001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe; 022 023 import java.util.Set; 024 025 import cascading.flow.FlowElement; 026 import cascading.flow.planner.Scope; 027 import cascading.operation.Assertion; 028 import cascading.operation.AssertionLevel; 029 import cascading.operation.BaseOperation; 030 import cascading.operation.Operation; 031 import cascading.operation.PlannedOperation; 032 import cascading.operation.PlannerLevel; 033 import cascading.tuple.Fields; 034 import cascading.tuple.FieldsResolverException; 035 import cascading.tuple.TupleException; 036 037 /** 038 * An Operator is a type of {@link Pipe}. Operators pass specified arguments to a given {@link cascading.operation.BaseOperation}. 039 * </p> 040 * The argFields value select the input fields used by the operation. By default the whole input Tuple is passes as arguments. 041 * The outFields value select the fields in the result Tuple returned by this Pipe. By default, the operation results 042 * of the given operation replace the input Tuple. 043 */ 044 public abstract class Operator extends Pipe 045 { 046 /** Field operation */ 047 protected final Operation operation; 048 /** Field argumentSelector */ 049 protected Fields argumentSelector = Fields.ALL; // use wildcard. let the operation choose 050 /** Field outputSelector */ 051 protected Fields outputSelector = Fields.RESULTS; // this is overridden by the subclasses via the ctor 052 /** Field assertionLevel */ 053 protected PlannerLevel plannerLevel; // do not initialize a default 054 055 protected Operator( Operation operation ) 056 { 057 this.operation = operation; 058 verifyOperation(); 059 } 060 061 protected Operator( String name, Operation operation ) 062 { 063 super( name ); 064 this.operation = operation; 065 verifyOperation(); 066 } 067 068 protected Operator( String name, Operation operation, Fields outputSelector ) 069 { 070 super( name ); 071 this.operation = operation; 072 this.outputSelector = outputSelector; 073 verifyOperation(); 074 } 075 076 protected Operator( String name, Fields argumentSelector, Operation operation ) 077 { 078 super( name ); 079 this.operation = operation; 080 this.argumentSelector = argumentSelector; 081 verifyOperation(); 082 } 083 084 protected Operator( String name, Fields argumentSelector, Operation operation, Fields outputSelector ) 085 { 086 super( name ); 087 this.operation = operation; 088 this.argumentSelector = argumentSelector; 089 this.outputSelector = outputSelector; 090 verifyOperation(); 091 } 092 093 protected Operator( Pipe previous, Operation operation ) 094 { 095 super( previous ); 096 this.operation = operation; 097 verifyOperation(); 098 } 099 100 protected Operator( Pipe previous, Fields argumentSelector, Operation operation ) 101 { 102 super( previous ); 103 this.operation = operation; 104 this.argumentSelector = argumentSelector; 105 verifyOperation(); 106 } 107 108 protected Operator( Pipe previous, Fields argumentSelector, Operation operation, Fields outputSelector ) 109 { 110 super( previous ); 111 this.operation = operation; 112 this.argumentSelector = argumentSelector; 113 this.outputSelector = outputSelector; 114 verifyOperation(); 115 } 116 117 protected Operator( Pipe previous, Operation operation, Fields outputSelector ) 118 { 119 super( previous ); 120 this.operation = operation; 121 this.outputSelector = outputSelector; 122 verifyOperation(); 123 } 124 125 protected Operator( String name, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 126 { 127 super( name ); 128 this.plannerLevel = plannerLevel; 129 this.operation = operation; 130 this.outputSelector = outputSelector; 131 verifyOperation(); 132 } 133 134 protected Operator( String name, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 135 { 136 super( name ); 137 this.plannerLevel = plannerLevel; 138 this.operation = operation; 139 this.argumentSelector = argumentSelector; 140 this.outputSelector = outputSelector; 141 verifyOperation(); 142 } 143 144 protected Operator( Pipe previous, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 145 { 146 super( previous ); 147 this.plannerLevel = plannerLevel; 148 this.operation = operation; 149 this.outputSelector = outputSelector; 150 verifyOperation(); 151 } 152 153 protected Operator( Pipe previous, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 154 { 155 super( previous ); 156 this.plannerLevel = plannerLevel; 157 this.operation = operation; 158 this.argumentSelector = argumentSelector; 159 this.outputSelector = outputSelector; 160 verifyOperation(); 161 } 162 163 protected void verifyOperation() 164 { 165 if( operation == null ) 166 throw new IllegalArgumentException( "operation may not be null" ); 167 168 if( argumentSelector == null ) 169 throw new IllegalArgumentException( "argumentSelector may not be null" ); 170 171 if( outputSelector == null ) 172 throw new IllegalArgumentException( "outputSelector may not be null" ); 173 174 if( operation instanceof PlannedOperation ) 175 { 176 if( plannerLevel == null ) 177 throw new IllegalArgumentException( "planner level may not be null" ); 178 else if( plannerLevel.isNoneLevel() ) 179 throw new IllegalArgumentException( "given planner level: " + plannerLevel.getClass().getName() + ", may not be NONE" ); 180 } 181 } 182 183 /** 184 * Method getOperation returns the operation managed by this Operator object. 185 * 186 * @return the operation (type Operation) of this Operator object. 187 */ 188 public Operation getOperation() 189 { 190 return operation; 191 } 192 193 /** 194 * Method getArgumentSelector returns the argumentSelector of this Operator object. 195 * 196 * @return the argumentSelector (type Fields) of this Operator object. 197 */ 198 public Fields getArgumentSelector() 199 { 200 return argumentSelector; 201 } 202 203 /** 204 * Method getFieldDeclaration returns the fieldDeclaration of this Operator object. 205 * 206 * @return the fieldDeclaration (type Fields) of this Operator object. 207 */ 208 public Fields getFieldDeclaration() 209 { 210 return operation.getFieldDeclaration(); 211 } 212 213 /** 214 * Method getOutputSelector returns the outputSelector of this Operator object. 215 * 216 * @return the outputSelector (type Fields) of this Operator object. 217 */ 218 public Fields getOutputSelector() 219 { 220 return outputSelector; 221 } 222 223 /** 224 * Method getAssertionLevel returns the assertionLevel of this Operator object. Only used if the {@link cascading.operation.Operation} 225 * is an {@link Assertion}. 226 * 227 * @return the assertionLevel (type Assertion.Level) of this Operator object. 228 */ 229 @Deprecated 230 public AssertionLevel getAssertionLevel() 231 { 232 return (AssertionLevel) plannerLevel; 233 } 234 235 /** 236 * Method getPlannerLevel returns the plannerLevel of this Operator object. 237 * 238 * @return the plannerLevel (type PlannerLevel) of this Operator object. 239 */ 240 public PlannerLevel getPlannerLevel() 241 { 242 return plannerLevel; 243 } 244 245 /** 246 * Method hasPlannerLevel returns true if this Operator object holds a {@link PlannedOperation} object with an associated 247 * {@link PlannerLevel} level. 248 * 249 * @return boolean 250 */ 251 public boolean hasPlannerLevel() 252 { 253 return plannerLevel != null; 254 } 255 256 // FIELDS 257 258 protected Fields resolveRemainderFields( Set<Scope> incomingScopes, Fields argumentFields ) 259 { 260 Fields fields = resolveIncomingOperationArgumentFields( getFirst( incomingScopes ) ); 261 262 if( fields.isUnknown() ) 263 return fields; 264 265 return fields.subtract( argumentFields ); 266 } 267 268 public abstract Scope outgoingScopeFor( Set<Scope> incomingScopes ); 269 270 void verifyDeclaredFields( Fields declared ) 271 { 272 if( declared.isDefined() && declared.size() == 0 ) 273 throw new OperatorException( this, "field declaration: " + getFieldDeclaration().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" ); 274 } 275 276 void verifyOutputSelector( Fields outputSelector ) 277 { 278 if( outputSelector.isDefined() && outputSelector.size() == 0 ) 279 throw new OperatorException( this, "output selector: " + getOutputSelector().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" ); 280 } 281 282 void verifyArguments( Fields argumentSelector ) 283 { 284 if( argumentSelector.isUnknown() ) 285 return; 286 287 if( operation.getNumArgs() != Operation.ANY && argumentSelector.size() < operation.getNumArgs() ) 288 throw new OperatorException( this, "resolved wrong number of arguments: " + argumentSelector.printVerbose() + ", expected: " + operation.getNumArgs() ); 289 } 290 291 Fields resolveOutgoingSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields ) 292 { 293 Scope incomingScope = getFirst( incomingScopes ); 294 Fields outputSelector = getOutputSelector(); 295 296 if( outputSelector.isResults() ) 297 return declaredFields; 298 299 if( outputSelector.isArguments() ) 300 return argumentFields; 301 302 if( outputSelector.isGroup() ) 303 return incomingScope.getOutGroupingFields(); 304 305 if( outputSelector.isValues() ) 306 return incomingScope.getOutGroupingValueFields(); 307 308 Fields incomingFields = resolveIncomingOperationPassThroughFields( incomingScope ); 309 310 // not part of resolve as we need the argumentFields 311 if( outputSelector.isSwap() ) 312 return Fields.asDeclaration( incomingFields.subtract( argumentFields ) ).append( declaredFields ); 313 314 try 315 { 316 return Fields.resolve( outputSelector, Fields.asDeclaration( incomingFields ), declaredFields ); 317 } 318 catch( TupleException exception ) 319 { 320 throw new OperatorException( this, incomingFields, declaredFields, outputSelector, exception ); 321 } 322 } 323 324 Fields resolveArgumentSelector( Set<Scope> incomingScopes ) 325 { 326 Fields argumentSelector = getArgumentSelector(); 327 328 try 329 { 330 Scope incomingScope = getFirst( incomingScopes ); 331 332 if( argumentSelector.isAll() ) 333 return resolveIncomingOperationArgumentFields( incomingScope ); 334 335 if( argumentSelector.isGroup() ) 336 return incomingScope.getOutGroupingFields(); 337 338 if( argumentSelector.isValues() ) 339 return incomingScope.getOutGroupingValueFields(); 340 341 return resolveIncomingOperationArgumentFields( incomingScope ).select( argumentSelector ); 342 } 343 catch( FieldsResolverException exception ) 344 { 345 throw new OperatorException( this, OperatorException.Kind.argument, exception.getSourceFields(), argumentSelector, exception ); 346 } 347 catch( Exception exception ) 348 { 349 throw new OperatorException( this, "unable to resolve argument selector: " + argumentSelector.printVerbose(), exception ); 350 } 351 } 352 353 Fields resolveDeclared( Set<Scope> incomingScopes, Fields arguments ) 354 { 355 Fields fieldDeclaration = getFieldDeclaration(); 356 357 if( getOutputSelector().isReplace() ) 358 { 359 if( arguments.isDefined() && fieldDeclaration.isDefined() && arguments.size() != fieldDeclaration.size() ) 360 throw new OperatorException( this, "during REPLACE both the arguments selector and field declaration must be the same size, arguments: " + arguments.printVerbose() + " declaration: " + fieldDeclaration.printVerbose() ); 361 362 if( fieldDeclaration.isArguments() ) // there is no type info, so inherit it 363 return arguments; 364 365 return arguments.project( fieldDeclaration ); 366 } 367 368 try 369 { 370 Scope incomingScope = getFirst( incomingScopes ); 371 372 if( fieldDeclaration.isUnknown() ) 373 return fieldDeclaration; 374 375 if( fieldDeclaration.isArguments() ) 376 return Fields.asDeclaration( arguments ); 377 378 if( fieldDeclaration.isAll() ) 379 return resolveIncomingOperationPassThroughFields( incomingScope ); 380 381 if( fieldDeclaration.isGroup() ) 382 return incomingScope.getOutGroupingFields(); 383 384 // VALUES is the diff between all fields and group fields 385 if( fieldDeclaration.isValues() ) 386 return incomingScope.getOutGroupingValueFields(); 387 388 } 389 catch( Exception exception ) 390 { 391 throw new OperatorException( this, "could not resolve declared fields in: " + this, exception ); 392 } 393 394 return fieldDeclaration; 395 } 396 397 // OBJECT OVERRIDES 398 399 @Override 400 public String toString() 401 { 402 return super.toString() + "[" + operation + "]"; 403 } 404 405 @Override 406 protected void printInternal( StringBuffer buffer, Scope scope ) 407 { 408 super.printInternal( buffer, scope ); 409 buffer.append( "[" ); 410 BaseOperation.printOperationInternal( operation, buffer, scope ); 411 buffer.append( "]" ); 412 } 413 414 @Override 415 public boolean isEquivalentTo( FlowElement element ) 416 { 417 boolean equivalentTo = super.isEquivalentTo( element ); 418 419 if( !equivalentTo ) 420 return false; 421 422 Operator operator = (Operator) element; 423 424 equivalentTo = argumentSelector.equals( operator.argumentSelector ); 425 426 if( !equivalentTo ) 427 return false; 428 429 if( !operation.equals( operator.operation ) ) 430 return false; 431 432 equivalentTo = outputSelector.equals( operator.outputSelector ); 433 434 if( !equivalentTo ) 435 return false; 436 437 return true; 438 } 439 440 @SuppressWarnings({"RedundantIfStatement"}) 441 public boolean equals( Object object ) 442 { 443 if( this == object ) 444 return true; 445 if( object == null || getClass() != object.getClass() ) 446 return false; 447 if( !super.equals( object ) ) 448 return false; 449 450 Operator operator = (Operator) object; 451 452 if( argumentSelector != null ? !argumentSelector.equals( operator.argumentSelector ) : operator.argumentSelector != null ) 453 return false; 454 if( operation != null ? !operation.equals( operator.operation ) : operator.operation != null ) 455 return false; 456 if( outputSelector != null ? !outputSelector.equals( operator.outputSelector ) : operator.outputSelector != null ) 457 return false; 458 459 return true; 460 } 461 462 @Override 463 public int hashCode() 464 { 465 int result = super.hashCode(); 466 result = 31 * result + ( operation != null ? operation.hashCode() : 0 ); 467 result = 31 * result + ( argumentSelector != null ? argumentSelector.hashCode() : 0 ); 468 result = 31 * result + ( outputSelector != null ? outputSelector.hashCode() : 0 ); 469 return result; 470 } 471 }