001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.io.Serializable;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.Set;
028import java.util.TreeSet;
029
030import cascading.flow.FlowElement;
031import cascading.flow.FlowElements;
032import cascading.flow.planner.Scope;
033import cascading.flow.planner.graph.AnnotatedGraph;
034import cascading.flow.planner.graph.ElementGraph;
035import cascading.tuple.Fields;
036import cascading.util.Util;
037
038/**
039 *
040 */
041public class ProcessEdge<Process extends ProcessModel> implements Serializable
042  {
043  String id;
044  String sourceProcessID;
045  String sinkProcessID;
046  FlowElement flowElement;
047
048  Set<Integer> sourceProvidedOrdinals;
049  Set<Integer> sinkExpectedOrdinals;
050
051  Map<Integer, Fields> resolvedKeyFields;
052  Map<Integer, Fields> resolvedSortFields;
053  Map<Integer, Fields> resolvedValueFields;
054
055  Set<Enum> sinkAnnotations = Collections.emptySet();
056  Set<Enum> sourceAnnotations = Collections.emptySet();
057
058  Map<String, String> edgeAnnotations;
059
060  public ProcessEdge( Process sourceProcess, FlowElement flowElement, Process sinkProcess )
061    {
062    this( sourceProcess.getElementGraph(), flowElement, sinkProcess.getElementGraph() );
063
064    this.sourceProcessID = sourceProcess.getID();
065    this.sinkProcessID = sinkProcess.getID();
066    }
067
068  public ProcessEdge( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph )
069    {
070    this.flowElement = flowElement;
071
072    // for both set of ordinals, we only care about the edges entering the flowElement
073    // the source may only provide a subset of the paths expected by the sink
074
075    // all ordinals the source process provides
076    this.sourceProvidedOrdinals = createOrdinals( sourceElementGraph.incomingEdgesOf( flowElement ) );
077
078    // all ordinals the sink process expects
079    this.sinkExpectedOrdinals = createOrdinals( sinkElementGraph.incomingEdgesOf( flowElement ) );
080
081    setResolvedFields( sourceElementGraph, flowElement, sinkElementGraph );
082
083    if( sourceElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sourceElementGraph ).hasAnnotations() )
084      this.sourceAnnotations = ( (AnnotatedGraph) sourceElementGraph ).getAnnotations().getKeysFor( flowElement );
085
086    if( sinkElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sinkElementGraph ).hasAnnotations() )
087      this.sinkAnnotations = ( (AnnotatedGraph) sinkElementGraph ).getAnnotations().getKeysFor( flowElement );
088    }
089
090  public ProcessEdge( Process sourceProcess, Process sinkProcess )
091    {
092    this.sourceProcessID = sourceProcess.getID();
093    this.sinkProcessID = sinkProcess.getID();
094    }
095
096  public String getID()
097    {
098    if( id == null ) // make it lazy
099      id = Util.createUniqueID();
100
101    return id;
102    }
103
104  public String getSourceProcessID()
105    {
106    return sourceProcessID;
107    }
108
109  public String getSinkProcessID()
110    {
111    return sinkProcessID;
112    }
113
114  /**
115   * Since this is an edge, we must declare the key and value fields used to connect two models
116   * <p/>
117   * In most cases, the data being streamed is segmented into key and value pairs depending on the
118   * type of communication model used to move the data.
119   * <p/>
120   * under partitioning models, the keys are hashed, and binned into a partition.
121   * <p/>
122   * where data is simply forwarded, all data must be put into the key, and value declared to be empty
123   */
124  private void setResolvedFields( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph )
125    {
126    Set<Scope> outgoingScopes = sourceElementGraph.outgoingEdgesOf( flowElement ); // resolved fields
127    Scope resolvedScope = Util.getFirst( outgoingScopes ); // only need first
128
129    Set<Scope> incomingScopes = sinkElementGraph.incomingEdgesOf( flowElement );
130
131    resolvedKeyFields = new HashMap<>();
132    resolvedSortFields = new HashMap<>();
133    resolvedValueFields = new HashMap<>();
134
135    for( Scope incomingScope : incomingScopes )
136      {
137      int ordinal = incomingScope.getOrdinal();
138
139      // no joining or merging happening
140      if( resolvedScope.getKeySelectors() == null )
141        {
142        resolvedKeyFields.put( ordinal, incomingScope.getIncomingSpliceFields() );
143        }
144      else if( resolvedScope.getKeySelectors() != null )
145        {
146        Fields value = resolvedScope.getKeySelectors().get( incomingScope.getName() );
147
148        if( value != null )
149          resolvedKeyFields.put( ordinal, value );
150        }
151
152      if( resolvedScope.getSortingSelectors() != null )
153        {
154        Fields value = resolvedScope.getSortingSelectors().get( incomingScope.getName() );
155
156        if( value != null )
157          resolvedSortFields.put( ordinal, value );
158        }
159
160      if( resolvedScope.getKeySelectors() != null )
161        resolvedValueFields.put( ordinal, incomingScope.getIncomingSpliceFields() );
162      }
163    }
164
165  /**
166   * Returns any edge annotations, or an empty immutable Map.
167   * <p/>
168   * Use {@link #addEdgeAnnotation(String, String)} to add edge annotations.
169   *
170   * @return
171   */
172  public Map<String, String> getEdgeAnnotations()
173    {
174    if( edgeAnnotations == null )
175      return Collections.emptyMap();
176
177    return Collections.unmodifiableMap( edgeAnnotations );
178    }
179
180  public void addEdgeAnnotation( Enum annotation )
181    {
182    if( annotation == null )
183      return;
184
185    addEdgeAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
186    }
187
188  public void addEdgeAnnotation( String key, String value )
189    {
190    if( edgeAnnotations == null )
191      edgeAnnotations = new HashMap<>();
192
193    edgeAnnotations.put( key, value );
194    }
195
196  private Set<Integer> createOrdinals( Set<Scope> scopes )
197    {
198    Set<Integer> ordinals = new TreeSet<>();
199
200    for( Scope scope : scopes )
201      ordinals.add( scope.getOrdinal() );
202
203    return ordinals;
204    }
205
206  public FlowElement getFlowElement()
207    {
208    return flowElement;
209    }
210
211  public String getFlowElementID()
212    {
213    return FlowElements.id( flowElement );
214    }
215
216  public Set<Integer> getSinkExpectedOrdinals()
217    {
218    return sinkExpectedOrdinals;
219    }
220
221  public Set<Integer> getSourceProvidedOrdinals()
222    {
223    return sourceProvidedOrdinals;
224    }
225
226  public Map<Integer, Fields> getResolvedKeyFields()
227    {
228    return resolvedKeyFields;
229    }
230
231  public Map<Integer, Fields> getResolvedSortFields()
232    {
233    return resolvedSortFields;
234    }
235
236  public Map<Integer, Fields> getResolvedValueFields()
237    {
238    return resolvedValueFields;
239    }
240
241  public Set<Enum> getSinkAnnotations()
242    {
243    return sinkAnnotations;
244    }
245
246  public Set<Enum> getSourceAnnotations()
247    {
248    return sourceAnnotations;
249    }
250  }