001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Comparator;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.regex.Pattern;
031
032import cascading.cascade.Cascades;
033import cascading.flow.Flow;
034import cascading.operation.Debug;
035import cascading.operation.Filter;
036import cascading.operation.Function;
037import cascading.operation.Identity;
038import cascading.operation.Insert;
039import cascading.operation.NoOp;
040import cascading.operation.aggregator.Count;
041import cascading.operation.aggregator.First;
042import cascading.operation.expression.ExpressionFunction;
043import cascading.operation.filter.And;
044import cascading.operation.function.UnGroup;
045import cascading.operation.regex.RegexFilter;
046import cascading.operation.regex.RegexParser;
047import cascading.operation.regex.RegexSplitter;
048import cascading.pipe.Each;
049import cascading.pipe.Every;
050import cascading.pipe.GroupBy;
051import cascading.pipe.Merge;
052import cascading.pipe.Pipe;
053import cascading.tap.MultiSourceTap;
054import cascading.tap.SinkMode;
055import cascading.tap.Tap;
056import cascading.tuple.Fields;
057import cascading.tuple.Hasher;
058import cascading.tuple.Tuple;
059import org.junit.Test;
060
061import static cascading.ComparePlatformsTest.NONDETERMINISTIC;
062import static data.InputData.*;
063
064public class FieldedPipesPlatformTest extends PlatformTestCase
065  {
066  public FieldedPipesPlatformTest()
067    {
068    super( true, 5, 3 ); // leave cluster testing enabled
069    }
070
071  @Test
072  public void testSimpleGroup() throws Exception
073    {
074    getPlatform().copyFromLocal( inputFileApache );
075
076    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
077
078    Pipe pipe = new Pipe( "test" );
079
080    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
081
082    pipe = new GroupBy( pipe, new Fields( "ip" ) );
083
084    pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
085
086    Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE );
087
088    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
089
090    flow.complete();
091
092    validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
093    validateLength( flow, 8, null );
094    }
095
096  @Test
097  public void testSimpleChain() throws Exception
098    {
099    getPlatform().copyFromLocal( inputFileApache );
100
101    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
102
103    Pipe pipe = new Pipe( "test" );
104
105    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
106
107    pipe = new GroupBy( pipe, new Fields( "ip" ) );
108
109    pipe = new Every( pipe, new Count( new Fields( "count1" ) ) );
110    pipe = new Every( pipe, new Count( new Fields( "count2" ) ) );
111    pipe = new Every( pipe, new Count( new Fields( "count3" ) ) );
112    pipe = new Every( pipe, new Count( new Fields( "count4" ) ) );
113
114    Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "simplechain" ), SinkMode.REPLACE );
115
116    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
117
118    flow.complete();
119
120    validateLength( flow, 8, 5 );
121    }
122
123  @Test
124  public void testChainEndingWithEach() throws Exception
125    {
126    getPlatform().copyFromLocal( inputFileApache );
127
128    Pipe pipe = new Pipe( "test" );
129
130    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
131
132    pipe = new GroupBy( pipe, new Fields( "ip" ) );
133
134    pipe = new Every( pipe, new Count( new Fields( "count1" ) ) );
135    pipe = new Every( pipe, new Count( new Fields( "count2" ) ) );
136
137    pipe = new Each( pipe, new Fields( "count1", "count2" ), new ExpressionFunction( new Fields( "sum" ), "count1 + count2", int.class ), Fields.ALL );
138
139    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
140    Tap sink = getPlatform().getTextFile( getOutputPath( "chaineach" ), SinkMode.REPLACE );
141
142    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
143
144    flow.complete();
145
146    validateLength( flow, 8, null );
147    }
148
149  // also tests the RegexSplitter
150
151  @Test
152  public void testNoGroup() throws Exception
153    {
154    getPlatform().copyFromLocal( inputFileApache );
155
156    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache );
157
158    Pipe pipe = new Pipe( "test" );
159
160    pipe = new Each( pipe, new RegexSplitter( "\\s+" ), new Fields( 1 ) );
161
162    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "nogroup" ), SinkMode.REPLACE );
163
164    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
165
166    flow.complete();
167
168    validateLength( flow, 10, null );
169
170    List<Tuple> results = getSinkAsList( flow );
171
172    assertTrue( results.contains( new Tuple( "75.185.76.245" ) ) );
173    }
174
175  @Test
176  public void testCopy() throws Exception
177    {
178    getPlatform().copyFromLocal( inputFileApache );
179
180    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache );
181
182    Pipe pipe = new Pipe( "test" );
183
184    Tap sink = getPlatform().getTextFile( getOutputPath( "copy" ), SinkMode.REPLACE );
185
186    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
187
188    flow.complete();
189
190    validateLength( flow, 10, null );
191    }
192
193  @Test
194  public void testSimpleMerge() throws Exception
195    {
196    getPlatform().copyFromLocal( inputFileLower );
197    getPlatform().copyFromLocal( inputFileUpper );
198
199    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
200    Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
201
202    Map sources = new HashMap();
203
204    sources.put( "lower", sourceLower );
205    sources.put( "upper", sourceUpper );
206
207    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
208
209    // using null pos so all fields are written
210    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "simplemerge" ), SinkMode.REPLACE );
211
212    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
213    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
214
215    Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
216
217    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
218
219    flow.complete();
220
221    validateLength( flow, 10 );
222
223    Collection results = getSinkAsList( flow );
224
225    assertTrue( "missing value", results.contains( new Tuple( "1\ta" ) ) );
226    assertTrue( "missing value", results.contains( new Tuple( "1\tA" ) ) );
227    assertTrue( "missing value", results.contains( new Tuple( "2\tb" ) ) );
228    assertTrue( "missing value", results.contains( new Tuple( "2\tB" ) ) );
229    assertTrue( "missing value", results.contains( new Tuple( "3\tc" ) ) );
230    assertTrue( "missing value", results.contains( new Tuple( "3\tC" ) ) );
231    }
232
233  /**
234   * Specifically tests GroupBy will return the correct grouping fields to the following Every
235   * <p/>
236   * additionally tests secondary sorting during merging
237   *
238   * @throws Exception
239   */
240  @Test
241  public void testSimpleMergeThree() throws Exception
242    {
243    getPlatform().copyFromLocal( inputFileLower );
244    getPlatform().copyFromLocal( inputFileUpper );
245    getPlatform().copyFromLocal( inputFileLowerOffset );
246
247    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
248    Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
249    Tap sourceLowerOffset = getPlatform().getTextFile( inputFileLowerOffset );
250
251    Map sources = new HashMap();
252
253    sources.put( "lower", sourceLower );
254    sources.put( "upper", sourceUpper );
255    sources.put( "offset", sourceLowerOffset );
256
257    Tap sink = getPlatform().getDelimitedFile( Fields.ALL, "\t", getOutputPath( "simplemergethree" ), SinkMode.REPLACE );
258
259    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
260
261    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
262    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
263    Pipe pipeOffset = new Each( new Pipe( "offset" ), new Fields( "line" ), splitter );
264
265    Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper, pipeOffset ), new Fields( "num" ), new Fields( "char" ) );
266
267    splice = new Every( splice, new Fields( "char" ), new First( new Fields( "first" ) ) );
268
269    splice = new Each( splice, new Fields( "num", "first" ), new Identity() );
270
271    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
272
273    flow.complete();
274
275    validateLength( flow, 6 );
276
277    List<Tuple> tuples = getSinkAsList( flow );
278
279    assertTrue( tuples.contains( new Tuple( "1", "A" ) ) );
280    assertTrue( tuples.contains( new Tuple( "2", "B" ) ) );
281    assertTrue( tuples.contains( new Tuple( "3", "C" ) ) );
282    assertTrue( tuples.contains( new Tuple( "4", "D" ) ) );
283    assertTrue( tuples.contains( new Tuple( "5", "E" ) ) );
284    assertTrue( tuples.contains( new Tuple( "6", "c" ) ) );
285    }
286
287  /**
288   * same test as MergePipesTest, but to test that chained groupby don't exhibit similar failures
289   *
290   * @throws Exception
291   */
292  @Test
293  public void testSameSourceMergeThreeChainGroup() throws Exception
294    {
295    getPlatform().copyFromLocal( inputFileLower );
296
297    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
298
299    Map sources = new HashMap();
300
301    sources.put( "split", sourceLower );
302
303    Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE );
304
305    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
306
307    Pipe pipe = new Pipe( "split" );
308
309    Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter );
310    Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter );
311    Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter );
312
313    //put group before merge to test path counts
314    Pipe splice = new GroupBy( Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ) );
315
316    // this group has its incoming paths counted, gated by the previous group
317    splice = new GroupBy( Pipe.pipes( splice, pipeOffset ), new Fields( "num" ) );
318
319    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
320
321    if( getPlatform().isMapReduce() )
322      assertEquals( "wrong num jobs", 2, flow.getFlowSteps().size() );
323
324    flow.complete();
325
326    validateLength( flow, 15 );
327    }
328
329  @Test
330  public void testUnGroup() throws Exception
331    {
332    getPlatform().copyFromLocal( inputFileJoined );
333
334    Tap source = getPlatform().getTextFile( inputFileJoined );
335    Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
336
337    Pipe pipe = new Pipe( "test" );
338
339    pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
340
341    pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
342
343    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
344
345    flow.complete();
346
347    validateLength( flow, 10 );
348    }
349
350  @Test
351  public void testUnGroupAnon() throws Exception
352    {
353    getPlatform().copyFromLocal( inputFileJoined );
354
355    Tap source = getPlatform().getTextFile( inputFileJoined );
356    Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE );
357
358    Pipe pipe = new Pipe( "test" );
359
360    pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
361
362    pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
363
364    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
365
366    flow.complete();
367
368    validateLength( flow, 10 );
369    }
370
371  @Test
372  public void testUnGroupBySize() throws Exception
373    {
374    getPlatform().copyFromLocal( inputFileJoinedExtra );
375
376    Tap source = getPlatform().getTextFile( inputFileJoinedExtra );
377    Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped_size" ), SinkMode.REPLACE );
378
379    Pipe pipe = new Pipe( "test" );
380
381    pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num1", "num2", "lower", "upper" ) ) );
382
383    pipe = new Each( pipe, new UnGroup( new Fields( "num1", "num2", "char" ), new Fields( "num1", "num2" ), 1 ) );
384
385    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
386
387    flow.complete();
388
389    List<Tuple> tuples = asList( flow, sink );
390    assertEquals( 10, tuples.size() );
391
392    List<Object> values = new ArrayList<Object>();
393    for( Tuple tuple : tuples )
394      values.add( tuple.getObject( 1 ) );
395
396    assertTrue( values.contains( "1\t1\ta" ) );
397    assertTrue( values.contains( "1\t1\tA" ) );
398    assertTrue( values.contains( "2\t2\tb" ) );
399    assertTrue( values.contains( "2\t2\tB" ) );
400    assertTrue( values.contains( "3\t3\tc" ) );
401    assertTrue( values.contains( "3\t3\tC" ) );
402    assertTrue( values.contains( "4\t4\td" ) );
403    assertTrue( values.contains( "4\t4\tD" ) );
404    assertTrue( values.contains( "5\t5\te" ) );
405    assertTrue( values.contains( "5\t5\tE" ) );
406    }
407
408  @Test
409  public void testFilter() throws Exception
410    {
411    getPlatform().copyFromLocal( inputFileApache );
412
413    Tap source = getPlatform().getTextFile( inputFileApache );
414    Tap sink = getPlatform().getTextFile( getOutputPath( "filter" ), SinkMode.REPLACE );
415
416    Pipe pipe = new Pipe( "test" );
417
418    Filter filter = new RegexFilter( "^68.*" );
419
420    pipe = new Each( pipe, new Fields( "line" ), filter );
421
422    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
423
424    flow.complete();
425
426    validateLength( flow, 3 );
427    }
428
429  @Test
430  public void testLogicFilter() throws Exception
431    {
432    getPlatform().copyFromLocal( inputFileApache );
433
434    Tap source = getPlatform().getTextFile( inputFileApache );
435    Tap sink = getPlatform().getTextFile( getOutputPath( "logicfilter" ), SinkMode.REPLACE );
436
437    Pipe pipe = new Pipe( "test" );
438
439    Filter filter = new And( new RegexFilter( "^68.*$" ), new RegexFilter( "^1000.*$" ) );
440
441    pipe = new Each( pipe, new Fields( "line" ), filter );
442
443    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
444
445    flow.complete();
446
447    validateLength( flow, 3 );
448    }
449
450  @Test
451  public void testFilterComplex() throws Exception
452    {
453    getPlatform().copyFromLocal( inputFileApache );
454
455    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
456    Tap sink = getPlatform().getTextFile( getOutputPath( "filtercomplex" ), SinkMode.REPLACE );
457
458    Pipe pipe = new Pipe( "test" );
459
460    pipe = new Each( pipe, new Fields( "line" ), TestConstants.APACHE_COMMON_PARSER );
461
462    pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) );
463    pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) );
464
465    pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL );
466
467    pipe = new GroupBy( pipe, new Fields( "value" ) );
468
469    pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) );
470
471    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
472
473    flow.complete();
474
475    validateLength( flow, 1, null );
476    }
477
478  /**
479   * Intentionally filters all values out to test next mr job behaves
480   *
481   * @throws Exception
482   */
483  @Test
484  public void testFilterAll() throws Exception
485    {
486    getPlatform().copyFromLocal( inputFileApache );
487
488    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
489    Tap sink = getPlatform().getTextFile( getOutputPath( "filterall" ), SinkMode.REPLACE );
490
491    Pipe pipe = new Pipe( "test" );
492
493    String regex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
494    Fields fieldDeclaration = new Fields( "ip", "time", "method", "event", "status", "size" );
495    int[] groups = {1, 2, 3, 4, 5, 6};
496    RegexParser function = new RegexParser( fieldDeclaration, regex, groups );
497    pipe = new Each( pipe, new Fields( "line" ), function );
498
499    pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
500
501    pipe = new GroupBy( pipe, new Fields( "method" ) );
502
503    pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL );
504
505    pipe = new GroupBy( pipe, new Fields( "value" ) );
506
507    pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) );
508
509    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
510
511    flow.complete();
512
513    validateLength( flow, 0, null );
514    }
515
516//  public void testLimitFilter() throws Exception
517//    {
518//    copyFromLocal( inputFileApache );
519//
520//    Tap source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileApache );
521//    Tap sink = new Lfs( new TextLine(), outputPath + "/limitfilter", true );
522//
523//    Pipe pipe = new Pipe( "test" );
524//
525//    Filter filter = new Limit( 7 );
526//
527//    pipe = new Each( pipe, new Fields( "line" ), filter );
528//
529//    Flow flow = new FlowConnector( getProperties() ).connect( source, sink, pipe );
530//
531////    flow.writeDOT( "flow.dot" );
532//
533//    flow.complete();
534//
535//    validateLength( flow, 7, null );
536//    }
537
538  //
539
540  /*
541   *
542   * TODO: create (optional) Tez rule to consolidate into a single DAG. currently renders to two DAGs, one for each side
543   *
544   */
545  @Test
546  public void testSplit() throws Exception
547    {
548    getPlatform().copyFromLocal( inputFileApache );
549
550    // 46 192
551
552    Tap source = getPlatform().getTextFile( inputFileApache );
553    Tap sink1 = getPlatform().getTextFile( getOutputPath( "split1" ), SinkMode.REPLACE );
554    Tap sink2 = getPlatform().getTextFile( getOutputPath( "split2" ), SinkMode.REPLACE );
555
556    Pipe pipe = new Pipe( "split" );
557
558    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
559
560    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
561    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
562
563    Map sources = new HashMap();
564    sources.put( "split", source );
565
566    Map sinks = new HashMap();
567    sinks.put( "left", sink1 );
568    sinks.put( "right", sink2 );
569
570    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
571
572    flow.complete();
573
574    validateLength( flow, 1, "left" );
575    validateLength( flow, 2, "right" );
576    }
577
578  /**
579   * verifies non-safe rules apply in the proper place
580   *
581   * @throws Exception
582   */
583  @Test
584  public void testSplitNonSafe() throws Exception
585    {
586    getPlatform().copyFromLocal( inputFileApache );
587
588    // 46 192
589
590    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
591    Tap sink1 = getPlatform().getTextFile( getOutputPath( "nonsafesplit1" ), SinkMode.REPLACE );
592    Tap sink2 = getPlatform().getTextFile( getOutputPath( "nonsafesplit2" ), SinkMode.REPLACE );
593
594    Pipe pipe = new Pipe( "split" );
595
596    // run job on non-safe operation, forces 3 mr jobs.
597    pipe = new Each( pipe, new TestFunction( new Fields( "ignore" ), new Tuple( 1 ), false ), new Fields( "line" ) );
598
599    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
600
601    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
602    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
603
604    Map sources = new HashMap();
605    sources.put( "split", source );
606
607    Map sinks = new HashMap();
608    sinks.put( "left", sink1 );
609    sinks.put( "right", sink2 );
610
611    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
612
613    flow.complete();
614
615    validateLength( flow, 1, "left" );
616    validateLength( flow, 2, "right" );
617    }
618
619  @Test
620  public void testSplitSameSourceMerged() throws Exception
621    {
622    getPlatform().copyFromLocal( inputFileApache );
623
624    // 46 192
625
626    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
627    Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemerged" ), SinkMode.REPLACE );
628
629    Pipe pipe = new Pipe( "split" );
630
631    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
632
633    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
634    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
635
636    Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) );
637
638    Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged );
639
640    flow.complete();
641
642    validateLength( flow, 3 );
643    }
644
645  /**
646   * verifies not inserting Identity between groups works
647   *
648   * @throws Exception
649   */
650  @Test
651  public void testSplitOut() throws Exception
652    {
653    getPlatform().copyFromLocal( inputFileApache );
654
655    Tap sourceLower = getPlatform().getTextFile( new Fields( "num", "line" ), inputFileApache );
656
657    Map sources = new HashMap();
658
659    sources.put( "lower1", sourceLower );
660
661    // using null pos so all fields are written
662    Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitout1" ), SinkMode.REPLACE );
663    Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitout2" ), SinkMode.REPLACE );
664
665    Map sinks = new HashMap();
666
667    sinks.put( "output1", sink1 );
668    sinks.put( "output2", sink2 );
669
670    Pipe pipeLower1 = new Pipe( "lower1" );
671
672    Pipe left = new GroupBy( "output1", pipeLower1, new Fields( 0 ) );
673    Pipe right = new GroupBy( "output2", left, new Fields( 0 ) );
674
675    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, Pipe.pipes( left, right ) );
676
677//    flow.writeDOT( "spit.dot" );
678
679    flow.complete();
680
681    validateLength( flow, 10, "output1" );
682    validateLength( flow, 10, "output2" );
683
684    assertEquals( 10, asSet( flow, sink1 ).size() );
685    assertEquals( 10, asSet( flow, sink2 ).size() );
686    }
687
688  @Test
689  public void testSplitComplex() throws Exception
690    {
691    getPlatform().copyFromLocal( inputFileApache );
692
693    // 46 192
694
695    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
696    Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE );
697    Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE );
698
699    Pipe pipe = new Pipe( "split" );
700
701    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
702
703    pipe = new GroupBy( pipe, new Fields( "ip" ) );
704
705    pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
706
707    pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
708
709    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
710
711    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
712
713    Map sources = Cascades.tapsMap( "split", source );
714    Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) );
715
716    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
717
718    flow.complete();
719
720    validateLength( flow, 1, "left" );
721    validateLength( flow, 1, "right" );
722    }
723
724  @Test
725  public void testSplitMultiple() throws Exception
726    {
727    getPlatform().copyFromLocal( inputFileApache );
728
729    // 46 192
730
731    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
732    Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
733    Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE );
734    Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE );
735
736    Pipe head = new Pipe( "split" );
737
738    head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
739
740    head = new GroupBy( head, new Fields( "ip" ) );
741
742    head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
743
744    head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
745
746    Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
747
748    Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
749
750    right = new GroupBy( right, new Fields( "ip" ) );
751
752    Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() );
753
754    Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() );
755
756    Map sources = Cascades.tapsMap( "split", source );
757    Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) );
758
759    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight );
760
761    flow.complete();
762
763    validateLength( flow, 1, "left" );
764    validateLength( flow, 1, "rightLeft" );
765    validateLength( flow, 1, "rightRight" );
766    }
767
768  @Test
769  public void testConcatenation() throws Exception
770    {
771    getPlatform().copyFromLocal( inputFileLower );
772    getPlatform().copyFromLocal( inputFileUpper );
773
774    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
775    Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
776
777    Tap source = new MultiSourceTap( sourceLower, sourceUpper );
778
779    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
780
781    // using null pos so all fields are written
782    Tap sink = getPlatform().getTextFile( getOutputPath( "complexconcat" ), SinkMode.REPLACE );
783
784    Pipe pipe = new Each( new Pipe( "concat" ), new Fields( "line" ), splitter );
785
786    Pipe splice = new GroupBy( pipe, new Fields( "num" ) );
787
788    Flow countFlow = getPlatform().getFlowConnector().connect( source, sink, splice );
789
790    countFlow.complete();
791
792    validateLength( countFlow, 10, null );
793    }
794
795  @Test
796  public void testGeneratorAggregator() throws Exception
797    {
798    getPlatform().copyFromLocal( inputFileApache );
799
800    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
801
802    Pipe pipe = new Pipe( "test" );
803
804    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
805
806    pipe = new GroupBy( pipe, new Fields( "ip" ) );
807
808    pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) );
809    pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) );
810
811    Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE );
812
813    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
814
815    flow.complete();
816
817    validateLength( flow, 8 * 2 * 3, null );
818    }
819
820  @Test
821  public void testReplace() throws Exception
822    {
823    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
824    Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "offset", "line" ), getOutputPath( "replace" ), SinkMode.REPLACE );
825
826    Pipe pipe = new Pipe( "test" );
827
828    Function parser = new RegexParser( new Fields( 0 ), "^[^ ]*" );
829    pipe = new Each( pipe, new Fields( "line" ), parser, Fields.REPLACE );
830    pipe = new Each( pipe, new Fields( "line" ), new Identity( Fields.ARGS ), Fields.REPLACE );
831    pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "line" ) ), Fields.REPLACE );
832
833    pipe = new Each( pipe, new Debug( true ) );
834
835    Flow flow = getPlatform().getFlowConnector( disableDebug() ).connect( source, sink, pipe );
836
837    flow.complete();
838
839    validateLength( flow, 10, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
840    }
841
842  @Test
843  public void testSwap() throws Exception
844    {
845    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
846    Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE );
847
848    Pipe pipe = new Pipe( "test" );
849
850    Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
851    pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP );
852    pipe = new GroupBy( pipe, new Fields( "ip" ) );
853    pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
854    pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP );
855
856    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
857
858    flow.complete();
859
860    validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
861    }
862
863  @Test
864  public void testNone() throws Exception
865    {
866    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
867    Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ip" ), getOutputPath( "none" ), SinkMode.REPLACE );
868
869    Pipe pipe = new Pipe( "test" );
870
871    Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
872    pipe = new Each( pipe, new Fields( "line" ), parser, Fields.ALL );
873    pipe = new Each( pipe, new Fields( "line" ), new NoOp(), Fields.SWAP ); // declares Fields.NONE
874    pipe = new GroupBy( pipe, new Fields( "ip" ) );
875    pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
876    pipe = new Each( pipe, Fields.NONE, new Insert( new Fields( "ipaddress" ), "1.2.3.4" ), Fields.ALL );
877
878    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
879
880    flow.complete();
881
882    validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
883    }
884
885  /**
886   * this tests a merge on two pipes with the same source and name.
887   *
888   * @throws Exception
889   */
890  @Test
891  public void testSplitSameSourceMergedSameName() throws Exception
892    {
893    getPlatform().copyFromLocal( inputFileApache );
894
895    // 46 192
896
897    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
898    Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemergedsamename" ), SinkMode.REPLACE );
899
900    Pipe pipe = new Pipe( "split" );
901
902    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
903
904    Pipe left = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*46.*" ) );
905    Pipe right = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*102.*" ) );
906
907    Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) );
908
909    Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged );
910
911    flow.complete();
912
913    validateLength( flow, 3 );
914    }
915
916  /**
917   * Catches failure to properly resolve the grouping fields as incoming to the second group-by
918   *
919   * @throws Exception
920   */
921  @Test
922  public void testGroupGroup() throws Exception
923    {
924    getPlatform().copyFromLocal( inputFileApache );
925
926    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
927
928    Pipe pipe = new Pipe( "test" );
929
930    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip", String.class ), "^[^ ]*" ), new Fields( "ip" ) );
931
932    pipe = new GroupBy( pipe, new Fields( "ip" ) );
933
934    pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
935
936    pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "count" ) );
937
938    Tap sink = getPlatform().getTextFile( getOutputPath( "groupgroup" ), SinkMode.REPLACE );
939
940    Map<Object, Object> properties = getProperties();
941
942    properties.put( "cascading.serialization.types.required", "true" );
943
944    Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, pipe );
945
946    flow.complete();
947
948    validateLength( flow, 8, null );
949    }
950
951  public static class LowerComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable
952    {
953    @Override
954    public int compare( Comparable lhs, Comparable rhs )
955      {
956      return lhs.toString().toLowerCase().compareTo( rhs.toString().toLowerCase() );
957      }
958
959    @Override
960    public int hashCode( Comparable value )
961      {
962      if( value == null )
963        return 0;
964
965      return value.toString().toLowerCase().hashCode();
966      }
967    }
968
969  @Test
970  public void testGroupByInsensitive() throws Exception
971    {
972    getPlatform().copyFromLocal( inputFileLower );
973    getPlatform().copyFromLocal( inputFileUpper );
974
975    Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
976    Tap sourceUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
977
978    Map sources = new HashMap();
979
980    sources.put( "lower", sourceLower );
981    sources.put( "upper", sourceUpper );
982
983    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "insensitivegrouping" + NONDETERMINISTIC ), SinkMode.REPLACE );
984
985    Pipe pipeLower = new Pipe( "lower" );
986    Pipe pipeUpper = new Pipe( "upper" );
987
988    Pipe merge = new Merge( pipeLower, pipeUpper );
989
990    Fields charFields = new Fields( "char" );
991    charFields.setComparator( "char", new LowerComparator() );
992
993    Pipe splice = new GroupBy( "groupby", merge, charFields );
994
995    splice = new Every( splice, new Fields( "char" ), new Count() );
996
997    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
998
999    flow.complete();
1000
1001    // we can't guarantee if the grouping key will be upper or lower
1002    validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s2$" ) );
1003    }
1004  }