001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading; 023 024import java.io.File; 025import java.io.IOException; 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.regex.Pattern; 037 038import cascading.flow.Flow; 039import cascading.flow.FlowProcess; 040import cascading.flow.planner.FlowPlanner; 041import cascading.flow.stream.graph.StreamGraph; 042import cascading.operation.Aggregator; 043import cascading.operation.Buffer; 044import cascading.operation.ConcreteCall; 045import cascading.operation.Filter; 046import cascading.operation.Function; 047import cascading.tap.Tap; 048import cascading.tuple.Fields; 049import cascading.tuple.Tuple; 050import cascading.tuple.TupleEntry; 051import cascading.tuple.TupleEntryIterator; 052import cascading.tuple.TupleListCollector; 053import cascading.util.Util; 054import junit.framework.TestCase; 055import org.junit.After; 056import org.junit.Before; 057import org.junit.Rule; 058import org.junit.rules.TestName; 059import org.junit.runner.RunWith; 060import org.junit.runners.BlockJUnit4ClassRunner; 061 062/** 063 * Class CascadingTestCase is the base class for all Cascading tests. 064 * <p> 065 * It included a few helpful utility methods for testing Cascading applications. 066 */ 067@RunWith(BlockJUnit4ClassRunner.class) 068public abstract class CascadingTestCase extends TestCase implements Serializable 069 { 070 public static final String ROOT_OUTPUT_PATH = "test.output.root"; 071 public static final String ROOT_PLAN_PATH = "test.plan.root"; 072 public static final String TEST_TRACEPLAN_ENABLED = "test.traceplan.enabled"; 073 074 private String outputPath; 075 private String planPath; 076 077 @Rule 078 public transient TestName name = new TestName(); 079 080 static class TestFlowProcess extends FlowProcess.NullFlowProcess 081 { 082 private final Map<Object, Object> properties; 083 084 public TestFlowProcess( Map<Object, Object> properties ) 085 { 086 this.properties = properties; 087 } 088 089 @Override 090 public Object getProperty( String key ) 091 { 092 return properties.get( key ); 093 } 094 } 095 096 public CascadingTestCase() 097 { 098 } 099 100 public CascadingTestCase( String name ) 101 { 102 super( name ); 103 } 104 105 @Override 106 @Before 107 public void setUp() throws Exception 108 { 109 super.setUp(); 110 111 if( Boolean.getBoolean( TEST_TRACEPLAN_ENABLED ) ) 112 { 113 System.setProperty( FlowPlanner.TRACE_PLAN_PATH, Util.join( "/", getPlanPath(), "planner" ) ); 114 System.setProperty( FlowPlanner.TRACE_PLAN_TRANSFORM_PATH, Util.join( "/", getPlanPath(), "planner" ) ); 115 System.setProperty( FlowPlanner.TRACE_STATS_PATH, Util.join( "/", getPlanPath(), "planner" ) ); 116 System.setProperty( "platform." + StreamGraph.DOT_FILE_PATH, Util.join( "/", getPlanPath(), "stream" ) ); // pass down 117 } 118 } 119 120 @Override 121 @After 122 public void tearDown() throws Exception 123 { 124 super.tearDown(); 125 } 126 127 protected static String getTestOutputRoot() 128 { 129 return System.getProperty( ROOT_OUTPUT_PATH, "build/test/output" ).replace( ":", "_" ); 130 } 131 132 protected static String getTestPlanRoot() 133 { 134 return System.getProperty( ROOT_PLAN_PATH, "build/test/plan" ).replace( ":", "_" ); 135 } 136 137 protected String[] getOutputPathElements() 138 { 139 return new String[]{getTestOutputRoot(), getTestCaseName(), getTestName()}; 140 } 141 142 protected String[] getPlanPathElements() 143 { 144 return new String[]{getTestPlanRoot(), getTestCaseName(), getTestName()}; 145 } 146 147 protected String getOutputPath() 148 { 149 if( outputPath == null ) 150 outputPath = Util.join( getOutputPathElements(), File.separator ); 151 152 return outputPath; 153 } 154 155 protected String getPlanPath() 156 { 157 if( planPath == null ) 158 planPath = Util.join( getPlanPathElements(), File.separator ); 159 160 return planPath; 161 } 162 163 public String getTestCaseName() 164 { 165 return getClass().getSimpleName().replaceAll( "^(.*)Test.*$", "$1" ).toLowerCase(); 166 } 167 168 public String getTestName() 169 { 170 return name.getMethodName(); 171 } 172 173 public static void validateLength( Flow flow, int numTuples ) throws IOException 174 { 175 validateLength( flow, numTuples, -1 ); 176 } 177 178 public static void validateLength( Flow flow, int numTuples, String name ) throws IOException 179 { 180 validateLength( flow, numTuples, -1, null, name ); 181 } 182 183 public static void validateLength( Flow flow, int numTuples, int tupleSize ) throws IOException 184 { 185 validateLength( flow, numTuples, tupleSize, null, null ); 186 } 187 188 public static void validateLength( Flow flow, int numTuples, int tupleSize, Pattern regex ) throws IOException 189 { 190 validateLength( flow, numTuples, tupleSize, regex, null ); 191 } 192 193 public static void validateLength( Flow flow, int numTuples, Pattern regex, String name ) throws IOException 194 { 195 validateLength( flow, numTuples, -1, regex, name ); 196 } 197 198 public static void validateLength( Flow flow, int numTuples, int tupleSize, Pattern regex, String name ) throws IOException 199 { 200 TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name ); 201 validateLength( iterator, numTuples, tupleSize, regex ); 202 } 203 204 public static void validateLength( TupleEntryIterator iterator, int numTuples ) 205 { 206 validateLength( iterator, numTuples, -1, null ); 207 } 208 209 public static void validateLength( TupleEntryIterator iterator, int numTuples, int tupleSize ) 210 { 211 validateLength( iterator, numTuples, tupleSize, null ); 212 } 213 214 public static void validateLength( TupleEntryIterator iterator, int numTuples, Pattern regex ) 215 { 216 validateLength( iterator, numTuples, -1, regex ); 217 } 218 219 public static void validateLength( TupleEntryIterator iterator, int numTuples, int tupleSize, Pattern regex ) 220 { 221 int count = 0; 222 223 while( iterator.hasNext() ) 224 { 225 TupleEntry tupleEntry = iterator.next(); 226 227 if( tupleSize != -1 ) 228 assertEquals( "wrong number of elements", tupleSize, tupleEntry.size() ); 229 230 if( regex != null ) 231 assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() ); 232 233 count++; 234 } 235 236 try 237 { 238 iterator.close(); 239 } 240 catch( IOException exception ) 241 { 242 throw new RuntimeException( exception ); 243 } 244 245 assertEquals( "wrong number of lines", numTuples, count ); 246 } 247 248 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields ) 249 { 250 return invokeFunction( function, new TupleEntry( arguments ), resultFields ); 251 } 252 253 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields, Map<Object, Object> properties ) 254 { 255 return invokeFunction( function, new TupleEntry( arguments ), resultFields, properties ); 256 } 257 258 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields ) 259 { 260 return invokeFunction( function, arguments, resultFields, new HashMap<Object, Object>() ); 261 } 262 263 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields, Map<Object, Object> properties ) 264 { 265 FlowProcess flowProcess = new TestFlowProcess( properties ); 266 ConcreteCall operationCall = new ConcreteCall( arguments.getFields(), function.getFieldDeclaration() ); 267 TupleListCollector collector = new TupleListCollector( resultFields, true ); 268 269 operationCall.setArguments( arguments ); 270 operationCall.setOutputCollector( collector ); 271 272 function.prepare( flowProcess, operationCall ); 273 function.operate( flowProcess, operationCall ); 274 function.cleanup( flowProcess, operationCall ); 275 276 return collector; 277 } 278 279 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields ) 280 { 281 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 282 283 return invokeFunction( function, entries, resultFields ); 284 } 285 286 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 287 { 288 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 289 290 return invokeFunction( function, entries, resultFields, properties ); 291 } 292 293 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields ) 294 { 295 return invokeFunction( function, argumentsArray, resultFields, new HashMap<Object, Object>() ); 296 } 297 298 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 299 { 300 FlowProcess flowProcess = new TestFlowProcess( properties ); 301 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields(), function.getFieldDeclaration() ); 302 TupleListCollector collector = new TupleListCollector( resultFields, true ); 303 304 function.prepare( flowProcess, operationCall ); 305 operationCall.setOutputCollector( collector ); 306 307 for( TupleEntry arguments : argumentsArray ) 308 { 309 operationCall.setArguments( arguments ); 310 function.operate( flowProcess, operationCall ); 311 } 312 313 function.flush( flowProcess, operationCall ); 314 function.cleanup( flowProcess, operationCall ); 315 316 return collector; 317 } 318 319 public static boolean invokeFilter( Filter filter, Tuple arguments ) 320 { 321 return invokeFilter( filter, new TupleEntry( arguments ) ); 322 } 323 324 public static boolean invokeFilter( Filter filter, Tuple arguments, Map<Object, Object> properties ) 325 { 326 return invokeFilter( filter, new TupleEntry( arguments ), properties ); 327 } 328 329 public static boolean invokeFilter( Filter filter, TupleEntry arguments ) 330 { 331 return invokeFilter( filter, arguments, new HashMap<Object, Object>() ); 332 } 333 334 public static boolean invokeFilter( Filter filter, TupleEntry arguments, Map<Object, Object> properties ) 335 { 336 FlowProcess flowProcess = new TestFlowProcess( properties ); 337 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 338 339 operationCall.setArguments( arguments ); 340 341 filter.prepare( flowProcess, operationCall ); 342 343 boolean isRemove = filter.isRemove( flowProcess, operationCall ); 344 345 filter.cleanup( flowProcess, operationCall ); 346 347 return isRemove; 348 } 349 350 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray ) 351 { 352 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 353 354 return invokeFilter( filter, entries, Collections.emptyMap() ); 355 } 356 357 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray, Map<Object, Object> properties ) 358 { 359 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 360 361 return invokeFilter( filter, entries, properties ); 362 } 363 364 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray ) 365 { 366 return invokeFilter( filter, argumentsArray, Collections.emptyMap() ); 367 } 368 369 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray, Map<Object, Object> properties ) 370 { 371 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 372 373 FlowProcess flowProcess = new TestFlowProcess( properties ); 374 375 filter.prepare( flowProcess, operationCall ); 376 377 boolean[] results = new boolean[ argumentsArray.length ]; 378 379 for( int i = 0; i < argumentsArray.length; i++ ) 380 { 381 operationCall.setArguments( argumentsArray[ i ] ); 382 383 results[ i ] = filter.isRemove( flowProcess, operationCall ); 384 } 385 386 filter.flush( flowProcess, operationCall ); 387 filter.cleanup( flowProcess, operationCall ); 388 389 return results; 390 } 391 392 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields ) 393 { 394 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 395 396 return invokeAggregator( aggregator, entries, resultFields ); 397 } 398 399 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 400 { 401 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 402 403 return invokeAggregator( aggregator, entries, resultFields, properties ); 404 } 405 406 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields ) 407 { 408 return invokeAggregator( aggregator, null, argumentsArray, resultFields ); 409 } 410 411 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 412 { 413 return invokeAggregator( aggregator, null, argumentsArray, resultFields, properties ); 414 } 415 416 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 417 { 418 return invokeAggregator( aggregator, group, argumentsArray, resultFields, Collections.emptyMap() ); 419 } 420 421 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 422 { 423 FlowProcess flowProcess = new TestFlowProcess( properties ); 424 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields(), aggregator.getFieldDeclaration() ); 425 426 operationCall.setGroup( group ); 427 428 aggregator.prepare( flowProcess, operationCall ); 429 430 aggregator.start( flowProcess, operationCall ); 431 432 for( TupleEntry arguments : argumentsArray ) 433 { 434 operationCall.setArguments( arguments ); 435 aggregator.aggregate( flowProcess, operationCall ); 436 } 437 438 TupleListCollector collector = new TupleListCollector( resultFields, true ); 439 operationCall.setOutputCollector( collector ); 440 441 aggregator.complete( flowProcess, operationCall ); 442 443 aggregator.cleanup( null, operationCall ); 444 445 return collector; 446 } 447 448 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields ) 449 { 450 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 451 452 return invokeBuffer( buffer, entries, resultFields ); 453 } 454 455 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 456 { 457 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 458 459 return invokeBuffer( buffer, entries, resultFields, properties ); 460 } 461 462 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields ) 463 { 464 return invokeBuffer( buffer, null, argumentsArray, resultFields ); 465 } 466 467 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 468 { 469 return invokeBuffer( buffer, null, argumentsArray, resultFields, properties ); 470 } 471 472 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 473 { 474 return invokeBuffer( buffer, group, argumentsArray, resultFields, Collections.emptyMap() ); 475 } 476 477 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 478 { 479 FlowProcess flowProcess = new TestFlowProcess( properties ); 480 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields(), buffer.getFieldDeclaration() ); 481 482 operationCall.setGroup( group ); 483 484 buffer.prepare( flowProcess, operationCall ); 485 TupleListCollector collector = new TupleListCollector( resultFields, true ); 486 operationCall.setOutputCollector( collector ); 487 488 operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() ); 489 490 buffer.operate( flowProcess, operationCall ); 491 492 buffer.cleanup( null, operationCall ); 493 494 return collector; 495 } 496 497 private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray ) 498 { 499 TupleEntry[] entries = new TupleEntry[ argumentsArray.length ]; 500 501 for( int i = 0; i < argumentsArray.length; i++ ) 502 entries[ i ] = new TupleEntry( argumentsArray[ i ] ); 503 504 return entries; 505 } 506 507 public static List<Tuple> getSourceAsList( Flow flow ) throws IOException 508 { 509 return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() ); 510 } 511 512 public static List<Tuple> getSinkAsList( Flow flow ) throws IOException 513 { 514 return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() ); 515 } 516 517 public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException 518 { 519 return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() ); 520 } 521 522 public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException 523 { 524 return asCollection( flow, tap, selector, new ArrayList<Tuple>() ); 525 } 526 527 public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException 528 { 529 return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() ); 530 } 531 532 public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException 533 { 534 return asCollection( flow, tap, selector, new HashSet<Tuple>() ); 535 } 536 537 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException 538 { 539 return asCollection( flow, tap, Fields.ALL, collection ); 540 } 541 542 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException 543 { 544 try( TupleEntryIterator iterator = flow.openTapForRead( tap ) ) 545 { 546 return asCollection( iterator, selector, collection ); 547 } 548 } 549 550 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result ) 551 { 552 while( iterator.hasNext() ) 553 result.add( iterator.next().getTupleCopy() ); 554 555 return result; 556 } 557 558 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result ) 559 { 560 while( iterator.hasNext() ) 561 result.add( iterator.next().selectTupleCopy( selector ) ); 562 563 return result; 564 } 565 }