001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe; 023 024import java.beans.ConstructorProperties; 025import java.util.Set; 026 027import cascading.flow.planner.Scope; 028import cascading.operation.Aggregator; 029import cascading.operation.AssertionLevel; 030import cascading.operation.Buffer; 031import cascading.operation.GroupAssertion; 032import cascading.tuple.Fields; 033 034/** 035 * The Every operator applies an {@link Aggregator} or {@link Buffer} to every grouping. 036 * <p> 037 * Any number of Every instances may follow other Every, {@link GroupBy}, or {@link CoGroup} instances if they apply an 038 * Aggregator, not a Buffer. If a Buffer, only one Every may follow a GroupBy or CoGroup. 039 * <p> 040 * Every operators create aggregate values for every grouping they encounter. This aggregate value is added to the current 041 * grouping Tuple. 042 * <p> 043 * In the case of a CoGroup, the grouping Tuple will be all the grouping keys from all joined streams, 044 * and if an "outer" type join is used, one value on the groupingTuple may be null. 045 * <p> 046 * Subsequent Every instances can continue to append values to the grouping Tuple. When an Each follows 047 * and Every, the Each applies its operation to the grouping Tuple (thus all child values in the grouping are discarded 048 * and only aggregate values are propagated). 049 */ 050public class Every extends Operator 051 { 052 /** Field AGGREGATOR_ARGUMENTS */ 053 private static final Fields AGGREGATOR_ARGUMENTS = Fields.ALL; 054 /** Field AGGREGATOR_SELECTOR */ 055 private static final Fields AGGREGATOR_SELECTOR = Fields.ALL; 056 /** Field ASSERTION_SELECTOR */ 057 private static final Fields ASSERTION_SELECTOR = Fields.RESULTS; 058 059 /** 060 * Constructor Every creates a new Every instance. 061 * 062 * @param previous previous Pipe to receive input Tuples from 063 * @param aggregator Aggregator to be applied to every input Tuple grouping 064 */ 065 @ConstructorProperties({"previous", "aggregator"}) 066 public Every( Pipe previous, Aggregator aggregator ) 067 { 068 super( previous, AGGREGATOR_ARGUMENTS, aggregator, AGGREGATOR_SELECTOR ); 069 } 070 071 /** 072 * Constructor Every creates a new Every instance. 073 * 074 * @param previous previous Pipe to receive input Tuples from 075 * @param argumentSelector field selector that selects Function arguments from the input Tuple 076 * @param aggregator Aggregator to be applied to every input Tuple grouping 077 */ 078 @ConstructorProperties({"previous", "argumentSelector", "aggregator"}) 079 public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator ) 080 { 081 super( previous, argumentSelector, aggregator, AGGREGATOR_SELECTOR ); 082 } 083 084 /** 085 * Constructor Every creates a new Every instance. 086 * 087 * @param previous previous Pipe to receive input Tuples from 088 * @param argumentSelector field selector that selects Function arguments from the input Tuple 089 * @param aggregator Aggregator to be applied to every input Tuple grouping 090 * @param outputSelector field selector that selects the output Tuple from the grouping and Aggregator results Tuples 091 */ 092 @ConstructorProperties({"previous", "argumentSelector", "aggregator", "outputSelector"}) 093 public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator, Fields outputSelector ) 094 { 095 super( previous, argumentSelector, aggregator, outputSelector ); 096 } 097 098 /** 099 * Constructor Every creates a new Every instance. 100 * 101 * @param previous previous Pipe to receive input Tuples from 102 * @param aggregator Aggregator to be applied to every input Tuple grouping 103 * @param outputSelector field selector that selects the output Tuple from the grouping and Aggregator results Tuples 104 */ 105 @ConstructorProperties({"previous", "aggregator", "outputSelector"}) 106 public Every( Pipe previous, Aggregator aggregator, Fields outputSelector ) 107 { 108 super( previous, AGGREGATOR_ARGUMENTS, aggregator, outputSelector ); 109 } 110 111 /** 112 * Constructor Every creates a new Every instance. 113 * 114 * @param previous previous Pipe to receive input Tuples from 115 * @param buffer Buffer to be applied to every input Tuple grouping 116 */ 117 @ConstructorProperties({"previous", "buffer"}) 118 public Every( Pipe previous, Buffer buffer ) 119 { 120 super( previous, AGGREGATOR_ARGUMENTS, buffer, AGGREGATOR_SELECTOR ); 121 } 122 123 /** 124 * Constructor Every creates a new Every instance. 125 * 126 * @param previous previous Pipe to receive input Tuples from 127 * @param argumentSelector field selector that selects Function arguments from the input Tuple 128 * @param buffer Buffer to be applied to every input Tuple grouping 129 */ 130 @ConstructorProperties({"previous", "argumentSelector", "buffer"}) 131 public Every( Pipe previous, Fields argumentSelector, Buffer buffer ) 132 { 133 super( previous, argumentSelector, buffer, AGGREGATOR_SELECTOR ); 134 } 135 136 /** 137 * Constructor Every creates a new Every instance. 138 * 139 * @param previous previous Pipe to receive input Tuples from 140 * @param argumentSelector field selector that selects Function arguments from the input Tuple 141 * @param buffer Buffer to be applied to every input Tuple grouping 142 * @param outputSelector field selector that selects the output Tuple from the grouping and Buffer results Tuples 143 */ 144 @ConstructorProperties({"previous", "argumentSelector", "buffer", "outputSelector"}) 145 public Every( Pipe previous, Fields argumentSelector, Buffer buffer, Fields outputSelector ) 146 { 147 super( previous, argumentSelector, buffer, outputSelector ); 148 } 149 150 /** 151 * Constructor Every creates a new Every instance. 152 * 153 * @param previous previous Pipe to receive input Tuples from 154 * @param buffer Buffer to be applied to every input Tuple grouping 155 * @param outputSelector field selector that selects the output Tuple from the grouping and Buffer results Tuples 156 */ 157 @ConstructorProperties({"previous", "buffer", "outputSelector"}) 158 public Every( Pipe previous, Buffer buffer, Fields outputSelector ) 159 { 160 super( previous, AGGREGATOR_ARGUMENTS, buffer, outputSelector ); 161 } 162 163 /** 164 * Constructor Every creates a new Every instance. 165 * 166 * @param previous previous Pipe to receive input Tuples from 167 * @param assertionLevel of type AssertionLevel 168 * @param assertion GroupAssertion to be applied to every input Tuple grouping 169 */ 170 @ConstructorProperties({"previous", "assertionLevel", "assertion"}) 171 public Every( Pipe previous, AssertionLevel assertionLevel, GroupAssertion assertion ) 172 { 173 super( previous, AGGREGATOR_ARGUMENTS, assertionLevel, assertion, ASSERTION_SELECTOR ); 174 } 175 176 /** 177 * Constructor Every creates a new Every instance. 178 * 179 * @param previous previous Pipe to receive input Tuples from 180 * @param argumentSelector field selector that selects Function arguments from the input Tuple 181 * @param assertionLevel AssertionLevel to associate with the Assertion 182 * @param assertion GroupAssertion to be applied to every input Tuple grouping 183 */ 184 @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"}) 185 public Every( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, GroupAssertion assertion ) 186 { 187 super( previous, argumentSelector, assertionLevel, assertion, ASSERTION_SELECTOR ); 188 } 189 190 /** 191 * Method isBuffer returns true if this Every instance holds a {@link cascading.operation.Buffer} operation. 192 * 193 * @return boolean 194 */ 195 public boolean isBuffer() 196 { 197 return operation instanceof Buffer; 198 } 199 200 /** 201 * Method isReducer returns true if this Every instance holds a {@link Aggregator} operation. 202 * 203 * @return boolean 204 */ 205 public boolean isAggregator() 206 { 207 return operation instanceof Aggregator; 208 } 209 210 public boolean isGroupAssertion() 211 { 212 return operation instanceof GroupAssertion; 213 } 214 215 public Aggregator getAggregator() 216 { 217 return (Aggregator) operation; 218 } 219 220 public Buffer getBuffer() 221 { 222 return (Buffer) operation; 223 } 224 225 public GroupAssertion getGroupAssertion() 226 { 227 return (GroupAssertion) operation; 228 } 229 230 @Override 231 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 232 { 233 if( isBuffer() ) 234 return incomingScope.getIncomingBufferArgumentFields(); 235 else 236 return incomingScope.getIncomingAggregatorArgumentFields(); 237 } 238 239 @Override 240 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 241 { 242 if( isBuffer() ) 243 return incomingScope.getIncomingBufferPassThroughFields(); 244 else 245 return incomingScope.getIncomingAggregatorPassThroughFields(); 246 } 247 248 @Override 249 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 250 { 251 Scope incomingScope = getFirst( incomingScopes ); 252 253 if( !isBuffer() && incomingScope.getOutValuesFields().isNone() ) 254 throw new OperatorException( this, "only a Buffer may be preceded by a CoGroup declaring Fields.NONE as the join fields" ); 255 256 Fields argumentFields = resolveArgumentSelector( incomingScopes ); 257 258 verifyArguments( argumentFields ); 259 260 // we currently don't support using result from a previous Every in the current Every 261 verifyAggregatorArguments( argumentFields, incomingScope ); 262 263 Fields declaredFields = resolveDeclared( incomingScopes, argumentFields ); 264 265 verifyDeclaredFields( declaredFields ); 266 267 Fields outgoingGroupingFields = resolveOutgoingGroupingSelector( incomingScopes, argumentFields, declaredFields ); 268 269 verifyOutputSelector( outgoingGroupingFields ); 270 271 Fields outgoingValuesFields = incomingScope.getOutValuesFields(); 272 273 // the incoming fields eligible to be outgoing, for Every only the grouping fields. 274 Fields passThroughFields = resolveIncomingOperationPassThroughFields( incomingScope ); 275 Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields ); 276 277 return new Scope( getName(), Scope.Kind.EVERY, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields ); 278 } 279 280 private void verifyAggregatorArguments( Fields argumentFields, Scope incomingScope ) 281 { 282 if( ( !isBuffer() ) && incomingScope.isEvery() && argumentFields.contains( incomingScope.getOperationDeclaredFields() ) ) 283 throw new OperatorException( this, "arguments may not select a declared field from a previous Every" ); 284 } 285 286 Fields resolveOutgoingGroupingSelector( Set<Scope> incomingScopes, Fields argumentSelector, Fields declared ) 287 { 288 try 289 { 290 return resolveOutgoingSelector( incomingScopes, argumentSelector, declared ); 291 } 292 catch( Exception exception ) 293 { 294 if( exception instanceof OperatorException ) 295 throw (OperatorException) exception; 296 297 if( isBuffer() ) 298 throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception ); 299 else 300 throw new OperatorException( this, "could not resolve outgoing grouping selector in: " + this, exception ); 301 } 302 } 303 }