001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.Collection;
028    import java.util.Collections;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.Set;
034    import java.util.regex.Pattern;
035    
036    import cascading.flow.Flow;
037    import cascading.flow.FlowProcess;
038    import cascading.operation.Aggregator;
039    import cascading.operation.Buffer;
040    import cascading.operation.ConcreteCall;
041    import cascading.operation.Filter;
042    import cascading.operation.Function;
043    import cascading.tap.Tap;
044    import cascading.tuple.Fields;
045    import cascading.tuple.Tuple;
046    import cascading.tuple.TupleEntry;
047    import cascading.tuple.TupleEntryIterator;
048    import cascading.tuple.TupleListCollector;
049    import junit.framework.TestCase;
050    
051    /**
052     * Class CascadingTestCase is the base class for all Cascading tests.
053     * <p/>
054     * It included a few helpful utility methods for testing Cascading applications.
055     */
056    public class CascadingTestCase extends TestCase implements Serializable
057      {
058      static class TestFlowProcess extends FlowProcess.NullFlowProcess
059        {
060        private final Map<Object, Object> properties;
061    
062        public TestFlowProcess( Map<Object, Object> properties )
063          {
064          this.properties = properties;
065          }
066    
067        @Override
068        public Object getProperty( String key )
069          {
070          return properties.get( key );
071          }
072        }
073    
074      public CascadingTestCase()
075        {
076        }
077    
078      public CascadingTestCase( String name )
079        {
080        super( name );
081        }
082    
083      public static void validateLength( Flow flow, int length ) throws IOException
084        {
085        validateLength( flow, length, -1 );
086        }
087    
088      public static void validateLength( Flow flow, int length, String name ) throws IOException
089        {
090        validateLength( flow, length, -1, null, name );
091        }
092    
093      public static void validateLength( Flow flow, int length, int size ) throws IOException
094        {
095        validateLength( flow, length, size, null, null );
096        }
097    
098      public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException
099        {
100        validateLength( flow, length, size, regex, null );
101        }
102    
103      public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException
104        {
105        validateLength( flow, length, -1, regex, name );
106        }
107    
108      public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException
109        {
110        TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name );
111        validateLength( iterator, length, size, regex );
112        }
113    
114      public static void validateLength( TupleEntryIterator iterator, int length )
115        {
116        validateLength( iterator, length, -1, null );
117        }
118    
119      public static void validateLength( TupleEntryIterator iterator, int length, int size )
120        {
121        validateLength( iterator, length, size, null );
122        }
123    
124      public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex )
125        {
126        validateLength( iterator, length, -1, regex );
127        }
128    
129      public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex )
130        {
131        int count = 0;
132    
133        while( iterator.hasNext() )
134          {
135          TupleEntry tupleEntry = iterator.next();
136    
137          if( size != -1 )
138            assertEquals( "wrong number of elements", size, tupleEntry.size() );
139    
140          if( regex != null )
141            assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() );
142    
143          count++;
144          }
145    
146        try
147          {
148          iterator.close();
149          }
150        catch( IOException exception )
151          {
152          throw new RuntimeException( exception );
153          }
154    
155        assertEquals( "wrong number of lines", length, count );
156        }
157    
158      public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields )
159        {
160        return invokeFunction( function, new TupleEntry( arguments ), resultFields );
161        }
162    
163      public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields, Map<Object, Object> properties )
164        {
165        return invokeFunction( function, new TupleEntry( arguments ), resultFields, properties );
166        }
167    
168      public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields )
169        {
170        return invokeFunction( function, arguments, resultFields, new HashMap<Object, Object>() );
171        }
172    
173      public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields, Map<Object, Object> properties )
174        {
175        FlowProcess flowProcess = new TestFlowProcess( properties );
176        ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
177        TupleListCollector collector = new TupleListCollector( resultFields, true );
178    
179        operationCall.setArguments( arguments );
180        operationCall.setOutputCollector( collector );
181    
182        function.prepare( flowProcess, operationCall );
183        function.operate( flowProcess, operationCall );
184        function.cleanup( flowProcess, operationCall );
185    
186        return collector;
187        }
188    
189      public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields )
190        {
191        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
192    
193        return invokeFunction( function, entries, resultFields );
194        }
195    
196      public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
197        {
198        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
199    
200        return invokeFunction( function, entries, resultFields, properties );
201        }
202    
203      public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields )
204        {
205        return invokeFunction( function, argumentsArray, resultFields, new HashMap<Object, Object>() );
206        }
207    
208      public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
209        {
210        FlowProcess flowProcess = new TestFlowProcess( properties );
211        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
212        TupleListCollector collector = new TupleListCollector( resultFields, true );
213    
214        function.prepare( flowProcess, operationCall );
215        operationCall.setOutputCollector( collector );
216    
217        for( TupleEntry arguments : argumentsArray )
218          {
219          operationCall.setArguments( arguments );
220          function.operate( flowProcess, operationCall );
221          }
222    
223        function.flush( flowProcess, operationCall );
224        function.cleanup( flowProcess, operationCall );
225    
226        return collector;
227        }
228    
229      public static boolean invokeFilter( Filter filter, Tuple arguments )
230        {
231        return invokeFilter( filter, new TupleEntry( arguments ) );
232        }
233    
234      public static boolean invokeFilter( Filter filter, Tuple arguments, Map<Object, Object> properties )
235        {
236        return invokeFilter( filter, new TupleEntry( arguments ), properties );
237        }
238    
239      public static boolean invokeFilter( Filter filter, TupleEntry arguments )
240        {
241        return invokeFilter( filter, arguments, new HashMap<Object, Object>() );
242        }
243    
244      public static boolean invokeFilter( Filter filter, TupleEntry arguments, Map<Object, Object> properties )
245        {
246        FlowProcess flowProcess = new TestFlowProcess( properties );
247        ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
248    
249        operationCall.setArguments( arguments );
250    
251        filter.prepare( flowProcess, operationCall );
252    
253        boolean isRemove = filter.isRemove( flowProcess, operationCall );
254    
255        filter.cleanup( flowProcess, operationCall );
256    
257        return isRemove;
258        }
259    
260      public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray )
261        {
262        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
263    
264        return invokeFilter( filter, entries, Collections.emptyMap() );
265        }
266    
267      public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray, Map<Object, Object> properties )
268        {
269        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
270    
271        return invokeFilter( filter, entries, properties );
272        }
273    
274      public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray )
275        {
276        return invokeFilter( filter, argumentsArray, Collections.emptyMap() );
277        }
278    
279      public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray, Map<Object, Object> properties )
280        {
281        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
282    
283        FlowProcess flowProcess = new TestFlowProcess( properties );
284    
285        filter.prepare( flowProcess, operationCall );
286    
287        boolean[] results = new boolean[ argumentsArray.length ];
288    
289        for( int i = 0; i < argumentsArray.length; i++ )
290          {
291          operationCall.setArguments( argumentsArray[ i ] );
292    
293          results[ i ] = filter.isRemove( flowProcess, operationCall );
294          }
295    
296        filter.flush( flowProcess, operationCall );
297        filter.cleanup( flowProcess, operationCall );
298    
299        return results;
300        }
301    
302      public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields )
303        {
304        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
305    
306        return invokeAggregator( aggregator, entries, resultFields );
307        }
308    
309      public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
310        {
311        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
312    
313        return invokeAggregator( aggregator, entries, resultFields, properties );
314        }
315    
316      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields )
317        {
318        return invokeAggregator( aggregator, null, argumentsArray, resultFields );
319        }
320    
321      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
322        {
323        return invokeAggregator( aggregator, null, argumentsArray, resultFields, properties );
324        }
325    
326      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
327        {
328        return invokeAggregator( aggregator, group, argumentsArray, resultFields, Collections.emptyMap() );
329        }
330    
331      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
332        {
333        FlowProcess flowProcess = new TestFlowProcess( properties );
334        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
335    
336        operationCall.setGroup( group );
337    
338        aggregator.prepare( flowProcess, operationCall );
339    
340        aggregator.start( flowProcess, operationCall );
341    
342        for( TupleEntry arguments : argumentsArray )
343          {
344          operationCall.setArguments( arguments );
345          aggregator.aggregate( flowProcess, operationCall );
346          }
347    
348        TupleListCollector collector = new TupleListCollector( resultFields, true );
349        operationCall.setOutputCollector( collector );
350    
351        aggregator.complete( flowProcess, operationCall );
352    
353        aggregator.cleanup( null, operationCall );
354    
355        return collector;
356        }
357    
358      public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields )
359        {
360        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
361    
362        return invokeBuffer( buffer, entries, resultFields );
363        }
364    
365      public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
366        {
367        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
368    
369        return invokeBuffer( buffer, entries, resultFields, properties );
370        }
371    
372      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields )
373        {
374        return invokeBuffer( buffer, null, argumentsArray, resultFields );
375        }
376    
377      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
378        {
379        return invokeBuffer( buffer, null, argumentsArray, resultFields, properties );
380        }
381    
382      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
383        {
384        return invokeBuffer( buffer, group, argumentsArray, resultFields, Collections.emptyMap() );
385        }
386    
387      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
388        {
389        FlowProcess flowProcess = new TestFlowProcess( properties );
390        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
391    
392        operationCall.setGroup( group );
393    
394        buffer.prepare( flowProcess, operationCall );
395        TupleListCollector collector = new TupleListCollector( resultFields, true );
396        operationCall.setOutputCollector( collector );
397    
398        operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() );
399    
400        buffer.operate( flowProcess, operationCall );
401    
402        buffer.cleanup( null, operationCall );
403    
404        return collector;
405        }
406    
407      private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray )
408        {
409        TupleEntry[] entries = new TupleEntry[ argumentsArray.length ];
410    
411        for( int i = 0; i < argumentsArray.length; i++ )
412          entries[ i ] = new TupleEntry( argumentsArray[ i ] );
413    
414        return entries;
415        }
416    
417      public static List<Tuple> getSourceAsList( Flow flow ) throws IOException
418        {
419        return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() );
420        }
421    
422      public static List<Tuple> getSinkAsList( Flow flow ) throws IOException
423        {
424        return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() );
425        }
426    
427      public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException
428        {
429        return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() );
430        }
431    
432      public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException
433        {
434        return asCollection( flow, tap, selector, new ArrayList<Tuple>() );
435        }
436    
437      public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException
438        {
439        return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() );
440        }
441    
442      public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException
443        {
444        return asCollection( flow, tap, selector, new HashSet<Tuple>() );
445        }
446    
447      public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException
448        {
449        return asCollection( flow, tap, Fields.ALL, collection );
450        }
451    
452      public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException
453        {
454        TupleEntryIterator iterator = flow.openTapForRead( tap );
455    
456        try
457          {
458          return asCollection( iterator, selector, collection );
459          }
460        finally
461          {
462          iterator.close();
463          }
464        }
465    
466      public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result )
467        {
468        while( iterator.hasNext() )
469          result.add( iterator.next().getTupleCopy() );
470    
471        return result;
472        }
473    
474      public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result )
475        {
476        while( iterator.hasNext() )
477          result.add( iterator.next().selectTupleCopy( selector ) );
478    
479        return result;
480        }
481      }