001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.util.Set; 024 025import cascading.flow.FlowElement; 026import cascading.flow.planner.Scope; 027import cascading.operation.BaseOperation; 028import cascading.operation.Operation; 029import cascading.operation.PlannedOperation; 030import cascading.operation.PlannerLevel; 031import cascading.tuple.Fields; 032import cascading.tuple.FieldsResolverException; 033import cascading.tuple.TupleException; 034 035/** 036 * An Operator is a type of {@link Pipe}. Operators pass specified arguments to a given {@link cascading.operation.BaseOperation}. 037 * </p> 038 * The argFields value select the input fields used by the operation. By default the whole input Tuple is passes as arguments. 039 * The outFields value select the fields in the result Tuple returned by this Pipe. By default, the operation results 040 * of the given operation replace the input Tuple. 041 */ 042public abstract class Operator extends Pipe 043 { 044 /** Field operation */ 045 protected final Operation operation; 046 /** Field argumentSelector */ 047 protected Fields argumentSelector = Fields.ALL; // use wildcard. let the operation choose 048 /** Field outputSelector */ 049 protected Fields outputSelector = Fields.RESULTS; // this is overridden by the subclasses via the ctor 050 /** Field assertionLevel */ 051 protected PlannerLevel plannerLevel; // do not initialize a default 052 053 protected Operator( Operation operation ) 054 { 055 this.operation = operation; 056 verifyOperation(); 057 } 058 059 protected Operator( String name, Operation operation ) 060 { 061 super( name ); 062 this.operation = operation; 063 verifyOperation(); 064 } 065 066 protected Operator( String name, Operation operation, Fields outputSelector ) 067 { 068 super( name ); 069 this.operation = operation; 070 this.outputSelector = outputSelector; 071 verifyOperation(); 072 } 073 074 protected Operator( String name, Fields argumentSelector, Operation operation ) 075 { 076 super( name ); 077 this.operation = operation; 078 this.argumentSelector = argumentSelector; 079 verifyOperation(); 080 } 081 082 protected Operator( String name, Fields argumentSelector, Operation operation, Fields outputSelector ) 083 { 084 super( name ); 085 this.operation = operation; 086 this.argumentSelector = argumentSelector; 087 this.outputSelector = outputSelector; 088 verifyOperation(); 089 } 090 091 protected Operator( Pipe previous, Operation operation ) 092 { 093 super( previous ); 094 this.operation = operation; 095 verifyOperation(); 096 } 097 098 protected Operator( Pipe previous, Fields argumentSelector, Operation operation ) 099 { 100 super( previous ); 101 this.operation = operation; 102 this.argumentSelector = argumentSelector; 103 verifyOperation(); 104 } 105 106 protected Operator( Pipe previous, Fields argumentSelector, Operation operation, Fields outputSelector ) 107 { 108 super( previous ); 109 this.operation = operation; 110 this.argumentSelector = argumentSelector; 111 this.outputSelector = outputSelector; 112 verifyOperation(); 113 } 114 115 protected Operator( Pipe previous, Operation operation, Fields outputSelector ) 116 { 117 super( previous ); 118 this.operation = operation; 119 this.outputSelector = outputSelector; 120 verifyOperation(); 121 } 122 123 protected Operator( String name, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 124 { 125 super( name ); 126 this.plannerLevel = plannerLevel; 127 this.operation = operation; 128 this.outputSelector = outputSelector; 129 verifyOperation(); 130 } 131 132 protected Operator( String name, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 133 { 134 super( name ); 135 this.plannerLevel = plannerLevel; 136 this.operation = operation; 137 this.argumentSelector = argumentSelector; 138 this.outputSelector = outputSelector; 139 verifyOperation(); 140 } 141 142 protected Operator( Pipe previous, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 143 { 144 super( previous ); 145 this.plannerLevel = plannerLevel; 146 this.operation = operation; 147 this.outputSelector = outputSelector; 148 verifyOperation(); 149 } 150 151 protected Operator( Pipe previous, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 152 { 153 super( previous ); 154 this.plannerLevel = plannerLevel; 155 this.operation = operation; 156 this.argumentSelector = argumentSelector; 157 this.outputSelector = outputSelector; 158 verifyOperation(); 159 } 160 161 protected void verifyOperation() 162 { 163 if( operation == null ) 164 throw new IllegalArgumentException( "operation may not be null" ); 165 166 if( argumentSelector == null ) 167 throw new IllegalArgumentException( "argumentSelector may not be null" ); 168 169 if( outputSelector == null ) 170 throw new IllegalArgumentException( "outputSelector may not be null" ); 171 172 if( operation instanceof PlannedOperation ) 173 { 174 if( plannerLevel == null ) 175 throw new IllegalArgumentException( "planner level may not be null" ); 176 else if( plannerLevel.isNoneLevel() ) 177 throw new IllegalArgumentException( "given planner level: " + plannerLevel.getClass().getName() + ", may not be NONE" ); 178 } 179 } 180 181 /** 182 * Method getOperation returns the operation managed by this Operator object. 183 * 184 * @return the operation (type Operation) of this Operator object. 185 */ 186 public Operation getOperation() 187 { 188 return operation; 189 } 190 191 /** 192 * Method getArgumentSelector returns the argumentSelector of this Operator object. 193 * 194 * @return the argumentSelector (type Fields) of this Operator object. 195 */ 196 public Fields getArgumentSelector() 197 { 198 return argumentSelector; 199 } 200 201 /** 202 * Method getFieldDeclaration returns the fieldDeclaration of this Operator object. 203 * 204 * @return the fieldDeclaration (type Fields) of this Operator object. 205 */ 206 public Fields getFieldDeclaration() 207 { 208 return operation.getFieldDeclaration(); 209 } 210 211 /** 212 * Method getOutputSelector returns the outputSelector of this Operator object. 213 * 214 * @return the outputSelector (type Fields) of this Operator object. 215 */ 216 public Fields getOutputSelector() 217 { 218 return outputSelector; 219 } 220 221 /** 222 * Method getPlannerLevel returns the plannerLevel of this Operator object. 223 * 224 * @return the plannerLevel (type PlannerLevel) of this Operator object. 225 */ 226 public PlannerLevel getPlannerLevel() 227 { 228 return plannerLevel; 229 } 230 231 /** 232 * Method hasPlannerLevel returns true if this Operator object holds a {@link PlannedOperation} object with an associated 233 * {@link PlannerLevel} level. 234 * 235 * @return boolean 236 */ 237 public boolean hasPlannerLevel() 238 { 239 return plannerLevel != null; 240 } 241 242 // FIELDS 243 244 protected Fields resolveRemainderFields( Set<Scope> incomingScopes, Fields argumentFields ) 245 { 246 Fields fields = resolveIncomingOperationArgumentFields( getFirst( incomingScopes ) ); 247 248 if( fields.isUnknown() ) 249 return fields; 250 251 return fields.subtract( argumentFields ); 252 } 253 254 public abstract Scope outgoingScopeFor( Set<Scope> incomingScopes ); 255 256 void verifyDeclaredFields( Fields declared ) 257 { 258 if( declared.isDefined() && declared.size() == 0 ) 259 throw new OperatorException( this, "field declaration: " + getFieldDeclaration().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" ); 260 } 261 262 void verifyOutputSelector( Fields outputSelector ) 263 { 264 if( outputSelector.isDefined() && outputSelector.size() == 0 ) 265 throw new OperatorException( this, "output selector: " + getOutputSelector().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" ); 266 } 267 268 void verifyArguments( Fields argumentSelector ) 269 { 270 if( argumentSelector.isUnknown() ) 271 return; 272 273 if( operation.getNumArgs() != Operation.ANY && argumentSelector.size() < operation.getNumArgs() ) 274 throw new OperatorException( this, "resolved wrong number of arguments: " + argumentSelector.printVerbose() + ", expected: " + operation.getNumArgs() ); 275 } 276 277 Fields resolveOutgoingSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields ) 278 { 279 Scope incomingScope = getFirst( incomingScopes ); 280 Fields outputSelector = getOutputSelector(); 281 282 if( outputSelector.isResults() ) 283 return declaredFields; 284 285 if( outputSelector.isArguments() ) 286 return argumentFields; 287 288 if( outputSelector.isGroup() ) 289 return incomingScope.getOutGroupingFields(); 290 291 if( outputSelector.isValues() ) 292 return incomingScope.getOutGroupingValueFields(); 293 294 Fields incomingFields = resolveIncomingOperationPassThroughFields( incomingScope ); 295 296 // not part of resolve as we need the argumentFields 297 if( outputSelector.isSwap() ) 298 return Fields.asDeclaration( incomingFields.subtract( argumentFields ) ).append( declaredFields ); 299 300 try 301 { 302 return Fields.resolve( outputSelector, Fields.asDeclaration( incomingFields ), declaredFields ); 303 } 304 catch( TupleException exception ) 305 { 306 throw new OperatorException( this, incomingFields, declaredFields, outputSelector, exception ); 307 } 308 } 309 310 Fields resolveArgumentSelector( Set<Scope> incomingScopes ) 311 { 312 Fields argumentSelector = getArgumentSelector(); 313 314 try 315 { 316 Scope incomingScope = getFirst( incomingScopes ); 317 318 if( argumentSelector.isAll() ) 319 return resolveIncomingOperationArgumentFields( incomingScope ); 320 321 if( argumentSelector.isGroup() ) 322 return incomingScope.getOutGroupingFields(); 323 324 if( argumentSelector.isValues() ) 325 return incomingScope.getOutGroupingValueFields(); 326 327 return resolveIncomingOperationArgumentFields( incomingScope ).select( argumentSelector ); 328 } 329 catch( FieldsResolverException exception ) 330 { 331 throw new OperatorException( this, OperatorException.Kind.argument, exception.getSourceFields(), argumentSelector, exception ); 332 } 333 catch( Exception exception ) 334 { 335 throw new OperatorException( this, "unable to resolve argument selector: " + argumentSelector.printVerbose(), exception ); 336 } 337 } 338 339 Fields resolveDeclared( Set<Scope> incomingScopes, Fields arguments ) 340 { 341 Fields fieldDeclaration = getFieldDeclaration(); 342 343 if( getOutputSelector().isReplace() ) 344 { 345 if( arguments.isDefined() && fieldDeclaration.isDefined() && arguments.size() != fieldDeclaration.size() ) 346 throw new OperatorException( this, "during REPLACE both the arguments selector and field declaration must be the same size, arguments: " + arguments.printVerbose() + " declaration: " + fieldDeclaration.printVerbose() ); 347 348 if( fieldDeclaration.isArguments() ) // there is no type info, so inherit it 349 return arguments; 350 351 return arguments.project( fieldDeclaration ); 352 } 353 354 try 355 { 356 Scope incomingScope = getFirst( incomingScopes ); 357 358 if( fieldDeclaration.isUnknown() ) 359 return fieldDeclaration; 360 361 if( fieldDeclaration.isArguments() ) 362 return Fields.asDeclaration( arguments ); 363 364 if( fieldDeclaration.isAll() ) 365 return resolveIncomingOperationPassThroughFields( incomingScope ); 366 367 if( fieldDeclaration.isGroup() ) 368 return incomingScope.getOutGroupingFields(); 369 370 // VALUES is the diff between all fields and group fields 371 if( fieldDeclaration.isValues() ) 372 return incomingScope.getOutGroupingValueFields(); 373 374 } 375 catch( Exception exception ) 376 { 377 throw new OperatorException( this, "could not resolve declared fields in: " + this, exception ); 378 } 379 380 return fieldDeclaration; 381 } 382 383 // OBJECT OVERRIDES 384 385 @Override 386 public String toString() 387 { 388 return super.toString() + "[" + operation + "]"; 389 } 390 391 @Override 392 protected void printInternal( StringBuffer buffer, Scope scope ) 393 { 394 super.printInternal( buffer, scope ); 395 buffer.append( "[" ); 396 BaseOperation.printOperationInternal( operation, buffer, scope ); 397 buffer.append( "]" ); 398 } 399 400 @Override 401 public boolean isEquivalentTo( FlowElement element ) 402 { 403 boolean equivalentTo = super.isEquivalentTo( element ); 404 405 if( !equivalentTo ) 406 return false; 407 408 Operator operator = (Operator) element; 409 410 equivalentTo = argumentSelector.equals( operator.argumentSelector ); 411 412 if( !equivalentTo ) 413 return false; 414 415 if( !operation.equals( operator.operation ) ) 416 return false; 417 418 equivalentTo = outputSelector.equals( operator.outputSelector ); 419 420 if( !equivalentTo ) 421 return false; 422 423 return true; 424 } 425 426 @SuppressWarnings({"RedundantIfStatement"}) 427 public boolean equals( Object object ) 428 { 429 if( this == object ) 430 return true; 431 if( object == null || getClass() != object.getClass() ) 432 return false; 433 if( !super.equals( object ) ) 434 return false; 435 436 Operator operator = (Operator) object; 437 438 if( argumentSelector != null ? !argumentSelector.equals( operator.argumentSelector ) : operator.argumentSelector != null ) 439 return false; 440 if( operation != null ? !operation.equals( operator.operation ) : operator.operation != null ) 441 return false; 442 if( outputSelector != null ? !outputSelector.equals( operator.outputSelector ) : operator.outputSelector != null ) 443 return false; 444 445 return true; 446 } 447 448 @Override 449 public int hashCode() 450 { 451 int result = super.hashCode(); 452 result = 31 * result + ( operation != null ? operation.hashCode() : 0 ); 453 result = 31 * result + ( argumentSelector != null ? argumentSelector.hashCode() : 0 ); 454 result = 31 * result + ( outputSelector != null ? outputSelector.hashCode() : 0 ); 455 return result; 456 } 457 }