001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Arrays; 027 import java.util.Collection; 028 import java.util.Collections; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.List; 032 import java.util.Map; 033 import java.util.Set; 034 import java.util.regex.Pattern; 035 036 import cascading.flow.Flow; 037 import cascading.flow.FlowProcess; 038 import cascading.operation.Aggregator; 039 import cascading.operation.Buffer; 040 import cascading.operation.ConcreteCall; 041 import cascading.operation.Filter; 042 import cascading.operation.Function; 043 import cascading.tap.Tap; 044 import cascading.tuple.Fields; 045 import cascading.tuple.Tuple; 046 import cascading.tuple.TupleEntry; 047 import cascading.tuple.TupleEntryIterator; 048 import cascading.tuple.TupleListCollector; 049 import junit.framework.TestCase; 050 051 /** 052 * Class CascadingTestCase is the base class for all Cascading tests. 053 * <p/> 054 * It included a few helpful utility methods for testing Cascading applications. 055 */ 056 public class CascadingTestCase extends TestCase implements Serializable 057 { 058 static class TestFlowProcess extends FlowProcess.NullFlowProcess 059 { 060 private final Map<Object, Object> properties; 061 062 public TestFlowProcess( Map<Object, Object> properties ) 063 { 064 this.properties = properties; 065 } 066 067 @Override 068 public Object getProperty( String key ) 069 { 070 return properties.get( key ); 071 } 072 } 073 074 public CascadingTestCase() 075 { 076 } 077 078 public CascadingTestCase( String name ) 079 { 080 super( name ); 081 } 082 083 public static void validateLength( Flow flow, int length ) throws IOException 084 { 085 validateLength( flow, length, -1 ); 086 } 087 088 public static void validateLength( Flow flow, int length, String name ) throws IOException 089 { 090 validateLength( flow, length, -1, null, name ); 091 } 092 093 public static void validateLength( Flow flow, int length, int size ) throws IOException 094 { 095 validateLength( flow, length, size, null, null ); 096 } 097 098 public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException 099 { 100 validateLength( flow, length, size, regex, null ); 101 } 102 103 public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException 104 { 105 validateLength( flow, length, -1, regex, name ); 106 } 107 108 public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException 109 { 110 TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name ); 111 validateLength( iterator, length, size, regex ); 112 } 113 114 public static void validateLength( TupleEntryIterator iterator, int length ) 115 { 116 validateLength( iterator, length, -1, null ); 117 } 118 119 public static void validateLength( TupleEntryIterator iterator, int length, int size ) 120 { 121 validateLength( iterator, length, size, null ); 122 } 123 124 public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex ) 125 { 126 validateLength( iterator, length, -1, regex ); 127 } 128 129 public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex ) 130 { 131 int count = 0; 132 133 while( iterator.hasNext() ) 134 { 135 TupleEntry tupleEntry = iterator.next(); 136 137 if( size != -1 ) 138 assertEquals( "wrong number of elements", size, tupleEntry.size() ); 139 140 if( regex != null ) 141 assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() ); 142 143 count++; 144 } 145 146 try 147 { 148 iterator.close(); 149 } 150 catch( IOException exception ) 151 { 152 throw new RuntimeException( exception ); 153 } 154 155 assertEquals( "wrong number of lines", length, count ); 156 } 157 158 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields ) 159 { 160 return invokeFunction( function, new TupleEntry( arguments ), resultFields ); 161 } 162 163 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields, Map<Object, Object> properties ) 164 { 165 return invokeFunction( function, new TupleEntry( arguments ), resultFields, properties ); 166 } 167 168 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields ) 169 { 170 return invokeFunction( function, arguments, resultFields, new HashMap<Object, Object>() ); 171 } 172 173 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields, Map<Object, Object> properties ) 174 { 175 FlowProcess flowProcess = new TestFlowProcess( properties ); 176 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 177 TupleListCollector collector = new TupleListCollector( resultFields, true ); 178 179 operationCall.setArguments( arguments ); 180 operationCall.setOutputCollector( collector ); 181 182 function.prepare( flowProcess, operationCall ); 183 function.operate( flowProcess, operationCall ); 184 function.cleanup( flowProcess, operationCall ); 185 186 return collector; 187 } 188 189 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields ) 190 { 191 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 192 193 return invokeFunction( function, entries, resultFields ); 194 } 195 196 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 197 { 198 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 199 200 return invokeFunction( function, entries, resultFields, properties ); 201 } 202 203 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields ) 204 { 205 return invokeFunction( function, argumentsArray, resultFields, new HashMap<Object, Object>() ); 206 } 207 208 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 209 { 210 FlowProcess flowProcess = new TestFlowProcess( properties ); 211 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 212 TupleListCollector collector = new TupleListCollector( resultFields, true ); 213 214 function.prepare( flowProcess, operationCall ); 215 operationCall.setOutputCollector( collector ); 216 217 for( TupleEntry arguments : argumentsArray ) 218 { 219 operationCall.setArguments( arguments ); 220 function.operate( flowProcess, operationCall ); 221 } 222 223 function.flush( flowProcess, operationCall ); 224 function.cleanup( flowProcess, operationCall ); 225 226 return collector; 227 } 228 229 public static boolean invokeFilter( Filter filter, Tuple arguments ) 230 { 231 return invokeFilter( filter, new TupleEntry( arguments ) ); 232 } 233 234 public static boolean invokeFilter( Filter filter, Tuple arguments, Map<Object, Object> properties ) 235 { 236 return invokeFilter( filter, new TupleEntry( arguments ), properties ); 237 } 238 239 public static boolean invokeFilter( Filter filter, TupleEntry arguments ) 240 { 241 return invokeFilter( filter, arguments, new HashMap<Object, Object>() ); 242 } 243 244 public static boolean invokeFilter( Filter filter, TupleEntry arguments, Map<Object, Object> properties ) 245 { 246 FlowProcess flowProcess = new TestFlowProcess( properties ); 247 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 248 249 operationCall.setArguments( arguments ); 250 251 filter.prepare( flowProcess, operationCall ); 252 253 boolean isRemove = filter.isRemove( flowProcess, operationCall ); 254 255 filter.cleanup( flowProcess, operationCall ); 256 257 return isRemove; 258 } 259 260 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray ) 261 { 262 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 263 264 return invokeFilter( filter, entries, Collections.emptyMap() ); 265 } 266 267 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray, Map<Object, Object> properties ) 268 { 269 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 270 271 return invokeFilter( filter, entries, properties ); 272 } 273 274 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray ) 275 { 276 return invokeFilter( filter, argumentsArray, Collections.emptyMap() ); 277 } 278 279 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray, Map<Object, Object> properties ) 280 { 281 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 282 283 FlowProcess flowProcess = new TestFlowProcess( properties ); 284 285 filter.prepare( flowProcess, operationCall ); 286 287 boolean[] results = new boolean[ argumentsArray.length ]; 288 289 for( int i = 0; i < argumentsArray.length; i++ ) 290 { 291 operationCall.setArguments( argumentsArray[ i ] ); 292 293 results[ i ] = filter.isRemove( flowProcess, operationCall ); 294 } 295 296 filter.flush( flowProcess, operationCall ); 297 filter.cleanup( flowProcess, operationCall ); 298 299 return results; 300 } 301 302 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields ) 303 { 304 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 305 306 return invokeAggregator( aggregator, entries, resultFields ); 307 } 308 309 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 310 { 311 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 312 313 return invokeAggregator( aggregator, entries, resultFields, properties ); 314 } 315 316 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields ) 317 { 318 return invokeAggregator( aggregator, null, argumentsArray, resultFields ); 319 } 320 321 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 322 { 323 return invokeAggregator( aggregator, null, argumentsArray, resultFields, properties ); 324 } 325 326 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 327 { 328 return invokeAggregator( aggregator, group, argumentsArray, resultFields, Collections.emptyMap() ); 329 } 330 331 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 332 { 333 FlowProcess flowProcess = new TestFlowProcess( properties ); 334 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 335 336 operationCall.setGroup( group ); 337 338 aggregator.prepare( flowProcess, operationCall ); 339 340 aggregator.start( flowProcess, operationCall ); 341 342 for( TupleEntry arguments : argumentsArray ) 343 { 344 operationCall.setArguments( arguments ); 345 aggregator.aggregate( flowProcess, operationCall ); 346 } 347 348 TupleListCollector collector = new TupleListCollector( resultFields, true ); 349 operationCall.setOutputCollector( collector ); 350 351 aggregator.complete( flowProcess, operationCall ); 352 353 aggregator.cleanup( null, operationCall ); 354 355 return collector; 356 } 357 358 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields ) 359 { 360 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 361 362 return invokeBuffer( buffer, entries, resultFields ); 363 } 364 365 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 366 { 367 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 368 369 return invokeBuffer( buffer, entries, resultFields, properties ); 370 } 371 372 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields ) 373 { 374 return invokeBuffer( buffer, null, argumentsArray, resultFields ); 375 } 376 377 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 378 { 379 return invokeBuffer( buffer, null, argumentsArray, resultFields, properties ); 380 } 381 382 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 383 { 384 return invokeBuffer( buffer, group, argumentsArray, resultFields, Collections.emptyMap() ); 385 } 386 387 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 388 { 389 FlowProcess flowProcess = new TestFlowProcess( properties ); 390 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 391 392 operationCall.setGroup( group ); 393 394 buffer.prepare( flowProcess, operationCall ); 395 TupleListCollector collector = new TupleListCollector( resultFields, true ); 396 operationCall.setOutputCollector( collector ); 397 398 operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() ); 399 400 buffer.operate( flowProcess, operationCall ); 401 402 buffer.cleanup( null, operationCall ); 403 404 return collector; 405 } 406 407 private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray ) 408 { 409 TupleEntry[] entries = new TupleEntry[ argumentsArray.length ]; 410 411 for( int i = 0; i < argumentsArray.length; i++ ) 412 entries[ i ] = new TupleEntry( argumentsArray[ i ] ); 413 414 return entries; 415 } 416 417 public static List<Tuple> getSourceAsList( Flow flow ) throws IOException 418 { 419 return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() ); 420 } 421 422 public static List<Tuple> getSinkAsList( Flow flow ) throws IOException 423 { 424 return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() ); 425 } 426 427 public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException 428 { 429 return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() ); 430 } 431 432 public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException 433 { 434 return asCollection( flow, tap, selector, new ArrayList<Tuple>() ); 435 } 436 437 public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException 438 { 439 return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() ); 440 } 441 442 public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException 443 { 444 return asCollection( flow, tap, selector, new HashSet<Tuple>() ); 445 } 446 447 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException 448 { 449 return asCollection( flow, tap, Fields.ALL, collection ); 450 } 451 452 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException 453 { 454 TupleEntryIterator iterator = flow.openTapForRead( tap ); 455 456 try 457 { 458 return asCollection( iterator, selector, collection ); 459 } 460 finally 461 { 462 iterator.close(); 463 } 464 } 465 466 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result ) 467 { 468 while( iterator.hasNext() ) 469 result.add( iterator.next().getTupleCopy() ); 470 471 return result; 472 } 473 474 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result ) 475 { 476 while( iterator.hasNext() ) 477 result.add( iterator.next().selectTupleCopy( selector ) ); 478 479 return result; 480 } 481 }