001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading;
023
024import java.io.File;
025import java.io.IOException;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.regex.Pattern;
037
038import cascading.flow.Flow;
039import cascading.flow.FlowProcess;
040import cascading.flow.planner.FlowPlanner;
041import cascading.flow.stream.graph.StreamGraph;
042import cascading.operation.Aggregator;
043import cascading.operation.Buffer;
044import cascading.operation.ConcreteCall;
045import cascading.operation.Filter;
046import cascading.operation.Function;
047import cascading.tap.Tap;
048import cascading.tuple.Fields;
049import cascading.tuple.Tuple;
050import cascading.tuple.TupleEntry;
051import cascading.tuple.TupleEntryIterator;
052import cascading.tuple.TupleListCollector;
053import cascading.util.Util;
054import junit.framework.TestCase;
055import org.junit.After;
056import org.junit.Before;
057import org.junit.Rule;
058import org.junit.rules.TestName;
059import org.junit.runner.RunWith;
060import org.junit.runners.BlockJUnit4ClassRunner;
061
062/**
063 * Class CascadingTestCase is the base class for all Cascading tests.
064 * <p>
065 * It included a few helpful utility methods for testing Cascading applications.
066 */
067@RunWith(BlockJUnit4ClassRunner.class)
068public abstract class CascadingTestCase extends TestCase implements Serializable
069  {
070  public static final String ROOT_OUTPUT_PATH = "test.output.root";
071  public static final String ROOT_PLAN_PATH = "test.plan.root";
072  public static final String TEST_TRACEPLAN_ENABLED = "test.traceplan.enabled";
073
074  private String outputPath;
075  private String planPath;
076
077  @Rule
078  public transient TestName name = new TestName();
079
080  static class TestFlowProcess extends FlowProcess.NullFlowProcess
081    {
082    private final Map<Object, Object> properties;
083
084    public TestFlowProcess( Map<Object, Object> properties )
085      {
086      this.properties = properties;
087      }
088
089    @Override
090    public Object getProperty( String key )
091      {
092      return properties.get( key );
093      }
094    }
095
096  public CascadingTestCase()
097    {
098    }
099
100  public CascadingTestCase( String name )
101    {
102    super( name );
103    }
104
105  @Override
106  @Before
107  public void setUp() throws Exception
108    {
109    super.setUp();
110
111    if( Boolean.getBoolean( TEST_TRACEPLAN_ENABLED ) )
112      {
113      System.setProperty( FlowPlanner.TRACE_PLAN_PATH, Util.join( "/", getPlanPath(), "planner" ) );
114      System.setProperty( FlowPlanner.TRACE_PLAN_TRANSFORM_PATH, Util.join( "/", getPlanPath(), "planner" ) );
115      System.setProperty( FlowPlanner.TRACE_STATS_PATH, Util.join( "/", getPlanPath(), "planner" ) );
116      System.setProperty( "platform." + StreamGraph.DOT_FILE_PATH, Util.join( "/", getPlanPath(), "stream" ) ); // pass down
117      }
118    }
119
120  @Override
121  @After
122  public void tearDown() throws Exception
123    {
124    super.tearDown();
125    }
126
127  protected static String getTestOutputRoot()
128    {
129    return System.getProperty( ROOT_OUTPUT_PATH, "build/test/output" ).replace( ":", "_" );
130    }
131
132  protected static String getTestPlanRoot()
133    {
134    return System.getProperty( ROOT_PLAN_PATH, "build/test/plan" ).replace( ":", "_" );
135    }
136
137  protected String[] getOutputPathElements()
138    {
139    return new String[]{getTestOutputRoot(), getTestCaseName(), getTestName()};
140    }
141
142  protected String[] getPlanPathElements()
143    {
144    return new String[]{getTestPlanRoot(), getTestCaseName(), getTestName()};
145    }
146
147  protected String getOutputPath()
148    {
149    if( outputPath == null )
150      outputPath = Util.join( getOutputPathElements(), File.separator );
151
152    return outputPath;
153    }
154
155  protected String getPlanPath()
156    {
157    if( planPath == null )
158      planPath = Util.join( getPlanPathElements(), File.separator );
159
160    return planPath;
161    }
162
163  public String getTestCaseName()
164    {
165    return getClass().getSimpleName().replaceAll( "^(.*)Test.*$", "$1" ).toLowerCase();
166    }
167
168  public String getTestName()
169    {
170    return name.getMethodName();
171    }
172
173  public static void validateLength( Flow flow, int numTuples ) throws IOException
174    {
175    validateLength( flow, numTuples, -1 );
176    }
177
178  public static void validateLength( Flow flow, int numTuples, String name ) throws IOException
179    {
180    validateLength( flow, numTuples, -1, null, name );
181    }
182
183  public static void validateLength( Flow flow, int numTuples, int tupleSize ) throws IOException
184    {
185    validateLength( flow, numTuples, tupleSize, null, null );
186    }
187
188  public static void validateLength( Flow flow, int numTuples, int tupleSize, Pattern regex ) throws IOException
189    {
190    validateLength( flow, numTuples, tupleSize, regex, null );
191    }
192
193  public static void validateLength( Flow flow, int numTuples, Pattern regex, String name ) throws IOException
194    {
195    validateLength( flow, numTuples, -1, regex, name );
196    }
197
198  public static void validateLength( Flow flow, int numTuples, int tupleSize, Pattern regex, String name ) throws IOException
199    {
200    TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name );
201    validateLength( iterator, numTuples, tupleSize, regex );
202    }
203
204  public static void validateLength( TupleEntryIterator iterator, int numTuples )
205    {
206    validateLength( iterator, numTuples, -1, null );
207    }
208
209  public static void validateLength( TupleEntryIterator iterator, int numTuples, int tupleSize )
210    {
211    validateLength( iterator, numTuples, tupleSize, null );
212    }
213
214  public static void validateLength( TupleEntryIterator iterator, int numTuples, Pattern regex )
215    {
216    validateLength( iterator, numTuples, -1, regex );
217    }
218
219  public static void validateLength( TupleEntryIterator iterator, int numTuples, int tupleSize, Pattern regex )
220    {
221    int count = 0;
222
223    while( iterator.hasNext() )
224      {
225      TupleEntry tupleEntry = iterator.next();
226
227      if( tupleSize != -1 )
228        assertEquals( "wrong number of elements", tupleSize, tupleEntry.size() );
229
230      if( regex != null )
231        assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() );
232
233      count++;
234      }
235
236    try
237      {
238      iterator.close();
239      }
240    catch( IOException exception )
241      {
242      throw new RuntimeException( exception );
243      }
244
245    assertEquals( "wrong number of lines", numTuples, count );
246    }
247
248  public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields )
249    {
250    return invokeFunction( function, new TupleEntry( arguments ), resultFields );
251    }
252
253  public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields, Map<Object, Object> properties )
254    {
255    return invokeFunction( function, new TupleEntry( arguments ), resultFields, properties );
256    }
257
258  public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields )
259    {
260    return invokeFunction( function, arguments, resultFields, new HashMap<Object, Object>() );
261    }
262
263  public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields, Map<Object, Object> properties )
264    {
265    FlowProcess flowProcess = new TestFlowProcess( properties );
266    ConcreteCall operationCall = new ConcreteCall( arguments.getFields(), function.getFieldDeclaration() );
267    TupleListCollector collector = new TupleListCollector( resultFields, true );
268
269    operationCall.setArguments( arguments );
270    operationCall.setOutputCollector( collector );
271
272    function.prepare( flowProcess, operationCall );
273    function.operate( flowProcess, operationCall );
274    function.cleanup( flowProcess, operationCall );
275
276    return collector;
277    }
278
279  public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields )
280    {
281    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
282
283    return invokeFunction( function, entries, resultFields );
284    }
285
286  public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
287    {
288    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
289
290    return invokeFunction( function, entries, resultFields, properties );
291    }
292
293  public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields )
294    {
295    return invokeFunction( function, argumentsArray, resultFields, new HashMap<Object, Object>() );
296    }
297
298  public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
299    {
300    FlowProcess flowProcess = new TestFlowProcess( properties );
301    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields(), function.getFieldDeclaration() );
302    TupleListCollector collector = new TupleListCollector( resultFields, true );
303
304    function.prepare( flowProcess, operationCall );
305    operationCall.setOutputCollector( collector );
306
307    for( TupleEntry arguments : argumentsArray )
308      {
309      operationCall.setArguments( arguments );
310      function.operate( flowProcess, operationCall );
311      }
312
313    function.flush( flowProcess, operationCall );
314    function.cleanup( flowProcess, operationCall );
315
316    return collector;
317    }
318
319  public static boolean invokeFilter( Filter filter, Tuple arguments )
320    {
321    return invokeFilter( filter, new TupleEntry( arguments ) );
322    }
323
324  public static boolean invokeFilter( Filter filter, Tuple arguments, Map<Object, Object> properties )
325    {
326    return invokeFilter( filter, new TupleEntry( arguments ), properties );
327    }
328
329  public static boolean invokeFilter( Filter filter, TupleEntry arguments )
330    {
331    return invokeFilter( filter, arguments, new HashMap<Object, Object>() );
332    }
333
334  public static boolean invokeFilter( Filter filter, TupleEntry arguments, Map<Object, Object> properties )
335    {
336    FlowProcess flowProcess = new TestFlowProcess( properties );
337    ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
338
339    operationCall.setArguments( arguments );
340
341    filter.prepare( flowProcess, operationCall );
342
343    boolean isRemove = filter.isRemove( flowProcess, operationCall );
344
345    filter.cleanup( flowProcess, operationCall );
346
347    return isRemove;
348    }
349
350  public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray )
351    {
352    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
353
354    return invokeFilter( filter, entries, Collections.emptyMap() );
355    }
356
357  public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray, Map<Object, Object> properties )
358    {
359    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
360
361    return invokeFilter( filter, entries, properties );
362    }
363
364  public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray )
365    {
366    return invokeFilter( filter, argumentsArray, Collections.emptyMap() );
367    }
368
369  public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray, Map<Object, Object> properties )
370    {
371    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
372
373    FlowProcess flowProcess = new TestFlowProcess( properties );
374
375    filter.prepare( flowProcess, operationCall );
376
377    boolean[] results = new boolean[ argumentsArray.length ];
378
379    for( int i = 0; i < argumentsArray.length; i++ )
380      {
381      operationCall.setArguments( argumentsArray[ i ] );
382
383      results[ i ] = filter.isRemove( flowProcess, operationCall );
384      }
385
386    filter.flush( flowProcess, operationCall );
387    filter.cleanup( flowProcess, operationCall );
388
389    return results;
390    }
391
392  public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields )
393    {
394    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
395
396    return invokeAggregator( aggregator, entries, resultFields );
397    }
398
399  public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
400    {
401    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
402
403    return invokeAggregator( aggregator, entries, resultFields, properties );
404    }
405
406  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields )
407    {
408    return invokeAggregator( aggregator, null, argumentsArray, resultFields );
409    }
410
411  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
412    {
413    return invokeAggregator( aggregator, null, argumentsArray, resultFields, properties );
414    }
415
416  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
417    {
418    return invokeAggregator( aggregator, group, argumentsArray, resultFields, Collections.emptyMap() );
419    }
420
421  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
422    {
423    FlowProcess flowProcess = new TestFlowProcess( properties );
424    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields(), aggregator.getFieldDeclaration() );
425
426    operationCall.setGroup( group );
427
428    aggregator.prepare( flowProcess, operationCall );
429
430    aggregator.start( flowProcess, operationCall );
431
432    for( TupleEntry arguments : argumentsArray )
433      {
434      operationCall.setArguments( arguments );
435      aggregator.aggregate( flowProcess, operationCall );
436      }
437
438    TupleListCollector collector = new TupleListCollector( resultFields, true );
439    operationCall.setOutputCollector( collector );
440
441    aggregator.complete( flowProcess, operationCall );
442
443    aggregator.cleanup( null, operationCall );
444
445    return collector;
446    }
447
448  public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields )
449    {
450    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
451
452    return invokeBuffer( buffer, entries, resultFields );
453    }
454
455  public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
456    {
457    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
458
459    return invokeBuffer( buffer, entries, resultFields, properties );
460    }
461
462  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields )
463    {
464    return invokeBuffer( buffer, null, argumentsArray, resultFields );
465    }
466
467  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
468    {
469    return invokeBuffer( buffer, null, argumentsArray, resultFields, properties );
470    }
471
472  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
473    {
474    return invokeBuffer( buffer, group, argumentsArray, resultFields, Collections.emptyMap() );
475    }
476
477  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
478    {
479    FlowProcess flowProcess = new TestFlowProcess( properties );
480    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields(), buffer.getFieldDeclaration() );
481
482    operationCall.setGroup( group );
483
484    buffer.prepare( flowProcess, operationCall );
485    TupleListCollector collector = new TupleListCollector( resultFields, true );
486    operationCall.setOutputCollector( collector );
487
488    operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() );
489
490    buffer.operate( flowProcess, operationCall );
491
492    buffer.cleanup( null, operationCall );
493
494    return collector;
495    }
496
497  private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray )
498    {
499    TupleEntry[] entries = new TupleEntry[ argumentsArray.length ];
500
501    for( int i = 0; i < argumentsArray.length; i++ )
502      entries[ i ] = new TupleEntry( argumentsArray[ i ] );
503
504    return entries;
505    }
506
507  public static List<Tuple> getSourceAsList( Flow flow ) throws IOException
508    {
509    return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() );
510    }
511
512  public static List<Tuple> getSinkAsList( Flow flow ) throws IOException
513    {
514    return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() );
515    }
516
517  public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException
518    {
519    return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() );
520    }
521
522  public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException
523    {
524    return asCollection( flow, tap, selector, new ArrayList<Tuple>() );
525    }
526
527  public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException
528    {
529    return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() );
530    }
531
532  public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException
533    {
534    return asCollection( flow, tap, selector, new HashSet<Tuple>() );
535    }
536
537  public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException
538    {
539    return asCollection( flow, tap, Fields.ALL, collection );
540    }
541
542  public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException
543    {
544    try( TupleEntryIterator iterator = flow.openTapForRead( tap ) )
545      {
546      return asCollection( iterator, selector, collection );
547      }
548    }
549
550  public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result )
551    {
552    while( iterator.hasNext() )
553      result.add( iterator.next().getTupleCopy() );
554
555    return result;
556    }
557
558  public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result )
559    {
560    while( iterator.hasNext() )
561      result.add( iterator.next().selectTupleCopy( selector ) );
562
563    return result;
564    }
565  }