001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading;
023
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.regex.Pattern;
033
034import cascading.cascade.Cascades;
035import cascading.flow.Flow;
036import cascading.operation.Debug;
037import cascading.operation.Filter;
038import cascading.operation.Function;
039import cascading.operation.Identity;
040import cascading.operation.Insert;
041import cascading.operation.NoOp;
042import cascading.operation.aggregator.Count;
043import cascading.operation.aggregator.First;
044import cascading.operation.expression.ExpressionFunction;
045import cascading.operation.filter.And;
046import cascading.operation.function.UnGroup;
047import cascading.operation.regex.RegexFilter;
048import cascading.operation.regex.RegexParser;
049import cascading.operation.regex.RegexSplitter;
050import cascading.pipe.Each;
051import cascading.pipe.Every;
052import cascading.pipe.GroupBy;
053import cascading.pipe.Merge;
054import cascading.pipe.Pipe;
055import cascading.tap.MultiSourceTap;
056import cascading.tap.SinkMode;
057import cascading.tap.Tap;
058import cascading.tuple.Fields;
059import cascading.tuple.Hasher;
060import cascading.tuple.Tuple;
061import org.junit.Test;
062
063import static cascading.ComparePlatformsTest.NONDETERMINISTIC;
064import static data.InputData.*;
065
066public class FieldedPipesPlatformTest extends PlatformTestCase
067  {
068  public FieldedPipesPlatformTest()
069    {
070    super( true, 5, 3 ); // leave cluster testing enabled
071    }
072
073  @Test
074  public void testSimpleGroup() throws Exception
075    {
076    getPlatform().copyFromLocal( inputFileApache );
077
078    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
079
080    Pipe pipe = new Pipe( "test" );
081
082    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
083
084    pipe = new GroupBy( pipe, new Fields( "ip" ) );
085
086    pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
087
088    Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE );
089
090    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
091
092    flow.complete();
093
094    validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
095    validateLength( flow, 8, null );
096    }
097
098  @Test
099  public void testSimpleChain() throws Exception
100    {
101    getPlatform().copyFromLocal( inputFileApache );
102
103    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
104
105    Pipe pipe = new Pipe( "test" );
106
107    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
108
109    pipe = new GroupBy( pipe, new Fields( "ip" ) );
110
111    pipe = new Every( pipe, new Count( new Fields( "count1" ) ) );
112    pipe = new Every( pipe, new Count( new Fields( "count2" ) ) );
113    pipe = new Every( pipe, new Count( new Fields( "count3" ) ) );
114    pipe = new Every( pipe, new Count( new Fields( "count4" ) ) );
115
116    Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "simplechain" ), SinkMode.REPLACE );
117
118    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
119
120    flow.complete();
121
122    validateLength( flow, 8, 5 );
123    }
124
125  @Test
126  public void testChainEndingWithEach() throws Exception
127    {
128    getPlatform().copyFromLocal( inputFileApache );
129
130    Pipe pipe = new Pipe( "test" );
131
132    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
133
134    pipe = new GroupBy( pipe, new Fields( "ip" ) );
135
136    pipe = new Every( pipe, new Count( new Fields( "count1" ) ) );
137    pipe = new Every( pipe, new Count( new Fields( "count2" ) ) );
138
139    pipe = new Each( pipe, new Fields( "count1", "count2" ), new ExpressionFunction( new Fields( "sum" ), "count1 + count2", int.class ), Fields.ALL );
140
141    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
142    Tap sink = getPlatform().getTextFile( getOutputPath( "chaineach" ), SinkMode.REPLACE );
143
144    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
145
146    flow.complete();
147
148    validateLength( flow, 8, null );
149    }
150
151  // also tests the RegexSplitter
152
153  @Test
154  public void testNoGroup() throws Exception
155    {
156    getPlatform().copyFromLocal( inputFileApache );
157
158    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache );
159
160    Pipe pipe = new Pipe( "test" );
161
162    pipe = new Each( pipe, new RegexSplitter( "\\s+" ), new Fields( 1 ) );
163
164    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "nogroup" ), SinkMode.REPLACE );
165
166    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
167
168    flow.complete();
169
170    validateLength( flow, 10, null );
171
172    List<Tuple> results = getSinkAsList( flow );
173
174    assertTrue( results.contains( new Tuple( "75.185.76.245" ) ) );
175    }
176
177  @Test
178  public void testCopy() throws Exception
179    {
180    getPlatform().copyFromLocal( inputFileApache );
181
182    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache );
183
184    Pipe pipe = new Pipe( "test" );
185
186    Tap sink = getPlatform().getTextFile( getOutputPath( "copy" ), SinkMode.REPLACE );
187
188    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
189
190    flow.complete();
191
192    validateLength( flow, 10, null );
193    }
194
195  @Test
196  public void testSimpleMerge() throws Exception
197    {
198    getPlatform().copyFromLocal( inputFileLower );
199    getPlatform().copyFromLocal( inputFileUpper );
200
201    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
202    Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
203
204    Map sources = new HashMap();
205
206    sources.put( "lower", sourceLower );
207    sources.put( "upper", sourceUpper );
208
209    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
210
211    // using null pos so all fields are written
212    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "simplemerge" ), SinkMode.REPLACE );
213
214    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
215    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
216
217    Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
218
219    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
220
221    flow.complete();
222
223    validateLength( flow, 10 );
224
225    Collection results = getSinkAsList( flow );
226
227    assertTrue( "missing value", results.contains( new Tuple( "1\ta" ) ) );
228    assertTrue( "missing value", results.contains( new Tuple( "1\tA" ) ) );
229    assertTrue( "missing value", results.contains( new Tuple( "2\tb" ) ) );
230    assertTrue( "missing value", results.contains( new Tuple( "2\tB" ) ) );
231    assertTrue( "missing value", results.contains( new Tuple( "3\tc" ) ) );
232    assertTrue( "missing value", results.contains( new Tuple( "3\tC" ) ) );
233    }
234
235  /**
236   * Specifically tests GroupBy will return the correct grouping fields to the following Every
237   * <p/>
238   * additionally tests secondary sorting during merging
239   *
240   * @throws Exception
241   */
242  @Test
243  public void testSimpleMergeThree() throws Exception
244    {
245    getPlatform().copyFromLocal( inputFileLower );
246    getPlatform().copyFromLocal( inputFileUpper );
247    getPlatform().copyFromLocal( inputFileLowerOffset );
248
249    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
250    Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
251    Tap sourceLowerOffset = getPlatform().getTextFile( inputFileLowerOffset );
252
253    Map sources = new HashMap();
254
255    sources.put( "lower", sourceLower );
256    sources.put( "upper", sourceUpper );
257    sources.put( "offset", sourceLowerOffset );
258
259    Tap sink = getPlatform().getDelimitedFile( Fields.ALL, "\t", getOutputPath( "simplemergethree" ), SinkMode.REPLACE );
260
261    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
262
263    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
264    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
265    Pipe pipeOffset = new Each( new Pipe( "offset" ), new Fields( "line" ), splitter );
266
267    Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper, pipeOffset ), new Fields( "num" ), new Fields( "char" ) );
268
269    splice = new Every( splice, new Fields( "char" ), new First( new Fields( "first" ) ) );
270
271    splice = new Each( splice, new Fields( "num", "first" ), new Identity() );
272
273    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
274
275    flow.complete();
276
277    validateLength( flow, 6 );
278
279    List<Tuple> tuples = getSinkAsList( flow );
280
281    assertTrue( tuples.contains( new Tuple( "1", "A" ) ) );
282    assertTrue( tuples.contains( new Tuple( "2", "B" ) ) );
283    assertTrue( tuples.contains( new Tuple( "3", "C" ) ) );
284    assertTrue( tuples.contains( new Tuple( "4", "D" ) ) );
285    assertTrue( tuples.contains( new Tuple( "5", "E" ) ) );
286    assertTrue( tuples.contains( new Tuple( "6", "c" ) ) );
287    }
288
289  @Test
290  public void testSameSourceMerge() throws Exception
291    {
292    getPlatform().copyFromLocal( inputFileLower );
293
294    Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
295
296    Map sources = new HashMap();
297
298    sources.put( "lower", sourceLower );
299    sources.put( "upper", sourceLower );
300
301    // using null pos so all fields are written
302    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE );
303
304    Pipe pipeLower = new Pipe( "lower" );
305    Pipe pipeUpper = new Pipe( "upper" );
306
307    Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
308
309    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
310
311    flow.complete();
312
313    validateLength( flow, 10 );
314
315    Collection results = getSinkAsList( flow );
316
317    assertEquals( "missing value", 2, Collections.frequency( results, new Tuple( "1\ta" ) ) );
318    assertEquals( "missing value", 2, Collections.frequency( results, new Tuple( "2\tb" ) ) );
319    assertEquals( "missing value", 2, Collections.frequency( results, new Tuple( "3\tc" ) ) );
320    }
321
322  /**
323   * same test as MergePipesTest, but to test that chained groupby don't exhibit similar failures
324   *
325   * @throws Exception
326   */
327  @Test
328  public void testSameSourceMergeThreeChainGroup() throws Exception
329    {
330    getPlatform().copyFromLocal( inputFileLower );
331
332    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
333
334    Map sources = new HashMap();
335
336    sources.put( "split", sourceLower );
337
338    Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE );
339
340    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
341
342    Pipe pipe = new Pipe( "split" );
343
344    Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter );
345    Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter );
346    Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter );
347
348    //put group before merge to test path counts
349    Pipe splice = new GroupBy( Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ) );
350
351    // this group has its incoming paths counted, gated by the previous group
352    splice = new GroupBy( Pipe.pipes( splice, pipeOffset ), new Fields( "num" ) );
353
354    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
355
356    if( getPlatform().isMapReduce() )
357      assertEquals( "wrong num jobs", 2, flow.getFlowSteps().size() );
358
359    flow.complete();
360
361    validateLength( flow, 15 );
362    }
363
364  @Test
365  public void testUnGroup() throws Exception
366    {
367    getPlatform().copyFromLocal( inputFileJoined );
368
369    Tap source = getPlatform().getTextFile( inputFileJoined );
370    Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
371
372    Pipe pipe = new Pipe( "test" );
373
374    pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
375
376    pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
377
378    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
379
380    flow.complete();
381
382    validateLength( flow, 10 );
383    }
384
385  @Test
386  public void testUnGroupAnon() throws Exception
387    {
388    getPlatform().copyFromLocal( inputFileJoined );
389
390    Tap source = getPlatform().getTextFile( inputFileJoined );
391    Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE );
392
393    Pipe pipe = new Pipe( "test" );
394
395    pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
396
397    pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
398
399    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
400
401    flow.complete();
402
403    validateLength( flow, 10 );
404    }
405
406  @Test
407  public void testUnGroupBySize() throws Exception
408    {
409    getPlatform().copyFromLocal( inputFileJoinedExtra );
410
411    Tap source = getPlatform().getTextFile( inputFileJoinedExtra );
412    Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped_size" ), SinkMode.REPLACE );
413
414    Pipe pipe = new Pipe( "test" );
415
416    pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num1", "num2", "lower", "upper" ) ) );
417
418    pipe = new Each( pipe, new UnGroup( new Fields( "num1", "num2", "char" ), new Fields( "num1", "num2" ), 1 ) );
419
420    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
421
422    flow.complete();
423
424    List<Tuple> tuples = asList( flow, sink );
425    assertEquals( 10, tuples.size() );
426
427    List<Object> values = new ArrayList<Object>();
428    for( Tuple tuple : tuples )
429      values.add( tuple.getObject( 1 ) );
430
431    assertTrue( values.contains( "1\t1\ta" ) );
432    assertTrue( values.contains( "1\t1\tA" ) );
433    assertTrue( values.contains( "2\t2\tb" ) );
434    assertTrue( values.contains( "2\t2\tB" ) );
435    assertTrue( values.contains( "3\t3\tc" ) );
436    assertTrue( values.contains( "3\t3\tC" ) );
437    assertTrue( values.contains( "4\t4\td" ) );
438    assertTrue( values.contains( "4\t4\tD" ) );
439    assertTrue( values.contains( "5\t5\te" ) );
440    assertTrue( values.contains( "5\t5\tE" ) );
441    }
442
443  @Test
444  public void testFilter() throws Exception
445    {
446    getPlatform().copyFromLocal( inputFileApache );
447
448    Tap source = getPlatform().getTextFile( inputFileApache );
449    Tap sink = getPlatform().getTextFile( getOutputPath( "filter" ), SinkMode.REPLACE );
450
451    Pipe pipe = new Pipe( "test" );
452
453    Filter filter = new RegexFilter( "^68.*" );
454
455    pipe = new Each( pipe, new Fields( "line" ), filter );
456
457    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
458
459    flow.complete();
460
461    validateLength( flow, 3 );
462    }
463
464  @Test
465  public void testLogicFilter() throws Exception
466    {
467    getPlatform().copyFromLocal( inputFileApache );
468
469    Tap source = getPlatform().getTextFile( inputFileApache );
470    Tap sink = getPlatform().getTextFile( getOutputPath( "logicfilter" ), SinkMode.REPLACE );
471
472    Pipe pipe = new Pipe( "test" );
473
474    Filter filter = new And( new RegexFilter( "^68.*$" ), new RegexFilter( "^1000.*$" ) );
475
476    pipe = new Each( pipe, new Fields( "line" ), filter );
477
478    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
479
480    flow.complete();
481
482    validateLength( flow, 3 );
483    }
484
485  @Test
486  public void testFilterComplex() throws Exception
487    {
488    getPlatform().copyFromLocal( inputFileApache );
489
490    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
491    Tap sink = getPlatform().getTextFile( getOutputPath( "filtercomplex" ), SinkMode.REPLACE );
492
493    Pipe pipe = new Pipe( "test" );
494
495    pipe = new Each( pipe, new Fields( "line" ), TestConstants.APACHE_COMMON_PARSER );
496
497    pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) );
498    pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) );
499
500    pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL );
501
502    pipe = new GroupBy( pipe, new Fields( "value" ) );
503
504    pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) );
505
506    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
507
508    flow.complete();
509
510    validateLength( flow, 1, null );
511    }
512
513  /**
514   * Intentionally filters all values out to test next mr job behaves
515   *
516   * @throws Exception
517   */
518  @Test
519  public void testFilterAll() throws Exception
520    {
521    getPlatform().copyFromLocal( inputFileApache );
522
523    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
524    Tap sink = getPlatform().getTextFile( getOutputPath( "filterall" ), SinkMode.REPLACE );
525
526    Pipe pipe = new Pipe( "test" );
527
528    String regex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
529    Fields fieldDeclaration = new Fields( "ip", "time", "method", "event", "status", "size" );
530    int[] groups = {1, 2, 3, 4, 5, 6};
531    RegexParser function = new RegexParser( fieldDeclaration, regex, groups );
532    pipe = new Each( pipe, new Fields( "line" ), function );
533
534    pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
535
536    pipe = new GroupBy( pipe, new Fields( "method" ) );
537
538    pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL );
539
540    pipe = new GroupBy( pipe, new Fields( "value" ) );
541
542    pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) );
543
544    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
545
546    flow.complete();
547
548    validateLength( flow, 0, null );
549    }
550
551//  public void testLimitFilter() throws Exception
552//    {
553//    copyFromLocal( inputFileApache );
554//
555//    Tap source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileApache );
556//    Tap sink = new Lfs( new TextLine(), outputPath + "/limitfilter", true );
557//
558//    Pipe pipe = new Pipe( "test" );
559//
560//    Filter filter = new Limit( 7 );
561//
562//    pipe = new Each( pipe, new Fields( "line" ), filter );
563//
564//    Flow flow = new FlowConnector( getProperties() ).connect( source, sink, pipe );
565//
566////    flow.writeDOT( "flow.dot" );
567//
568//    flow.complete();
569//
570//    validateLength( flow, 7, null );
571//    }
572
573  //
574
575  /*
576   *
577   * TODO: create (optional) Tez rule to consolidate into a single DAG. currently renders to two DAGs, one for each side
578   *
579   */
580  @Test
581  public void testSplit() throws Exception
582    {
583    getPlatform().copyFromLocal( inputFileApache );
584
585    // 46 192
586
587    Tap source = getPlatform().getTextFile( inputFileApache );
588    Tap sink1 = getPlatform().getTextFile( getOutputPath( "split1" ), SinkMode.REPLACE );
589    Tap sink2 = getPlatform().getTextFile( getOutputPath( "split2" ), SinkMode.REPLACE );
590
591    Pipe pipe = new Pipe( "split" );
592
593    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
594
595    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
596    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
597
598    Map sources = new HashMap();
599    sources.put( "split", source );
600
601    Map sinks = new HashMap();
602    sinks.put( "left", sink1 );
603    sinks.put( "right", sink2 );
604
605    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
606
607    flow.complete();
608
609    validateLength( flow, 1, "left" );
610    validateLength( flow, 2, "right" );
611    }
612
613  /**
614   * verifies non-safe rules apply in the proper place
615   *
616   * @throws Exception
617   */
618  @Test
619  public void testSplitNonSafe() throws Exception
620    {
621    getPlatform().copyFromLocal( inputFileApache );
622
623    // 46 192
624
625    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
626    Tap sink1 = getPlatform().getTextFile( getOutputPath( "nonsafesplit1" ), SinkMode.REPLACE );
627    Tap sink2 = getPlatform().getTextFile( getOutputPath( "nonsafesplit2" ), SinkMode.REPLACE );
628
629    Pipe pipe = new Pipe( "split" );
630
631    // run job on non-safe operation, forces 3 mr jobs.
632    pipe = new Each( pipe, new TestFunction( new Fields( "ignore" ), new Tuple( 1 ), false ), new Fields( "line" ) );
633
634    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
635
636    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
637    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
638
639    Map sources = new HashMap();
640    sources.put( "split", source );
641
642    Map sinks = new HashMap();
643    sinks.put( "left", sink1 );
644    sinks.put( "right", sink2 );
645
646    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
647
648    flow.complete();
649
650    validateLength( flow, 1, "left" );
651    validateLength( flow, 2, "right" );
652    }
653
654  @Test
655  public void testSplitSameSourceMerged() throws Exception
656    {
657    getPlatform().copyFromLocal( inputFileApache );
658
659    // 46 192
660
661    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
662    Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemerged" ), SinkMode.REPLACE );
663
664    Pipe pipe = new Pipe( "split" );
665
666    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
667
668    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
669    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) );
670
671    Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) );
672
673    Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged );
674
675    flow.complete();
676
677    validateLength( flow, 3 );
678    }
679
680  /**
681   * verifies not inserting Identity between groups works
682   *
683   * @throws Exception
684   */
685  @Test
686  public void testSplitOut() throws Exception
687    {
688    getPlatform().copyFromLocal( inputFileApache );
689
690    Tap sourceLower = getPlatform().getTextFile( new Fields( "num", "line" ), inputFileApache );
691
692    Map sources = new HashMap();
693
694    sources.put( "lower1", sourceLower );
695
696    // using null pos so all fields are written
697    Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitout1" ), SinkMode.REPLACE );
698    Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitout2" ), SinkMode.REPLACE );
699
700    Map sinks = new HashMap();
701
702    sinks.put( "output1", sink1 );
703    sinks.put( "output2", sink2 );
704
705    Pipe pipeLower1 = new Pipe( "lower1" );
706
707    Pipe left = new GroupBy( "output1", pipeLower1, new Fields( 0 ) );
708    Pipe right = new GroupBy( "output2", left, new Fields( 0 ) );
709
710    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, Pipe.pipes( left, right ) );
711
712//    flow.writeDOT( "spit.dot" );
713
714    flow.complete();
715
716    validateLength( flow, 10, "output1" );
717    validateLength( flow, 10, "output2" );
718
719    assertEquals( 10, asSet( flow, sink1 ).size() );
720    assertEquals( 10, asSet( flow, sink2 ).size() );
721    }
722
723  @Test
724  public void testSplitComplex() throws Exception
725    {
726    getPlatform().copyFromLocal( inputFileApache );
727
728    // 46 192
729
730    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
731    Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE );
732    Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE );
733
734    Pipe pipe = new Pipe( "split" );
735
736    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
737
738    pipe = new GroupBy( pipe, new Fields( "ip" ) );
739
740    pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
741
742    pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
743
744    Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
745
746    Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
747
748    Map sources = Cascades.tapsMap( "split", source );
749    Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) );
750
751    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
752
753    flow.complete();
754
755    validateLength( flow, 1, "left" );
756    validateLength( flow, 1, "right" );
757    }
758
759  @Test
760  public void testSplitMultiple() throws Exception
761    {
762    getPlatform().copyFromLocal( inputFileApache );
763
764    // 46 192
765
766    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
767    Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
768    Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE );
769    Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE );
770
771    Pipe head = new Pipe( "split" );
772
773    head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
774
775    head = new GroupBy( head, new Fields( "ip" ) );
776
777    head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
778
779    head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
780
781    Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
782
783    Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
784
785    right = new GroupBy( right, new Fields( "ip" ) );
786
787    Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() );
788
789    Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() );
790
791    Map sources = Cascades.tapsMap( "split", source );
792    Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) );
793
794    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight );
795
796    flow.complete();
797
798    validateLength( flow, 1, "left" );
799    validateLength( flow, 1, "rightLeft" );
800    validateLength( flow, 1, "rightRight" );
801    }
802
803  @Test
804  public void testConcatenation() throws Exception
805    {
806    getPlatform().copyFromLocal( inputFileLower );
807    getPlatform().copyFromLocal( inputFileUpper );
808
809    Tap sourceLower = getPlatform().getTextFile( inputFileLower );
810    Tap sourceUpper = getPlatform().getTextFile( inputFileUpper );
811
812    Tap source = new MultiSourceTap( sourceLower, sourceUpper );
813
814    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
815
816    // using null pos so all fields are written
817    Tap sink = getPlatform().getTextFile( getOutputPath( "complexconcat" ), SinkMode.REPLACE );
818
819    Pipe pipe = new Each( new Pipe( "concat" ), new Fields( "line" ), splitter );
820
821    Pipe splice = new GroupBy( pipe, new Fields( "num" ) );
822
823    Flow countFlow = getPlatform().getFlowConnector().connect( source, sink, splice );
824
825    countFlow.complete();
826
827    validateLength( countFlow, 10, null );
828    }
829
830  @Test
831  public void testGeneratorAggregator() throws Exception
832    {
833    getPlatform().copyFromLocal( inputFileApache );
834
835    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
836
837    Pipe pipe = new Pipe( "test" );
838
839    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
840
841    pipe = new GroupBy( pipe, new Fields( "ip" ) );
842
843    pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) );
844    pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) );
845
846    Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE );
847
848    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
849
850    flow.complete();
851
852    validateLength( flow, 8 * 2 * 3, null );
853    }
854
855  @Test
856  public void testReplace() throws Exception
857    {
858    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
859    Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "offset", "line" ), getOutputPath( "replace" ), SinkMode.REPLACE );
860
861    Pipe pipe = new Pipe( "test" );
862
863    Function parser = new RegexParser( new Fields( 0 ), "^[^ ]*" );
864    pipe = new Each( pipe, new Fields( "line" ), parser, Fields.REPLACE );
865    pipe = new Each( pipe, new Fields( "line" ), new Identity( Fields.ARGS ), Fields.REPLACE );
866    pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "line" ) ), Fields.REPLACE );
867
868    pipe = new Each( pipe, new Debug( true ) );
869
870    Flow flow = getPlatform().getFlowConnector( disableDebug() ).connect( source, sink, pipe );
871
872    flow.complete();
873
874    validateLength( flow, 10, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
875    }
876
877  @Test
878  public void testSwap() throws Exception
879    {
880    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
881    Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE );
882
883    Pipe pipe = new Pipe( "test" );
884
885    Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
886    pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP );
887    pipe = new GroupBy( pipe, new Fields( "ip" ) );
888    pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
889    pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP );
890
891    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
892
893    flow.complete();
894
895    validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
896    }
897
898  @Test
899  public void testNone() throws Exception
900    {
901    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
902    Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ip" ), getOutputPath( "none" ), SinkMode.REPLACE );
903
904    Pipe pipe = new Pipe( "test" );
905
906    Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
907    pipe = new Each( pipe, new Fields( "line" ), parser, Fields.ALL );
908    pipe = new Each( pipe, new Fields( "line" ), new NoOp(), Fields.SWAP ); // declares Fields.NONE
909    pipe = new GroupBy( pipe, new Fields( "ip" ) );
910    pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
911    pipe = new Each( pipe, Fields.NONE, new Insert( new Fields( "ipaddress" ), "1.2.3.4" ), Fields.ALL );
912
913    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
914
915    flow.complete();
916
917    validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
918    }
919
920  /**
921   * this tests a merge on two pipes with the same source and name.
922   *
923   * @throws Exception
924   */
925  @Test
926  public void testSplitSameSourceMergedSameName() throws Exception
927    {
928    getPlatform().copyFromLocal( inputFileApache );
929
930    // 46 192
931
932    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
933    Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemergedsamename" ), SinkMode.REPLACE );
934
935    Pipe pipe = new Pipe( "split" );
936
937    pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) );
938
939    Pipe left = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*46.*" ) );
940    Pipe right = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*102.*" ) );
941
942    Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) );
943
944    Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged );
945
946    flow.complete();
947
948    validateLength( flow, 3 );
949    }
950
951  /**
952   * Catches failure to properly resolve the grouping fields as incoming to the second group-by
953   *
954   * @throws Exception
955   */
956  @Test
957  public void testGroupGroup() throws Exception
958    {
959    getPlatform().copyFromLocal( inputFileApache );
960
961    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
962
963    Pipe pipe = new Pipe( "test" );
964
965    pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip", String.class ), "^[^ ]*" ), new Fields( "ip" ) );
966
967    pipe = new GroupBy( pipe, new Fields( "ip" ) );
968
969    pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
970
971    pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "count" ) );
972
973    Tap sink = getPlatform().getTextFile( getOutputPath( "groupgroup" ), SinkMode.REPLACE );
974
975    Map<Object, Object> properties = getProperties();
976
977    properties.put( "cascading.serialization.types.required", "true" );
978
979    Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, pipe );
980
981    flow.complete();
982
983    validateLength( flow, 8, null );
984    }
985
986  public static class LowerComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable
987    {
988    @Override
989    public int compare( Comparable lhs, Comparable rhs )
990      {
991      return lhs.toString().toLowerCase().compareTo( rhs.toString().toLowerCase() );
992      }
993
994    @Override
995    public int hashCode( Comparable value )
996      {
997      if( value == null )
998        return 0;
999
1000      return value.toString().toLowerCase().hashCode();
1001      }
1002    }
1003
1004  @Test
1005  public void testGroupByInsensitive() throws Exception
1006    {
1007    getPlatform().copyFromLocal( inputFileLower );
1008    getPlatform().copyFromLocal( inputFileUpper );
1009
1010    Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1011    Tap sourceUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper );
1012
1013    Map sources = new HashMap();
1014
1015    sources.put( "lower", sourceLower );
1016    sources.put( "upper", sourceUpper );
1017
1018    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "insensitivegrouping" + NONDETERMINISTIC ), SinkMode.REPLACE );
1019
1020    Pipe pipeLower = new Pipe( "lower" );
1021    Pipe pipeUpper = new Pipe( "upper" );
1022
1023    Pipe merge = new Merge( pipeLower, pipeUpper );
1024
1025    Fields charFields = new Fields( "char" );
1026    charFields.setComparator( "char", new LowerComparator() );
1027
1028    Pipe splice = new GroupBy( "groupby", merge, charFields );
1029
1030    splice = new Every( splice, new Fields( "char" ), new Count() );
1031
1032    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1033
1034    flow.complete();
1035
1036    // we can't guarantee if the grouping key will be upper or lower
1037    validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s2$" ) );
1038    }
1039  }