001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Arrays; 027 import java.util.Collection; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Set; 031 import java.util.regex.Pattern; 032 033 import cascading.flow.Flow; 034 import cascading.flow.FlowProcess; 035 import cascading.operation.Aggregator; 036 import cascading.operation.Buffer; 037 import cascading.operation.ConcreteCall; 038 import cascading.operation.Filter; 039 import cascading.operation.Function; 040 import cascading.tap.Tap; 041 import cascading.tuple.Fields; 042 import cascading.tuple.Tuple; 043 import cascading.tuple.TupleEntry; 044 import cascading.tuple.TupleEntryIterator; 045 import cascading.tuple.TupleListCollector; 046 import junit.framework.TestCase; 047 048 /** 049 * Class CascadingTestCase is the base class for all Cascading tests. 050 * <p/> 051 * It included a few helpful utility methods for testing Cascading applications. 052 */ 053 public class CascadingTestCase extends TestCase implements Serializable 054 { 055 public CascadingTestCase() 056 { 057 } 058 059 public CascadingTestCase( String name ) 060 { 061 super( name ); 062 } 063 064 public static void validateLength( Flow flow, int length ) throws IOException 065 { 066 validateLength( flow, length, -1 ); 067 } 068 069 public static void validateLength( Flow flow, int length, String name ) throws IOException 070 { 071 validateLength( flow, length, -1, null, name ); 072 } 073 074 public static void validateLength( Flow flow, int length, int size ) throws IOException 075 { 076 validateLength( flow, length, size, null, null ); 077 } 078 079 public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException 080 { 081 validateLength( flow, length, size, regex, null ); 082 } 083 084 public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException 085 { 086 validateLength( flow, length, -1, regex, name ); 087 } 088 089 public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException 090 { 091 TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name ); 092 validateLength( iterator, length, size, regex ); 093 } 094 095 public static void validateLength( TupleEntryIterator iterator, int length ) 096 { 097 validateLength( iterator, length, -1, null ); 098 } 099 100 public static void validateLength( TupleEntryIterator iterator, int length, int size ) 101 { 102 validateLength( iterator, length, size, null ); 103 } 104 105 public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex ) 106 { 107 validateLength( iterator, length, -1, regex ); 108 } 109 110 public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex ) 111 { 112 int count = 0; 113 114 while( iterator.hasNext() ) 115 { 116 TupleEntry tupleEntry = iterator.next(); 117 118 if( size != -1 ) 119 assertEquals( "wrong number of elements", size, tupleEntry.size() ); 120 121 if( regex != null ) 122 assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() ); 123 124 count++; 125 } 126 127 try 128 { 129 iterator.close(); 130 } 131 catch( IOException exception ) 132 { 133 throw new RuntimeException( exception ); 134 } 135 136 assertEquals( "wrong number of lines", length, count ); 137 } 138 139 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields ) 140 { 141 return invokeFunction( function, new TupleEntry( arguments ), resultFields ); 142 } 143 144 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields ) 145 { 146 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 147 TupleListCollector collector = new TupleListCollector( resultFields, true ); 148 149 operationCall.setArguments( arguments ); 150 operationCall.setOutputCollector( collector ); 151 152 function.prepare( FlowProcess.NULL, operationCall ); 153 function.operate( FlowProcess.NULL, operationCall ); 154 function.cleanup( FlowProcess.NULL, operationCall ); 155 156 return collector; 157 } 158 159 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields ) 160 { 161 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 162 163 return invokeFunction( function, entries, resultFields ); 164 } 165 166 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields ) 167 { 168 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 169 TupleListCollector collector = new TupleListCollector( resultFields, true ); 170 171 function.prepare( FlowProcess.NULL, operationCall ); 172 operationCall.setOutputCollector( collector ); 173 174 for( TupleEntry arguments : argumentsArray ) 175 { 176 operationCall.setArguments( arguments ); 177 function.operate( FlowProcess.NULL, operationCall ); 178 } 179 180 function.cleanup( FlowProcess.NULL, operationCall ); 181 182 return collector; 183 } 184 185 public static boolean invokeFilter( Filter filter, Tuple arguments ) 186 { 187 return invokeFilter( filter, new TupleEntry( arguments ) ); 188 } 189 190 public static boolean invokeFilter( Filter filter, TupleEntry arguments ) 191 { 192 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 193 194 operationCall.setArguments( arguments ); 195 196 filter.prepare( FlowProcess.NULL, operationCall ); 197 198 boolean isRemove = filter.isRemove( FlowProcess.NULL, operationCall ); 199 200 filter.cleanup( FlowProcess.NULL, operationCall ); 201 202 return isRemove; 203 } 204 205 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray ) 206 { 207 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 208 209 return invokeFilter( filter, entries ); 210 } 211 212 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray ) 213 { 214 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 215 216 filter.prepare( FlowProcess.NULL, operationCall ); 217 218 boolean[] results = new boolean[ argumentsArray.length ]; 219 220 for( int i = 0; i < argumentsArray.length; i++ ) 221 { 222 operationCall.setArguments( argumentsArray[ i ] ); 223 224 results[ i ] = filter.isRemove( FlowProcess.NULL, operationCall ); 225 } 226 227 filter.cleanup( FlowProcess.NULL, operationCall ); 228 229 return results; 230 } 231 232 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields ) 233 { 234 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 235 236 return invokeAggregator( aggregator, entries, resultFields ); 237 } 238 239 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields ) 240 { 241 return invokeAggregator( aggregator, null, argumentsArray, resultFields ); 242 } 243 244 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 245 { 246 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 247 248 operationCall.setGroup( group ); 249 250 aggregator.prepare( FlowProcess.NULL, operationCall ); 251 252 aggregator.start( FlowProcess.NULL, operationCall ); 253 254 for( TupleEntry arguments : argumentsArray ) 255 { 256 operationCall.setArguments( arguments ); 257 aggregator.aggregate( FlowProcess.NULL, operationCall ); 258 } 259 260 TupleListCollector collector = new TupleListCollector( resultFields, true ); 261 operationCall.setOutputCollector( collector ); 262 263 aggregator.complete( FlowProcess.NULL, operationCall ); 264 265 aggregator.cleanup( null, operationCall ); 266 267 return collector; 268 } 269 270 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields ) 271 { 272 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 273 274 return invokeBuffer( buffer, entries, resultFields ); 275 } 276 277 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields ) 278 { 279 return invokeBuffer( buffer, null, argumentsArray, resultFields ); 280 } 281 282 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 283 { 284 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 285 286 operationCall.setGroup( group ); 287 288 buffer.prepare( FlowProcess.NULL, operationCall ); 289 TupleListCollector collector = new TupleListCollector( resultFields, true ); 290 operationCall.setOutputCollector( collector ); 291 292 operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() ); 293 294 buffer.operate( FlowProcess.NULL, operationCall ); 295 296 buffer.cleanup( null, operationCall ); 297 298 return collector; 299 } 300 301 private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray ) 302 { 303 TupleEntry[] entries = new TupleEntry[ argumentsArray.length ]; 304 305 for( int i = 0; i < argumentsArray.length; i++ ) 306 entries[ i ] = new TupleEntry( argumentsArray[ i ] ); 307 308 return entries; 309 } 310 311 public static List<Tuple> getSourceAsList( Flow flow ) throws IOException 312 { 313 return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() ); 314 } 315 316 public static List<Tuple> getSinkAsList( Flow flow ) throws IOException 317 { 318 return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() ); 319 } 320 321 public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException 322 { 323 return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() ); 324 } 325 326 public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException 327 { 328 return asCollection( flow, tap, selector, new ArrayList<Tuple>() ); 329 } 330 331 public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException 332 { 333 return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() ); 334 } 335 336 public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException 337 { 338 return asCollection( flow, tap, selector, new HashSet<Tuple>() ); 339 } 340 341 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException 342 { 343 return asCollection( flow, tap, Fields.ALL, collection ); 344 } 345 346 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException 347 { 348 TupleEntryIterator iterator = flow.openTapForRead( tap ); 349 350 try 351 { 352 return asCollection( iterator, selector, collection ); 353 } 354 finally 355 { 356 iterator.close(); 357 } 358 } 359 360 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result ) 361 { 362 while( iterator.hasNext() ) 363 result.add( iterator.next().getTupleCopy() ); 364 365 return result; 366 } 367 368 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result ) 369 { 370 while( iterator.hasNext() ) 371 result.add( iterator.next().selectTupleCopy( selector ) ); 372 373 return result; 374 } 375 }