001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.ArrayList; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 028 import cascading.flow.FlowElement; 029 import cascading.flow.FlowProcess; 030 import cascading.flow.planner.Scope; 031 import cascading.pipe.Pipe; 032 import cascading.pipe.Splice; 033 import cascading.tuple.Fields; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleEntry; 036 import cascading.tuple.TupleEntryChainIterator; 037 import cascading.tuple.TupleEntryIterator; 038 import cascading.tuple.Tuples; 039 import cascading.tuple.util.TupleBuilder; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 043 import static cascading.tuple.util.TupleViews.*; 044 045 /** 046 * 047 */ 048 public abstract class SpliceGate extends Gate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements ElementDuct, Collapsing 049 { 050 private static final Logger LOG = LoggerFactory.getLogger( SpliceGate.class ); 051 052 protected Duct[] orderedPrevious; 053 054 public enum Role 055 { 056 sink, source, both 057 } 058 059 protected final FlowProcess flowProcess; 060 protected Role role = Role.both; 061 062 private TrapHandler trapHandler; 063 private Set<String> branchNames; 064 065 protected final Splice splice; 066 protected final List<Scope> incomingScopes = new ArrayList<Scope>(); 067 protected final List<Scope> outgoingScopes = new ArrayList<Scope>(); 068 protected Fields[] keyFields; 069 protected Fields[] sortFields; 070 protected Fields[] valuesFields; 071 072 protected TupleBuilder[] keyBuilder; 073 protected TupleBuilder[] valuesBuilder; 074 protected TupleBuilder[] sortBuilder; 075 076 protected Grouping<TupleEntry, TupleEntryIterator> grouping; 077 protected TupleEntry keyEntry; 078 protected TupleEntryChainIterator tupleEntryIterator; 079 080 public SpliceGate( FlowProcess flowProcess, Splice splice ) 081 { 082 this.splice = splice; 083 084 FlowElement element = splice; 085 086 while( element != null ) 087 { 088 if( element.hasConfigDef() ) 089 flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() ); 090 091 element = ( (Pipe) element ).getParent(); 092 } 093 094 this.flowProcess = flowProcess; 095 } 096 097 public SpliceGate( FlowProcess flowProcess, Splice splice, Role role ) 098 { 099 this.splice = splice; 100 this.flowProcess = flowProcess; 101 this.role = role; 102 } 103 104 public void setBranchNames( Set<String> branchNames ) 105 { 106 this.branchNames = branchNames; 107 } 108 109 public Set<String> getBranchNames() 110 { 111 return branchNames; 112 } 113 114 @Override 115 public void setTrapHandler( TrapHandler trapHandler ) 116 { 117 this.trapHandler = trapHandler; 118 } 119 120 @Override 121 public boolean hasTrapHandler() 122 { 123 return trapHandler != null; 124 } 125 126 protected void handleReThrowableException( String message, Throwable throwable ) 127 { 128 trapHandler.handleReThrowableException( message, throwable ); 129 } 130 131 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 132 { 133 trapHandler.handleException( exception, tupleEntry ); 134 } 135 136 protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 137 { 138 if( narrowFields.isNone() ) 139 return new TupleBuilder() 140 { 141 @Override 142 public Tuple makeResult( Tuple input, Tuple output ) 143 { 144 return Tuple.NULL; 145 } 146 }; 147 148 if( incomingFields.isUnknown() ) 149 return new TupleBuilder() 150 { 151 @Override 152 public Tuple makeResult( Tuple input, Tuple output ) 153 { 154 return input.get( incomingFields, narrowFields ); 155 } 156 }; 157 158 if( narrowFields.isAll() ) // dubious this is ever reached 159 return new TupleBuilder() 160 { 161 @Override 162 public Tuple makeResult( Tuple input, Tuple output ) 163 { 164 return input; 165 } 166 }; 167 168 return createDefaultNarrowBuilder( incomingFields, narrowFields ); 169 } 170 171 protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 172 { 173 return new TupleBuilder() 174 { 175 Tuple result = createNarrow( incomingFields.getPos( narrowFields ) ); 176 177 @Override 178 public Tuple makeResult( Tuple input, Tuple output ) 179 { 180 return reset( result, input ); 181 } 182 }; 183 } 184 185 protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField ) 186 { 187 if( incomingFields.isUnknown() ) 188 return new TupleBuilder() 189 { 190 @Override 191 public Tuple makeResult( Tuple input, Tuple output ) 192 { 193 return Tuples.nulledCopy( incomingFields, input, keyField ); 194 } 195 }; 196 197 if( keyField.isNone() ) 198 return new TupleBuilder() 199 { 200 @Override 201 public Tuple makeResult( Tuple input, Tuple output ) 202 { 203 return input; 204 } 205 }; 206 207 if( keyField.isAll() ) 208 return new TupleBuilder() 209 { 210 Tuple nullTuple = Tuple.size( incomingFields.size() ); 211 212 @Override 213 public Tuple makeResult( Tuple input, Tuple output ) 214 { 215 return nullTuple; 216 } 217 }; 218 219 return new TupleBuilder() 220 { 221 Tuple nullTuple = Tuple.size( keyField.size() ); 222 Tuple result = createOverride( incomingFields, keyField ); 223 224 @Override 225 public Tuple makeResult( Tuple baseTuple, Tuple output ) 226 { 227 return reset( result, baseTuple, nullTuple ); 228 } 229 }; 230 } 231 232 @Override 233 public void initialize() 234 { 235 super.initialize(); 236 237 if( incomingScopes.size() == 0 ) 238 throw new IllegalStateException( "incoming scopes may not be empty" ); 239 240 if( outgoingScopes.size() == 0 ) 241 throw new IllegalStateException( "outgoing scope may not be empty" ); 242 243 int size = splice.isGroupBy() ? 1 : incomingScopes.size(); 244 245 keyFields = new Fields[ size ]; 246 valuesFields = new Fields[ size ]; 247 248 keyBuilder = new TupleBuilder[ size ]; 249 valuesBuilder = new TupleBuilder[ size ]; 250 251 if( splice.isSorted() ) 252 { 253 sortFields = new Fields[ size ]; 254 sortBuilder = new TupleBuilder[ size ]; 255 } 256 257 Scope outgoingScope = outgoingScopes.get( 0 ); 258 259 for( int i = 0; i < size; i++ ) 260 { 261 Scope incomingScope = incomingScopes.get( i ); 262 263 // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge 264 int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() ); 265 266 keyFields[ pos ] = outgoingScope.getKeySelectors().get( incomingScope.getName() ); 267 valuesFields[ pos ] = incomingScope.getIncomingSpliceFields(); 268 269 keyBuilder[ pos ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ pos ] ); 270 valuesBuilder[ pos ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ pos ] ); 271 272 if( sortFields != null ) 273 { 274 sortFields[ pos ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() ); 275 sortBuilder[ pos ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ pos ] ); 276 } 277 278 if( LOG.isDebugEnabled() ) 279 { 280 LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), pos ); 281 LOG.debug( "keyFields: {}", printSafe( keyFields[ pos ] ) ); 282 LOG.debug( "valueFields: {}", printSafe( valuesFields[ pos ] ) ); 283 284 if( sortFields != null ) 285 LOG.debug( "sortFields: {}", printSafe( sortFields[ pos ] ) ); 286 } 287 } 288 289 if( role == Role.sink ) 290 return; 291 292 keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true ); 293 tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() ); 294 295 grouping = new Grouping<TupleEntry, TupleEntryIterator>(); 296 grouping.key = keyEntry; 297 grouping.joinIterator = tupleEntryIterator; 298 } 299 300 @Override 301 public FlowElement getFlowElement() 302 { 303 return splice; 304 } 305 306 @Override 307 public List<Scope> getOutgoingScopes() 308 { 309 return outgoingScopes; 310 } 311 312 @Override 313 public List<Scope> getIncomingScopes() 314 { 315 return incomingScopes; 316 } 317 318 public void addIncomingScope( Scope incomingScope ) 319 { 320 incomingScopes.add( incomingScope ); 321 } 322 323 public void addOutgoingScope( Scope outgoingScope ) 324 { 325 outgoingScopes.add( outgoingScope ); 326 } 327 328 @Override 329 public void cleanup() 330 { 331 super.cleanup(); 332 333 // close if top of stack 334 if( next == null ) 335 TrapHandler.closeTraps(); 336 } 337 338 protected synchronized void orderDucts( StreamGraph streamGraph ) 339 { 340 orderedPrevious = new Duct[ incomingScopes.size() ]; 341 342 if( incomingScopes.size() == 1 && splice.getPrevious().length == 1 ) 343 { 344 orderedPrevious[ 0 ] = allPrevious[ 0 ]; 345 return; 346 } 347 348 for( Duct previous : allPrevious ) 349 orderedPrevious[ streamGraph.ordinalBetween( previous, this ) ] = previous; 350 } 351 352 protected void makePosMap( Map<Duct, Integer> posMap ) 353 { 354 for( int i = 0; i < orderedPrevious.length; i++ ) 355 { 356 if( orderedPrevious[ i ] != null ) 357 posMap.put( orderedPrevious[ i ], i ); 358 } 359 } 360 361 private String printSafe( Fields fields ) 362 { 363 if( fields != null ) 364 return fields.printVerbose(); 365 366 return ""; 367 } 368 369 @Override 370 public final boolean equals( Object object ) 371 { 372 if( this == object ) 373 return true; 374 if( !( object instanceof SpliceGate ) ) 375 return false; 376 377 SpliceGate spliceGate = (SpliceGate) object; 378 379 if( splice != null ? splice != spliceGate.splice : spliceGate.splice != null ) 380 return false; 381 382 return true; 383 } 384 385 @Override 386 public final int hashCode() 387 { 388 return splice != null ? System.identityHashCode( splice ) : 0; 389 } 390 391 @Override 392 public String toString() 393 { 394 final StringBuilder sb = new StringBuilder(); 395 sb.append( getClass().getSimpleName() ); 396 sb.append( "{splice=" ).append( splice ); 397 sb.append( '}' ); 398 return sb.toString(); 399 } 400 }