001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import java.io.IOException;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.stream.duct.Duct;
029import cascading.flow.stream.duct.Reducing;
030import cascading.operation.Aggregator;
031import cascading.pipe.Every;
032import cascading.pipe.OperatorException;
033import cascading.tuple.Fields;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036import cascading.tuple.TupleEntryCollector;
037import cascading.tuple.Tuples;
038
039/**
040 *
041 */
042public class AggregatorEveryStage extends EveryStage<TupleEntry> implements Reducing<TupleEntry, TupleEntry>
043  {
044  private Aggregator aggregator;
045  private Reducing reducing;
046
047  public AggregatorEveryStage( FlowProcess flowProcess, Every every )
048    {
049    super( flowProcess, every );
050    }
051
052  @Override
053  public void initialize()
054    {
055    super.initialize();
056
057    aggregator = every.getAggregator();
058
059    outputCollector = new TupleEntryCollector( getOperationDeclaredFields() )
060      {
061      @Override
062      protected void collect( TupleEntry resultEntry ) throws IOException
063        {
064        Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() );
065
066        outgoingEntry.setTuple( outgoing );
067
068        try
069          {
070          reducing.completeGroup( AggregatorEveryStage.this, outgoingEntry );
071          }
072        finally
073          {
074          Tuples.asModifiable( outgoing );
075          }
076        }
077      };
078
079    reducing = (Reducing) getNext();
080    }
081
082  @Override
083  protected Fields getIncomingPassThroughFields()
084    {
085    return incomingScopes.get( 0 ).getIncomingAggregatorPassThroughFields();
086    }
087
088  @Override
089  protected Fields getIncomingArgumentsFields()
090    {
091    return incomingScopes.get( 0 ).getIncomingAggregatorArgumentFields();
092    }
093
094  @Override
095  protected Fields getOutgoingSelector()
096    {
097    return outgoingScopes.get( 0 ).getOutGroupingSelector();
098    }
099
100  @Override
101  public void startGroup( Duct previous, TupleEntry groupEntry )
102    {
103    operationCall.setGroup( groupEntry );
104    operationCall.setArguments( null );  // zero it out
105    operationCall.setOutputCollector( null ); // zero it out
106
107    try
108      {
109      aggregator.start( flowProcess, operationCall );
110      }
111    catch( CascadingException exception )
112      {
113      handleException( exception, groupEntry );
114      }
115    catch( Throwable throwable )
116      {
117      handleException( new OperatorException( every, "operator Every failed starting operation: " + every.getOperation(), throwable ), groupEntry );
118      }
119
120    reducing.startGroup( this, groupEntry );
121    }
122
123  @Override
124  public void receive( Duct previous, int ordinal, TupleEntry tupleEntry )
125    {
126    try
127      {
128      argumentsEntry.setTuple( argumentsBuilder.makeResult( tupleEntry.getTuple(), null ) );
129      operationCall.setArguments( argumentsEntry );
130
131      aggregator.aggregate( flowProcess, operationCall );
132      }
133    catch( CascadingException exception )
134      {
135      handleException( exception, argumentsEntry );
136      }
137    catch( Throwable throwable )
138      {
139      handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry );
140      }
141
142    next.receive( this, ordinal, tupleEntry );
143    }
144
145  @Override
146  public void completeGroup( Duct previous, TupleEntry incomingEntry )
147    {
148    this.incomingEntry = incomingEntry;
149    operationCall.setArguments( null );
150    operationCall.setOutputCollector( outputCollector );
151
152    try
153      {
154      aggregator.complete( flowProcess, operationCall ); // collector calls next
155      }
156    catch( CascadingException exception )
157      {
158      handleException( exception, incomingEntry );
159      }
160    catch( Throwable throwable )
161      {
162      handleException( new OperatorException( every, "operator Every failed completing operation: " + every.getOperation(), throwable ), incomingEntry );
163      }
164    }
165  }