001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.util.Set; 024 025import cascading.flow.planner.Scope; 026import cascading.operation.BaseOperation; 027import cascading.operation.Operation; 028import cascading.operation.PlannedOperation; 029import cascading.operation.PlannerLevel; 030import cascading.tuple.Fields; 031import cascading.tuple.FieldsResolverException; 032import cascading.tuple.TupleException; 033 034/** 035 * An Operator is a type of {@link Pipe}. Operators pass specified arguments to a given {@link cascading.operation.BaseOperation}. 036 * </p> 037 * The argFields value select the input fields used by the operation. By default the whole input Tuple is passes as arguments. 038 * The outFields value select the fields in the result Tuple returned by this Pipe. By default, the operation results 039 * of the given operation replace the input Tuple. 040 */ 041public abstract class Operator extends Pipe 042 { 043 /** Field operation */ 044 protected final Operation operation; 045 /** Field argumentSelector */ 046 protected Fields argumentSelector = Fields.ALL; // use wildcard. let the operation choose 047 /** Field outputSelector */ 048 protected Fields outputSelector = Fields.RESULTS; // this is overridden by the subclasses via the ctor 049 /** Field assertionLevel */ 050 protected PlannerLevel plannerLevel; // do not initialize a default 051 052 protected Operator( Operation operation ) 053 { 054 this.operation = operation; 055 verifyOperation(); 056 } 057 058 protected Operator( String name, Operation operation ) 059 { 060 super( name ); 061 this.operation = operation; 062 verifyOperation(); 063 } 064 065 protected Operator( String name, Operation operation, Fields outputSelector ) 066 { 067 super( name ); 068 this.operation = operation; 069 this.outputSelector = outputSelector; 070 verifyOperation(); 071 } 072 073 protected Operator( String name, Fields argumentSelector, Operation operation ) 074 { 075 super( name ); 076 this.operation = operation; 077 this.argumentSelector = argumentSelector; 078 verifyOperation(); 079 } 080 081 protected Operator( String name, Fields argumentSelector, Operation operation, Fields outputSelector ) 082 { 083 super( name ); 084 this.operation = operation; 085 this.argumentSelector = argumentSelector; 086 this.outputSelector = outputSelector; 087 verifyOperation(); 088 } 089 090 protected Operator( Pipe previous, Operation operation ) 091 { 092 super( previous ); 093 this.operation = operation; 094 verifyOperation(); 095 } 096 097 protected Operator( Pipe previous, Fields argumentSelector, Operation operation ) 098 { 099 super( previous ); 100 this.operation = operation; 101 this.argumentSelector = argumentSelector; 102 verifyOperation(); 103 } 104 105 protected Operator( Pipe previous, Fields argumentSelector, Operation operation, Fields outputSelector ) 106 { 107 super( previous ); 108 this.operation = operation; 109 this.argumentSelector = argumentSelector; 110 this.outputSelector = outputSelector; 111 verifyOperation(); 112 } 113 114 protected Operator( Pipe previous, Operation operation, Fields outputSelector ) 115 { 116 super( previous ); 117 this.operation = operation; 118 this.outputSelector = outputSelector; 119 verifyOperation(); 120 } 121 122 protected Operator( String name, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 123 { 124 super( name ); 125 this.plannerLevel = plannerLevel; 126 this.operation = operation; 127 this.outputSelector = outputSelector; 128 verifyOperation(); 129 } 130 131 protected Operator( String name, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 132 { 133 super( name ); 134 this.plannerLevel = plannerLevel; 135 this.operation = operation; 136 this.argumentSelector = argumentSelector; 137 this.outputSelector = outputSelector; 138 verifyOperation(); 139 } 140 141 protected Operator( Pipe previous, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 142 { 143 super( previous ); 144 this.plannerLevel = plannerLevel; 145 this.operation = operation; 146 this.outputSelector = outputSelector; 147 verifyOperation(); 148 } 149 150 protected Operator( Pipe previous, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector ) 151 { 152 super( previous ); 153 this.plannerLevel = plannerLevel; 154 this.operation = operation; 155 this.argumentSelector = argumentSelector; 156 this.outputSelector = outputSelector; 157 verifyOperation(); 158 } 159 160 protected void verifyOperation() 161 { 162 if( operation == null ) 163 throw new IllegalArgumentException( "operation may not be null" ); 164 165 if( argumentSelector == null ) 166 throw new IllegalArgumentException( "argumentSelector may not be null" ); 167 168 if( outputSelector == null ) 169 throw new IllegalArgumentException( "outputSelector may not be null" ); 170 171 if( operation instanceof PlannedOperation ) 172 { 173 if( plannerLevel == null ) 174 throw new IllegalArgumentException( "planner level may not be null" ); 175 else if( plannerLevel.isNoneLevel() ) 176 throw new IllegalArgumentException( "given planner level: " + plannerLevel.getClass().getName() + ", may not be NONE" ); 177 } 178 } 179 180 /** 181 * Method getOperation returns the operation managed by this Operator object. 182 * 183 * @return the operation (type Operation) of this Operator object. 184 */ 185 public Operation getOperation() 186 { 187 return operation; 188 } 189 190 /** 191 * Method getArgumentSelector returns the argumentSelector of this Operator object. 192 * 193 * @return the argumentSelector (type Fields) of this Operator object. 194 */ 195 public Fields getArgumentSelector() 196 { 197 return argumentSelector; 198 } 199 200 /** 201 * Method getFieldDeclaration returns the fieldDeclaration of this Operator object. 202 * 203 * @return the fieldDeclaration (type Fields) of this Operator object. 204 */ 205 public Fields getFieldDeclaration() 206 { 207 return operation.getFieldDeclaration(); 208 } 209 210 /** 211 * Method getOutputSelector returns the outputSelector of this Operator object. 212 * 213 * @return the outputSelector (type Fields) of this Operator object. 214 */ 215 public Fields getOutputSelector() 216 { 217 return outputSelector; 218 } 219 220 /** 221 * Method getPlannerLevel returns the plannerLevel of this Operator object. 222 * 223 * @return the plannerLevel (type PlannerLevel) of this Operator object. 224 */ 225 public PlannerLevel getPlannerLevel() 226 { 227 return plannerLevel; 228 } 229 230 /** 231 * Method hasPlannerLevel returns true if this Operator object holds a {@link PlannedOperation} object with an associated 232 * {@link PlannerLevel} level. 233 * 234 * @return boolean 235 */ 236 public boolean hasPlannerLevel() 237 { 238 return plannerLevel != null; 239 } 240 241 // FIELDS 242 243 protected Fields resolveRemainderFields( Set<Scope> incomingScopes, Fields argumentFields ) 244 { 245 Fields fields = resolveIncomingOperationArgumentFields( getFirst( incomingScopes ) ); 246 247 if( fields.isUnknown() ) 248 return fields; 249 250 return fields.subtract( argumentFields ); 251 } 252 253 public abstract Scope outgoingScopeFor( Set<Scope> incomingScopes ); 254 255 void verifyDeclaredFields( Fields declared ) 256 { 257 if( declared.isDefined() && declared.size() == 0 ) 258 throw new OperatorException( this, "field declaration: " + getFieldDeclaration().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" ); 259 } 260 261 void verifyOutputSelector( Fields outputSelector ) 262 { 263 if( outputSelector.isDefined() && outputSelector.size() == 0 ) 264 throw new OperatorException( this, "output selector: " + getOutputSelector().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" ); 265 } 266 267 void verifyArguments( Fields argumentSelector ) 268 { 269 if( argumentSelector.isUnknown() ) 270 return; 271 272 if( operation.getNumArgs() != Operation.ANY && argumentSelector.size() < operation.getNumArgs() ) 273 throw new OperatorException( this, "resolved wrong number of arguments: " + argumentSelector.printVerbose() + ", expected: " + operation.getNumArgs() ); 274 } 275 276 Fields resolveOutgoingSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields ) 277 { 278 Scope incomingScope = getFirst( incomingScopes ); 279 Fields outputSelector = getOutputSelector(); 280 281 if( outputSelector.isResults() ) 282 return declaredFields; 283 284 if( outputSelector.isArguments() ) 285 return argumentFields; 286 287 if( outputSelector.isGroup() ) 288 return incomingScope.getOutGroupingFields(); 289 290 if( outputSelector.isValues() ) 291 return incomingScope.getOutGroupingValueFields(); 292 293 Fields incomingFields = resolveIncomingOperationPassThroughFields( incomingScope ); 294 295 // not part of resolve as we need the argumentFields 296 if( outputSelector.isSwap() ) 297 return Fields.asDeclaration( incomingFields.subtract( argumentFields ) ).append( declaredFields ); 298 299 try 300 { 301 return Fields.resolve( outputSelector, Fields.asDeclaration( incomingFields ), declaredFields ); 302 } 303 catch( TupleException exception ) 304 { 305 throw new OperatorException( this, incomingFields, declaredFields, outputSelector, exception ); 306 } 307 } 308 309 Fields resolveArgumentSelector( Set<Scope> incomingScopes ) 310 { 311 Fields argumentSelector = getArgumentSelector(); 312 313 try 314 { 315 Scope incomingScope = getFirst( incomingScopes ); 316 317 if( argumentSelector.isAll() ) 318 return resolveIncomingOperationArgumentFields( incomingScope ); 319 320 if( argumentSelector.isGroup() ) 321 return incomingScope.getOutGroupingFields(); 322 323 if( argumentSelector.isValues() ) 324 return incomingScope.getOutGroupingValueFields(); 325 326 return resolveIncomingOperationArgumentFields( incomingScope ).select( argumentSelector ); 327 } 328 catch( FieldsResolverException exception ) 329 { 330 throw new OperatorException( this, OperatorException.Kind.argument, exception.getSourceFields(), argumentSelector, exception ); 331 } 332 catch( Exception exception ) 333 { 334 throw new OperatorException( this, "unable to resolve argument selector: " + argumentSelector.printVerbose(), exception ); 335 } 336 } 337 338 Fields resolveDeclared( Set<Scope> incomingScopes, Fields arguments ) 339 { 340 Fields fieldDeclaration = getFieldDeclaration(); 341 342 if( getOutputSelector().isReplace() ) 343 { 344 if( arguments.isDefined() && fieldDeclaration.isDefined() && arguments.size() != fieldDeclaration.size() ) 345 throw new OperatorException( this, "during REPLACE both the arguments selector and field declaration must be the same size, arguments: " + arguments.printVerbose() + " declaration: " + fieldDeclaration.printVerbose() ); 346 347 if( fieldDeclaration.isArguments() ) // there is no type info, so inherit it 348 return arguments; 349 350 return arguments.project( fieldDeclaration ); 351 } 352 353 try 354 { 355 Scope incomingScope = getFirst( incomingScopes ); 356 357 if( fieldDeclaration.isUnknown() ) 358 return fieldDeclaration; 359 360 if( fieldDeclaration.isArguments() ) 361 return Fields.asDeclaration( arguments ); 362 363 if( fieldDeclaration.isAll() ) 364 return resolveIncomingOperationPassThroughFields( incomingScope ); 365 366 if( fieldDeclaration.isGroup() ) 367 return incomingScope.getOutGroupingFields(); 368 369 // VALUES is the diff between all fields and group fields 370 if( fieldDeclaration.isValues() ) 371 return incomingScope.getOutGroupingValueFields(); 372 373 } 374 catch( Exception exception ) 375 { 376 throw new OperatorException( this, "could not resolve declared fields in: " + this, exception ); 377 } 378 379 return fieldDeclaration; 380 } 381 382 // OBJECT OVERRIDES 383 384 @Override 385 public String toString() 386 { 387 return super.toString() + "[" + operation + "]"; 388 } 389 390 @Override 391 protected void printInternal( StringBuffer buffer, Scope scope ) 392 { 393 super.printInternal( buffer, scope ); 394 buffer.append( "[" ); 395 BaseOperation.printOperationInternal( operation, buffer, scope ); 396 buffer.append( "]" ); 397 } 398 399 @SuppressWarnings({"RedundantIfStatement"}) 400 public boolean equals( Object object ) 401 { 402 if( this == object ) 403 return true; 404 if( object == null || getClass() != object.getClass() ) 405 return false; 406 if( !super.equals( object ) ) 407 return false; 408 409 Operator operator = (Operator) object; 410 411 if( argumentSelector != null ? !argumentSelector.equals( operator.argumentSelector ) : operator.argumentSelector != null ) 412 return false; 413 if( operation != null ? !operation.equals( operator.operation ) : operator.operation != null ) 414 return false; 415 if( outputSelector != null ? !outputSelector.equals( operator.outputSelector ) : operator.outputSelector != null ) 416 return false; 417 418 return true; 419 } 420 421 @Override 422 public int hashCode() 423 { 424 int result = super.hashCode(); 425 result = 31 * result + ( operation != null ? operation.hashCode() : 0 ); 426 result = 31 * result + ( argumentSelector != null ? argumentSelector.hashCode() : 0 ); 427 result = 31 * result + ( outputSelector != null ? outputSelector.hashCode() : 0 ); 428 return result; 429 } 430 }