001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.io.Serializable; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.FlowElement; 031import cascading.flow.FlowElements; 032import cascading.flow.planner.Scope; 033import cascading.flow.planner.graph.AnnotatedGraph; 034import cascading.flow.planner.graph.ElementGraph; 035import cascading.util.Util; 036 037/** 038 * 039 */ 040public class ProcessEdge<Process extends ProcessModel> implements Serializable 041 { 042 String id; 043 String sourceProcessID; 044 String sinkProcessID; 045 FlowElement flowElement; 046 Set<Integer> sourceProvidedOrdinals; 047 Set<Integer> sinkExpectedOrdinals; 048 Set<Enum> sinkAnnotations = Collections.emptySet(); 049 Set<Enum> sourceAnnotations = Collections.emptySet(); 050 Map<String, String> edgeAnnotations; 051 052 public ProcessEdge( Process sourceProcess, FlowElement flowElement, Process sinkProcess ) 053 { 054 this.flowElement = flowElement; 055 this.sourceProcessID = sourceProcess.getID(); 056 this.sinkProcessID = sinkProcess.getID(); 057 058 ElementGraph sourceElementGraph = sourceProcess.getElementGraph(); 059 ElementGraph sinkElementGraph = sinkProcess.getElementGraph(); 060 061 // for both set of ordinals, we only care about the edges entering the flowElement 062 // the source may only provide a subset of the paths expected by the sink 063 064 // all ordinals the source process provides 065 this.sourceProvidedOrdinals = createOrdinals( sourceElementGraph.incomingEdgesOf( flowElement ) ); 066 067 // all ordinals the sink process expects 068 this.sinkExpectedOrdinals = createOrdinals( sinkElementGraph.incomingEdgesOf( flowElement ) ); 069 070 if( sourceElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sourceElementGraph ).hasAnnotations() ) 071 this.sourceAnnotations = ( (AnnotatedGraph) sourceElementGraph ).getAnnotations().getKeysFor( flowElement ); 072 073 if( sinkElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sinkElementGraph ).hasAnnotations() ) 074 this.sinkAnnotations = ( (AnnotatedGraph) sinkElementGraph ).getAnnotations().getKeysFor( flowElement ); 075 } 076 077 public ProcessEdge( Process sourceProcess, Process sinkProcess ) 078 { 079 this.sourceProcessID = sourceProcess.getID(); 080 this.sinkProcessID = sinkProcess.getID(); 081 } 082 083 public String getID() 084 { 085 if( id == null ) // make it lazy 086 id = Util.createUniqueID(); 087 088 return id; 089 } 090 091 public String getSourceProcessID() 092 { 093 return sourceProcessID; 094 } 095 096 public String getSinkProcessID() 097 { 098 return sinkProcessID; 099 } 100 101 /** 102 * Returns any edge annotations, or an empty immutable Map. 103 * <p/> 104 * Use {@link #addEdgeAnnotation(String, String)} to add edge annotations. 105 * 106 * @return 107 */ 108 public Map<String, String> getEdgeAnnotations() 109 { 110 if( edgeAnnotations == null ) 111 return Collections.emptyMap(); 112 113 return Collections.unmodifiableMap( edgeAnnotations ); 114 } 115 116 public void addEdgeAnnotation( Enum annotation ) 117 { 118 if( annotation == null ) 119 return; 120 121 addEdgeAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 122 } 123 124 public void addEdgeAnnotation( String key, String value ) 125 { 126 if( edgeAnnotations == null ) 127 edgeAnnotations = new HashMap<>(); 128 129 edgeAnnotations.put( key, value ); 130 } 131 132 private Set<Integer> createOrdinals( Set<Scope> scopes ) 133 { 134 Set<Integer> ordinals = new HashSet<>(); 135 136 for( Scope scope : scopes ) 137 ordinals.add( scope.getOrdinal() ); 138 139 return ordinals; 140 } 141 142 public FlowElement getFlowElement() 143 { 144 return flowElement; 145 } 146 147 public String getFlowElementID() 148 { 149 return FlowElements.id( flowElement ); 150 } 151 152 public Set<Integer> getSinkExpectedOrdinals() 153 { 154 return sinkExpectedOrdinals; 155 } 156 157 public Set<Integer> getSourceProvidedOrdinals() 158 { 159 return sourceProvidedOrdinals; 160 } 161 162 public Set<Enum> getSinkAnnotations() 163 { 164 return sinkAnnotations; 165 } 166 167 public Set<Enum> getSourceAnnotations() 168 { 169 return sourceAnnotations; 170 } 171 }