001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.Collection;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Set;
031    import java.util.regex.Pattern;
032    
033    import cascading.flow.Flow;
034    import cascading.flow.FlowProcess;
035    import cascading.operation.Aggregator;
036    import cascading.operation.Buffer;
037    import cascading.operation.ConcreteCall;
038    import cascading.operation.Filter;
039    import cascading.operation.Function;
040    import cascading.tap.Tap;
041    import cascading.tuple.Fields;
042    import cascading.tuple.Tuple;
043    import cascading.tuple.TupleEntry;
044    import cascading.tuple.TupleEntryIterator;
045    import cascading.tuple.TupleListCollector;
046    import junit.framework.TestCase;
047    
048    /**
049     * Class CascadingTestCase is the base class for all Cascading tests.
050     * <p/>
051     * It included a few helpful utility methods for testing Cascading applications.
052     */
053    public class CascadingTestCase extends TestCase implements Serializable
054      {
055      public CascadingTestCase()
056        {
057        }
058    
059      public CascadingTestCase( String name )
060        {
061        super( name );
062        }
063    
064      public static void validateLength( Flow flow, int length ) throws IOException
065        {
066        validateLength( flow, length, -1 );
067        }
068    
069      public static void validateLength( Flow flow, int length, String name ) throws IOException
070        {
071        validateLength( flow, length, -1, null, name );
072        }
073    
074      public static void validateLength( Flow flow, int length, int size ) throws IOException
075        {
076        validateLength( flow, length, size, null, null );
077        }
078    
079      public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException
080        {
081        validateLength( flow, length, size, regex, null );
082        }
083    
084      public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException
085        {
086        validateLength( flow, length, -1, regex, name );
087        }
088    
089      public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException
090        {
091        TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name );
092        validateLength( iterator, length, size, regex );
093        }
094    
095      public static void validateLength( TupleEntryIterator iterator, int length )
096        {
097        validateLength( iterator, length, -1, null );
098        }
099    
100      public static void validateLength( TupleEntryIterator iterator, int length, int size )
101        {
102        validateLength( iterator, length, size, null );
103        }
104    
105      public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex )
106        {
107        validateLength( iterator, length, -1, regex );
108        }
109    
110      public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex )
111        {
112        int count = 0;
113    
114        while( iterator.hasNext() )
115          {
116          TupleEntry tupleEntry = iterator.next();
117    
118          if( size != -1 )
119            assertEquals( "wrong number of elements", size, tupleEntry.size() );
120    
121          if( regex != null )
122            assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() );
123    
124          count++;
125          }
126    
127        try
128          {
129          iterator.close();
130          }
131        catch( IOException exception )
132          {
133          throw new RuntimeException( exception );
134          }
135    
136        assertEquals( "wrong number of lines", length, count );
137        }
138    
139      public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields )
140        {
141        return invokeFunction( function, new TupleEntry( arguments ), resultFields );
142        }
143    
144      public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields )
145        {
146        ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
147        TupleListCollector collector = new TupleListCollector( resultFields, true );
148    
149        operationCall.setArguments( arguments );
150        operationCall.setOutputCollector( collector );
151    
152        function.prepare( FlowProcess.NULL, operationCall );
153        function.operate( FlowProcess.NULL, operationCall );
154        function.cleanup( FlowProcess.NULL, operationCall );
155    
156        return collector;
157        }
158    
159      public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields )
160        {
161        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
162    
163        return invokeFunction( function, entries, resultFields );
164        }
165    
166      public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields )
167        {
168        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
169        TupleListCollector collector = new TupleListCollector( resultFields, true );
170    
171        function.prepare( FlowProcess.NULL, operationCall );
172        operationCall.setOutputCollector( collector );
173    
174        for( TupleEntry arguments : argumentsArray )
175          {
176          operationCall.setArguments( arguments );
177          function.operate( FlowProcess.NULL, operationCall );
178          }
179    
180        function.cleanup( FlowProcess.NULL, operationCall );
181    
182        return collector;
183        }
184    
185      public static boolean invokeFilter( Filter filter, Tuple arguments )
186        {
187        return invokeFilter( filter, new TupleEntry( arguments ) );
188        }
189    
190      public static boolean invokeFilter( Filter filter, TupleEntry arguments )
191        {
192        ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
193    
194        operationCall.setArguments( arguments );
195    
196        filter.prepare( FlowProcess.NULL, operationCall );
197    
198        boolean isRemove = filter.isRemove( FlowProcess.NULL, operationCall );
199    
200        filter.cleanup( FlowProcess.NULL, operationCall );
201    
202        return isRemove;
203        }
204    
205      public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray )
206        {
207        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
208    
209        return invokeFilter( filter, entries );
210        }
211    
212      public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray )
213        {
214        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
215    
216        filter.prepare( FlowProcess.NULL, operationCall );
217    
218        boolean[] results = new boolean[ argumentsArray.length ];
219    
220        for( int i = 0; i < argumentsArray.length; i++ )
221          {
222          operationCall.setArguments( argumentsArray[ i ] );
223    
224          results[ i ] = filter.isRemove( FlowProcess.NULL, operationCall );
225          }
226    
227        filter.cleanup( FlowProcess.NULL, operationCall );
228    
229        return results;
230        }
231    
232      public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields )
233        {
234        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
235    
236        return invokeAggregator( aggregator, entries, resultFields );
237        }
238    
239      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields )
240        {
241        return invokeAggregator( aggregator, null, argumentsArray, resultFields );
242        }
243    
244      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
245        {
246        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
247    
248        operationCall.setGroup( group );
249    
250        aggregator.prepare( FlowProcess.NULL, operationCall );
251    
252        aggregator.start( FlowProcess.NULL, operationCall );
253    
254        for( TupleEntry arguments : argumentsArray )
255          {
256          operationCall.setArguments( arguments );
257          aggregator.aggregate( FlowProcess.NULL, operationCall );
258          }
259    
260        TupleListCollector collector = new TupleListCollector( resultFields, true );
261        operationCall.setOutputCollector( collector );
262    
263        aggregator.complete( FlowProcess.NULL, operationCall );
264    
265        aggregator.cleanup( null, operationCall );
266    
267        return collector;
268        }
269    
270      public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields )
271        {
272        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
273    
274        return invokeBuffer( buffer, entries, resultFields );
275        }
276    
277      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields )
278        {
279        return invokeBuffer( buffer, null, argumentsArray, resultFields );
280        }
281    
282      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
283        {
284        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
285    
286        operationCall.setGroup( group );
287    
288        buffer.prepare( FlowProcess.NULL, operationCall );
289        TupleListCollector collector = new TupleListCollector( resultFields, true );
290        operationCall.setOutputCollector( collector );
291    
292        operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() );
293    
294        buffer.operate( FlowProcess.NULL, operationCall );
295    
296        buffer.cleanup( null, operationCall );
297    
298        return collector;
299        }
300    
301      private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray )
302        {
303        TupleEntry[] entries = new TupleEntry[ argumentsArray.length ];
304    
305        for( int i = 0; i < argumentsArray.length; i++ )
306          entries[ i ] = new TupleEntry( argumentsArray[ i ] );
307    
308        return entries;
309        }
310    
311      public static List<Tuple> getSourceAsList( Flow flow ) throws IOException
312        {
313        return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() );
314        }
315    
316      public static List<Tuple> getSinkAsList( Flow flow ) throws IOException
317        {
318        return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() );
319        }
320    
321      public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException
322        {
323        return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() );
324        }
325    
326      public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException
327        {
328        return asCollection( flow, tap, selector, new ArrayList<Tuple>() );
329        }
330    
331      public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException
332        {
333        return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() );
334        }
335    
336      public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException
337        {
338        return asCollection( flow, tap, selector, new HashSet<Tuple>() );
339        }
340    
341      public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException
342        {
343        return asCollection( flow, tap, Fields.ALL, collection );
344        }
345    
346      public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException
347        {
348        TupleEntryIterator iterator = flow.openTapForRead( tap );
349    
350        try
351          {
352          return asCollection( iterator, selector, collection );
353          }
354        finally
355          {
356          iterator.close();
357          }
358        }
359    
360      public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result )
361        {
362        while( iterator.hasNext() )
363          result.add( iterator.next().getTupleCopy() );
364    
365        return result;
366        }
367    
368      public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result )
369        {
370        while( iterator.hasNext() )
371          result.add( iterator.next().selectTupleCopy( selector ) );
372    
373        return result;
374        }
375      }