001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import java.io.IOException; 025 026import cascading.CascadingException; 027import cascading.flow.FlowProcess; 028import cascading.flow.stream.duct.Duct; 029import cascading.flow.stream.duct.Reducing; 030import cascading.operation.Aggregator; 031import cascading.pipe.Every; 032import cascading.pipe.OperatorException; 033import cascading.tuple.Fields; 034import cascading.tuple.Tuple; 035import cascading.tuple.TupleEntry; 036import cascading.tuple.TupleEntryCollector; 037import cascading.tuple.Tuples; 038 039/** 040 * 041 */ 042public class AggregatorEveryStage extends EveryStage<TupleEntry> implements Reducing<TupleEntry, TupleEntry> 043 { 044 private Aggregator aggregator; 045 private Reducing reducing; 046 047 public AggregatorEveryStage( FlowProcess flowProcess, Every every ) 048 { 049 super( flowProcess, every ); 050 } 051 052 @Override 053 public void initialize() 054 { 055 super.initialize(); 056 057 aggregator = every.getAggregator(); 058 059 outputCollector = new TupleEntryCollector( getOperationDeclaredFields() ) 060 { 061 @Override 062 protected void collect( TupleEntry resultEntry ) throws IOException 063 { 064 Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() ); 065 066 outgoingEntry.setTuple( outgoing ); 067 068 try 069 { 070 reducing.completeGroup( AggregatorEveryStage.this, outgoingEntry ); 071 } 072 finally 073 { 074 Tuples.asModifiable( outgoing ); 075 } 076 } 077 }; 078 079 reducing = (Reducing) getNext(); 080 } 081 082 @Override 083 protected Fields getIncomingPassThroughFields() 084 { 085 return incomingScopes.get( 0 ).getIncomingAggregatorPassThroughFields(); 086 } 087 088 @Override 089 protected Fields getIncomingArgumentsFields() 090 { 091 return incomingScopes.get( 0 ).getIncomingAggregatorArgumentFields(); 092 } 093 094 @Override 095 protected Fields getOutgoingSelector() 096 { 097 return outgoingScopes.get( 0 ).getOutGroupingSelector(); 098 } 099 100 @Override 101 public void startGroup( Duct previous, TupleEntry groupEntry ) 102 { 103 operationCall.setGroup( groupEntry ); 104 operationCall.setArguments( null ); // zero it out 105 operationCall.setOutputCollector( null ); // zero it out 106 107 try 108 { 109 aggregator.start( flowProcess, operationCall ); 110 } 111 catch( CascadingException exception ) 112 { 113 handleException( exception, groupEntry ); 114 } 115 catch( Throwable throwable ) 116 { 117 handleException( new OperatorException( every, "operator Every failed starting operation: " + every.getOperation(), throwable ), groupEntry ); 118 } 119 120 reducing.startGroup( this, groupEntry ); 121 } 122 123 @Override 124 public void receive( Duct previous, int ordinal, TupleEntry tupleEntry ) 125 { 126 try 127 { 128 argumentsEntry.setTuple( argumentsBuilder.makeResult( tupleEntry.getTuple(), null ) ); 129 operationCall.setArguments( argumentsEntry ); 130 131 aggregator.aggregate( flowProcess, operationCall ); 132 } 133 catch( CascadingException exception ) 134 { 135 handleException( exception, argumentsEntry ); 136 } 137 catch( Throwable throwable ) 138 { 139 handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry ); 140 } 141 142 next.receive( this, ordinal, tupleEntry ); 143 } 144 145 @Override 146 public void completeGroup( Duct previous, TupleEntry incomingEntry ) 147 { 148 this.incomingEntry = incomingEntry; 149 operationCall.setArguments( null ); 150 operationCall.setOutputCollector( outputCollector ); 151 152 try 153 { 154 aggregator.complete( flowProcess, operationCall ); // collector calls next 155 } 156 catch( CascadingException exception ) 157 { 158 handleException( exception, incomingEntry ); 159 } 160 catch( Throwable throwable ) 161 { 162 handleException( new OperatorException( every, "operator Every failed completing operation: " + every.getOperation(), throwable ), incomingEntry ); 163 } 164 } 165 }