001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.process; 023 024import java.io.Serializable; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Set; 029import java.util.TreeSet; 030 031import cascading.flow.FlowElement; 032import cascading.flow.FlowElements; 033import cascading.flow.planner.Scope; 034import cascading.flow.planner.graph.AnnotatedGraph; 035import cascading.flow.planner.graph.ElementGraph; 036import cascading.tuple.Fields; 037import cascading.util.Util; 038 039/** 040 * 041 */ 042public class ProcessEdge<Process extends ProcessModel> implements Serializable 043 { 044 String id; 045 String sourceProcessID; 046 String sinkProcessID; 047 FlowElement flowElement; 048 049 Set<Integer> sourceProvidedOrdinals; 050 Set<Integer> sinkExpectedOrdinals; 051 052 Map<Integer, Fields> resolvedKeyFields; 053 Map<Integer, Fields> resolvedSortFields; 054 Map<Integer, Fields> resolvedValueFields; 055 056 Set<Enum> sinkAnnotations = Collections.emptySet(); 057 Set<Enum> sourceAnnotations = Collections.emptySet(); 058 059 Map<String, String> edgeAnnotations; 060 061 public ProcessEdge( Process sourceProcess, FlowElement flowElement, Process sinkProcess ) 062 { 063 this( sourceProcess.getElementGraph(), flowElement, sinkProcess.getElementGraph() ); 064 065 this.sourceProcessID = sourceProcess.getID(); 066 this.sinkProcessID = sinkProcess.getID(); 067 } 068 069 public ProcessEdge( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph ) 070 { 071 this.flowElement = flowElement; 072 073 // for both set of ordinals, we only care about the edges entering the flowElement 074 // the source may only provide a subset of the paths expected by the sink 075 076 // all ordinals the source process provides 077 this.sourceProvidedOrdinals = createOrdinals( sourceElementGraph.incomingEdgesOf( flowElement ) ); 078 079 // all ordinals the sink process expects 080 this.sinkExpectedOrdinals = createOrdinals( sinkElementGraph.incomingEdgesOf( flowElement ) ); 081 082 setResolvedFields( sourceElementGraph, flowElement, sinkElementGraph ); 083 084 if( sourceElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sourceElementGraph ).hasAnnotations() ) 085 this.sourceAnnotations = ( (AnnotatedGraph) sourceElementGraph ).getAnnotations().getKeysFor( flowElement ); 086 087 if( sinkElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sinkElementGraph ).hasAnnotations() ) 088 this.sinkAnnotations = ( (AnnotatedGraph) sinkElementGraph ).getAnnotations().getKeysFor( flowElement ); 089 } 090 091 public ProcessEdge( Process sourceProcess, Process sinkProcess ) 092 { 093 this.sourceProcessID = sourceProcess.getID(); 094 this.sinkProcessID = sinkProcess.getID(); 095 } 096 097 public String getID() 098 { 099 if( id == null ) // make it lazy 100 id = Util.createUniqueID(); 101 102 return id; 103 } 104 105 public String getSourceProcessID() 106 { 107 return sourceProcessID; 108 } 109 110 public String getSinkProcessID() 111 { 112 return sinkProcessID; 113 } 114 115 /** 116 * Since this is an edge, we must declare the key and value fields used to connect two models 117 * <p> 118 * In most cases, the data being streamed is segmented into key and value pairs depending on the 119 * type of communication model used to move the data. 120 * <p> 121 * under partitioning models, the keys are hashed, and binned into a partition. 122 * <p> 123 * where data is simply forwarded, all data must be put into the key, and value declared to be empty 124 */ 125 private void setResolvedFields( ElementGraph sourceElementGraph, FlowElement flowElement, ElementGraph sinkElementGraph ) 126 { 127 Set<Scope> outgoingScopes = sourceElementGraph.outgoingEdgesOf( flowElement ); // resolved fields 128 Scope resolvedScope = Util.getFirst( outgoingScopes ); // only need first 129 130 Set<Scope> incomingScopes = sinkElementGraph.incomingEdgesOf( flowElement ); 131 132 resolvedKeyFields = new HashMap<>(); 133 resolvedSortFields = new HashMap<>(); 134 resolvedValueFields = new HashMap<>(); 135 136 for( Scope incomingScope : incomingScopes ) 137 { 138 int ordinal = incomingScope.getOrdinal(); 139 140 // no joining or merging happening 141 if( resolvedScope.getKeySelectors() == null ) 142 { 143 resolvedKeyFields.put( ordinal, incomingScope.getIncomingSpliceFields() ); 144 } 145 else if( resolvedScope.getKeySelectors() != null ) 146 { 147 Fields value = resolvedScope.getKeySelectors().get( incomingScope.getName() ); 148 149 if( value != null ) 150 resolvedKeyFields.put( ordinal, value ); 151 } 152 153 if( resolvedScope.getSortingSelectors() != null ) 154 { 155 Fields value = resolvedScope.getSortingSelectors().get( incomingScope.getName() ); 156 157 if( value != null ) 158 resolvedSortFields.put( ordinal, value ); 159 } 160 161 if( resolvedScope.getKeySelectors() != null ) 162 resolvedValueFields.put( ordinal, incomingScope.getIncomingSpliceFields() ); 163 } 164 } 165 166 /** 167 * Returns any edge annotations, or an empty immutable Map. 168 * <p> 169 * Use {@link #addEdgeAnnotation(String, String)} to add edge annotations. 170 * 171 * @return 172 */ 173 public Map<String, String> getEdgeAnnotations() 174 { 175 if( edgeAnnotations == null ) 176 return Collections.emptyMap(); 177 178 return Collections.unmodifiableMap( edgeAnnotations ); 179 } 180 181 public void addEdgeAnnotation( Enum annotation ) 182 { 183 if( annotation == null ) 184 return; 185 186 addEdgeAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 187 } 188 189 public void addEdgeAnnotation( String key, String value ) 190 { 191 if( edgeAnnotations == null ) 192 edgeAnnotations = new HashMap<>(); 193 194 edgeAnnotations.put( key, value ); 195 } 196 197 private Set<Integer> createOrdinals( Set<Scope> scopes ) 198 { 199 Set<Integer> ordinals = new TreeSet<>(); 200 201 for( Scope scope : scopes ) 202 ordinals.add( scope.getOrdinal() ); 203 204 return ordinals; 205 } 206 207 public FlowElement getFlowElement() 208 { 209 return flowElement; 210 } 211 212 public String getFlowElementID() 213 { 214 return FlowElements.id( flowElement ); 215 } 216 217 public Set<Integer> getSinkExpectedOrdinals() 218 { 219 return sinkExpectedOrdinals; 220 } 221 222 public Set<Integer> getSourceProvidedOrdinals() 223 { 224 return sourceProvidedOrdinals; 225 } 226 227 public Map<Integer, Fields> getResolvedKeyFields() 228 { 229 return resolvedKeyFields; 230 } 231 232 public Map<Integer, Fields> getResolvedSortFields() 233 { 234 return resolvedSortFields; 235 } 236 237 public Map<Integer, Fields> getResolvedValueFields() 238 { 239 return resolvedValueFields; 240 } 241 242 public Set<Enum> getSinkAnnotations() 243 { 244 return sinkAnnotations; 245 } 246 247 public Set<Enum> getSourceAnnotations() 248 { 249 return sourceAnnotations; 250 } 251 }