001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import cascading.flow.FlowElement;
024import cascading.flow.FlowProcess;
025import cascading.flow.planner.Scope;
026import cascading.flow.stream.duct.Duct;
027import cascading.operation.ConcreteCall;
028import cascading.pipe.Operator;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.tuple.TupleEntry;
032import cascading.tuple.TupleEntryCollector;
033import cascading.tuple.util.TupleBuilder;
034import cascading.tuple.util.TupleViews;
035
036import static cascading.tuple.util.TupleViews.*;
037
038/**
039 *
040 */
041public abstract class OperatorStage<Incoming> extends ElementStage<Incoming, TupleEntry>
042  {
043  /**
044   * In 2.2 the collector is now nulled before the
045   * {@link cascading.operation.Operation#cleanup(cascading.flow.FlowProcess, cascading.operation.OperationCall)}
046   * is called. This property retains the collector to remain compatible with 2.1.
047   */
048  public static final String RETAIN_COLLECTOR = "cascading.compatibility.retain.collector";
049
050  protected ConcreteCall operationCall;
051  protected TupleEntry incomingEntry;
052  protected Fields argumentsSelector;
053  protected TupleEntry argumentsEntry;
054  protected Fields remainderFields;
055  protected Fields outgoingSelector;
056  protected TupleEntry outgoingEntry;
057
058  protected TupleBuilder argumentsBuilder;
059  protected TupleBuilder outgoingBuilder;
060
061  private final boolean retainCollector;
062
063  protected TupleEntryCollector outputCollector;
064
065  public OperatorStage( FlowProcess flowProcess, FlowElement flowElement )
066    {
067    super( flowProcess, flowElement );
068
069    this.retainCollector = Boolean.parseBoolean( flowProcess.getStringProperty( RETAIN_COLLECTOR ) );
070    }
071
072  public abstract Operator getOperator();
073
074  protected abstract Fields getOutgoingSelector();
075
076  protected Fields getOperationDeclaredFields()
077    {
078    return outgoingScopes.get( 0 ).getOperationDeclaredFields();
079    }
080
081  protected abstract Fields getIncomingPassThroughFields();
082
083  protected abstract Fields getIncomingArgumentsFields();
084
085  protected TupleBuilder createArgumentsBuilder( final Fields incomingFields, final Fields argumentsSelector )
086    {
087    if( incomingFields.isUnknown() )
088      return new TupleBuilder()
089        {
090        @Override
091        public Tuple makeResult( Tuple input, Tuple output )
092          {
093          return input.get( incomingFields, argumentsSelector );
094          }
095        };
096
097    if( argumentsSelector.isAll() )
098      return new TupleBuilder()
099        {
100        @Override
101        public Tuple makeResult( Tuple input, Tuple output )
102          {
103          return input;
104          }
105        };
106
107    if( argumentsSelector.isNone() )
108      return new TupleBuilder()
109        {
110        @Override
111        public Tuple makeResult( Tuple input, Tuple output )
112          {
113          return Tuple.NULL;
114          }
115        };
116
117    final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );
118
119    return new TupleBuilder()
120      {
121      Tuple result = createNarrow( inputDeclarationFields.getPos( argumentsSelector ) );
122
123      @Override
124      public Tuple makeResult( Tuple input, Tuple output )
125        {
126        return TupleViews.reset( result, input );
127        }
128      };
129    }
130
131  protected TupleBuilder createOutgoingBuilder( final Operator operator, final Fields incomingFields, final Fields argumentSelector, final Fields remainderFields, final Fields declaredFields, final Fields outgoingSelector )
132    {
133    final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );
134
135    if( operator.getOutputSelector().isResults() )
136      return new TupleBuilder()
137        {
138        @Override
139        public Tuple makeResult( Tuple input, Tuple output )
140          {
141          return output;
142          }
143        };
144
145    if( operator.getOutputSelector().isAll() && !( incomingFields.isUnknown() || declaredFields.isUnknown() ) )
146      return new TupleBuilder()
147        {
148        Tuple result = createComposite( inputDeclarationFields, declaredFields );
149
150        @Override
151        public Tuple makeResult( Tuple input, Tuple output )
152          {
153          return TupleViews.reset( result, input, output );
154          }
155        };
156
157    if( operator.getOutputSelector().isReplace() )
158      {
159      if( incomingFields.isUnknown() )
160        return new TupleBuilder()
161          {
162          Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;
163
164          @Override
165          public Tuple makeResult( Tuple input, Tuple output )
166            {
167            Tuple result = new Tuple( input );
168
169            result.set( Fields.UNKNOWN, resultFields, output );
170
171            return result;
172            }
173          };
174
175      return new TupleBuilder()
176        {
177        Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;
178        Tuple result = createOverride( inputDeclarationFields, resultFields );
179
180        @Override
181        public Tuple makeResult( Tuple input, Tuple output )
182          {
183          return TupleViews.reset( result, input, output );
184          }
185        };
186      }
187
188    if( operator.getOutputSelector().isSwap() )
189      {
190      if( remainderFields.size() == 0 ) // the same as Fields.RESULTS
191        return new TupleBuilder()
192          {
193          @Override
194          public Tuple makeResult( Tuple input, Tuple output )
195            {
196            return output;
197            }
198          };
199      else if( declaredFields.isUnknown() )
200        return new TupleBuilder()
201          {
202          @Override
203          public Tuple makeResult( Tuple input, Tuple output )
204            {
205            return input.get( incomingFields, remainderFields ).append( output );
206            }
207          };
208      else
209        return new TupleBuilder()
210          {
211          Tuple view = createNarrow( inputDeclarationFields.getPos( remainderFields ) );
212          Tuple result = createComposite( Fields.asDeclaration( remainderFields ), declaredFields );
213
214          @Override
215          public Tuple makeResult( Tuple input, Tuple output )
216            {
217            TupleViews.reset( view, input );
218
219            return TupleViews.reset( result, view, output );
220            }
221          };
222      }
223
224    if( incomingFields.isUnknown() || declaredFields.isUnknown() )
225      return new TupleBuilder()
226        {
227        Fields selector = outgoingSelector.isUnknown() ? Fields.ALL : outgoingSelector;
228        TupleEntry incoming = new TupleEntry( incomingFields, true );
229        TupleEntry declared = new TupleEntry( declaredFields, true );
230
231        @Override
232        public Tuple makeResult( Tuple input, Tuple output )
233          {
234          incoming.setTuple( input );
235          declared.setTuple( output );
236
237          return TupleEntry.select( selector, incoming, declared );
238          }
239        };
240
241    return new TupleBuilder()
242      {
243      Fields inputFields = operator.getFieldDeclaration().isArguments() ? Fields.mask( inputDeclarationFields, declaredFields ) : inputDeclarationFields;
244      Tuple appended = createComposite( inputFields, declaredFields );
245      Fields allFields = Fields.resolve( Fields.ALL, inputFields, declaredFields );
246      Tuple result = createNarrow( allFields.getPos( outgoingSelector ), appended );
247
248      @Override
249      public Tuple makeResult( Tuple input, Tuple output )
250        {
251        TupleViews.reset( appended, input, output );
252
253        return result;
254        }
255      };
256    }
257
258  @Override
259  public void initialize()
260    {
261    Scope outgoingScope = outgoingScopes.get( 0 );
262
263    operationCall = new ConcreteCall( outgoingScope.getArgumentsDeclarator(), outgoingScope.getOperationDeclaredFields() );
264
265    argumentsSelector = outgoingScope.getArgumentsSelector();
266    remainderFields = outgoingScope.getRemainderPassThroughFields();
267    outgoingSelector = getOutgoingSelector();
268
269    argumentsEntry = new TupleEntry( outgoingScope.getArgumentsDeclarator(), true );
270
271    outgoingEntry = new TupleEntry( getOutgoingFields(), true );
272
273    operationCall.setArguments( argumentsEntry );
274
275    argumentsBuilder = createArgumentsBuilder( getIncomingArgumentsFields(), argumentsSelector );
276    outgoingBuilder = createOutgoingBuilder( getOperator(), getIncomingPassThroughFields(), argumentsSelector, remainderFields, getOperationDeclaredFields(), outgoingSelector );
277    }
278
279  @Override
280  public void prepare()
281    {
282    super.prepare(); // if fails, skip the this prepare
283
284    ( (Operator) getFlowElement() ).getOperation().prepare( flowProcess, operationCall );
285    }
286
287  @Override
288  public void complete( Duct previous )
289    {
290    try
291      {
292      ( (Operator) getFlowElement() ).getOperation().flush( flowProcess, operationCall );
293      }
294    finally
295      {
296      super.complete( previous );
297      }
298    }
299
300  @Override
301  public void cleanup()
302    {
303    if( !retainCollector ) // see comments for RETAIN_COLLECTOR
304      operationCall.setOutputCollector( null );
305
306    try
307      {
308      ( (Operator) getFlowElement() ).getOperation().cleanup( flowProcess, operationCall );
309      }
310    finally
311      {
312      super.cleanup(); // guarantee this happens
313      }
314    }
315  }