001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.cascade.Cascades;
031import cascading.flow.Flow;
032import cascading.flow.FlowConnectorProps;
033import cascading.flow.FlowDef;
034import cascading.flow.FlowProps;
035import cascading.operation.Function;
036import cascading.operation.Identity;
037import cascading.operation.Insert;
038import cascading.operation.aggregator.Count;
039import cascading.operation.aggregator.First;
040import cascading.operation.regex.RegexFilter;
041import cascading.operation.regex.RegexSplitGenerator;
042import cascading.operation.regex.RegexSplitter;
043import cascading.pipe.CoGroup;
044import cascading.pipe.Each;
045import cascading.pipe.Every;
046import cascading.pipe.GroupBy;
047import cascading.pipe.Pipe;
048import cascading.pipe.assembly.Discard;
049import cascading.pipe.assembly.Rename;
050import cascading.pipe.assembly.Retain;
051import cascading.pipe.joiner.InnerJoin;
052import cascading.pipe.joiner.Joiner;
053import cascading.pipe.joiner.LeftJoin;
054import cascading.pipe.joiner.MixedJoin;
055import cascading.pipe.joiner.OuterJoin;
056import cascading.pipe.joiner.RightJoin;
057import cascading.tap.SinkMode;
058import cascading.tap.Tap;
059import cascading.tuple.Fields;
060import cascading.tuple.Tuple;
061import cascading.util.NullNotEquivalentComparator;
062import org.junit.Test;
063
064import static data.InputData.*;
065
066public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase
067  {
068  public CoGroupFieldedPipesPlatformTest()
069    {
070    super( true, 4, 1 ); // leave cluster testing enabled
071    }
072
073  @Test
074  public void testCross() throws Exception
075    {
076    getPlatform().copyFromLocal( inputFileLhs );
077    getPlatform().copyFromLocal( inputFileRhs );
078
079    Map sources = new HashMap();
080
081    sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
082    sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
083
084    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
085
086    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
087    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
088
089    Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
090
091    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
092
093    flow.complete();
094
095    validateLength( flow, 37, null );
096
097    List<Tuple> values = getSinkAsList( flow );
098
099    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
100    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
101    }
102
103  @Test
104  public void testCoGroup() throws Exception
105    {
106    getPlatform().copyFromLocal( inputFileLower );
107    getPlatform().copyFromLocal( inputFileUpper );
108
109    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
110    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
111
112    Map sources = new HashMap();
113
114    sources.put( "lower", sourceLower );
115    sources.put( "upper", sourceUpper );
116
117    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE );
118
119    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
120
121    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
122    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
123
124    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) );
125
126    Map<Object, Object> properties = getProperties();
127
128    // make sure hasher is getting called, but does nothing special
129    FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() );
130
131    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
132
133    flow.complete();
134
135    validateLength( flow, 5 );
136
137    List<Tuple> values = getSinkAsList( flow );
138
139    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
140    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
141    }
142
143  @Test
144  public void testCoGroupSamePipeName() throws Exception
145    {
146    getPlatform().copyFromLocal( inputFileLower );
147    getPlatform().copyFromLocal( inputFileUpper );
148
149    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
150    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
151
152    Map sources = new HashMap();
153
154    sources.put( "lower", sourceLower );
155    sources.put( "upper", sourceUpper );
156
157    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
158
159    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
160
161    Pipe pipeLower = new Pipe( "lower" );
162    Pipe pipeUpper = new Pipe( "upper" );
163
164    // these pipes will hide the source name, and could cause one to be lost
165    pipeLower = new Pipe( "same", pipeLower );
166    pipeUpper = new Pipe( "same", pipeUpper );
167
168    pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
169    pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
170
171//    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
172//    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
173
174    pipeLower = new Pipe( "left", pipeLower );
175    pipeUpper = new Pipe( "right", pipeUpper );
176
177//    pipeLower = new Each( pipeLower, new Debug( true ) );
178//    pipeUpper = new Each( pipeUpper, new Debug( true ) );
179
180    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
181
182//    splice = new Each( splice, new Debug( true ) );
183    splice = new Pipe( "splice", splice );
184    splice = new Pipe( "tail", splice );
185
186    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
187
188    flow.complete();
189
190    validateLength( flow, 5 );
191
192    List<Tuple> values = getSinkAsList( flow );
193
194    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
195    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
196    }
197
198  @Test
199  public void testCoGroupWithUnknowns() throws Exception
200    {
201    getPlatform().copyFromLocal( inputFileLower );
202    getPlatform().copyFromLocal( inputFileUpper );
203
204    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
205    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
206
207    Map sources = new HashMap();
208
209    sources.put( "lower", sourceLower );
210    sources.put( "upper", sourceUpper );
211
212    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
213
214    Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
215
216    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
217    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
218
219    Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
220
221    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
222
223    flow.complete();
224
225    validateLength( flow, 5 );
226
227    List<Tuple> values = getSinkAsList( flow );
228
229    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
230    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
231    }
232
233  /**
234   * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
235   * a new stream using an outerjoin.
236   *
237   * @throws Exception
238   */
239  @Test
240  public void testCoGroupFilteredBranch() throws Exception
241    {
242    getPlatform().copyFromLocal( inputFileLower );
243    getPlatform().copyFromLocal( inputFileUpper );
244
245    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
246    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
247
248    Map sources = new HashMap();
249
250    sources.put( "lower", sourceLower );
251    sources.put( "upper", sourceUpper );
252
253    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE );
254
255    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
256
257    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
258    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
259    pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
260    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
261
262    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
263
264    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
265
266    flow.complete();
267
268    validateLength( flow, 5 );
269
270    List<Tuple> values = getSinkAsList( flow );
271
272    assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
273    assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
274    }
275
276  @Test
277  public void testCoGroupSelf() throws Exception
278    {
279    getPlatform().copyFromLocal( inputFileLower );
280
281    // intentionally creating multiple instances that are equivalent
282    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
283    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
284
285    Map sources = new HashMap();
286
287    sources.put( "lower", sourceLower );
288    sources.put( "upper", sourceUpper );
289
290    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE );
291
292    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
293
294    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
295    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
296
297    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
298
299    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
300
301    flow.complete();
302
303    validateLength( flow, 5 );
304
305    List<Tuple> values = getSinkAsList( flow );
306
307    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
308    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
309    }
310
311  @Test
312  public void testSplitCoGroupSelf() throws Exception
313    {
314    getPlatform().copyFromLocal( inputFileLower );
315
316    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
317
318    Map sources = new HashMap();
319
320    sources.put( "lowerLhs", source );
321    sources.put( "upperLhs", source );
322    sources.put( "lowerRhs", source );
323    sources.put( "upperRhs", source );
324
325    Tap sinkLhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/lhs" ), SinkMode.REPLACE );
326    Tap sinkRhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/rhs" ), SinkMode.REPLACE );
327
328    Map sinks = new HashMap();
329
330    sinks.put( "lhs", sinkLhs );
331    sinks.put( "rhs", sinkRhs );
332
333    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
334
335    Pipe pipeLowerLhs = new Each( new Pipe( "lowerLhs" ), new Fields( "line" ), splitter );
336    Pipe pipeUpperLhs = new Each( new Pipe( "upperLhs" ), new Fields( "line" ), splitter );
337
338    Pipe spliceLhs = new CoGroup( "lhs", pipeLowerLhs, new Fields( "num" ), pipeUpperLhs, new Fields( "num" ), Fields.size( 4 ) );
339
340    Pipe pipeLowerRhs = new Each( new Pipe( "lowerRhs" ), new Fields( "line" ), splitter );
341    Pipe pipeUpperRhs = new Each( new Pipe( "upperRhs" ), new Fields( "line" ), splitter );
342
343    Pipe spliceRhs = new CoGroup( "rhs", pipeLowerRhs, new Fields( "num" ), pipeUpperRhs, new Fields( "num" ), Fields.size( 4 ) );
344
345    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, spliceLhs, spliceRhs );
346
347    flow.complete();
348
349    List<Tuple> values = asList( flow, sinkLhs );
350
351    assertEquals( 5, values.size() );
352    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
353    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
354
355    values = asList( flow, sinkRhs );
356
357    assertEquals( 5, values.size() );
358    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
359    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
360    }
361
362  /**
363   * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
364   *
365   * @throws Exception when
366   */
367  @Test
368  public void testCoGroupAfterEvery() throws Exception
369    {
370    getPlatform().copyFromLocal( inputFileLower );
371    getPlatform().copyFromLocal( inputFileUpper );
372
373    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower );
374    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileUpper );
375
376    Map sources = new HashMap();
377
378    sources.put( "lower", sourceLower );
379    sources.put( "upper", sourceUpper );
380
381    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
382
383    Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " );
384
385    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
386    pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
387    pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
388
389    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
390    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
391    pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
392
393    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
394
395    Map<Object, Object> properties = getPlatform().getProperties();
396
397    properties.put( "cascading.serialization.types.required", "true" );
398
399    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
400
401    flow.complete();
402
403    validateLength( flow, 5, null );
404
405    List<Tuple> values = getSinkAsList( flow );
406
407    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
408    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
409    }
410
411  /**
412   * Tests that CoGroup properly resolves fields when following an Every
413   *
414   * @throws Exception
415   */
416  @Test
417  public void testCoGroupAfterEveryNoDeclared() throws Exception
418    {
419    getPlatform().copyFromLocal( inputFileLower );
420    getPlatform().copyFromLocal( inputFileUpper );
421
422    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
423    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
424
425    Map sources = new HashMap();
426
427    sources.put( "lower", sourceLower );
428    sources.put( "upper", sourceUpper );
429
430    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE );
431
432    Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " );
433
434    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 );
435    pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL );
436    pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) );
437    pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL );
438
439    Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " );
440
441    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 );
442    pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) );
443    pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL );
444
445    Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
446
447    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
448
449    flow.complete();
450
451    validateLength( flow, 5, null );
452
453    List<Tuple> values = getSinkAsList( flow );
454
455    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
456    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
457    }
458
459  @Test
460  public void testCoGroupInnerSingleField() throws Exception
461    {
462    getPlatform().copyFromLocal( inputFileLowerOffset );
463    getPlatform().copyFromLocal( inputFileUpper );
464
465    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
466    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
467
468    Map sources = new HashMap();
469
470    sources.put( "lower", sourceLower );
471    sources.put( "upper", sourceUpper );
472
473    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE );
474
475    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
476    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
477
478    Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
479
480    join = new Every( join, new Count() );
481
482//    join = new Each( join, new Debug( true ) );
483
484    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
485
486    flow.complete();
487
488    validateLength( flow, 2, null );
489
490    Set<Tuple> results = new HashSet<Tuple>();
491
492    results.add( new Tuple( "1\t1\t1" ) );
493    results.add( new Tuple( "5\t5\t2" ) );
494
495    List<Tuple> actual = getSinkAsList( flow );
496
497    results.removeAll( actual );
498
499    assertEquals( 0, results.size() );
500    }
501
502  /**
503   * 1 a1
504   * 1 a2
505   * 1 a3
506   * 2 b1
507   * 3 c1
508   * 4 d1
509   * 4 d2
510   * 4 d3
511   * 5 e1
512   * 5 e2
513   * 5 e3
514   * 7 g1
515   * 7 g2
516   * 7 g3
517   * 7 g4
518   * 7 g5
519   * null h1
520   * <p/>
521   * 1 A1
522   * 1 A2
523   * 1 A3
524   * 2 B1
525   * 2 B2
526   * 2 B3
527   * 4 D1
528   * 6 F1
529   * 6 F2
530   * null H1
531   * <p/>
532   * 1  a1      1       A1
533   * 1  a1      1       A2
534   * 1  a1      1       A3
535   * 1  a2      1       A1
536   * 1  a2      1       A2
537   * 1  a2      1       A3
538   * 1  a3      1       A1
539   * 1  a3      1       A2
540   * 1  a3      1       A3
541   * 2  b1      2       B1
542   * 2  b1      2       B2
543   * 2  b1      2       B3
544   * 4  d1      4       D1
545   * 4  d2      4       D1
546   * 4  d3      4       D1
547   * null h1  null  H1
548   *
549   * @throws Exception
550   */
551  @Test
552  public void testCoGroupInner() throws Exception
553    {
554    HashSet<Tuple> results = new HashSet<Tuple>();
555
556    results.add( new Tuple( "1", "a1", "1", "A1" ) );
557    results.add( new Tuple( "1", "a1", "1", "A2" ) );
558    results.add( new Tuple( "1", "a1", "1", "A3" ) );
559    results.add( new Tuple( "1", "a2", "1", "A1" ) );
560    results.add( new Tuple( "1", "a2", "1", "A2" ) );
561    results.add( new Tuple( "1", "a2", "1", "A3" ) );
562    results.add( new Tuple( "1", "a3", "1", "A1" ) );
563    results.add( new Tuple( "1", "a3", "1", "A2" ) );
564    results.add( new Tuple( "1", "a3", "1", "A3" ) );
565    results.add( new Tuple( "2", "b1", "2", "B1" ) );
566    results.add( new Tuple( "2", "b1", "2", "B2" ) );
567    results.add( new Tuple( "2", "b1", "2", "B3" ) );
568    results.add( new Tuple( "4", "d1", "4", "D1" ) );
569    results.add( new Tuple( "4", "d2", "4", "D1" ) );
570    results.add( new Tuple( "4", "d3", "4", "D1" ) );
571    results.add( new Tuple( null, "h1", null, "H1" ) );
572
573    handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null );
574    handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null );
575    }
576
577  /**
578   * 1 a1
579   * 1 a2
580   * 1 a3
581   * 2 b1
582   * 3 c1
583   * 4 d1
584   * 4 d2
585   * 4 d3
586   * 5 e1
587   * 5 e2
588   * 5 e3
589   * 7 g1
590   * 7 g2
591   * 7 g3
592   * 7 g4
593   * 7 g5
594   * null h1
595   * <p/>
596   * 1 A1
597   * 1 A2
598   * 1 A3
599   * 2 B1
600   * 2 B2
601   * 2 B3
602   * 4 D1
603   * 6 F1
604   * 6 F2
605   * null H1
606   * <p/>
607   * 1  a1      1       A1
608   * 1  a1      1       A2
609   * 1  a1      1       A3
610   * 1  a2      1       A1
611   * 1  a2      1       A2
612   * 1  a2      1       A3
613   * 1  a3      1       A1
614   * 1  a3      1       A2
615   * 1  a3      1       A3
616   * 2  b1      2       B1
617   * 2  b1      2       B2
618   * 2  b1      2       B3
619   * 4  d1      4       D1
620   * 4  d2      4       D1
621   * 4  d3      4       D1
622   *
623   * @throws Exception
624   */
625  @Test
626  public void testCoGroupInnerNull() throws Exception
627    {
628    HashSet<Tuple> results = new HashSet<Tuple>();
629
630    results.add( new Tuple( "1", "a1", "1", "A1" ) );
631    results.add( new Tuple( "1", "a1", "1", "A2" ) );
632    results.add( new Tuple( "1", "a1", "1", "A3" ) );
633    results.add( new Tuple( "1", "a2", "1", "A1" ) );
634    results.add( new Tuple( "1", "a2", "1", "A2" ) );
635    results.add( new Tuple( "1", "a2", "1", "A3" ) );
636    results.add( new Tuple( "1", "a3", "1", "A1" ) );
637    results.add( new Tuple( "1", "a3", "1", "A2" ) );
638    results.add( new Tuple( "1", "a3", "1", "A3" ) );
639    results.add( new Tuple( "2", "b1", "2", "B1" ) );
640    results.add( new Tuple( "2", "b1", "2", "B2" ) );
641    results.add( new Tuple( "2", "b1", "2", "B3" ) );
642    results.add( new Tuple( "4", "d1", "4", "D1" ) );
643    results.add( new Tuple( "4", "d2", "4", "D1" ) );
644    results.add( new Tuple( "4", "d3", "4", "D1" ) );
645
646    handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() );
647    handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() );
648    }
649
650  /**
651   * 1 a1
652   * 1 a2
653   * 1 a3
654   * 2 b1
655   * 3 c1
656   * 4 d1
657   * 4 d2
658   * 4 d3
659   * 5 e1
660   * 5 e2
661   * 5 e3
662   * 7 g1
663   * 7 g2
664   * 7 g3
665   * 7 g4
666   * 7 g5
667   * null h1
668   * <p/>
669   * 1 A1
670   * 1 A2
671   * 1 A3
672   * 2 B1
673   * 2 B2
674   * 2 B3
675   * 4 D1
676   * 6 F1
677   * 6 F2
678   * null H1
679   * <p/>
680   * 1  a1      1       A1
681   * 1  a1      1       A2
682   * 1  a1      1       A3
683   * 1  a2      1       A1
684   * 1  a2      1       A2
685   * 1  a2      1       A3
686   * 1  a3      1       A1
687   * 1  a3      1       A2
688   * 1  a3      1       A3
689   * 2  b1      2       B1
690   * 2  b1      2       B2
691   * 2  b1      2       B3
692   * 3  c1      null    null
693   * 4  d1      4       D1
694   * 4  d2      4       D1
695   * 4  d3      4       D1
696   * 5  e1      null    null
697   * 5  e2      null    null
698   * 5  e3      null    null
699   * null       null    6       F1
700   * null       null    6       F2
701   * 7  g1      null    null
702   * 7  g2      null    null
703   * 7  g3      null    null
704   * 7  g4      null    null
705   * 7  g5      null    null
706   * null h1  null  H1
707   *
708   * @throws Exception
709   */
710  @Test
711  public void testCoGroupOuter() throws Exception
712    {
713    Set<Tuple> results = new HashSet<Tuple>();
714
715    results.add( new Tuple( "1", "a1", "1", "A1" ) );
716    results.add( new Tuple( "1", "a1", "1", "A2" ) );
717    results.add( new Tuple( "1", "a1", "1", "A3" ) );
718    results.add( new Tuple( "1", "a2", "1", "A1" ) );
719    results.add( new Tuple( "1", "a2", "1", "A2" ) );
720    results.add( new Tuple( "1", "a2", "1", "A3" ) );
721    results.add( new Tuple( "1", "a3", "1", "A1" ) );
722    results.add( new Tuple( "1", "a3", "1", "A2" ) );
723    results.add( new Tuple( "1", "a3", "1", "A3" ) );
724    results.add( new Tuple( "2", "b1", "2", "B1" ) );
725    results.add( new Tuple( "2", "b1", "2", "B2" ) );
726    results.add( new Tuple( "2", "b1", "2", "B3" ) );
727    results.add( new Tuple( "3", "c1", null, null ) );
728    results.add( new Tuple( "4", "d1", "4", "D1" ) );
729    results.add( new Tuple( "4", "d2", "4", "D1" ) );
730    results.add( new Tuple( "4", "d3", "4", "D1" ) );
731    results.add( new Tuple( "5", "e1", null, null ) );
732    results.add( new Tuple( "5", "e2", null, null ) );
733    results.add( new Tuple( "5", "e3", null, null ) );
734    results.add( new Tuple( null, null, "6", "F1" ) );
735    results.add( new Tuple( null, null, "6", "F2" ) );
736    results.add( new Tuple( "7", "g1", null, null ) );
737    results.add( new Tuple( "7", "g2", null, null ) );
738    results.add( new Tuple( "7", "g3", null, null ) );
739    results.add( new Tuple( "7", "g4", null, null ) );
740    results.add( new Tuple( "7", "g5", null, null ) );
741    results.add( new Tuple( null, "h1", null, "H1" ) );
742
743    handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null );
744    handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null );
745    }
746
747  /**
748   * 1 a1
749   * 1 a2
750   * 1 a3
751   * 2 b1
752   * 3 c1
753   * 4 d1
754   * 4 d2
755   * 4 d3
756   * 5 e1
757   * 5 e2
758   * 5 e3
759   * 7 g1
760   * 7 g2
761   * 7 g3
762   * 7 g4
763   * 7 g5
764   * null h1
765   * <p/>
766   * 1 A1
767   * 1 A2
768   * 1 A3
769   * 2 B1
770   * 2 B2
771   * 2 B3
772   * 4 D1
773   * 6 F1
774   * 6 F2
775   * null H1
776   * <p/>
777   * 1  a1      1       A1
778   * 1  a1      1       A2
779   * 1  a1      1       A3
780   * 1  a2      1       A1
781   * 1  a2      1       A2
782   * 1  a2      1       A3
783   * 1  a3      1       A1
784   * 1  a3      1       A2
785   * 1  a3      1       A3
786   * 2  b1      2       B1
787   * 2  b1      2       B2
788   * 2  b1      2       B3
789   * 3  c1      null    null
790   * 4  d1      4       D1
791   * 4  d2      4       D1
792   * 4  d3      4       D1
793   * 5  e1      null    null
794   * 5  e2      null    null
795   * 5  e3      null    null
796   * null       null    6       F1
797   * null       null    6       F2
798   * 7  g1      null    null
799   * 7  g2      null    null
800   * 7  g3      null    null
801   * 7  g4      null    null
802   * 7  g5      null    null
803   * null h1  null  null
804   * null null  null  H1
805   *
806   * @throws Exception
807   */
808  @Test
809  public void testCoGroupOuterNull() throws Exception
810    {
811    Set<Tuple> results = new HashSet<Tuple>();
812
813    results.add( new Tuple( "1", "a1", "1", "A1" ) );
814    results.add( new Tuple( "1", "a1", "1", "A2" ) );
815    results.add( new Tuple( "1", "a1", "1", "A3" ) );
816    results.add( new Tuple( "1", "a2", "1", "A1" ) );
817    results.add( new Tuple( "1", "a2", "1", "A2" ) );
818    results.add( new Tuple( "1", "a2", "1", "A3" ) );
819    results.add( new Tuple( "1", "a3", "1", "A1" ) );
820    results.add( new Tuple( "1", "a3", "1", "A2" ) );
821    results.add( new Tuple( "1", "a3", "1", "A3" ) );
822    results.add( new Tuple( "2", "b1", "2", "B1" ) );
823    results.add( new Tuple( "2", "b1", "2", "B2" ) );
824    results.add( new Tuple( "2", "b1", "2", "B3" ) );
825    results.add( new Tuple( "3", "c1", null, null ) );
826    results.add( new Tuple( "4", "d1", "4", "D1" ) );
827    results.add( new Tuple( "4", "d2", "4", "D1" ) );
828    results.add( new Tuple( "4", "d3", "4", "D1" ) );
829    results.add( new Tuple( "5", "e1", null, null ) );
830    results.add( new Tuple( "5", "e2", null, null ) );
831    results.add( new Tuple( "5", "e3", null, null ) );
832    results.add( new Tuple( null, null, "6", "F1" ) );
833    results.add( new Tuple( null, null, "6", "F2" ) );
834    results.add( new Tuple( "7", "g1", null, null ) );
835    results.add( new Tuple( "7", "g2", null, null ) );
836    results.add( new Tuple( "7", "g3", null, null ) );
837    results.add( new Tuple( "7", "g4", null, null ) );
838    results.add( new Tuple( "7", "g5", null, null ) );
839    results.add( new Tuple( null, "h1", null, null ) );
840    results.add( new Tuple( null, null, null, "H1" ) );
841
842    handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() );
843    handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() );
844    }
845
846  /**
847   * 1 a1
848   * 1 a2
849   * 1 a3
850   * 2 b1
851   * 3 c1
852   * 4 d1
853   * 4 d2
854   * 4 d3
855   * 5 e1
856   * 5 e2
857   * 5 e3
858   * 7 g1
859   * 7 g2
860   * 7 g3
861   * 7 g4
862   * 7 g5
863   * null h1
864   * <p/>
865   * 1 A1
866   * 1 A2
867   * 1 A3
868   * 2 B1
869   * 2 B2
870   * 2 B3
871   * 4 D1
872   * 6 F1
873   * 6 F2
874   * null H1
875   * <p/>
876   * 1  a1      1       A1
877   * 1  a1      1       A2
878   * 1  a1      1       A3
879   * 1  a2      1       A1
880   * 1  a2      1       A2
881   * 1  a2      1       A3
882   * 1  a3      1       A1
883   * 1  a3      1       A2
884   * 1  a3      1       A3
885   * 2  b1      2       B1
886   * 2  b1      2       B2
887   * 2  b1      2       B3
888   * 3  c1      null    null
889   * 4  d1      4       D1
890   * 4  d2      4       D1
891   * 4  d3      4       D1
892   * 5  e1      null    null
893   * 5  e2      null    null
894   * 5  e3      null    null
895   * 7  g1      null    null
896   * 7  g2      null    null
897   * 7  g3      null    null
898   * 7  g4      null    null
899   * 7  g5      null    null
900   * null h1    null    H1
901   *
902   * @throws Exception
903   */
904  @Test
905  public void testCoGroupInnerOuter() throws Exception
906    {
907    Set<Tuple> results = new HashSet<Tuple>();
908
909    results.add( new Tuple( "1", "a1", "1", "A1" ) );
910    results.add( new Tuple( "1", "a1", "1", "A2" ) );
911    results.add( new Tuple( "1", "a1", "1", "A3" ) );
912    results.add( new Tuple( "1", "a2", "1", "A1" ) );
913    results.add( new Tuple( "1", "a2", "1", "A2" ) );
914    results.add( new Tuple( "1", "a2", "1", "A3" ) );
915    results.add( new Tuple( "1", "a3", "1", "A1" ) );
916    results.add( new Tuple( "1", "a3", "1", "A2" ) );
917    results.add( new Tuple( "1", "a3", "1", "A3" ) );
918    results.add( new Tuple( "2", "b1", "2", "B1" ) );
919    results.add( new Tuple( "2", "b1", "2", "B2" ) );
920    results.add( new Tuple( "2", "b1", "2", "B3" ) );
921    results.add( new Tuple( "3", "c1", null, null ) );
922    results.add( new Tuple( "4", "d1", "4", "D1" ) );
923    results.add( new Tuple( "4", "d2", "4", "D1" ) );
924    results.add( new Tuple( "4", "d3", "4", "D1" ) );
925    results.add( new Tuple( "5", "e1", null, null ) );
926    results.add( new Tuple( "5", "e2", null, null ) );
927    results.add( new Tuple( "5", "e3", null, null ) );
928    results.add( new Tuple( "7", "g1", null, null ) );
929    results.add( new Tuple( "7", "g2", null, null ) );
930    results.add( new Tuple( "7", "g3", null, null ) );
931    results.add( new Tuple( "7", "g4", null, null ) );
932    results.add( new Tuple( "7", "g5", null, null ) );
933    results.add( new Tuple( null, "h1", null, "H1" ) );
934
935    handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null );
936    handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null );
937    }
938
939  /**
940   * 1 a1
941   * 1 a2
942   * 1 a3
943   * 2 b1
944   * 3 c1
945   * 4 d1
946   * 4 d2
947   * 4 d3
948   * 5 e1
949   * 5 e2
950   * 5 e3
951   * 7 g1
952   * 7 g2
953   * 7 g3
954   * 7 g4
955   * 7 g5
956   * null h1
957   * <p/>
958   * 1 A1
959   * 1 A2
960   * 1 A3
961   * 2 B1
962   * 2 B2
963   * 2 B3
964   * 4 D1
965   * 6 F1
966   * 6 F2
967   * null H1
968   * <p/>
969   * 1  a1      1       A1
970   * 1  a1      1       A2
971   * 1  a1      1       A3
972   * 1  a2      1       A1
973   * 1  a2      1       A2
974   * 1  a2      1       A3
975   * 1  a3      1       A1
976   * 1  a3      1       A2
977   * 1  a3      1       A3
978   * 2  b1      2       B1
979   * 2  b1      2       B2
980   * 2  b1      2       B3
981   * 3  c1      null    null
982   * 4  d1      4       D1
983   * 4  d2      4       D1
984   * 4  d3      4       D1
985   * 5  e1      null    null
986   * 5  e2      null    null
987   * 5  e3      null    null
988   * 7  g1      null    null
989   * 7  g2      null    null
990   * 7  g3      null    null
991   * 7  g4      null    null
992   * 7  g5      null    null
993   * null h1    null    null
994   *
995   * @throws Exception
996   */
997  @Test
998  public void testCoGroupInnerOuterNull() throws Exception
999    {
1000    Set<Tuple> results = new HashSet<Tuple>();
1001
1002    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1003    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1004    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1005    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1006    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1007    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1008    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1009    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1010    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1011    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1012    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1013    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1014    results.add( new Tuple( "3", "c1", null, null ) );
1015    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1016    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1017    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1018    results.add( new Tuple( "5", "e1", null, null ) );
1019    results.add( new Tuple( "5", "e2", null, null ) );
1020    results.add( new Tuple( "5", "e3", null, null ) );
1021    results.add( new Tuple( "7", "g1", null, null ) );
1022    results.add( new Tuple( "7", "g2", null, null ) );
1023    results.add( new Tuple( "7", "g3", null, null ) );
1024    results.add( new Tuple( "7", "g4", null, null ) );
1025    results.add( new Tuple( "7", "g5", null, null ) );
1026    results.add( new Tuple( null, "h1", null, null ) );
1027
1028    handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() );
1029    handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() );
1030    }
1031
1032  /**
1033   * 1 a1
1034   * 1 a2
1035   * 1 a3
1036   * 2 b1
1037   * 3 c1
1038   * 4 d1
1039   * 4 d2
1040   * 4 d3
1041   * 5 e1
1042   * 5 e2
1043   * 5 e3
1044   * 7 g1
1045   * 7 g2
1046   * 7 g3
1047   * 7 g4
1048   * 7 g5
1049   * null h1
1050   * <p/>
1051   * 1 A1
1052   * 1 A2
1053   * 1 A3
1054   * 2 B1
1055   * 2 B2
1056   * 2 B3
1057   * 4 D1
1058   * 6 F1
1059   * 6 F2
1060   * null H1
1061   * <p/>
1062   * 1  a1      1       A1
1063   * 1  a1      1       A2
1064   * 1  a1      1       A3
1065   * 1  a2      1       A1
1066   * 1  a2      1       A2
1067   * 1  a2      1       A3
1068   * 1  a3      1       A1
1069   * 1  a3      1       A2
1070   * 1  a3      1       A3
1071   * 2  b1      2       B1
1072   * 2  b1      2       B2
1073   * 2  b1      2       B3
1074   * 4  d1      4       D1
1075   * 4  d2      4       D1
1076   * 4  d3      4       D1
1077   * null       null    6       F1
1078   * null       null    6       F2
1079   * null h1    null    H1
1080   *
1081   * @throws Exception
1082   */
1083  @Test
1084  public void testCoGroupOuterInner() throws Exception
1085    {
1086    Set<Tuple> results = new HashSet<Tuple>();
1087
1088    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1089    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1090    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1091    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1092    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1093    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1094    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1095    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1096    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1097    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1098    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1099    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1100    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1101    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1102    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1103    results.add( new Tuple( null, null, "6", "F1" ) );
1104    results.add( new Tuple( null, null, "6", "F2" ) );
1105    results.add( new Tuple( null, "h1", null, "H1" ) );
1106
1107    handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null );
1108    handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null );
1109    }
1110
1111  /**
1112   * 1 a1
1113   * 1 a2
1114   * 1 a3
1115   * 2 b1
1116   * 3 c1
1117   * 4 d1
1118   * 4 d2
1119   * 4 d3
1120   * 5 e1
1121   * 5 e2
1122   * 5 e3
1123   * 7 g1
1124   * 7 g2
1125   * 7 g3
1126   * 7 g4
1127   * 7 g5
1128   * null h1
1129   * <p/>
1130   * 1 A1
1131   * 1 A2
1132   * 1 A3
1133   * 2 B1
1134   * 2 B2
1135   * 2 B3
1136   * 4 D1
1137   * 6 F1
1138   * 6 F2
1139   * null H1
1140   * <p/>
1141   * 1  a1      1       A1
1142   * 1  a1      1       A2
1143   * 1  a1      1       A3
1144   * 1  a2      1       A1
1145   * 1  a2      1       A2
1146   * 1  a2      1       A3
1147   * 1  a3      1       A1
1148   * 1  a3      1       A2
1149   * 1  a3      1       A3
1150   * 2  b1      2       B1
1151   * 2  b1      2       B2
1152   * 2  b1      2       B3
1153   * 4  d1      4       D1
1154   * 4  d2      4       D1
1155   * 4  d3      4       D1
1156   * null       null    6       F1
1157   * null       null    6       F2
1158   * null null  null    H1
1159   *
1160   * @throws Exception
1161   */
1162  @Test
1163  public void testCoGroupOuterInnerNull() throws Exception
1164    {
1165    Set<Tuple> results = new HashSet<Tuple>();
1166
1167    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1168    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1169    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1170    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1171    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1172    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1173    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1174    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1175    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1176    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1177    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1178    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1179    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1180    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1181    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1182    results.add( new Tuple( null, null, "6", "F1" ) );
1183    results.add( new Tuple( null, null, "6", "F2" ) );
1184    results.add( new Tuple( null, null, null, "H1" ) );
1185
1186    handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() );
1187    handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() );
1188    }
1189
1190  private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception
1191    {
1192    results = new HashSet<Tuple>( results );
1193
1194    getPlatform().copyFromLocal( inputFileLhsSparse );
1195    getPlatform().copyFromLocal( inputFileRhsSparse );
1196
1197    Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
1198    Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
1199    Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
1200
1201    Map sources = new HashMap();
1202
1203    sources.put( "lower", sourceLower );
1204    sources.put( "upper", sourceUpper );
1205
1206    Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
1207
1208    Pipe pipeLower = new Pipe( "lower" );
1209    Pipe pipeUpper = new Pipe( "upper" );
1210
1211    Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
1212
1213    Fields groupFields = new Fields( "num" );
1214
1215    if( comparator != null )
1216      groupFields.setComparator( 0, comparator );
1217
1218    Pipe splice;
1219    if( useResultGroupFields )
1220      splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner );
1221    else
1222      splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner );
1223
1224    splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS );
1225
1226    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1227
1228    flow.complete();
1229
1230    validateLength( flow, results.size() );
1231
1232    List<Tuple> actual = getSinkAsList( flow );
1233
1234    results.removeAll( actual );
1235
1236    assertEquals( 0, results.size() );
1237    }
1238
1239  /**
1240   * 1 a
1241   * 5 b
1242   * 6 c
1243   * 5 b
1244   * 5 e
1245   * <p/>
1246   * 1 A
1247   * 2 B
1248   * 3 C
1249   * 4 D
1250   * 5 E
1251   * <p/>
1252   * 1 a
1253   * 2 b
1254   * 3 c
1255   * 4 d
1256   * 5 e
1257   * <p/>
1258   * 1  a       1       A  1  a
1259   * -  -   2   B  2  b
1260   * -  -   3   C  3  c
1261   * -  -   4   D  4  d
1262   * 5  b       5   E  5  e
1263   * 5  e       5   E  5  e
1264   *
1265   * @throws Exception
1266   */
1267  @Test
1268  public void testCoGroupMixed() throws Exception
1269    {
1270    getPlatform().copyFromLocal( inputFileLowerOffset );
1271    getPlatform().copyFromLocal( inputFileLower );
1272    getPlatform().copyFromLocal( inputFileUpper );
1273
1274    Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
1275    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1276    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1277
1278    Map sources = new HashMap();
1279
1280    sources.put( "loweroffset", sourceLowerOffset );
1281    sources.put( "lower", sourceLower );
1282    sources.put( "upper", sourceUpper );
1283
1284    Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE );
1285
1286    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1287
1288    Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
1289    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1290    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1291
1292    Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
1293    Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
1294
1295    MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
1296    Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join );
1297
1298    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1299
1300    flow.complete();
1301
1302    validateLength( flow, 6 );
1303
1304    Set<Tuple> results = new HashSet<Tuple>();
1305
1306    results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) );
1307    results.add( new Tuple( null, null, "2", "B", "2", "b" ) );
1308    results.add( new Tuple( null, null, "3", "C", "3", "c" ) );
1309    results.add( new Tuple( null, null, "4", "D", "4", "d" ) );
1310    results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) );
1311    results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) );
1312
1313    List<Tuple> actual = getSinkAsList( flow );
1314
1315    results.removeAll( actual );
1316
1317    assertEquals( 0, results.size() );
1318    }
1319
1320  @Test
1321  public void testCoGroupDiffFields() throws Exception
1322    {
1323    getPlatform().copyFromLocal( inputFileLower );
1324    getPlatform().copyFromLocal( inputFileUpper );
1325
1326    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1327    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1328
1329    Map sources = new HashMap();
1330
1331    sources.put( "lower", sourceLower );
1332    sources.put( "upper", sourceUpper );
1333
1334    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
1335
1336    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1337    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1338
1339    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1340    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1341
1342    Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1343
1344    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1345
1346    flow.complete();
1347
1348    validateLength( flow, 5 );
1349
1350    List<Tuple> actual = getSinkAsList( flow );
1351
1352    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1353    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1354    }
1355
1356  @Test
1357  public void testCoGroupGroupBy() throws Exception
1358    {
1359    getPlatform().copyFromLocal( inputFileLower );
1360    getPlatform().copyFromLocal( inputFileUpper );
1361
1362    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1363    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1364
1365    Map sources = new HashMap();
1366
1367    sources.put( "lower", sourceLower );
1368    sources.put( "upper", sourceUpper );
1369
1370    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE );
1371
1372    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ).applyTypes( String.class, String.class ), " " );
1373    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ).applyTypes( String.class, String.class ), " " );
1374
1375    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1376    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1377
1378    Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1379
1380    Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) );
1381
1382    Map<Object, Object> properties = getPlatform().getProperties();
1383
1384    properties.put( "cascading.serialization.types.required", "true" );
1385
1386    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, groupby );
1387
1388    flow.complete();
1389
1390    validateLength( flow, 5, null );
1391
1392    List<Tuple> actual = getSinkAsList( flow );
1393
1394    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1395    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1396    }
1397
1398  @Test
1399  public void testCoGroupSamePipe() throws Exception
1400    {
1401    getPlatform().copyFromLocal( inputFileLower );
1402
1403    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1404
1405    Map sources = new HashMap();
1406
1407    sources.put( "lower", source );
1408
1409    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
1410
1411    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1412
1413    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1414
1415    Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
1416
1417    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1418
1419    flow.complete();
1420
1421    validateLength( flow, 5, null );
1422
1423    List<Tuple> actual = getSinkAsList( flow );
1424
1425    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1426    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1427    }
1428
1429  @Test
1430  public void testCoGroupSamePipe2() throws Exception
1431    {
1432    getPlatform().copyFromLocal( inputFileLower );
1433
1434    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1435
1436    Map sources = new HashMap();
1437
1438    sources.put( "lower", source );
1439
1440    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
1441
1442    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1443
1444    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1445
1446    Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1447
1448    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1449
1450    flow.complete();
1451
1452    validateLength( flow, 5, null );
1453
1454    List<Tuple> actual = getSinkAsList( flow );
1455
1456    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1457    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1458    }
1459
1460  @Test
1461  public void testCoGroupSamePipe3() throws Exception
1462    {
1463    getPlatform().copyFromLocal( inputFileLower );
1464
1465    Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1466
1467    Map sources = new HashMap();
1468
1469    sources.put( "lower", source );
1470
1471    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1472
1473    Pipe pipe = new Pipe( "lower" );
1474
1475    Pipe lhs = new Pipe( "lhs", pipe );
1476    Pipe rhs = new Pipe( "rhs", pipe );
1477
1478    Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1479
1480    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1481
1482    flow.complete();
1483
1484    validateLength( flow, 5, null );
1485
1486    List<Tuple> actual = getSinkAsList( flow );
1487
1488    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1489    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1490    }
1491
1492  @Test
1493  public void testCoGroupAroundCoGroup() throws Exception
1494    {
1495    getPlatform().copyFromLocal( inputFileLower );
1496    getPlatform().copyFromLocal( inputFileUpper );
1497
1498    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1499    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1500
1501    Map sources = new HashMap();
1502
1503    sources.put( "lower", sourceLower );
1504    sources.put( "upper1", sourceUpper );
1505    sources.put( "upper2", sourceUpper );
1506
1507    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE );
1508
1509    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1510
1511    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1512    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1513    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1514
1515    Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1516
1517    splice1 = new Each( splice1, new Identity() );
1518
1519    Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1520
1521    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1522
1523    flow.complete();
1524
1525    validateLength( flow, 5, null );
1526
1527    List<Tuple> actual = getSinkAsList( flow );
1528
1529    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1530    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1531    }
1532
1533  @Test
1534  public void testCoGroupAroundCoGroupWithout() throws Exception
1535    {
1536    runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" );
1537    }
1538
1539  @Test
1540  public void testCoGroupAroundCoGroupWith() throws Exception
1541    {
1542    // hack to get classname
1543    runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" );
1544    }
1545
1546  private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException
1547    {
1548    getPlatform().copyFromLocal( inputFileNums20 );
1549    getPlatform().copyFromLocal( inputFileNums10 );
1550
1551    Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 );
1552    Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 );
1553
1554    Map sources = new HashMap();
1555
1556    sources.put( "source20", source20 );
1557    sources.put( "source101", source10 );
1558    sources.put( "source102", source10 );
1559
1560    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE );
1561
1562    Pipe pipeNum20 = new Pipe( "source20" );
1563    Pipe pipeNum101 = new Pipe( "source101" );
1564    Pipe pipeNum102 = new Pipe( "source102" );
1565
1566    Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) );
1567
1568    Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) );
1569
1570    splice2 = new Each( splice2, new Identity() );
1571
1572    Map<Object, Object> properties = getPlatform().getProperties();
1573
1574    if( getPlatform().isMapReduce() )
1575      FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass );
1576
1577    Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 );
1578
1579    if( getPlatform().isMapReduce() )
1580      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1581
1582    flow.complete();
1583
1584    validateLength( flow, 10 );
1585
1586    List<Tuple> actual = getSinkAsList( flow );
1587
1588    assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) );
1589    assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) );
1590    }
1591
1592  @Test
1593  public void testCoGroupDiffFieldsSameFile() throws Exception
1594    {
1595    getPlatform().copyFromLocal( inputFileLower );
1596    getPlatform().copyFromLocal( inputFileUpper );
1597
1598    Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1599    Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower );
1600
1601    Map sources = new HashMap();
1602
1603    sources.put( "offsetLower", sourceOffsetLower );
1604    sources.put( "lower", sourceLower );
1605
1606    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE );
1607
1608    Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " );
1609    Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " );
1610
1611    Pipe offsetLower = new Pipe( "offsetLower" );
1612    offsetLower = new Discard( offsetLower, new Fields( "offset" ) );
1613    offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower );
1614
1615    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper );
1616
1617    Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) );
1618
1619    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1620
1621    flow.complete();
1622
1623    validateLength( flow, 5 );
1624
1625    List<Tuple> actual = getSinkAsList( flow );
1626
1627    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1628    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1629    }
1630
1631  @Test
1632  public void testJoinNone() throws Exception
1633    {
1634    getPlatform().copyFromLocal( inputFileLower );
1635    getPlatform().copyFromLocal( inputFileUpper );
1636
1637    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1638    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1639
1640    Map sources = new HashMap();
1641
1642    sources.put( "lower", sourceLower );
1643    sources.put( "upper", sourceUpper );
1644
1645    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1646
1647    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1648
1649    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1650    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1651
1652    Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1653
1654    Map<Object, Object> properties = getProperties();
1655
1656    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1657
1658    flow.complete();
1659
1660    validateLength( flow, 25 );
1661
1662    List<Tuple> values = getSinkAsList( flow );
1663
1664    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1665    assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1666    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1667    }
1668
1669  @Test
1670  public void testMultiJoin() throws Exception
1671    {
1672    getPlatform().copyFromLocal( inputFileCrossX2 );
1673
1674    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 );
1675    Tap innerSink = getPlatform().getTextFile( getOutputPath( "inner" ), SinkMode.REPLACE );
1676    Tap outerSink = getPlatform().getTextFile( getOutputPath( "outer" ), SinkMode.REPLACE );
1677    Tap leftSink = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
1678    Tap rightSink = getPlatform().getTextFile( getOutputPath( "right" ), SinkMode.REPLACE );
1679
1680    Pipe uniques = new Pipe( "unique" );
1681
1682    uniques = new Each( uniques, new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1683
1684    uniques = new GroupBy( uniques, new Fields( "word" ) );
1685
1686    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1687
1688//    uniques = new Each( uniques, new Debug( true ) );
1689
1690    Pipe fielded = new Pipe( "fielded" );
1691
1692    fielded = new Each( fielded, new Fields( "line" ), new RegexSplitter( "\\s" ) );
1693
1694//    fielded = new Each( fielded, new Debug( true ) );
1695
1696    Pipe inner = new CoGroup( "inner", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new InnerJoin() );
1697    Pipe outer = new CoGroup( "outer", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new OuterJoin() );
1698    Pipe left = new CoGroup( "left", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new LeftJoin() );
1699    Pipe right = new CoGroup( "right", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new RightJoin() );
1700
1701    Pipe[] heads = Pipe.pipes( uniques, fielded );
1702    Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) );
1703
1704    Pipe[] tails = Pipe.pipes( inner, outer, left, right );
1705    Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) );
1706
1707    Flow flow = getPlatform().getFlowConnector().connect( "multi-joins", sources, sinks, tails );
1708
1709    flow.complete();
1710
1711    validateLength( flow.openTapForRead( innerSink ), 74 );
1712    validateLength( flow.openTapForRead( outerSink ), 84 );
1713    validateLength( flow.openTapForRead( leftSink ), 74 );
1714    validateLength( flow.openTapForRead( rightSink ), 84 );
1715    }
1716
1717  /**
1718   * This test checks that a valid node is created when adding back remainders during unique
1719   * path sub-graph partitioning.
1720   */
1721  @Test
1722  public void testMultiJoinWithSplits() throws Exception
1723    {
1724    getPlatform().copyFromLocal( inputFileCrossX2 );
1725
1726    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 );
1727    Tap innerSinkLhs = getPlatform().getTextFile( getOutputPath( "innerLhs" ), SinkMode.REPLACE );
1728    Tap uniqueSinkLhs = getPlatform().getTextFile( getOutputPath( "uniquesLhs" ), SinkMode.REPLACE );
1729    Tap innerSinkRhs = getPlatform().getTextFile( getOutputPath( "innerRhs" ), SinkMode.REPLACE );
1730    Tap uniqueSinkRhs = getPlatform().getTextFile( getOutputPath( "uniquesRhs" ), SinkMode.REPLACE );
1731
1732    Pipe incoming = new Pipe( "incoming" );
1733    Pipe uniquesLhs;
1734    Pipe innerLhs;
1735    Pipe uniquesRhs;
1736    Pipe innerRhs;
1737
1738    {
1739    Pipe generatorLhs = new Each( new Pipe( "genLhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1740
1741    generatorLhs = new Each( generatorLhs, new Identity() );
1742
1743    Pipe generatorRhs = new Each( new Pipe( "genRhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1744
1745    Pipe uniques = new Each( new Pipe( "uniquesLhs", generatorLhs ), new Identity() );
1746
1747    uniques = new GroupBy( uniques, new Fields( "word" ) );
1748
1749    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1750
1751    Pipe lhs = new Pipe( "lhs", generatorLhs );
1752
1753    lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) );
1754
1755    Pipe rhs = new Pipe( "rhs", generatorRhs );
1756
1757    rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) );
1758
1759    Pipe inner = new CoGroup( "innerLhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() );
1760
1761    uniquesLhs = uniques;
1762    innerLhs = inner;
1763    }
1764
1765    {
1766    Pipe generatorLhs = new Each( new Pipe( "genLhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1767
1768    generatorLhs = new Each( generatorLhs, new Identity() );
1769
1770    Pipe generatorRhs = new Each( new Pipe( "genRhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1771
1772    Pipe uniques = new Each( new Pipe( "uniquesRhs", generatorLhs ), new Identity() );
1773
1774    uniques = new GroupBy( uniques, new Fields( "word" ) );
1775
1776    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1777
1778    Pipe lhs = new Pipe( "lhs", generatorLhs );
1779
1780    lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) );
1781
1782    Pipe rhs = new Pipe( "rhs", generatorRhs );
1783
1784    rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) );
1785
1786    Pipe inner = new CoGroup( "innerRhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() );
1787
1788    uniquesRhs = uniques;
1789    innerRhs = inner;
1790    }
1791
1792    FlowDef flowDef = FlowDef.flowDef()
1793      .setName( "multi-joins" )
1794      .addSource( "incoming", source )
1795      .addTailSink( innerLhs, innerSinkLhs )
1796      .addTailSink( uniquesLhs, uniqueSinkLhs )
1797      .addTailSink( innerRhs, innerSinkRhs )
1798      .addTailSink( uniquesRhs, uniqueSinkRhs );
1799
1800    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
1801
1802    flow.complete();
1803
1804    validateLength( flow.openTapForRead( innerSinkLhs ), 3900 );
1805    validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 );
1806    validateLength( flow.openTapForRead( innerSinkRhs ), 3900 );
1807    validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 );
1808    }
1809
1810  @Test
1811  public void testSameSourceGroupSplitCoGroup() throws Exception
1812    {
1813    getPlatform().copyFromLocal( inputFileLower );
1814
1815    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower );
1816
1817    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE );
1818
1819    Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " );
1820
1821    Pipe sourcePipe = new Each( new Pipe( "source" ), new Fields( "line" ), splitter );
1822    sourcePipe = new GroupBy( sourcePipe, new Fields( "num" ) );
1823    sourcePipe = new Every( sourcePipe, new Fields( "char" ), new First( new Fields( "first", String.class ) ), Fields.ALL );
1824
1825    Pipe lhsPipe = new Retain( new Pipe( "lhs", sourcePipe ), Fields.ALL );
1826    Pipe rhsPipe = new Retain( new Pipe( "rhs", sourcePipe ), Fields.ALL );
1827
1828    Pipe splice = new CoGroup( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), Fields.size( 4 ) );
1829
1830    Map<Object, Object> properties = getPlatform().getProperties();
1831
1832    properties.put( "cascading.serialization.types.required", "true" );
1833
1834    Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, splice );
1835
1836    flow.complete();
1837
1838    validateLength( flow, 5, null );
1839
1840    List<Tuple> values = getSinkAsList( flow );
1841
1842    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
1843    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
1844    }
1845  }