001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import cascading.CascadingException;
024    import cascading.flow.FlowProcess;
025    import cascading.operation.GroupAssertion;
026    import cascading.pipe.Every;
027    import cascading.pipe.OperatorException;
028    import cascading.tuple.Fields;
029    import cascading.tuple.TupleEntry;
030    
031    /**
032     *
033     */
034    public class GroupAssertionEveryStage extends EveryStage<TupleEntry> implements Reducing<TupleEntry, TupleEntry>
035      {
036      private GroupAssertion groupAssertion;
037      private Reducing reducing;
038    
039      public GroupAssertionEveryStage( FlowProcess flowProcess, Every every )
040        {
041        super( flowProcess, every );
042        }
043    
044      @Override
045      protected Fields getIncomingPassThroughFields()
046        {
047        return incomingScopes.get( 0 ).getIncomingAggregatorPassThroughFields();
048        }
049    
050      @Override
051      protected Fields getIncomingArgumentsFields()
052        {
053        return incomingScopes.get( 0 ).getIncomingAggregatorArgumentFields();
054        }
055    
056      @Override
057      protected Fields getOutgoingSelector()
058        {
059        return outgoingScopes.get( 0 ).getOutGroupingSelector();
060        }
061    
062      @Override
063      public void initialize()
064        {
065        super.initialize();
066    
067        groupAssertion = every.getGroupAssertion();
068    
069        reducing = (Reducing) getNext();
070        }
071    
072      @Override
073      public void startGroup( Duct previous, TupleEntry groupEntry )
074        {
075        operationCall.setGroup( groupEntry );
076        operationCall.setArguments( null );  // zero it out
077        operationCall.setOutputCollector( null ); // zero it out
078    
079        try
080          {
081          groupAssertion.start( flowProcess, operationCall );
082          }
083        catch( CascadingException exception )
084          {
085          handleException( exception, groupEntry );
086          }
087        catch( Throwable throwable )
088          {
089          handleException( new OperatorException( every, "operator Every failed starting operation: " + every.getOperation(), throwable ), groupEntry );
090          }
091    
092        reducing.startGroup( this, groupEntry );
093        }
094    
095      @Override
096      public void receive( Duct previous, TupleEntry tupleEntry )
097        {
098        try
099          {
100          argumentsEntry.setTuple( argumentsBuilder.makeResult( tupleEntry.getTuple(), null ) );
101          operationCall.setArguments( argumentsEntry );
102    
103          groupAssertion.aggregate( flowProcess, operationCall );
104          }
105        catch( CascadingException exception )
106          {
107          handleException( exception, argumentsEntry );
108          }
109        catch( Throwable throwable )
110          {
111          handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry );
112          }
113    
114        next.receive( this, tupleEntry );
115        }
116    
117      @Override
118      public void completeGroup( Duct previous, TupleEntry incomingEntry )
119        {
120        this.incomingEntry = incomingEntry;
121        operationCall.setArguments( null );
122    
123        try
124          {
125          groupAssertion.doAssert( flowProcess, operationCall ); // collector calls next
126    
127          reducing.completeGroup( this, incomingEntry );
128          }
129        catch( CascadingException exception )
130          {
131          handleException( exception, incomingEntry );
132          }
133        catch( Throwable throwable )
134          {
135          handleException( new OperatorException( every, "operator Every failed completing operation: " + every.getOperation(), throwable ), incomingEntry );
136          }
137        }
138      }