001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.io.Serializable; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.Set; 028import java.util.TreeSet; 029 030import cascading.flow.FlowElement; 031import cascading.flow.FlowElements; 032import cascading.flow.planner.Scope; 033import cascading.flow.planner.graph.AnnotatedGraph; 034import cascading.flow.planner.graph.ElementGraph; 035import cascading.tuple.Fields; 036import cascading.util.Util; 037 038/** 039 * 040 */ 041public class ProcessEdge<Process extends ProcessModel> implements Serializable 042 { 043 String id; 044 String sourceProcessID; 045 String sinkProcessID; 046 FlowElement flowElement; 047 048 Set<Integer> sourceProvidedOrdinals; 049 Set<Integer> sinkExpectedOrdinals; 050 051 Map<Integer, Fields> resolvedKeyFields; 052 Map<Integer, Fields> resolvedSortFields; 053 Map<Integer, Fields> resolvedValueFields; 054 055 Set<Enum> sinkAnnotations = Collections.emptySet(); 056 Set<Enum> sourceAnnotations = Collections.emptySet(); 057 058 Map<String, String> edgeAnnotations; 059 060 public ProcessEdge( Process sourceProcess, FlowElement flowElement, Process sinkProcess ) 061 { 062 this( sourceProcess.getElementGraph(), flowElement, sinkProcess.getElementGraph() ); 063 064 this.sourceProcessID = sourceProcess.getID(); 065 this.sinkProcessID = sinkProcess.getID(); 066 } 067 068 public ProcessEdge( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph ) 069 { 070 this.flowElement = flowElement; 071 072 // for both set of ordinals, we only care about the edges entering the flowElement 073 // the source may only provide a subset of the paths expected by the sink 074 075 // all ordinals the source process provides 076 this.sourceProvidedOrdinals = createOrdinals( sourceElementGraph.incomingEdgesOf( flowElement ) ); 077 078 // all ordinals the sink process expects 079 this.sinkExpectedOrdinals = createOrdinals( sinkElementGraph.incomingEdgesOf( flowElement ) ); 080 081 setResolvedFields( sourceElementGraph, flowElement, sinkElementGraph ); 082 083 if( sourceElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sourceElementGraph ).hasAnnotations() ) 084 this.sourceAnnotations = ( (AnnotatedGraph) sourceElementGraph ).getAnnotations().getKeysFor( flowElement ); 085 086 if( sinkElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sinkElementGraph ).hasAnnotations() ) 087 this.sinkAnnotations = ( (AnnotatedGraph) sinkElementGraph ).getAnnotations().getKeysFor( flowElement ); 088 } 089 090 public ProcessEdge( Process sourceProcess, Process sinkProcess ) 091 { 092 this.sourceProcessID = sourceProcess.getID(); 093 this.sinkProcessID = sinkProcess.getID(); 094 } 095 096 public String getID() 097 { 098 if( id == null ) // make it lazy 099 id = Util.createUniqueID(); 100 101 return id; 102 } 103 104 public String getSourceProcessID() 105 { 106 return sourceProcessID; 107 } 108 109 public String getSinkProcessID() 110 { 111 return sinkProcessID; 112 } 113 114 /** 115 * Since this is an edge, we must declare the key and value fields used to connect two models 116 * <p/> 117 * In most cases, the data being streamed is segmented into key and value pairs depending on the 118 * type of communication model used to move the data. 119 * <p/> 120 * under partitioning models, the keys are hashed, and binned into a partition. 121 * <p/> 122 * where data is simply forwarded, all data must be put into the key, and value declared to be empty 123 */ 124 private void setResolvedFields( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph ) 125 { 126 Set<Scope> outgoingScopes = sourceElementGraph.outgoingEdgesOf( flowElement ); // resolved fields 127 Scope resolvedScope = Util.getFirst( outgoingScopes ); // only need first 128 129 Set<Scope> incomingScopes = sinkElementGraph.incomingEdgesOf( flowElement ); 130 131 resolvedKeyFields = new HashMap<>(); 132 resolvedSortFields = new HashMap<>(); 133 resolvedValueFields = new HashMap<>(); 134 135 for( Scope incomingScope : incomingScopes ) 136 { 137 int ordinal = incomingScope.getOrdinal(); 138 139 // no joining or merging happening 140 if( resolvedScope.getKeySelectors() == null ) 141 { 142 resolvedKeyFields.put( ordinal, incomingScope.getIncomingSpliceFields() ); 143 } 144 else if( resolvedScope.getKeySelectors() != null ) 145 { 146 Fields value = resolvedScope.getKeySelectors().get( incomingScope.getName() ); 147 148 if( value != null ) 149 resolvedKeyFields.put( ordinal, value ); 150 } 151 152 if( resolvedScope.getSortingSelectors() != null ) 153 { 154 Fields value = resolvedScope.getSortingSelectors().get( incomingScope.getName() ); 155 156 if( value != null ) 157 resolvedSortFields.put( ordinal, value ); 158 } 159 160 if( resolvedScope.getKeySelectors() != null ) 161 resolvedValueFields.put( ordinal, incomingScope.getIncomingSpliceFields() ); 162 } 163 } 164 165 /** 166 * Returns any edge annotations, or an empty immutable Map. 167 * <p/> 168 * Use {@link #addEdgeAnnotation(String, String)} to add edge annotations. 169 * 170 * @return 171 */ 172 public Map<String, String> getEdgeAnnotations() 173 { 174 if( edgeAnnotations == null ) 175 return Collections.emptyMap(); 176 177 return Collections.unmodifiableMap( edgeAnnotations ); 178 } 179 180 public void addEdgeAnnotation( Enum annotation ) 181 { 182 if( annotation == null ) 183 return; 184 185 addEdgeAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 186 } 187 188 public void addEdgeAnnotation( String key, String value ) 189 { 190 if( edgeAnnotations == null ) 191 edgeAnnotations = new HashMap<>(); 192 193 edgeAnnotations.put( key, value ); 194 } 195 196 private Set<Integer> createOrdinals( Set<Scope> scopes ) 197 { 198 Set<Integer> ordinals = new TreeSet<>(); 199 200 for( Scope scope : scopes ) 201 ordinals.add( scope.getOrdinal() ); 202 203 return ordinals; 204 } 205 206 public FlowElement getFlowElement() 207 { 208 return flowElement; 209 } 210 211 public String getFlowElementID() 212 { 213 return FlowElements.id( flowElement ); 214 } 215 216 public Set<Integer> getSinkExpectedOrdinals() 217 { 218 return sinkExpectedOrdinals; 219 } 220 221 public Set<Integer> getSourceProvidedOrdinals() 222 { 223 return sourceProvidedOrdinals; 224 } 225 226 public Map<Integer, Fields> getResolvedKeyFields() 227 { 228 return resolvedKeyFields; 229 } 230 231 public Map<Integer, Fields> getResolvedSortFields() 232 { 233 return resolvedSortFields; 234 } 235 236 public Map<Integer, Fields> getResolvedValueFields() 237 { 238 return resolvedValueFields; 239 } 240 241 public Set<Enum> getSinkAnnotations() 242 { 243 return sinkAnnotations; 244 } 245 246 public Set<Enum> getSourceAnnotations() 247 { 248 return sourceAnnotations; 249 } 250 }