001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading;
023
024import java.io.IOException;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.cascade.Cascades;
032import cascading.flow.Flow;
033import cascading.flow.FlowConnectorProps;
034import cascading.flow.FlowDef;
035import cascading.flow.FlowProps;
036import cascading.operation.Function;
037import cascading.operation.Identity;
038import cascading.operation.Insert;
039import cascading.operation.aggregator.Count;
040import cascading.operation.aggregator.First;
041import cascading.operation.regex.RegexFilter;
042import cascading.operation.regex.RegexSplitGenerator;
043import cascading.operation.regex.RegexSplitter;
044import cascading.pipe.CoGroup;
045import cascading.pipe.Each;
046import cascading.pipe.Every;
047import cascading.pipe.GroupBy;
048import cascading.pipe.Pipe;
049import cascading.pipe.assembly.Discard;
050import cascading.pipe.assembly.Rename;
051import cascading.pipe.assembly.Retain;
052import cascading.pipe.joiner.InnerJoin;
053import cascading.pipe.joiner.Joiner;
054import cascading.pipe.joiner.LeftJoin;
055import cascading.pipe.joiner.MixedJoin;
056import cascading.pipe.joiner.OuterJoin;
057import cascading.pipe.joiner.RightJoin;
058import cascading.tap.SinkMode;
059import cascading.tap.Tap;
060import cascading.tuple.Fields;
061import cascading.tuple.Tuple;
062import cascading.util.NullNotEquivalentComparator;
063import org.junit.Test;
064
065import static data.InputData.*;
066
067public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase
068  {
069  public CoGroupFieldedPipesPlatformTest()
070    {
071    super( true, 4, 1 ); // leave cluster testing enabled
072    }
073
074  @Test
075  public void testCross() throws Exception
076    {
077    getPlatform().copyFromLocal( inputFileLhs );
078    getPlatform().copyFromLocal( inputFileRhs );
079
080    Map sources = new HashMap();
081
082    sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
083    sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
084
085    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
086
087    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
088    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
089
090    Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
091
092    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
093
094    flow.complete();
095
096    validateLength( flow, 37, null );
097
098    List<Tuple> values = getSinkAsList( flow );
099
100    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
101    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
102    }
103
104  @Test
105  public void testCoGroup() throws Exception
106    {
107    getPlatform().copyFromLocal( inputFileLower );
108    getPlatform().copyFromLocal( inputFileUpper );
109
110    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
111    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
112
113    Map sources = new HashMap();
114
115    sources.put( "lower", sourceLower );
116    sources.put( "upper", sourceUpper );
117
118    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE );
119
120    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
121
122    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
123    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
124
125    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) );
126
127    Map<Object, Object> properties = getProperties();
128
129    // make sure hasher is getting called, but does nothing special
130    FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() );
131
132    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
133
134    flow.complete();
135
136    validateLength( flow, 5 );
137
138    List<Tuple> values = getSinkAsList( flow );
139
140    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
141    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
142    }
143
144  @Test
145  public void testCoGroupSamePipeName() throws Exception
146    {
147    getPlatform().copyFromLocal( inputFileLower );
148    getPlatform().copyFromLocal( inputFileUpper );
149
150    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
151    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
152
153    Map sources = new HashMap();
154
155    sources.put( "lower", sourceLower );
156    sources.put( "upper", sourceUpper );
157
158    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
159
160    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
161
162    Pipe pipeLower = new Pipe( "lower" );
163    Pipe pipeUpper = new Pipe( "upper" );
164
165    // these pipes will hide the source name, and could cause one to be lost
166    pipeLower = new Pipe( "same", pipeLower );
167    pipeUpper = new Pipe( "same", pipeUpper );
168
169    pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
170    pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
171
172//    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
173//    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
174
175    pipeLower = new Pipe( "left", pipeLower );
176    pipeUpper = new Pipe( "right", pipeUpper );
177
178//    pipeLower = new Each( pipeLower, new Debug( true ) );
179//    pipeUpper = new Each( pipeUpper, new Debug( true ) );
180
181    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
182
183//    splice = new Each( splice, new Debug( true ) );
184    splice = new Pipe( "splice", splice );
185    splice = new Pipe( "tail", splice );
186
187    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
188
189    flow.complete();
190
191    validateLength( flow, 5 );
192
193    List<Tuple> values = getSinkAsList( flow );
194
195    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
196    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
197    }
198
199  @Test
200  public void testCoGroupWithUnknowns() throws Exception
201    {
202    getPlatform().copyFromLocal( inputFileLower );
203    getPlatform().copyFromLocal( inputFileUpper );
204
205    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
206    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
207
208    Map sources = new HashMap();
209
210    sources.put( "lower", sourceLower );
211    sources.put( "upper", sourceUpper );
212
213    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
214
215    Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
216
217    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
218    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
219
220    Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
221
222    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
223
224    flow.complete();
225
226    validateLength( flow, 5 );
227
228    List<Tuple> values = getSinkAsList( flow );
229
230    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
231    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
232    }
233
234  /**
235   * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
236   * a new stream using an outerjoin.
237   *
238   * @throws Exception
239   */
240  @Test
241  public void testCoGroupFilteredBranch() throws Exception
242    {
243    getPlatform().copyFromLocal( inputFileLower );
244    getPlatform().copyFromLocal( inputFileUpper );
245
246    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
247    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
248
249    Map sources = new HashMap();
250
251    sources.put( "lower", sourceLower );
252    sources.put( "upper", sourceUpper );
253
254    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE );
255
256    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
257
258    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
259    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
260    pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
261    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
262
263    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
264
265    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
266
267    flow.complete();
268
269    validateLength( flow, 5 );
270
271    List<Tuple> values = getSinkAsList( flow );
272
273    assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
274    assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
275    }
276
277  @Test
278  public void testCoGroupSelf() throws Exception
279    {
280    getPlatform().copyFromLocal( inputFileLower );
281
282    // intentionally creating multiple instances that are equivalent
283    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
284    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
285
286    Map sources = new HashMap();
287
288    sources.put( "lower", sourceLower );
289    sources.put( "upper", sourceUpper );
290
291    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE );
292
293    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
294
295    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
296    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
297
298    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
299
300    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
301
302    flow.complete();
303
304    validateLength( flow, 5 );
305
306    List<Tuple> values = getSinkAsList( flow );
307
308    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
309    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
310    }
311
312  @Test
313  public void testSplitCoGroupSelf() throws Exception
314    {
315    getPlatform().copyFromLocal( inputFileLower );
316
317    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
318
319    Map sources = new HashMap();
320
321    sources.put( "lowerLhs", source );
322    sources.put( "upperLhs", source );
323    sources.put( "lowerRhs", source );
324    sources.put( "upperRhs", source );
325
326    Tap sinkLhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/lhs" ), SinkMode.REPLACE );
327    Tap sinkRhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/rhs" ), SinkMode.REPLACE );
328
329    Map sinks = new HashMap();
330
331    sinks.put( "lhs", sinkLhs );
332    sinks.put( "rhs", sinkRhs );
333
334    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
335
336    Pipe pipeLowerLhs = new Each( new Pipe( "lowerLhs" ), new Fields( "line" ), splitter );
337    Pipe pipeUpperLhs = new Each( new Pipe( "upperLhs" ), new Fields( "line" ), splitter );
338
339    Pipe spliceLhs = new CoGroup( "lhs", pipeLowerLhs, new Fields( "num" ), pipeUpperLhs, new Fields( "num" ), Fields.size( 4 ) );
340
341    Pipe pipeLowerRhs = new Each( new Pipe( "lowerRhs" ), new Fields( "line" ), splitter );
342    Pipe pipeUpperRhs = new Each( new Pipe( "upperRhs" ), new Fields( "line" ), splitter );
343
344    Pipe spliceRhs = new CoGroup( "rhs", pipeLowerRhs, new Fields( "num" ), pipeUpperRhs, new Fields( "num" ), Fields.size( 4 ) );
345
346    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, spliceLhs, spliceRhs );
347
348    flow.complete();
349
350    List<Tuple> values = asList( flow, sinkLhs );
351
352    assertEquals( 5, values.size() );
353    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
354    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
355
356    values = asList( flow, sinkRhs );
357
358    assertEquals( 5, values.size() );
359    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
360    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
361    }
362
363  /**
364   * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
365   *
366   * @throws Exception when
367   */
368  @Test
369  public void testCoGroupAfterEvery() throws Exception
370    {
371    getPlatform().copyFromLocal( inputFileLower );
372    getPlatform().copyFromLocal( inputFileUpper );
373
374    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower );
375    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileUpper );
376
377    Map sources = new HashMap();
378
379    sources.put( "lower", sourceLower );
380    sources.put( "upper", sourceUpper );
381
382    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
383
384    Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " );
385
386    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
387    pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
388    pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
389
390    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
391    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
392    pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
393
394    Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
395
396    Map<Object, Object> properties = getPlatform().getProperties();
397
398    properties.put( "cascading.serialization.types.required", "true" );
399
400    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
401
402    flow.complete();
403
404    validateLength( flow, 5, null );
405
406    List<Tuple> values = getSinkAsList( flow );
407
408    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
409    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
410    }
411
412  /**
413   * Tests that CoGroup properly resolves fields when following an Every
414   *
415   * @throws Exception
416   */
417  @Test
418  public void testCoGroupAfterEveryNoDeclared() throws Exception
419    {
420    getPlatform().copyFromLocal( inputFileLower );
421    getPlatform().copyFromLocal( inputFileUpper );
422
423    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
424    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
425
426    Map sources = new HashMap();
427
428    sources.put( "lower", sourceLower );
429    sources.put( "upper", sourceUpper );
430
431    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE );
432
433    Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " );
434
435    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 );
436    pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL );
437    pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) );
438    pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL );
439
440    Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " );
441
442    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 );
443    pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) );
444    pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL );
445
446    Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
447
448    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
449
450    flow.complete();
451
452    validateLength( flow, 5, null );
453
454    List<Tuple> values = getSinkAsList( flow );
455
456    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
457    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
458    }
459
460  @Test
461  public void testCoGroupInnerSingleField() throws Exception
462    {
463    getPlatform().copyFromLocal( inputFileLowerOffset );
464    getPlatform().copyFromLocal( inputFileUpper );
465
466    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
467    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
468
469    Map sources = new HashMap();
470
471    sources.put( "lower", sourceLower );
472    sources.put( "upper", sourceUpper );
473
474    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE );
475
476    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
477    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
478
479    Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
480
481    join = new Every( join, new Count() );
482
483//    join = new Each( join, new Debug( true ) );
484
485    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
486
487    flow.complete();
488
489    validateLength( flow, 2, null );
490
491    Set<Tuple> results = new HashSet<Tuple>();
492
493    results.add( new Tuple( "1\t1\t1" ) );
494    results.add( new Tuple( "5\t5\t2" ) );
495
496    List<Tuple> actual = getSinkAsList( flow );
497
498    results.removeAll( actual );
499
500    assertEquals( 0, results.size() );
501    }
502
503  /**
504   * 1 a1
505   * 1 a2
506   * 1 a3
507   * 2 b1
508   * 3 c1
509   * 4 d1
510   * 4 d2
511   * 4 d3
512   * 5 e1
513   * 5 e2
514   * 5 e3
515   * 7 g1
516   * 7 g2
517   * 7 g3
518   * 7 g4
519   * 7 g5
520   * null h1
521   * <p/>
522   * 1 A1
523   * 1 A2
524   * 1 A3
525   * 2 B1
526   * 2 B2
527   * 2 B3
528   * 4 D1
529   * 6 F1
530   * 6 F2
531   * null H1
532   * <p/>
533   * 1  a1      1       A1
534   * 1  a1      1       A2
535   * 1  a1      1       A3
536   * 1  a2      1       A1
537   * 1  a2      1       A2
538   * 1  a2      1       A3
539   * 1  a3      1       A1
540   * 1  a3      1       A2
541   * 1  a3      1       A3
542   * 2  b1      2       B1
543   * 2  b1      2       B2
544   * 2  b1      2       B3
545   * 4  d1      4       D1
546   * 4  d2      4       D1
547   * 4  d3      4       D1
548   * null h1  null  H1
549   *
550   * @throws Exception
551   */
552  @Test
553  public void testCoGroupInner() throws Exception
554    {
555    HashSet<Tuple> results = new HashSet<Tuple>();
556
557    results.add( new Tuple( "1", "a1", "1", "A1" ) );
558    results.add( new Tuple( "1", "a1", "1", "A2" ) );
559    results.add( new Tuple( "1", "a1", "1", "A3" ) );
560    results.add( new Tuple( "1", "a2", "1", "A1" ) );
561    results.add( new Tuple( "1", "a2", "1", "A2" ) );
562    results.add( new Tuple( "1", "a2", "1", "A3" ) );
563    results.add( new Tuple( "1", "a3", "1", "A1" ) );
564    results.add( new Tuple( "1", "a3", "1", "A2" ) );
565    results.add( new Tuple( "1", "a3", "1", "A3" ) );
566    results.add( new Tuple( "2", "b1", "2", "B1" ) );
567    results.add( new Tuple( "2", "b1", "2", "B2" ) );
568    results.add( new Tuple( "2", "b1", "2", "B3" ) );
569    results.add( new Tuple( "4", "d1", "4", "D1" ) );
570    results.add( new Tuple( "4", "d2", "4", "D1" ) );
571    results.add( new Tuple( "4", "d3", "4", "D1" ) );
572    results.add( new Tuple( null, "h1", null, "H1" ) );
573
574    handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null );
575    handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null );
576    }
577
578  /**
579   * 1 a1
580   * 1 a2
581   * 1 a3
582   * 2 b1
583   * 3 c1
584   * 4 d1
585   * 4 d2
586   * 4 d3
587   * 5 e1
588   * 5 e2
589   * 5 e3
590   * 7 g1
591   * 7 g2
592   * 7 g3
593   * 7 g4
594   * 7 g5
595   * null h1
596   * <p/>
597   * 1 A1
598   * 1 A2
599   * 1 A3
600   * 2 B1
601   * 2 B2
602   * 2 B3
603   * 4 D1
604   * 6 F1
605   * 6 F2
606   * null H1
607   * <p/>
608   * 1  a1      1       A1
609   * 1  a1      1       A2
610   * 1  a1      1       A3
611   * 1  a2      1       A1
612   * 1  a2      1       A2
613   * 1  a2      1       A3
614   * 1  a3      1       A1
615   * 1  a3      1       A2
616   * 1  a3      1       A3
617   * 2  b1      2       B1
618   * 2  b1      2       B2
619   * 2  b1      2       B3
620   * 4  d1      4       D1
621   * 4  d2      4       D1
622   * 4  d3      4       D1
623   *
624   * @throws Exception
625   */
626  @Test
627  public void testCoGroupInnerNull() throws Exception
628    {
629    HashSet<Tuple> results = new HashSet<Tuple>();
630
631    results.add( new Tuple( "1", "a1", "1", "A1" ) );
632    results.add( new Tuple( "1", "a1", "1", "A2" ) );
633    results.add( new Tuple( "1", "a1", "1", "A3" ) );
634    results.add( new Tuple( "1", "a2", "1", "A1" ) );
635    results.add( new Tuple( "1", "a2", "1", "A2" ) );
636    results.add( new Tuple( "1", "a2", "1", "A3" ) );
637    results.add( new Tuple( "1", "a3", "1", "A1" ) );
638    results.add( new Tuple( "1", "a3", "1", "A2" ) );
639    results.add( new Tuple( "1", "a3", "1", "A3" ) );
640    results.add( new Tuple( "2", "b1", "2", "B1" ) );
641    results.add( new Tuple( "2", "b1", "2", "B2" ) );
642    results.add( new Tuple( "2", "b1", "2", "B3" ) );
643    results.add( new Tuple( "4", "d1", "4", "D1" ) );
644    results.add( new Tuple( "4", "d2", "4", "D1" ) );
645    results.add( new Tuple( "4", "d3", "4", "D1" ) );
646
647    handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() );
648    handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() );
649    }
650
651  /**
652   * 1 a1
653   * 1 a2
654   * 1 a3
655   * 2 b1
656   * 3 c1
657   * 4 d1
658   * 4 d2
659   * 4 d3
660   * 5 e1
661   * 5 e2
662   * 5 e3
663   * 7 g1
664   * 7 g2
665   * 7 g3
666   * 7 g4
667   * 7 g5
668   * null h1
669   * <p/>
670   * 1 A1
671   * 1 A2
672   * 1 A3
673   * 2 B1
674   * 2 B2
675   * 2 B3
676   * 4 D1
677   * 6 F1
678   * 6 F2
679   * null H1
680   * <p/>
681   * 1  a1      1       A1
682   * 1  a1      1       A2
683   * 1  a1      1       A3
684   * 1  a2      1       A1
685   * 1  a2      1       A2
686   * 1  a2      1       A3
687   * 1  a3      1       A1
688   * 1  a3      1       A2
689   * 1  a3      1       A3
690   * 2  b1      2       B1
691   * 2  b1      2       B2
692   * 2  b1      2       B3
693   * 3  c1      null    null
694   * 4  d1      4       D1
695   * 4  d2      4       D1
696   * 4  d3      4       D1
697   * 5  e1      null    null
698   * 5  e2      null    null
699   * 5  e3      null    null
700   * null       null    6       F1
701   * null       null    6       F2
702   * 7  g1      null    null
703   * 7  g2      null    null
704   * 7  g3      null    null
705   * 7  g4      null    null
706   * 7  g5      null    null
707   * null h1  null  H1
708   *
709   * @throws Exception
710   */
711  @Test
712  public void testCoGroupOuter() throws Exception
713    {
714    Set<Tuple> results = new HashSet<Tuple>();
715
716    results.add( new Tuple( "1", "a1", "1", "A1" ) );
717    results.add( new Tuple( "1", "a1", "1", "A2" ) );
718    results.add( new Tuple( "1", "a1", "1", "A3" ) );
719    results.add( new Tuple( "1", "a2", "1", "A1" ) );
720    results.add( new Tuple( "1", "a2", "1", "A2" ) );
721    results.add( new Tuple( "1", "a2", "1", "A3" ) );
722    results.add( new Tuple( "1", "a3", "1", "A1" ) );
723    results.add( new Tuple( "1", "a3", "1", "A2" ) );
724    results.add( new Tuple( "1", "a3", "1", "A3" ) );
725    results.add( new Tuple( "2", "b1", "2", "B1" ) );
726    results.add( new Tuple( "2", "b1", "2", "B2" ) );
727    results.add( new Tuple( "2", "b1", "2", "B3" ) );
728    results.add( new Tuple( "3", "c1", null, null ) );
729    results.add( new Tuple( "4", "d1", "4", "D1" ) );
730    results.add( new Tuple( "4", "d2", "4", "D1" ) );
731    results.add( new Tuple( "4", "d3", "4", "D1" ) );
732    results.add( new Tuple( "5", "e1", null, null ) );
733    results.add( new Tuple( "5", "e2", null, null ) );
734    results.add( new Tuple( "5", "e3", null, null ) );
735    results.add( new Tuple( null, null, "6", "F1" ) );
736    results.add( new Tuple( null, null, "6", "F2" ) );
737    results.add( new Tuple( "7", "g1", null, null ) );
738    results.add( new Tuple( "7", "g2", null, null ) );
739    results.add( new Tuple( "7", "g3", null, null ) );
740    results.add( new Tuple( "7", "g4", null, null ) );
741    results.add( new Tuple( "7", "g5", null, null ) );
742    results.add( new Tuple( null, "h1", null, "H1" ) );
743
744    handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null );
745    handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null );
746    }
747
748  /**
749   * 1 a1
750   * 1 a2
751   * 1 a3
752   * 2 b1
753   * 3 c1
754   * 4 d1
755   * 4 d2
756   * 4 d3
757   * 5 e1
758   * 5 e2
759   * 5 e3
760   * 7 g1
761   * 7 g2
762   * 7 g3
763   * 7 g4
764   * 7 g5
765   * null h1
766   * <p/>
767   * 1 A1
768   * 1 A2
769   * 1 A3
770   * 2 B1
771   * 2 B2
772   * 2 B3
773   * 4 D1
774   * 6 F1
775   * 6 F2
776   * null H1
777   * <p/>
778   * 1  a1      1       A1
779   * 1  a1      1       A2
780   * 1  a1      1       A3
781   * 1  a2      1       A1
782   * 1  a2      1       A2
783   * 1  a2      1       A3
784   * 1  a3      1       A1
785   * 1  a3      1       A2
786   * 1  a3      1       A3
787   * 2  b1      2       B1
788   * 2  b1      2       B2
789   * 2  b1      2       B3
790   * 3  c1      null    null
791   * 4  d1      4       D1
792   * 4  d2      4       D1
793   * 4  d3      4       D1
794   * 5  e1      null    null
795   * 5  e2      null    null
796   * 5  e3      null    null
797   * null       null    6       F1
798   * null       null    6       F2
799   * 7  g1      null    null
800   * 7  g2      null    null
801   * 7  g3      null    null
802   * 7  g4      null    null
803   * 7  g5      null    null
804   * null h1  null  null
805   * null null  null  H1
806   *
807   * @throws Exception
808   */
809  @Test
810  public void testCoGroupOuterNull() throws Exception
811    {
812    Set<Tuple> results = new HashSet<Tuple>();
813
814    results.add( new Tuple( "1", "a1", "1", "A1" ) );
815    results.add( new Tuple( "1", "a1", "1", "A2" ) );
816    results.add( new Tuple( "1", "a1", "1", "A3" ) );
817    results.add( new Tuple( "1", "a2", "1", "A1" ) );
818    results.add( new Tuple( "1", "a2", "1", "A2" ) );
819    results.add( new Tuple( "1", "a2", "1", "A3" ) );
820    results.add( new Tuple( "1", "a3", "1", "A1" ) );
821    results.add( new Tuple( "1", "a3", "1", "A2" ) );
822    results.add( new Tuple( "1", "a3", "1", "A3" ) );
823    results.add( new Tuple( "2", "b1", "2", "B1" ) );
824    results.add( new Tuple( "2", "b1", "2", "B2" ) );
825    results.add( new Tuple( "2", "b1", "2", "B3" ) );
826    results.add( new Tuple( "3", "c1", null, null ) );
827    results.add( new Tuple( "4", "d1", "4", "D1" ) );
828    results.add( new Tuple( "4", "d2", "4", "D1" ) );
829    results.add( new Tuple( "4", "d3", "4", "D1" ) );
830    results.add( new Tuple( "5", "e1", null, null ) );
831    results.add( new Tuple( "5", "e2", null, null ) );
832    results.add( new Tuple( "5", "e3", null, null ) );
833    results.add( new Tuple( null, null, "6", "F1" ) );
834    results.add( new Tuple( null, null, "6", "F2" ) );
835    results.add( new Tuple( "7", "g1", null, null ) );
836    results.add( new Tuple( "7", "g2", null, null ) );
837    results.add( new Tuple( "7", "g3", null, null ) );
838    results.add( new Tuple( "7", "g4", null, null ) );
839    results.add( new Tuple( "7", "g5", null, null ) );
840    results.add( new Tuple( null, "h1", null, null ) );
841    results.add( new Tuple( null, null, null, "H1" ) );
842
843    handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() );
844    handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() );
845    }
846
847  /**
848   * 1 a1
849   * 1 a2
850   * 1 a3
851   * 2 b1
852   * 3 c1
853   * 4 d1
854   * 4 d2
855   * 4 d3
856   * 5 e1
857   * 5 e2
858   * 5 e3
859   * 7 g1
860   * 7 g2
861   * 7 g3
862   * 7 g4
863   * 7 g5
864   * null h1
865   * <p/>
866   * 1 A1
867   * 1 A2
868   * 1 A3
869   * 2 B1
870   * 2 B2
871   * 2 B3
872   * 4 D1
873   * 6 F1
874   * 6 F2
875   * null H1
876   * <p/>
877   * 1  a1      1       A1
878   * 1  a1      1       A2
879   * 1  a1      1       A3
880   * 1  a2      1       A1
881   * 1  a2      1       A2
882   * 1  a2      1       A3
883   * 1  a3      1       A1
884   * 1  a3      1       A2
885   * 1  a3      1       A3
886   * 2  b1      2       B1
887   * 2  b1      2       B2
888   * 2  b1      2       B3
889   * 3  c1      null    null
890   * 4  d1      4       D1
891   * 4  d2      4       D1
892   * 4  d3      4       D1
893   * 5  e1      null    null
894   * 5  e2      null    null
895   * 5  e3      null    null
896   * 7  g1      null    null
897   * 7  g2      null    null
898   * 7  g3      null    null
899   * 7  g4      null    null
900   * 7  g5      null    null
901   * null h1    null    H1
902   *
903   * @throws Exception
904   */
905  @Test
906  public void testCoGroupInnerOuter() throws Exception
907    {
908    Set<Tuple> results = new HashSet<Tuple>();
909
910    results.add( new Tuple( "1", "a1", "1", "A1" ) );
911    results.add( new Tuple( "1", "a1", "1", "A2" ) );
912    results.add( new Tuple( "1", "a1", "1", "A3" ) );
913    results.add( new Tuple( "1", "a2", "1", "A1" ) );
914    results.add( new Tuple( "1", "a2", "1", "A2" ) );
915    results.add( new Tuple( "1", "a2", "1", "A3" ) );
916    results.add( new Tuple( "1", "a3", "1", "A1" ) );
917    results.add( new Tuple( "1", "a3", "1", "A2" ) );
918    results.add( new Tuple( "1", "a3", "1", "A3" ) );
919    results.add( new Tuple( "2", "b1", "2", "B1" ) );
920    results.add( new Tuple( "2", "b1", "2", "B2" ) );
921    results.add( new Tuple( "2", "b1", "2", "B3" ) );
922    results.add( new Tuple( "3", "c1", null, null ) );
923    results.add( new Tuple( "4", "d1", "4", "D1" ) );
924    results.add( new Tuple( "4", "d2", "4", "D1" ) );
925    results.add( new Tuple( "4", "d3", "4", "D1" ) );
926    results.add( new Tuple( "5", "e1", null, null ) );
927    results.add( new Tuple( "5", "e2", null, null ) );
928    results.add( new Tuple( "5", "e3", null, null ) );
929    results.add( new Tuple( "7", "g1", null, null ) );
930    results.add( new Tuple( "7", "g2", null, null ) );
931    results.add( new Tuple( "7", "g3", null, null ) );
932    results.add( new Tuple( "7", "g4", null, null ) );
933    results.add( new Tuple( "7", "g5", null, null ) );
934    results.add( new Tuple( null, "h1", null, "H1" ) );
935
936    handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null );
937    handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null );
938    }
939
940  /**
941   * 1 a1
942   * 1 a2
943   * 1 a3
944   * 2 b1
945   * 3 c1
946   * 4 d1
947   * 4 d2
948   * 4 d3
949   * 5 e1
950   * 5 e2
951   * 5 e3
952   * 7 g1
953   * 7 g2
954   * 7 g3
955   * 7 g4
956   * 7 g5
957   * null h1
958   * <p/>
959   * 1 A1
960   * 1 A2
961   * 1 A3
962   * 2 B1
963   * 2 B2
964   * 2 B3
965   * 4 D1
966   * 6 F1
967   * 6 F2
968   * null H1
969   * <p/>
970   * 1  a1      1       A1
971   * 1  a1      1       A2
972   * 1  a1      1       A3
973   * 1  a2      1       A1
974   * 1  a2      1       A2
975   * 1  a2      1       A3
976   * 1  a3      1       A1
977   * 1  a3      1       A2
978   * 1  a3      1       A3
979   * 2  b1      2       B1
980   * 2  b1      2       B2
981   * 2  b1      2       B3
982   * 3  c1      null    null
983   * 4  d1      4       D1
984   * 4  d2      4       D1
985   * 4  d3      4       D1
986   * 5  e1      null    null
987   * 5  e2      null    null
988   * 5  e3      null    null
989   * 7  g1      null    null
990   * 7  g2      null    null
991   * 7  g3      null    null
992   * 7  g4      null    null
993   * 7  g5      null    null
994   * null h1    null    null
995   *
996   * @throws Exception
997   */
998  @Test
999  public void testCoGroupInnerOuterNull() throws Exception
1000    {
1001    Set<Tuple> results = new HashSet<Tuple>();
1002
1003    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1004    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1005    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1006    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1007    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1008    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1009    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1010    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1011    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1012    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1013    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1014    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1015    results.add( new Tuple( "3", "c1", null, null ) );
1016    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1017    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1018    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1019    results.add( new Tuple( "5", "e1", null, null ) );
1020    results.add( new Tuple( "5", "e2", null, null ) );
1021    results.add( new Tuple( "5", "e3", null, null ) );
1022    results.add( new Tuple( "7", "g1", null, null ) );
1023    results.add( new Tuple( "7", "g2", null, null ) );
1024    results.add( new Tuple( "7", "g3", null, null ) );
1025    results.add( new Tuple( "7", "g4", null, null ) );
1026    results.add( new Tuple( "7", "g5", null, null ) );
1027    results.add( new Tuple( null, "h1", null, null ) );
1028
1029    handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() );
1030    handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() );
1031    }
1032
1033  /**
1034   * 1 a1
1035   * 1 a2
1036   * 1 a3
1037   * 2 b1
1038   * 3 c1
1039   * 4 d1
1040   * 4 d2
1041   * 4 d3
1042   * 5 e1
1043   * 5 e2
1044   * 5 e3
1045   * 7 g1
1046   * 7 g2
1047   * 7 g3
1048   * 7 g4
1049   * 7 g5
1050   * null h1
1051   * <p/>
1052   * 1 A1
1053   * 1 A2
1054   * 1 A3
1055   * 2 B1
1056   * 2 B2
1057   * 2 B3
1058   * 4 D1
1059   * 6 F1
1060   * 6 F2
1061   * null H1
1062   * <p/>
1063   * 1  a1      1       A1
1064   * 1  a1      1       A2
1065   * 1  a1      1       A3
1066   * 1  a2      1       A1
1067   * 1  a2      1       A2
1068   * 1  a2      1       A3
1069   * 1  a3      1       A1
1070   * 1  a3      1       A2
1071   * 1  a3      1       A3
1072   * 2  b1      2       B1
1073   * 2  b1      2       B2
1074   * 2  b1      2       B3
1075   * 4  d1      4       D1
1076   * 4  d2      4       D1
1077   * 4  d3      4       D1
1078   * null       null    6       F1
1079   * null       null    6       F2
1080   * null h1    null    H1
1081   *
1082   * @throws Exception
1083   */
1084  @Test
1085  public void testCoGroupOuterInner() throws Exception
1086    {
1087    Set<Tuple> results = new HashSet<Tuple>();
1088
1089    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1090    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1091    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1092    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1093    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1094    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1095    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1096    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1097    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1098    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1099    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1100    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1101    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1102    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1103    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1104    results.add( new Tuple( null, null, "6", "F1" ) );
1105    results.add( new Tuple( null, null, "6", "F2" ) );
1106    results.add( new Tuple( null, "h1", null, "H1" ) );
1107
1108    handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null );
1109    handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null );
1110    }
1111
1112  /**
1113   * 1 a1
1114   * 1 a2
1115   * 1 a3
1116   * 2 b1
1117   * 3 c1
1118   * 4 d1
1119   * 4 d2
1120   * 4 d3
1121   * 5 e1
1122   * 5 e2
1123   * 5 e3
1124   * 7 g1
1125   * 7 g2
1126   * 7 g3
1127   * 7 g4
1128   * 7 g5
1129   * null h1
1130   * <p/>
1131   * 1 A1
1132   * 1 A2
1133   * 1 A3
1134   * 2 B1
1135   * 2 B2
1136   * 2 B3
1137   * 4 D1
1138   * 6 F1
1139   * 6 F2
1140   * null H1
1141   * <p/>
1142   * 1  a1      1       A1
1143   * 1  a1      1       A2
1144   * 1  a1      1       A3
1145   * 1  a2      1       A1
1146   * 1  a2      1       A2
1147   * 1  a2      1       A3
1148   * 1  a3      1       A1
1149   * 1  a3      1       A2
1150   * 1  a3      1       A3
1151   * 2  b1      2       B1
1152   * 2  b1      2       B2
1153   * 2  b1      2       B3
1154   * 4  d1      4       D1
1155   * 4  d2      4       D1
1156   * 4  d3      4       D1
1157   * null       null    6       F1
1158   * null       null    6       F2
1159   * null null  null    H1
1160   *
1161   * @throws Exception
1162   */
1163  @Test
1164  public void testCoGroupOuterInnerNull() throws Exception
1165    {
1166    Set<Tuple> results = new HashSet<Tuple>();
1167
1168    results.add( new Tuple( "1", "a1", "1", "A1" ) );
1169    results.add( new Tuple( "1", "a1", "1", "A2" ) );
1170    results.add( new Tuple( "1", "a1", "1", "A3" ) );
1171    results.add( new Tuple( "1", "a2", "1", "A1" ) );
1172    results.add( new Tuple( "1", "a2", "1", "A2" ) );
1173    results.add( new Tuple( "1", "a2", "1", "A3" ) );
1174    results.add( new Tuple( "1", "a3", "1", "A1" ) );
1175    results.add( new Tuple( "1", "a3", "1", "A2" ) );
1176    results.add( new Tuple( "1", "a3", "1", "A3" ) );
1177    results.add( new Tuple( "2", "b1", "2", "B1" ) );
1178    results.add( new Tuple( "2", "b1", "2", "B2" ) );
1179    results.add( new Tuple( "2", "b1", "2", "B3" ) );
1180    results.add( new Tuple( "4", "d1", "4", "D1" ) );
1181    results.add( new Tuple( "4", "d2", "4", "D1" ) );
1182    results.add( new Tuple( "4", "d3", "4", "D1" ) );
1183    results.add( new Tuple( null, null, "6", "F1" ) );
1184    results.add( new Tuple( null, null, "6", "F2" ) );
1185    results.add( new Tuple( null, null, null, "H1" ) );
1186
1187    handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() );
1188    handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() );
1189    }
1190
1191  private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception
1192    {
1193    results = new HashSet<Tuple>( results );
1194
1195    getPlatform().copyFromLocal( inputFileLhsSparse );
1196    getPlatform().copyFromLocal( inputFileRhsSparse );
1197
1198    Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
1199    Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
1200    Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
1201
1202    Map sources = new HashMap();
1203
1204    sources.put( "lower", sourceLower );
1205    sources.put( "upper", sourceUpper );
1206
1207    Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
1208
1209    Pipe pipeLower = new Pipe( "lower" );
1210    Pipe pipeUpper = new Pipe( "upper" );
1211
1212    Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
1213
1214    Fields groupFields = new Fields( "num" );
1215
1216    if( comparator != null )
1217      groupFields.setComparator( 0, comparator );
1218
1219    Pipe splice;
1220    if( useResultGroupFields )
1221      splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner );
1222    else
1223      splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner );
1224
1225    splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS );
1226
1227    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1228
1229    flow.complete();
1230
1231    validateLength( flow, results.size() );
1232
1233    List<Tuple> actual = getSinkAsList( flow );
1234
1235    results.removeAll( actual );
1236
1237    assertEquals( 0, results.size() );
1238    }
1239
1240  /**
1241   * 1 a
1242   * 5 b
1243   * 6 c
1244   * 5 b
1245   * 5 e
1246   * <p/>
1247   * 1 A
1248   * 2 B
1249   * 3 C
1250   * 4 D
1251   * 5 E
1252   * <p/>
1253   * 1 a
1254   * 2 b
1255   * 3 c
1256   * 4 d
1257   * 5 e
1258   * <p/>
1259   * 1  a       1       A  1  a
1260   * -  -   2   B  2  b
1261   * -  -   3   C  3  c
1262   * -  -   4   D  4  d
1263   * 5  b       5   E  5  e
1264   * 5  e       5   E  5  e
1265   *
1266   * @throws Exception
1267   */
1268  @Test
1269  public void testCoGroupMixed() throws Exception
1270    {
1271    getPlatform().copyFromLocal( inputFileLowerOffset );
1272    getPlatform().copyFromLocal( inputFileLower );
1273    getPlatform().copyFromLocal( inputFileUpper );
1274
1275    Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
1276    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1277    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1278
1279    Map sources = new HashMap();
1280
1281    sources.put( "loweroffset", sourceLowerOffset );
1282    sources.put( "lower", sourceLower );
1283    sources.put( "upper", sourceUpper );
1284
1285    Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE );
1286
1287    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1288
1289    Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
1290    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1291    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1292
1293    Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
1294    Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
1295
1296    MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
1297    Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join );
1298
1299    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
1300
1301    flow.complete();
1302
1303    validateLength( flow, 6 );
1304
1305    Set<Tuple> results = new HashSet<Tuple>();
1306
1307    results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) );
1308    results.add( new Tuple( null, null, "2", "B", "2", "b" ) );
1309    results.add( new Tuple( null, null, "3", "C", "3", "c" ) );
1310    results.add( new Tuple( null, null, "4", "D", "4", "d" ) );
1311    results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) );
1312    results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) );
1313
1314    List<Tuple> actual = getSinkAsList( flow );
1315
1316    results.removeAll( actual );
1317
1318    assertEquals( 0, results.size() );
1319    }
1320
1321  @Test
1322  public void testCoGroupDiffFields() throws Exception
1323    {
1324    getPlatform().copyFromLocal( inputFileLower );
1325    getPlatform().copyFromLocal( inputFileUpper );
1326
1327    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1328    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1329
1330    Map sources = new HashMap();
1331
1332    sources.put( "lower", sourceLower );
1333    sources.put( "upper", sourceUpper );
1334
1335    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
1336
1337    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1338    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1339
1340    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1341    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1342
1343    Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1344
1345    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1346
1347    flow.complete();
1348
1349    validateLength( flow, 5 );
1350
1351    List<Tuple> actual = getSinkAsList( flow );
1352
1353    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1354    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1355    }
1356
1357  @Test
1358  public void testCoGroupGroupBy() throws Exception
1359    {
1360    getPlatform().copyFromLocal( inputFileLower );
1361    getPlatform().copyFromLocal( inputFileUpper );
1362
1363    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1364    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1365
1366    Map sources = new HashMap();
1367
1368    sources.put( "lower", sourceLower );
1369    sources.put( "upper", sourceUpper );
1370
1371    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE );
1372
1373    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ).applyTypes( String.class, String.class ), " " );
1374    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ).applyTypes( String.class, String.class ), " " );
1375
1376    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1377    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1378
1379    Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1380
1381    Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) );
1382
1383    Map<Object, Object> properties = getPlatform().getProperties();
1384
1385    properties.put( "cascading.serialization.types.required", "true" );
1386
1387    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, groupby );
1388
1389    flow.complete();
1390
1391    validateLength( flow, 5, null );
1392
1393    List<Tuple> actual = getSinkAsList( flow );
1394
1395    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1396    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1397    }
1398
1399  @Test
1400  public void testCoGroupSamePipe() throws Exception
1401    {
1402    getPlatform().copyFromLocal( inputFileLower );
1403
1404    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1405
1406    Map sources = new HashMap();
1407
1408    sources.put( "lower", source );
1409
1410    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
1411
1412    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1413
1414    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1415
1416    Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
1417
1418    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1419
1420    flow.complete();
1421
1422    validateLength( flow, 5, null );
1423
1424    List<Tuple> actual = getSinkAsList( flow );
1425
1426    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1427    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1428    }
1429
1430  @Test
1431  public void testCoGroupSamePipe2() throws Exception
1432    {
1433    getPlatform().copyFromLocal( inputFileLower );
1434
1435    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1436
1437    Map sources = new HashMap();
1438
1439    sources.put( "lower", source );
1440
1441    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
1442
1443    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1444
1445    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1446
1447    Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1448
1449    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1450
1451    flow.complete();
1452
1453    validateLength( flow, 5, null );
1454
1455    List<Tuple> actual = getSinkAsList( flow );
1456
1457    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1458    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1459    }
1460
1461  @Test
1462  public void testCoGroupSamePipe3() throws Exception
1463    {
1464    getPlatform().copyFromLocal( inputFileLower );
1465
1466    Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1467
1468    Map sources = new HashMap();
1469
1470    sources.put( "lower", source );
1471
1472    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1473
1474    Pipe pipe = new Pipe( "lower" );
1475
1476    Pipe lhs = new Pipe( "lhs", pipe );
1477    Pipe rhs = new Pipe( "rhs", pipe );
1478
1479    Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1480
1481    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1482
1483    flow.complete();
1484
1485    validateLength( flow, 5, null );
1486
1487    List<Tuple> actual = getSinkAsList( flow );
1488
1489    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1490    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1491    }
1492
1493  @Test
1494  public void testCoGroupAroundCoGroup() throws Exception
1495    {
1496    getPlatform().copyFromLocal( inputFileLower );
1497    getPlatform().copyFromLocal( inputFileUpper );
1498
1499    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1500    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1501
1502    Map sources = new HashMap();
1503
1504    sources.put( "lower", sourceLower );
1505    sources.put( "upper1", sourceUpper );
1506    sources.put( "upper2", sourceUpper );
1507
1508    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE );
1509
1510    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1511
1512    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1513    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1514    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1515
1516    Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1517
1518    splice1 = new Each( splice1, new Identity() );
1519
1520    Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1521
1522    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1523
1524    flow.complete();
1525
1526    validateLength( flow, 5, null );
1527
1528    List<Tuple> actual = getSinkAsList( flow );
1529
1530    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1531    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1532    }
1533
1534  @Test
1535  public void testCoGroupAroundCoGroupWithout() throws Exception
1536    {
1537    runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" );
1538    }
1539
1540  @Test
1541  public void testCoGroupAroundCoGroupWith() throws Exception
1542    {
1543    // hack to get classname
1544    runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" );
1545    }
1546
1547  private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException
1548    {
1549    getPlatform().copyFromLocal( inputFileNums20 );
1550    getPlatform().copyFromLocal( inputFileNums10 );
1551
1552    Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 );
1553    Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 );
1554
1555    Map sources = new HashMap();
1556
1557    sources.put( "source20", source20 );
1558    sources.put( "source101", source10 );
1559    sources.put( "source102", source10 );
1560
1561    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE );
1562
1563    Pipe pipeNum20 = new Pipe( "source20" );
1564    Pipe pipeNum101 = new Pipe( "source101" );
1565    Pipe pipeNum102 = new Pipe( "source102" );
1566
1567    Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) );
1568
1569    Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) );
1570
1571    splice2 = new Each( splice2, new Identity() );
1572
1573    Map<Object, Object> properties = getPlatform().getProperties();
1574
1575    if( getPlatform().isMapReduce() )
1576      FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass );
1577
1578    Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 );
1579
1580    if( getPlatform().isMapReduce() )
1581      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1582
1583    flow.complete();
1584
1585    validateLength( flow, 10 );
1586
1587    List<Tuple> actual = getSinkAsList( flow );
1588
1589    assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) );
1590    assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) );
1591    }
1592
1593  @Test
1594  public void testCoGroupDiffFieldsSameFile() throws Exception
1595    {
1596    getPlatform().copyFromLocal( inputFileLower );
1597    getPlatform().copyFromLocal( inputFileUpper );
1598
1599    Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1600    Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower );
1601
1602    Map sources = new HashMap();
1603
1604    sources.put( "offsetLower", sourceOffsetLower );
1605    sources.put( "lower", sourceLower );
1606
1607    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE );
1608
1609    Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " );
1610    Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " );
1611
1612    Pipe offsetLower = new Pipe( "offsetLower" );
1613    offsetLower = new Discard( offsetLower, new Fields( "offset" ) );
1614    offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower );
1615
1616    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper );
1617
1618    Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) );
1619
1620    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup );
1621
1622    flow.complete();
1623
1624    validateLength( flow, 5 );
1625
1626    List<Tuple> actual = getSinkAsList( flow );
1627
1628    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1629    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1630    }
1631
1632  @Test
1633  public void testJoinNone() throws Exception
1634    {
1635    getPlatform().copyFromLocal( inputFileLower );
1636    getPlatform().copyFromLocal( inputFileUpper );
1637
1638    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1639    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1640
1641    Map sources = new HashMap();
1642
1643    sources.put( "lower", sourceLower );
1644    sources.put( "upper", sourceUpper );
1645
1646    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1647
1648    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1649
1650    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1651    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1652
1653    Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1654
1655    Map<Object, Object> properties = getProperties();
1656
1657    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1658
1659    flow.complete();
1660
1661    validateLength( flow, 25 );
1662
1663    List<Tuple> values = getSinkAsList( flow );
1664
1665    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1666    assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1667    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1668    }
1669
1670  @Test
1671  public void testMultiJoin() throws Exception
1672    {
1673    getPlatform().copyFromLocal( inputFileCrossX2 );
1674
1675    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 );
1676    Tap innerSink = getPlatform().getTextFile( getOutputPath( "inner" ), SinkMode.REPLACE );
1677    Tap outerSink = getPlatform().getTextFile( getOutputPath( "outer" ), SinkMode.REPLACE );
1678    Tap leftSink = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
1679    Tap rightSink = getPlatform().getTextFile( getOutputPath( "right" ), SinkMode.REPLACE );
1680
1681    Pipe uniques = new Pipe( "unique" );
1682
1683    uniques = new Each( uniques, new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1684
1685    uniques = new GroupBy( uniques, new Fields( "word" ) );
1686
1687    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1688
1689//    uniques = new Each( uniques, new Debug( true ) );
1690
1691    Pipe fielded = new Pipe( "fielded" );
1692
1693    fielded = new Each( fielded, new Fields( "line" ), new RegexSplitter( "\\s" ) );
1694
1695//    fielded = new Each( fielded, new Debug( true ) );
1696
1697    Pipe inner = new CoGroup( "inner", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new InnerJoin() );
1698    Pipe outer = new CoGroup( "outer", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new OuterJoin() );
1699    Pipe left = new CoGroup( "left", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new LeftJoin() );
1700    Pipe right = new CoGroup( "right", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new RightJoin() );
1701
1702    Pipe[] heads = Pipe.pipes( uniques, fielded );
1703    Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) );
1704
1705    Pipe[] tails = Pipe.pipes( inner, outer, left, right );
1706    Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) );
1707
1708    Flow flow = getPlatform().getFlowConnector().connect( "multi-joins", sources, sinks, tails );
1709
1710    flow.complete();
1711
1712    validateLength( flow.openTapForRead( innerSink ), 74 );
1713    validateLength( flow.openTapForRead( outerSink ), 84 );
1714    validateLength( flow.openTapForRead( leftSink ), 74 );
1715    validateLength( flow.openTapForRead( rightSink ), 84 );
1716    }
1717
1718  /**
1719   * This test checks that a valid node is created when adding back remainders during unique
1720   * path sub-graph partitioning.
1721   */
1722  @Test
1723  public void testMultiJoinWithSplits() throws Exception
1724    {
1725    getPlatform().copyFromLocal( inputFileCrossX2 );
1726
1727    Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 );
1728    Tap innerSinkLhs = getPlatform().getTextFile( getOutputPath( "innerLhs" ), SinkMode.REPLACE );
1729    Tap uniqueSinkLhs = getPlatform().getTextFile( getOutputPath( "uniquesLhs" ), SinkMode.REPLACE );
1730    Tap innerSinkRhs = getPlatform().getTextFile( getOutputPath( "innerRhs" ), SinkMode.REPLACE );
1731    Tap uniqueSinkRhs = getPlatform().getTextFile( getOutputPath( "uniquesRhs" ), SinkMode.REPLACE );
1732
1733    Pipe incoming = new Pipe( "incoming" );
1734    Pipe uniquesLhs;
1735    Pipe innerLhs;
1736    Pipe uniquesRhs;
1737    Pipe innerRhs;
1738
1739    {
1740    Pipe generatorLhs = new Each( new Pipe( "genLhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1741
1742    generatorLhs = new Each( generatorLhs, new Identity() );
1743
1744    Pipe generatorRhs = new Each( new Pipe( "genRhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1745
1746    Pipe uniques = new Each( new Pipe( "uniquesLhs", generatorLhs ), new Identity() );
1747
1748    uniques = new GroupBy( uniques, new Fields( "word" ) );
1749
1750    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1751
1752    Pipe lhs = new Pipe( "lhs", generatorLhs );
1753
1754    lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) );
1755
1756    Pipe rhs = new Pipe( "rhs", generatorRhs );
1757
1758    rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) );
1759
1760    Pipe inner = new CoGroup( "innerLhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() );
1761
1762    uniquesLhs = uniques;
1763    innerLhs = inner;
1764    }
1765
1766    {
1767    Pipe generatorLhs = new Each( new Pipe( "genLhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1768
1769    generatorLhs = new Each( generatorLhs, new Identity() );
1770
1771    Pipe generatorRhs = new Each( new Pipe( "genRhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) );
1772
1773    Pipe uniques = new Each( new Pipe( "uniquesRhs", generatorLhs ), new Identity() );
1774
1775    uniques = new GroupBy( uniques, new Fields( "word" ) );
1776
1777    uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE );
1778
1779    Pipe lhs = new Pipe( "lhs", generatorLhs );
1780
1781    lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) );
1782
1783    Pipe rhs = new Pipe( "rhs", generatorRhs );
1784
1785    rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) );
1786
1787    Pipe inner = new CoGroup( "innerRhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() );
1788
1789    uniquesRhs = uniques;
1790    innerRhs = inner;
1791    }
1792
1793    FlowDef flowDef = FlowDef.flowDef()
1794      .setName( "multi-joins" )
1795      .addSource( "incoming", source )
1796      .addTailSink( innerLhs, innerSinkLhs )
1797      .addTailSink( uniquesLhs, uniqueSinkLhs )
1798      .addTailSink( innerRhs, innerSinkRhs )
1799      .addTailSink( uniquesRhs, uniqueSinkRhs );
1800
1801    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
1802
1803    flow.complete();
1804
1805    validateLength( flow.openTapForRead( innerSinkLhs ), 3900 );
1806    validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 );
1807    validateLength( flow.openTapForRead( innerSinkRhs ), 3900 );
1808    validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 );
1809    }
1810
1811  @Test
1812  public void testSameSourceGroupSplitCoGroup() throws Exception
1813    {
1814    getPlatform().copyFromLocal( inputFileLower );
1815
1816    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower );
1817
1818    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE );
1819
1820    Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " );
1821
1822    Pipe sourcePipe = new Each( new Pipe( "source" ), new Fields( "line" ), splitter );
1823    sourcePipe = new GroupBy( sourcePipe, new Fields( "num" ) );
1824    sourcePipe = new Every( sourcePipe, new Fields( "char" ), new First( new Fields( "first", String.class ) ), Fields.ALL );
1825
1826    Pipe lhsPipe = new Retain( new Pipe( "lhs", sourcePipe ), Fields.ALL );
1827    Pipe rhsPipe = new Retain( new Pipe( "rhs", sourcePipe ), Fields.ALL );
1828
1829    Pipe splice = new CoGroup( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), Fields.size( 4 ) );
1830
1831    Map<Object, Object> properties = getPlatform().getProperties();
1832
1833    properties.put( "cascading.serialization.types.required", "true" );
1834
1835    Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, splice );
1836
1837    flow.complete();
1838
1839    validateLength( flow, 5, null );
1840
1841    List<Tuple> values = getSinkAsList( flow );
1842
1843    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
1844    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
1845    }
1846
1847  /**
1848   * Requires rule TapBalanceGroupSplitTriangleTransformer   *
1849   */
1850  @Test
1851  public void testGroupSplitToCoGroupsTriangle() throws Exception
1852    {
1853    getPlatform().copyFromLocal( inputFileLower );
1854    getPlatform().copyFromLocal( inputFileUpper );
1855
1856    Map sources = new HashMap();
1857    sources.put( "lower", getPlatform().getDelimitedFile( new Fields( "numLower", "charLower" ), " ", inputFileLower ) );
1858    sources.put( "upper", getPlatform().getDelimitedFile( new Fields( "numUpper", "charUpper" ), " ", inputFileUpper ) );
1859
1860    Tap sink = getPlatform().getTextFile( getOutputPath( "sink" ), SinkMode.REPLACE );
1861
1862    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "numLower", "charLower" ), new Identity() );
1863    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "numUpper", "charUpper" ), new Identity() );
1864
1865    Pipe upperGroupBy = new GroupBy( pipeUpper, new Fields( "numUpper" ) );
1866    Pipe grpByEvery = new Every( upperGroupBy, new Count(), new Fields( "numUpper", "count" ) );
1867
1868    Pipe grpByEveryE1 = new Each( grpByEvery, new Fields( "numUpper", "count" ), new Identity() );
1869    Pipe grpByEveryE2 = new Each( grpByEvery, new Fields( "numUpper", "count" ), new Identity( new Fields( "numUpperUpper", "countUpper" ) ) );
1870
1871    Pipe cogroup = new CoGroup( "cogroup", pipeLower, new Fields( "numLower" ), grpByEveryE1, new Fields( "numUpper" ) );
1872    Each cogroupEach = new Each( cogroup, new Fields( "numLower", "charLower", "numUpper", "count" ), new Identity() );
1873
1874    Pipe cogroupNested = new CoGroup( cogroupEach, new Fields( "numLower" ), grpByEveryE2, new Fields( "numUpperUpper" ) );
1875    Pipe cogrpNestedEach = new Each( cogroupNested, Fields.ALL, new Identity() );
1876
1877    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogrpNestedEach );
1878    flow.complete();
1879
1880    validateLength( flow, 5 );
1881    }
1882  }