001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Set; 025 026 import cascading.flow.planner.Scope; 027 import cascading.operation.Assertion; 028 import cascading.operation.AssertionLevel; 029 import cascading.operation.Debug; 030 import cascading.operation.DebugLevel; 031 import cascading.operation.Filter; 032 import cascading.operation.Function; 033 import cascading.operation.ValueAssertion; 034 import cascading.tuple.Fields; 035 import cascading.tuple.Tuple; 036 037 /** 038 * The Each operator applies either a {@link Function} or a {@link Filter} to each entry in the {@link Tuple} 039 * stream. Any number of Each operators can follow an Each, {@link Splice}, or {@link Every} 040 * operator. 041 */ 042 public class Each extends Operator 043 { 044 /** Field FUNCTION_SELECTOR */ 045 private static final Fields FUNCTION_SELECTOR = Fields.RESULTS; 046 /** Field FILTER_SELECTOR */ 047 private static final Fields FILTER_SELECTOR = Fields.RESULTS; 048 049 /////////////////// 050 // TAKE FUNCTIONS 051 /////////////////// 052 053 /** 054 * Pass all fields to the given function, only return fields declared by the function. 055 * 056 * @param name name for this branch of Pipes 057 * @param function Function to be applied to each input Tuple 058 */ 059 @ConstructorProperties({"name", "function"}) 060 public Each( String name, Function function ) 061 { 062 super( name, function, FUNCTION_SELECTOR ); 063 } 064 065 /** 066 * Only pass argumentFields to the given function, only return fields declared by the function. 067 * 068 * @param name name for this branch of Pipes 069 * @param argumentSelector field selector that selects Function arguments from the input Tuple 070 * @param function Function to be applied to each input Tuple 071 */ 072 @ConstructorProperties({"name", "argumentSelector", "function"}) 073 public Each( String name, Fields argumentSelector, Function function ) 074 { 075 super( name, argumentSelector, function, FUNCTION_SELECTOR ); 076 } 077 078 /** 079 * Only pass argumentFields to the given function, only return fields selected by the outputSelector. 080 * 081 * @param name name for this branch of Pipes 082 * @param argumentSelector field selector that selects Function arguments from the input Tuple 083 * @param function Function to be applied to each input Tuple 084 * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples 085 */ 086 @ConstructorProperties({"name", "argumentSelector", "function", "outputSelector"}) 087 public Each( String name, Fields argumentSelector, Function function, Fields outputSelector ) 088 { 089 super( name, argumentSelector, function, outputSelector ); 090 } 091 092 /** 093 * Only return fields selected by the outputSelector. 094 * 095 * @param name name for this branch of Pipes 096 * @param function Function to be applied to each input Tuple 097 * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples 098 */ 099 @ConstructorProperties({"name", "function", "outputSelector"}) 100 public Each( String name, Function function, Fields outputSelector ) 101 { 102 super( name, function, outputSelector ); 103 } 104 105 /** 106 * Pass all fields to the given function, only return fields declared by the function. 107 * 108 * @param previous previous Pipe to receive input Tuples from 109 * @param function Function to be applied to each input Tuple 110 */ 111 @ConstructorProperties({"previous", "function"}) 112 public Each( Pipe previous, Function function ) 113 { 114 super( previous, function, FUNCTION_SELECTOR ); 115 } 116 117 /** 118 * Only pass argumentFields to the given function, only return fields declared by the function. 119 * 120 * @param previous previous Pipe to receive input Tuples from 121 * @param argumentSelector field selector that selects Function arguments from the input Tuple 122 * @param function Function to be applied to each input Tuple 123 */ 124 @ConstructorProperties({"previous", "argumentSelector", "function"}) 125 public Each( Pipe previous, Fields argumentSelector, Function function ) 126 { 127 super( previous, argumentSelector, function, FUNCTION_SELECTOR ); 128 } 129 130 /** 131 * Only pass argumentFields to the given function, only return fields selected by the outputSelector. 132 * 133 * @param previous previous Pipe to receive input Tuples from 134 * @param argumentSelector field selector that selects Function arguments from the input Tuple 135 * @param function Function to be applied to each input Tuple 136 * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples 137 */ 138 @ConstructorProperties({"previous", "argumentSelector", "function", "outputSelector"}) 139 public Each( Pipe previous, Fields argumentSelector, Function function, Fields outputSelector ) 140 { 141 super( previous, argumentSelector, function, outputSelector ); 142 } 143 144 /** 145 * Only pass argumentFields to the given function, only return fields selected by the outputSelector. 146 * 147 * @param previous previous Pipe to receive input Tuples from 148 * @param function Function to be applied to each input Tuple 149 * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples 150 */ 151 @ConstructorProperties({"previous", "function", "outputSelector"}) 152 public Each( Pipe previous, Function function, Fields outputSelector ) 153 { 154 super( previous, function, outputSelector ); 155 } 156 157 ///////////////// 158 // TAKE FILTERS 159 ///////////////// 160 161 /** 162 * Constructor Each creates a new Each instance. 163 * 164 * @param name name for this branch of Pipes 165 * @param filter Filter to be applied to each input Tuple 166 */ 167 @ConstructorProperties({"name", "filter"}) 168 public Each( String name, Filter filter ) 169 { 170 super( name, filter, FILTER_SELECTOR ); 171 } 172 173 /** 174 * Constructor Each creates a new Each instance. 175 * 176 * @param name name for this branch of Pipes 177 * @param argumentSelector field selector that selects Function arguments from the input Tuple 178 * @param filter Filter to be applied to each input Tuple 179 */ 180 @ConstructorProperties({"name", "argumentSelector", "filter"}) 181 public Each( String name, Fields argumentSelector, Filter filter ) 182 { 183 super( name, argumentSelector, filter, FILTER_SELECTOR ); 184 } 185 186 /** 187 * Constructor Each creates a new Each instance. 188 * 189 * @param previous previous Pipe to receive input Tuples from 190 * @param filter Filter to be applied to each input Tuple 191 */ 192 @ConstructorProperties({"previous", "filter"}) 193 public Each( Pipe previous, Filter filter ) 194 { 195 super( previous, filter, FILTER_SELECTOR ); 196 } 197 198 /** 199 * Constructor Each creates a new Each instance. 200 * 201 * @param previous previous Pipe to receive input Tuples from 202 * @param argumentSelector field selector that selects Function arguments from the input Tuple 203 * @param filter Filter to be applied to each input Tuple 204 */ 205 @ConstructorProperties({"previous", "argumentSelector", "filter"}) 206 public Each( Pipe previous, Fields argumentSelector, Filter filter ) 207 { 208 super( previous, argumentSelector, filter, FILTER_SELECTOR ); 209 } 210 211 /////////////// 212 // ASSERTIONS 213 /////////////// 214 215 /** 216 * Constructor Each creates a new Each instance. 217 * 218 * @param name name for this branch of Pipes 219 * @param assertionLevel AssertionLevel to associate with the Assertion 220 * @param assertion Assertion to be applied to each input Tuple 221 */ 222 @ConstructorProperties({"name", "assertionLevel", "assertion"}) 223 public Each( String name, AssertionLevel assertionLevel, Assertion assertion ) 224 { 225 super( name, assertionLevel, assertion, FILTER_SELECTOR ); 226 } 227 228 /** 229 * @param name name for this branch of Pipes 230 * @param argumentSelector field selector that selects Function arguments from the input Tuple 231 * @param assertionLevel AssertionLevel to associate with the Assertion 232 * @param assertion Assertion to be applied to each input Tuple 233 */ 234 @ConstructorProperties({"name", "argumentSelector", "assertionLevel", "assertion"}) 235 public Each( String name, Fields argumentSelector, AssertionLevel assertionLevel, Assertion assertion ) 236 { 237 super( name, argumentSelector, assertionLevel, assertion, FILTER_SELECTOR ); 238 } 239 240 /** 241 * @param previous previous Pipe to receive input Tuples from 242 * @param assertionLevel AssertionLevel to associate with the Assertion 243 * @param assertion Assertion to be applied to each input Tuple 244 */ 245 @ConstructorProperties({"previous", "assertionLevel", "assertion"}) 246 public Each( Pipe previous, AssertionLevel assertionLevel, Assertion assertion ) 247 { 248 super( previous, assertionLevel, assertion, FILTER_SELECTOR ); 249 } 250 251 /** 252 * @param previous previous Pipe to receive input Tuples from 253 * @param argumentSelector field selector that selects Function arguments from the input Tuple 254 * @param assertionLevel AssertionLevel to associate with the Assertion 255 * @param assertion Assertion to be applied to each input Tuple 256 */ 257 @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"}) 258 public Each( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, Assertion assertion ) 259 { 260 super( previous, argumentSelector, assertionLevel, assertion, FILTER_SELECTOR ); 261 } 262 263 ////////// 264 //DEBUG 265 ////////// 266 267 /** 268 * @param name name for this branch of Pipes 269 * @param argumentSelector field selector that selects Function arguments from the input Tuple 270 * @param debugLevel DebugLevel to associate with the Debug 271 * @param debug Debug to be applied to each input Tuple 272 */ 273 @ConstructorProperties({"name", "argumentSelector", "debugLevel", "debug"}) 274 public Each( String name, Fields argumentSelector, DebugLevel debugLevel, Debug debug ) 275 { 276 super( name, argumentSelector, debugLevel, debug, FILTER_SELECTOR ); 277 } 278 279 /** 280 * @param previous previous Pipe to receive input Tuples from 281 * @param debugLevel DebugLevel to associate with the Debug 282 * @param debug Debug to be applied to each input Tuple 283 */ 284 @ConstructorProperties({"previous", "debugLevel", "debug"}) 285 public Each( Pipe previous, DebugLevel debugLevel, Debug debug ) 286 { 287 super( previous, debugLevel, debug, FILTER_SELECTOR ); 288 } 289 290 /** 291 * @param previous previous Pipe to receive input Tuples from 292 * @param argumentSelector field selector that selects Function arguments from the input Tuple 293 * @param debugLevel DebugLevel to associate with the Debug 294 * @param debug Debug to be applied to each input Tuple 295 */ 296 @ConstructorProperties({"previous", "argumentSelector", "debugLevel", "debug"}) 297 public Each( Pipe previous, Fields argumentSelector, DebugLevel debugLevel, Debug debug ) 298 { 299 super( previous, argumentSelector, debugLevel, debug, FILTER_SELECTOR ); 300 } 301 302 @Override 303 protected void verifyOperation() 304 { 305 // backwards compatibility with 1.0 306 if( plannerLevel == null && operation instanceof Debug ) 307 plannerLevel = DebugLevel.DEFAULT; 308 309 super.verifyOperation(); 310 311 if( !argumentSelector.isArgSelector() ) 312 throw new IllegalArgumentException( "invalid argument selector: " + argumentSelector ); 313 314 if( !operation.getFieldDeclaration().isDeclarator() ) 315 throw new IllegalArgumentException( "invalid field declaration: " + operation.getFieldDeclaration() ); 316 317 if( !outputSelector.isOutSelector() ) 318 throw new IllegalArgumentException( "invalid output selector: " + outputSelector ); 319 } 320 321 public Function getFunction() 322 { 323 return (Function) operation; 324 } 325 326 public Filter getFilter() 327 { 328 return (Filter) operation; 329 } 330 331 public ValueAssertion getValueAssertion() 332 { 333 return (ValueAssertion) operation; 334 } 335 336 public boolean isFunction() 337 { 338 return operation instanceof Function; 339 } 340 341 public boolean isFilter() 342 { 343 return operation instanceof Filter; 344 } 345 346 public boolean isValueAssertion() 347 { 348 return operation instanceof ValueAssertion; 349 } 350 351 // FIELDS 352 353 @Override 354 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 355 { 356 return incomingScope.getIncomingFunctionArgumentFields(); 357 } 358 359 @Override 360 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 361 { 362 return incomingScope.getIncomingFunctionPassThroughFields(); 363 } 364 365 @Override 366 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 367 { 368 Fields argumentFields = resolveArgumentSelector( incomingScopes ); 369 370 verifyArguments( argumentFields ); 371 372 Fields declaredFields = resolveDeclared( incomingScopes, argumentFields ); 373 374 verifyDeclaredFields( declaredFields ); 375 376 Fields outgoingValuesFields = resolveOutgoingValuesSelector( incomingScopes, argumentFields, declaredFields ); 377 378 verifyOutputSelector( outgoingValuesFields ); 379 380 Fields outgoingGroupingFields = Fields.asDeclaration( outgoingValuesFields ); 381 382 // the incoming fields eligible to be outgoing 383 Fields passThroughFields = resolveIncomingOperationPassThroughFields( getFirst( incomingScopes ) ); 384 Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields ); 385 386 return new Scope( getName(), Scope.Kind.EACH, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields ); 387 } 388 389 Fields resolveOutgoingValuesSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields ) 390 { 391 try 392 { 393 return resolveOutgoingSelector( incomingScopes, argumentFields, declaredFields ); 394 } 395 catch( Exception exception ) 396 { 397 if( exception instanceof OperatorException ) 398 throw (OperatorException) exception; 399 400 throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception ); 401 } 402 } 403 }