001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.ArrayList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    
028    import cascading.flow.FlowElement;
029    import cascading.flow.FlowProcess;
030    import cascading.flow.planner.Scope;
031    import cascading.pipe.Pipe;
032    import cascading.pipe.Splice;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    import cascading.tuple.TupleEntryChainIterator;
037    import cascading.tuple.TupleEntryIterator;
038    import cascading.tuple.Tuples;
039    import cascading.tuple.util.TupleBuilder;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    import static cascading.tuple.util.TupleViews.*;
044    
045    /**
046     *
047     */
048    public abstract class SpliceGate extends Gate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements ElementDuct, Collapsing
049      {
050      private static final Logger LOG = LoggerFactory.getLogger( SpliceGate.class );
051    
052      protected Duct[] orderedPrevious;
053    
054      public enum Role
055        {
056          sink, source, both
057        }
058    
059      protected final FlowProcess flowProcess;
060      protected Role role = Role.both;
061    
062      private TrapHandler trapHandler;
063      private Set<String> branchNames;
064    
065      protected final Splice splice;
066      protected final List<Scope> incomingScopes = new ArrayList<Scope>();
067      protected final List<Scope> outgoingScopes = new ArrayList<Scope>();
068      protected Fields[] keyFields;
069      protected Fields[] sortFields;
070      protected Fields[] valuesFields;
071    
072      protected TupleBuilder[] keyBuilder;
073      protected TupleBuilder[] valuesBuilder;
074      protected TupleBuilder[] sortBuilder;
075    
076      protected Grouping<TupleEntry, TupleEntryIterator> grouping;
077      protected TupleEntry keyEntry;
078      protected TupleEntryChainIterator tupleEntryIterator;
079    
080      public SpliceGate( FlowProcess flowProcess, Splice splice )
081        {
082        this.splice = splice;
083    
084        FlowElement element = splice;
085    
086        while( element != null )
087          {
088          if( element.hasConfigDef() )
089            flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() );
090    
091          element = ( (Pipe) element ).getParent();
092          }
093    
094        this.flowProcess = flowProcess;
095        }
096    
097      public SpliceGate( FlowProcess flowProcess, Splice splice, Role role )
098        {
099        this.splice = splice;
100        this.flowProcess = flowProcess;
101        this.role = role;
102        }
103    
104      public void setBranchNames( Set<String> branchNames )
105        {
106        this.branchNames = branchNames;
107        }
108    
109      public Set<String> getBranchNames()
110        {
111        return branchNames;
112        }
113    
114      @Override
115      public void setTrapHandler( TrapHandler trapHandler )
116        {
117        this.trapHandler = trapHandler;
118        }
119    
120      @Override
121      public boolean hasTrapHandler()
122        {
123        return trapHandler != null;
124        }
125    
126      protected void handleReThrowableException( String message, Throwable throwable )
127        {
128        trapHandler.handleReThrowableException( message, throwable );
129        }
130    
131      protected void handleException( Throwable exception, TupleEntry tupleEntry )
132        {
133        trapHandler.handleException( exception, tupleEntry );
134        }
135    
136      protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
137        {
138        if( narrowFields.isNone() )
139          return new TupleBuilder()
140          {
141          @Override
142          public Tuple makeResult( Tuple input, Tuple output )
143            {
144            return Tuple.NULL;
145            }
146          };
147    
148        if( incomingFields.isUnknown() )
149          return new TupleBuilder()
150          {
151          @Override
152          public Tuple makeResult( Tuple input, Tuple output )
153            {
154            return input.get( incomingFields, narrowFields );
155            }
156          };
157    
158        if( narrowFields.isAll() ) // dubious this is ever reached
159          return new TupleBuilder()
160          {
161          @Override
162          public Tuple makeResult( Tuple input, Tuple output )
163            {
164            return input;
165            }
166          };
167    
168        return createDefaultNarrowBuilder( incomingFields, narrowFields );
169        }
170    
171      protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
172        {
173        return new TupleBuilder()
174        {
175        Tuple result = createNarrow( incomingFields.getPos( narrowFields ) );
176    
177        @Override
178        public Tuple makeResult( Tuple input, Tuple output )
179          {
180          return reset( result, input );
181          }
182        };
183        }
184    
185      protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField )
186        {
187        if( incomingFields.isUnknown() )
188          return new TupleBuilder()
189          {
190          @Override
191          public Tuple makeResult( Tuple input, Tuple output )
192            {
193            return Tuples.nulledCopy( incomingFields, input, keyField );
194            }
195          };
196    
197        if( keyField.isNone() )
198          return new TupleBuilder()
199          {
200          @Override
201          public Tuple makeResult( Tuple input, Tuple output )
202            {
203            return input;
204            }
205          };
206    
207        if( keyField.isAll() )
208          return new TupleBuilder()
209          {
210          Tuple nullTuple = Tuple.size( incomingFields.size() );
211    
212          @Override
213          public Tuple makeResult( Tuple input, Tuple output )
214            {
215            return nullTuple;
216            }
217          };
218    
219        return new TupleBuilder()
220        {
221        Tuple nullTuple = Tuple.size( keyField.size() );
222        Tuple result = createOverride( incomingFields, keyField );
223    
224        @Override
225        public Tuple makeResult( Tuple baseTuple, Tuple output )
226          {
227          return reset( result, baseTuple, nullTuple );
228          }
229        };
230        }
231    
232      @Override
233      public void initialize()
234        {
235        super.initialize();
236    
237        if( incomingScopes.size() == 0 )
238          throw new IllegalStateException( "incoming scopes may not be empty" );
239    
240        if( outgoingScopes.size() == 0 )
241          throw new IllegalStateException( "outgoing scope may not be empty" );
242    
243        int size = splice.isGroupBy() ? 1 : incomingScopes.size();
244    
245        keyFields = new Fields[ size ];
246        valuesFields = new Fields[ size ];
247    
248        keyBuilder = new TupleBuilder[ size ];
249        valuesBuilder = new TupleBuilder[ size ];
250    
251        if( splice.isSorted() )
252          {
253          sortFields = new Fields[ size ];
254          sortBuilder = new TupleBuilder[ size ];
255          }
256    
257        Scope outgoingScope = outgoingScopes.get( 0 );
258    
259        for( int i = 0; i < size; i++ )
260          {
261          Scope incomingScope = incomingScopes.get( i );
262    
263          // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge
264          int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() );
265    
266          keyFields[ pos ] = outgoingScope.getKeySelectors().get( incomingScope.getName() );
267          valuesFields[ pos ] = incomingScope.getIncomingSpliceFields();
268    
269          keyBuilder[ pos ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ pos ] );
270          valuesBuilder[ pos ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ pos ] );
271    
272          if( sortFields != null )
273            {
274            sortFields[ pos ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() );
275            sortBuilder[ pos ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ pos ] );
276            }
277    
278          if( LOG.isDebugEnabled() )
279            {
280            LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), pos );
281            LOG.debug( "keyFields: {}", printSafe( keyFields[ pos ] ) );
282            LOG.debug( "valueFields: {}", printSafe( valuesFields[ pos ] ) );
283    
284            if( sortFields != null )
285              LOG.debug( "sortFields: {}", printSafe( sortFields[ pos ] ) );
286            }
287          }
288    
289        if( role == Role.sink )
290          return;
291    
292        keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true );
293        tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() );
294    
295        grouping = new Grouping<TupleEntry, TupleEntryIterator>();
296        grouping.key = keyEntry;
297        grouping.joinIterator = tupleEntryIterator;
298        }
299    
300      @Override
301      public FlowElement getFlowElement()
302        {
303        return splice;
304        }
305    
306      @Override
307      public List<Scope> getOutgoingScopes()
308        {
309        return outgoingScopes;
310        }
311    
312      @Override
313      public List<Scope> getIncomingScopes()
314        {
315        return incomingScopes;
316        }
317    
318      public void addIncomingScope( Scope incomingScope )
319        {
320        incomingScopes.add( incomingScope );
321        }
322    
323      public void addOutgoingScope( Scope outgoingScope )
324        {
325        outgoingScopes.add( outgoingScope );
326        }
327    
328      @Override
329      public void cleanup()
330        {
331        super.cleanup();
332    
333        // close if top of stack
334        if( next == null )
335          TrapHandler.closeTraps();
336        }
337    
338      protected synchronized void orderDucts( StreamGraph streamGraph )
339        {
340        orderedPrevious = new Duct[ incomingScopes.size() ];
341    
342        if( incomingScopes.size() == 1 && splice.getPrevious().length == 1 )
343          {
344          orderedPrevious[ 0 ] = allPrevious[ 0 ];
345          return;
346          }
347    
348        for( Duct previous : allPrevious )
349          orderedPrevious[ streamGraph.ordinalBetween( previous, this ) ] = previous;
350        }
351    
352      protected void makePosMap( Map<Duct, Integer> posMap )
353        {
354        for( int i = 0; i < orderedPrevious.length; i++ )
355          {
356          if( orderedPrevious[ i ] != null )
357            posMap.put( orderedPrevious[ i ], i );
358          }
359        }
360    
361      private String printSafe( Fields fields )
362        {
363        if( fields != null )
364          return fields.printVerbose();
365    
366        return "";
367        }
368    
369      @Override
370      public final boolean equals( Object object )
371        {
372        if( this == object )
373          return true;
374        if( !( object instanceof SpliceGate ) )
375          return false;
376    
377        SpliceGate spliceGate = (SpliceGate) object;
378    
379        if( splice != null ? splice != spliceGate.splice : spliceGate.splice != null )
380          return false;
381    
382        return true;
383        }
384    
385      @Override
386      public final int hashCode()
387        {
388        return splice != null ? System.identityHashCode( splice ) : 0;
389        }
390    
391      @Override
392      public String toString()
393        {
394        final StringBuilder sb = new StringBuilder();
395        sb.append( getClass().getSimpleName() );
396        sb.append( "{splice=" ).append( splice );
397        sb.append( '}' );
398        return sb.toString();
399        }
400      }