001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.process;
023
024import java.io.Serializable;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Set;
029import java.util.TreeSet;
030
031import cascading.flow.FlowElement;
032import cascading.flow.FlowElements;
033import cascading.flow.planner.Scope;
034import cascading.flow.planner.graph.AnnotatedGraph;
035import cascading.flow.planner.graph.ElementGraph;
036import cascading.tuple.Fields;
037import cascading.util.Util;
038
039/**
040 *
041 */
042public class ProcessEdge<Process extends ProcessModel> implements Serializable
043  {
044  String id;
045  String sourceProcessID;
046  String sinkProcessID;
047  FlowElement flowElement;
048
049  Set<Integer> sourceProvidedOrdinals;
050  Set<Integer> sinkExpectedOrdinals;
051
052  Map<Integer, Fields> resolvedKeyFields;
053  Map<Integer, Fields> resolvedSortFields;
054  Map<Integer, Fields> resolvedValueFields;
055
056  Set<Enum> sinkAnnotations = Collections.emptySet();
057  Set<Enum> sourceAnnotations = Collections.emptySet();
058
059  Map<String, String> edgeAnnotations;
060
061  public ProcessEdge( Process sourceProcess, FlowElement flowElement, Process sinkProcess )
062    {
063    this( sourceProcess.getElementGraph(), flowElement, sinkProcess.getElementGraph() );
064
065    this.sourceProcessID = sourceProcess.getID();
066    this.sinkProcessID = sinkProcess.getID();
067    }
068
069  public ProcessEdge( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph )
070    {
071    this.flowElement = flowElement;
072
073    // for both set of ordinals, we only care about the edges entering the flowElement
074    // the source may only provide a subset of the paths expected by the sink
075
076    // all ordinals the source process provides
077    this.sourceProvidedOrdinals = createOrdinals( sourceElementGraph.incomingEdgesOf( flowElement ) );
078
079    // all ordinals the sink process expects
080    this.sinkExpectedOrdinals = createOrdinals( sinkElementGraph.incomingEdgesOf( flowElement ) );
081
082    setResolvedFields( sourceElementGraph, flowElement, sinkElementGraph );
083
084    if( sourceElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sourceElementGraph ).hasAnnotations() )
085      this.sourceAnnotations = ( (AnnotatedGraph) sourceElementGraph ).getAnnotations().getKeysFor( flowElement );
086
087    if( sinkElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sinkElementGraph ).hasAnnotations() )
088      this.sinkAnnotations = ( (AnnotatedGraph) sinkElementGraph ).getAnnotations().getKeysFor( flowElement );
089    }
090
091  public ProcessEdge( Process sourceProcess, Process sinkProcess )
092    {
093    this.sourceProcessID = sourceProcess.getID();
094    this.sinkProcessID = sinkProcess.getID();
095    }
096
097  public String getID()
098    {
099    if( id == null ) // make it lazy
100      id = Util.createUniqueID();
101
102    return id;
103    }
104
105  public String getSourceProcessID()
106    {
107    return sourceProcessID;
108    }
109
110  public String getSinkProcessID()
111    {
112    return sinkProcessID;
113    }
114
115  /**
116   * Since this is an edge, we must declare the key and value fields used to connect two models
117   * <p>
118   * In most cases, the data being streamed is segmented into key and value pairs depending on the
119   * type of communication model used to move the data.
120   * <p>
121   * under partitioning models, the keys are hashed, and binned into a partition.
122   * <p>
123   * where data is simply forwarded, all data must be put into the key, and value declared to be empty
124   */
125  private void setResolvedFields( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph )
126    {
127    Set<Scope> outgoingScopes = sourceElementGraph.outgoingEdgesOf( flowElement ); // resolved fields
128    Scope resolvedScope = Util.getFirst( outgoingScopes ); // only need first
129
130    Set<Scope> incomingScopes = sinkElementGraph.incomingEdgesOf( flowElement );
131
132    resolvedKeyFields = new HashMap<>();
133    resolvedSortFields = new HashMap<>();
134    resolvedValueFields = new HashMap<>();
135
136    for( Scope incomingScope : incomingScopes )
137      {
138      int ordinal = incomingScope.getOrdinal();
139
140      // no joining or merging happening
141      if( resolvedScope.getKeySelectors() == null )
142        {
143        resolvedKeyFields.put( ordinal, incomingScope.getIncomingSpliceFields() );
144        }
145      else if( resolvedScope.getKeySelectors() != null )
146        {
147        Fields value = resolvedScope.getKeySelectors().get( incomingScope.getName() );
148
149        if( value != null )
150          resolvedKeyFields.put( ordinal, value );
151        }
152
153      if( resolvedScope.getSortingSelectors() != null )
154        {
155        Fields value = resolvedScope.getSortingSelectors().get( incomingScope.getName() );
156
157        if( value != null )
158          resolvedSortFields.put( ordinal, value );
159        }
160
161      if( resolvedScope.getKeySelectors() != null )
162        resolvedValueFields.put( ordinal, incomingScope.getIncomingSpliceFields() );
163      }
164    }
165
166  /**
167   * Returns any edge annotations, or an empty immutable Map.
168   * <p>
169   * Use {@link #addEdgeAnnotation(String, String)} to add edge annotations.
170   *
171   * @return
172   */
173  public Map<String, String> getEdgeAnnotations()
174    {
175    if( edgeAnnotations == null )
176      return Collections.emptyMap();
177
178    return Collections.unmodifiableMap( edgeAnnotations );
179    }
180
181  public void addEdgeAnnotation( Enum annotation )
182    {
183    if( annotation == null )
184      return;
185
186    addEdgeAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
187    }
188
189  public void addEdgeAnnotation( String key, String value )
190    {
191    if( edgeAnnotations == null )
192      edgeAnnotations = new HashMap<>();
193
194    edgeAnnotations.put( key, value );
195    }
196
197  private Set<Integer> createOrdinals( Set<Scope> scopes )
198    {
199    Set<Integer> ordinals = new TreeSet<>();
200
201    for( Scope scope : scopes )
202      ordinals.add( scope.getOrdinal() );
203
204    return ordinals;
205    }
206
207  public FlowElement getFlowElement()
208    {
209    return flowElement;
210    }
211
212  public String getFlowElementID()
213    {
214    return FlowElements.id( flowElement );
215    }
216
217  public Set<Integer> getSinkExpectedOrdinals()
218    {
219    return sinkExpectedOrdinals;
220    }
221
222  public Set<Integer> getSourceProvidedOrdinals()
223    {
224    return sourceProvidedOrdinals;
225    }
226
227  public Map<Integer, Fields> getResolvedKeyFields()
228    {
229    return resolvedKeyFields;
230    }
231
232  public Map<Integer, Fields> getResolvedSortFields()
233    {
234    return resolvedSortFields;
235    }
236
237  public Map<Integer, Fields> getResolvedValueFields()
238    {
239    return resolvedValueFields;
240    }
241
242  public Set<Enum> getSinkAnnotations()
243    {
244    return sinkAnnotations;
245    }
246
247  public Set<Enum> getSourceAnnotations()
248    {
249    return sourceAnnotations;
250    }
251  }