001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.io.IOException;
024    import java.util.Iterator;
025    
026    import cascading.CascadingException;
027    import cascading.flow.FlowProcess;
028    import cascading.operation.Buffer;
029    import cascading.pipe.Every;
030    import cascading.pipe.OperatorException;
031    import cascading.tuple.Fields;
032    import cascading.tuple.Tuple;
033    import cascading.tuple.TupleEntry;
034    import cascading.tuple.TupleEntryCollector;
035    import cascading.tuple.TupleEntryIterator;
036    import cascading.tuple.Tuples;
037    
038    /**
039     *
040     */
041    public class BufferEveryWindow extends EveryStage<Grouping<TupleEntry, TupleEntryIterator>> implements OpenWindow
042      {
043      Buffer buffer;
044    
045      public BufferEveryWindow( FlowProcess flowProcess, Every every )
046        {
047        super( flowProcess, every );
048        }
049    
050      @Override
051      public void initialize()
052        {
053        super.initialize();
054    
055        buffer = every.getBuffer();
056    
057        outputCollector = new TupleEntryCollector( getOperationDeclaredFields() )
058        {
059        @Override
060        protected void collect( TupleEntry resultEntry ) throws IOException
061          {
062          Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() );
063    
064          outgoingEntry.setTuple( outgoing );
065    
066          try
067            {
068            next.receive( BufferEveryWindow.this, outgoingEntry );
069            }
070          finally
071            {
072            Tuples.asModifiable( outgoing );
073            }
074          }
075        };
076        }
077    
078      @Override
079      protected Fields getIncomingPassThroughFields()
080        {
081        return incomingScopes.get( 0 ).getIncomingBufferPassThroughFields();
082        }
083    
084      @Override
085      protected Fields getIncomingArgumentsFields()
086        {
087        return incomingScopes.get( 0 ).getIncomingBufferArgumentFields();
088        }
089    
090      @Override
091      protected Fields getOutgoingSelector()
092        {
093        return outgoingScopes.get( 0 ).getOutGroupingSelector();
094        }
095    
096      @Override
097      public void start( Duct previous )
098        {
099        next.start( this );
100        }
101    
102      @Override
103      public void receive( Duct previous, final Grouping<TupleEntry, TupleEntryIterator> grouping )
104        {
105        try
106          {
107          // we want to null out any 'values' before and after the iterator begins/ends
108          // this allows buffers to emit tuples before next() and when hasNext() return false;
109          final TupleEntry tupleEntry = grouping.joinIterator.getTupleEntry();
110          incomingEntry = tupleEntry;
111    
112          // if Fields.NONE are declared on the CoGroup, we don't provide arguments, only the joinerClosure
113          if( !tupleEntry.getFields().isNone() )
114            {
115            final Tuple valueNulledTuple = Tuples.setOnEmpty( tupleEntry, grouping.key );
116            tupleEntry.setTuple( valueNulledTuple );
117    
118            operationCall.setArgumentsIterator( createArgumentsIterator( grouping, tupleEntry, valueNulledTuple ) );
119            }
120    
121          operationCall.setOutputCollector( outputCollector );
122          operationCall.setJoinerClosure( grouping.joinerClosure );
123          operationCall.setGroup( grouping.key );
124    
125          buffer.operate( flowProcess, operationCall );
126          }
127        catch( CascadingException exception )
128          {
129          handleException( exception, argumentsEntry );
130          }
131        catch( Throwable throwable )
132          {
133          handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry );
134          }
135        }
136    
137      private Iterator<TupleEntry> createArgumentsIterator( final Grouping<TupleEntry, TupleEntryIterator> grouping, final TupleEntry tupleEntry, final Tuple valueNulledTuple )
138        {
139        return new Iterator<TupleEntry>()
140        {
141        public boolean hasNext()
142          {
143          boolean hasNext = grouping.joinIterator.hasNext();
144    
145          if( !hasNext && !operationCall.isRetainValues() )
146            tupleEntry.setTuple( valueNulledTuple ); // null out footer entries
147    
148          return hasNext;
149          }
150    
151        public TupleEntry next()
152          {
153          argumentsEntry.setTuple( argumentsBuilder.makeResult( grouping.joinIterator.next().getTuple(), null ) );
154    
155          return argumentsEntry;
156          }
157    
158        public void remove()
159          {
160          grouping.joinIterator.remove();
161          }
162        };
163        }
164      }