001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import cascading.CascadingException;
025import cascading.flow.FlowProcess;
026import cascading.flow.stream.duct.Duct;
027import cascading.flow.stream.duct.Reducing;
028import cascading.operation.GroupAssertion;
029import cascading.pipe.Every;
030import cascading.pipe.OperatorException;
031import cascading.tuple.Fields;
032import cascading.tuple.TupleEntry;
033
034/**
035 *
036 */
037public class GroupAssertionEveryStage extends EveryStage<TupleEntry> implements Reducing<TupleEntry, TupleEntry>
038  {
039  private GroupAssertion groupAssertion;
040  private Reducing reducing;
041
042  public GroupAssertionEveryStage( FlowProcess flowProcess, Every every )
043    {
044    super( flowProcess, every );
045    }
046
047  @Override
048  protected Fields getIncomingPassThroughFields()
049    {
050    return incomingScopes.get( 0 ).getIncomingAggregatorPassThroughFields();
051    }
052
053  @Override
054  protected Fields getIncomingArgumentsFields()
055    {
056    return incomingScopes.get( 0 ).getIncomingAggregatorArgumentFields();
057    }
058
059  @Override
060  protected Fields getOutgoingSelector()
061    {
062    return outgoingScopes.get( 0 ).getOutGroupingSelector();
063    }
064
065  @Override
066  public void initialize()
067    {
068    super.initialize();
069
070    groupAssertion = every.getGroupAssertion();
071
072    reducing = (Reducing) getNext();
073    }
074
075  @Override
076  public void startGroup( Duct previous, TupleEntry groupEntry )
077    {
078    operationCall.setGroup( groupEntry );
079    operationCall.setArguments( null );  // zero it out
080    operationCall.setOutputCollector( null ); // zero it out
081
082    try
083      {
084      groupAssertion.start( flowProcess, operationCall );
085      }
086    catch( CascadingException exception )
087      {
088      handleException( exception, groupEntry );
089      }
090    catch( Throwable throwable )
091      {
092      handleException( new OperatorException( every, "operator Every failed starting operation: " + every.getOperation(), throwable ), groupEntry );
093      }
094
095    reducing.startGroup( this, groupEntry );
096    }
097
098  @Override
099  public void receive( Duct previous, int ordinal, TupleEntry tupleEntry )
100    {
101    try
102      {
103      argumentsEntry.setTuple( argumentsBuilder.makeResult( tupleEntry.getTuple(), null ) );
104      operationCall.setArguments( argumentsEntry );
105
106      groupAssertion.aggregate( flowProcess, operationCall );
107      }
108    catch( CascadingException exception )
109      {
110      handleException( exception, argumentsEntry );
111      }
112    catch( Throwable throwable )
113      {
114      handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry );
115      }
116
117    next.receive( this, 0, tupleEntry );
118    }
119
120  @Override
121  public void completeGroup( Duct previous, TupleEntry incomingEntry )
122    {
123    this.incomingEntry = incomingEntry;
124    operationCall.setArguments( null );
125
126    try
127      {
128      groupAssertion.doAssert( flowProcess, operationCall ); // collector calls next
129
130      reducing.completeGroup( this, incomingEntry );
131      }
132    catch( CascadingException exception )
133      {
134      handleException( exception, incomingEntry );
135      }
136    catch( Throwable throwable )
137      {
138      handleException( new OperatorException( every, "operator Every failed completing operation: " + every.getOperation(), throwable ), incomingEntry );
139      }
140    }
141  }