001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import cascading.flow.Flow;
034import cascading.flow.FlowDef;
035import cascading.flow.FlowStep;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.operation.Aggregator;
038import cascading.operation.Function;
039import cascading.operation.Identity;
040import cascading.operation.aggregator.Count;
041import cascading.operation.aggregator.First;
042import cascading.operation.expression.ExpressionFunction;
043import cascading.operation.regex.RegexFilter;
044import cascading.operation.regex.RegexSplitter;
045import cascading.pipe.Checkpoint;
046import cascading.pipe.CoGroup;
047import cascading.pipe.Each;
048import cascading.pipe.Every;
049import cascading.pipe.GroupBy;
050import cascading.pipe.HashJoin;
051import cascading.pipe.Merge;
052import cascading.pipe.Pipe;
053import cascading.pipe.assembly.Rename;
054import cascading.pipe.joiner.InnerJoin;
055import cascading.pipe.joiner.Joiner;
056import cascading.pipe.joiner.LeftJoin;
057import cascading.pipe.joiner.MixedJoin;
058import cascading.pipe.joiner.OuterJoin;
059import cascading.pipe.joiner.RightJoin;
060import cascading.tap.SinkMode;
061import cascading.tap.Tap;
062import cascading.tuple.Fields;
063import cascading.tuple.Hasher;
064import cascading.tuple.Tuple;
065import org.junit.Test;
066
067import static data.InputData.*;
068
069public class JoinFieldedPipesPlatformTest extends PlatformTestCase
070  {
071  public JoinFieldedPipesPlatformTest()
072    {
073    super( true, 4, 1 ); // leave cluster testing enabled
074    }
075
076  @Test
077  public void testCross() throws Exception
078    {
079    getPlatform().copyFromLocal( inputFileLhs );
080    getPlatform().copyFromLocal( inputFileRhs );
081
082    Map sources = new HashMap();
083
084    sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) );
085    sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) );
086
087    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE );
088
089    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
090    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
091
092    Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
093
094    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross );
095
096    flow.complete();
097
098    validateLength( flow, 37, null );
099
100    List<Tuple> values = getSinkAsList( flow );
101
102    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
103    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
104    }
105
106  @Test
107  public void testJoin() throws Exception
108    {
109    getPlatform().copyFromLocal( inputFileLower );
110    getPlatform().copyFromLocal( inputFileUpper );
111
112    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
113    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
114
115    Map sources = new HashMap();
116
117    sources.put( "lower", sourceLower );
118    sources.put( "upper", sourceUpper );
119
120    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
121
122    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
123
124    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
125    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
126
127    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
128
129    Map<Object, Object> properties = getProperties();
130
131    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
132
133    flow.complete();
134
135    validateLength( flow, 5 );
136
137    List<Tuple> values = getSinkAsList( flow );
138
139    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
140    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
141    }
142
143  @Test
144  public void testJoinSamePipeName() throws Exception
145    {
146    getPlatform().copyFromLocal( inputFileLower );
147    getPlatform().copyFromLocal( inputFileUpper );
148
149    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
150    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
151
152    Map sources = new HashMap();
153
154    sources.put( "lower", sourceLower );
155    sources.put( "upper", sourceUpper );
156
157    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE );
158
159    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
160
161    Pipe pipeLower = new Pipe( "lower" );
162    Pipe pipeUpper = new Pipe( "upper" );
163
164    // these pipes will hide the source name, and could cause one to be lost
165    pipeLower = new Pipe( "same", pipeLower );
166    pipeUpper = new Pipe( "same", pipeUpper );
167
168    pipeLower = new Each( pipeLower, new Fields( "line" ), splitter );
169    pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter );
170
171//    pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
172//    pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) );
173
174    pipeLower = new Pipe( "left", pipeLower );
175    pipeUpper = new Pipe( "right", pipeUpper );
176
177//    pipeLower = new Each( pipeLower, new Debug( true ) );
178//    pipeUpper = new Each( pipeUpper, new Debug( true ) );
179
180    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
181
182//    splice = new Each( splice, new Debug( true ) );
183    splice = new Pipe( "splice", splice );
184    splice = new Pipe( "tail", splice );
185
186    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
187
188    flow.complete();
189
190    validateLength( flow, 5 );
191
192    List<Tuple> values = getSinkAsList( flow );
193
194    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
195    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
196    }
197
198  @Test
199  public void testJoinWithUnknowns() throws Exception
200    {
201    getPlatform().copyFromLocal( inputFileLower );
202    getPlatform().copyFromLocal( inputFileUpper );
203
204    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
205    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
206
207    Map sources = new HashMap();
208
209    sources.put( "lower", sourceLower );
210    sources.put( "upper", sourceUpper );
211
212    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE );
213
214    Function splitter = new RegexSplitter( Fields.UNKNOWN, " " );
215
216    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
217    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
218
219    Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) );
220
221    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
222
223    flow.complete();
224
225    validateLength( flow, 5 );
226
227    List<Tuple> values = getSinkAsList( flow );
228
229    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
230    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
231    }
232
233  /**
234   * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with
235   * a new stream using an outerjoin.
236   *
237   * @throws Exception
238   */
239  @Test
240  public void testJoinFilteredBranch() throws Exception
241    {
242    getPlatform().copyFromLocal( inputFileLower );
243    getPlatform().copyFromLocal( inputFileUpper );
244
245    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
246    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
247
248    Map sources = new HashMap();
249
250    sources.put( "lower", sourceLower );
251    sources.put( "upper", sourceUpper );
252
253    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE );
254
255    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
256
257    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
258    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
259    pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all
260    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
261
262    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() );
263
264    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
265
266    flow.complete();
267
268    validateLength( flow, 5 );
269
270    List<Tuple> values = getSinkAsList( flow );
271
272    assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) );
273    assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) );
274    }
275
276  @Test
277  public void testJoinSelf() throws Exception
278    {
279    getPlatform().copyFromLocal( inputFileLower );
280
281    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
282    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
283
284    Map sources = new HashMap();
285
286    sources.put( "lower", sourceLower );
287    sources.put( "upper", sourceUpper );
288
289    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE );
290
291    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
292
293    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
294    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
295
296    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
297
298    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
299
300    flow.complete();
301
302    validateLength( flow, 5 );
303
304    List<Tuple> values = getSinkAsList( flow );
305
306    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
307    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
308    }
309
310  /**
311   * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join
312   *
313   * @throws Exception when
314   */
315  @Test
316  public void testJoinAfterEvery() throws Exception
317    {
318    getPlatform().copyFromLocal( inputFileLower );
319    getPlatform().copyFromLocal( inputFileUpper );
320
321    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
322    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
323
324    Map sources = new HashMap();
325
326    sources.put( "lower", sourceLower );
327    sources.put( "upper", sourceUpper );
328
329    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE );
330
331    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
332
333    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
334    pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
335    pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL );
336
337    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
338    pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
339    pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL );
340
341    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
342
343    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
344
345    flow.complete();
346
347    validateLength( flow, 5, null );
348
349    List<Tuple> values = getSinkAsList( flow );
350
351    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
352    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
353    }
354
355  @Test
356  public void testJoinInnerSingleField() throws Exception
357    {
358    getPlatform().copyFromLocal( inputFileLowerOffset );
359    getPlatform().copyFromLocal( inputFileUpper );
360
361    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
362    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
363
364    Map sources = new HashMap();
365
366    sources.put( "lower", sourceLower );
367    sources.put( "upper", sourceUpper );
368
369    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE );
370
371    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) );
372    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) );
373
374    Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) );
375
376    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
377
378    flow.complete();
379
380    validateLength( flow, 3, null );
381
382    Set<Tuple> results = new HashSet<Tuple>();
383
384    results.add( new Tuple( "1\t1" ) );
385    results.add( new Tuple( "5\t5" ) );
386
387    List<Tuple> actual = getSinkAsList( flow );
388
389    results.removeAll( actual );
390
391    assertEquals( 0, results.size() );
392    }
393
394  /**
395   * 1 a1
396   * 1 a2
397   * 1 a3
398   * 2 b1
399   * 3 c1
400   * 4 d1
401   * 4 d2
402   * 4 d3
403   * 5 e1
404   * 5 e2
405   * 5 e3
406   * 7 g1
407   * 7 g2
408   * 7 g3
409   * 7 g4
410   * 7 g5
411   * null h1
412   * <p/>
413   * 1 A1
414   * 1 A2
415   * 1 A3
416   * 2 B1
417   * 2 B2
418   * 2 B3
419   * 4 D1
420   * 6 F1
421   * 6 F2
422   * null H1
423   * <p/>
424   * 1  a1      1       A1
425   * 1  a1      1       A2
426   * 1  a1      1       A3
427   * 1  a2      1       A1
428   * 1  a2      1       A2
429   * 1  a2      1       A3
430   * 1  a3      1       A1
431   * 1  a3      1       A2
432   * 1  a3      1       A3
433   * 2  b1      2       B1
434   * 2  b1      2       B2
435   * 2  b1      2       B3
436   * 4  d1      4       D1
437   * 4  d2      4       D1
438   * 4  d3      4       D1
439   * null h1  null  H1
440   *
441   * @throws Exception
442   */
443  @Test
444  public void testJoinInner() throws Exception
445    {
446    HashSet<Tuple> results = new HashSet<Tuple>();
447
448    results.add( new Tuple( "1", "a1", "1", "A1" ) );
449    results.add( new Tuple( "1", "a1", "1", "A2" ) );
450    results.add( new Tuple( "1", "a1", "1", "A3" ) );
451    results.add( new Tuple( "1", "a2", "1", "A1" ) );
452    results.add( new Tuple( "1", "a2", "1", "A2" ) );
453    results.add( new Tuple( "1", "a2", "1", "A3" ) );
454    results.add( new Tuple( "1", "a3", "1", "A1" ) );
455    results.add( new Tuple( "1", "a3", "1", "A2" ) );
456    results.add( new Tuple( "1", "a3", "1", "A3" ) );
457    results.add( new Tuple( "2", "b1", "2", "B1" ) );
458    results.add( new Tuple( "2", "b1", "2", "B2" ) );
459    results.add( new Tuple( "2", "b1", "2", "B3" ) );
460    results.add( new Tuple( "4", "d1", "4", "D1" ) );
461    results.add( new Tuple( "4", "d2", "4", "D1" ) );
462    results.add( new Tuple( "4", "d3", "4", "D1" ) );
463    results.add( new Tuple( null, "h1", null, "H1" ) );
464
465    handleJoins( "joininner", new InnerJoin(), results );
466    }
467
468  /**
469   * /**
470   * 1 a1
471   * 1 a2
472   * 1 a3
473   * 2 b1
474   * 3 c1
475   * 4 d1
476   * 4 d2
477   * 4 d3
478   * 5 e1
479   * 5 e2
480   * 5 e3
481   * 7 g1
482   * 7 g2
483   * 7 g3
484   * 7 g4
485   * 7 g5
486   * null h1
487   * <p/>
488   * 1 A1
489   * 1 A2
490   * 1 A3
491   * 2 B1
492   * 2 B2
493   * 2 B3
494   * 4 D1
495   * 6 F1
496   * 6 F2
497   * null H1
498   * <p/>
499   * 1  a1      1       A1
500   * 1  a1      1       A2
501   * 1  a1      1       A3
502   * 1  a2      1       A1
503   * 1  a2      1       A2
504   * 1  a2      1       A3
505   * 1  a3      1       A1
506   * 1  a3      1       A2
507   * 1  a3      1       A3
508   * 2  b1      2       B1
509   * 2  b1      2       B2
510   * 2  b1      2       B3
511   * 3  c1      null    null
512   * 4  d1      4       D1
513   * 4  d2      4       D1
514   * 4  d3      4       D1
515   * 5  e1      null    null
516   * 5  e2      null    null
517   * 5  e3      null    null
518   * null       null    6       F1
519   * null       null    6       F2
520   * 7  g1      null    null
521   * 7  g2      null    null
522   * 7  g3      null    null
523   * 7  g4      null    null
524   * 7  g5      null    null
525   * null h1  null  H1
526   *
527   * @throws Exception
528   */
529  @Test
530  public void testJoinOuter() throws Exception
531    {
532    // skip if hadoop cluster mode, outer joins don't behave the same
533    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
534      return;
535
536    Set<Tuple> results = new HashSet<Tuple>();
537
538    results.add( new Tuple( "1", "a1", "1", "A1" ) );
539    results.add( new Tuple( "1", "a1", "1", "A2" ) );
540    results.add( new Tuple( "1", "a1", "1", "A3" ) );
541    results.add( new Tuple( "1", "a2", "1", "A1" ) );
542    results.add( new Tuple( "1", "a2", "1", "A2" ) );
543    results.add( new Tuple( "1", "a2", "1", "A3" ) );
544    results.add( new Tuple( "1", "a3", "1", "A1" ) );
545    results.add( new Tuple( "1", "a3", "1", "A2" ) );
546    results.add( new Tuple( "1", "a3", "1", "A3" ) );
547    results.add( new Tuple( "2", "b1", "2", "B1" ) );
548    results.add( new Tuple( "2", "b1", "2", "B2" ) );
549    results.add( new Tuple( "2", "b1", "2", "B3" ) );
550    results.add( new Tuple( "3", "c1", null, null ) );
551    results.add( new Tuple( "4", "d1", "4", "D1" ) );
552    results.add( new Tuple( "4", "d2", "4", "D1" ) );
553    results.add( new Tuple( "4", "d3", "4", "D1" ) );
554    results.add( new Tuple( "5", "e1", null, null ) );
555    results.add( new Tuple( "5", "e2", null, null ) );
556    results.add( new Tuple( "5", "e3", null, null ) );
557    results.add( new Tuple( null, null, "6", "F1" ) );
558    results.add( new Tuple( null, null, "6", "F2" ) );
559    results.add( new Tuple( "7", "g1", null, null ) );
560    results.add( new Tuple( "7", "g2", null, null ) );
561    results.add( new Tuple( "7", "g3", null, null ) );
562    results.add( new Tuple( "7", "g4", null, null ) );
563    results.add( new Tuple( "7", "g5", null, null ) );
564    results.add( new Tuple( null, "h1", null, "H1" ) );
565
566    handleJoins( "joinouter", new OuterJoin(), results );
567    }
568
569  /**
570   * 1 a1
571   * 1 a2
572   * 1 a3
573   * 2 b1
574   * 3 c1
575   * 4 d1
576   * 4 d2
577   * 4 d3
578   * 5 e1
579   * 5 e2
580   * 5 e3
581   * 7 g1
582   * 7 g2
583   * 7 g3
584   * 7 g4
585   * 7 g5
586   * null h1
587   * <p/>
588   * 1 A1
589   * 1 A2
590   * 1 A3
591   * 2 B1
592   * 2 B2
593   * 2 B3
594   * 4 D1
595   * 6 F1
596   * 6 F2
597   * null H1
598   * <p/>
599   * 1  a1      1       A1
600   * 1  a1      1       A2
601   * 1  a1      1       A3
602   * 1  a2      1       A1
603   * 1  a2      1       A2
604   * 1  a2      1       A3
605   * 1  a3      1       A1
606   * 1  a3      1       A2
607   * 1  a3      1       A3
608   * 2  b1      2       B1
609   * 2  b1      2       B2
610   * 2  b1      2       B3
611   * 3  c1      null    null
612   * 4  d1      4       D1
613   * 4  d2      4       D1
614   * 4  d3      4       D1
615   * 5  e1      null    null
616   * 5  e2      null    null
617   * 5  e3      null    null
618   * 7  g1      null    null
619   * 7  g2      null    null
620   * 7  g3      null    null
621   * 7  g4      null    null
622   * 7  g5      null    null
623   * null h1    null    H1
624   *
625   * @throws Exception
626   */
627  @Test
628  public void testJoinInnerOuter() throws Exception
629    {
630    Set<Tuple> results = new HashSet<Tuple>();
631
632    results.add( new Tuple( "1", "a1", "1", "A1" ) );
633    results.add( new Tuple( "1", "a1", "1", "A2" ) );
634    results.add( new Tuple( "1", "a1", "1", "A3" ) );
635    results.add( new Tuple( "1", "a2", "1", "A1" ) );
636    results.add( new Tuple( "1", "a2", "1", "A2" ) );
637    results.add( new Tuple( "1", "a2", "1", "A3" ) );
638    results.add( new Tuple( "1", "a3", "1", "A1" ) );
639    results.add( new Tuple( "1", "a3", "1", "A2" ) );
640    results.add( new Tuple( "1", "a3", "1", "A3" ) );
641    results.add( new Tuple( "2", "b1", "2", "B1" ) );
642    results.add( new Tuple( "2", "b1", "2", "B2" ) );
643    results.add( new Tuple( "2", "b1", "2", "B3" ) );
644    results.add( new Tuple( "3", "c1", null, null ) );
645    results.add( new Tuple( "4", "d1", "4", "D1" ) );
646    results.add( new Tuple( "4", "d2", "4", "D1" ) );
647    results.add( new Tuple( "4", "d3", "4", "D1" ) );
648    results.add( new Tuple( "5", "e1", null, null ) );
649    results.add( new Tuple( "5", "e2", null, null ) );
650    results.add( new Tuple( "5", "e3", null, null ) );
651    results.add( new Tuple( "7", "g1", null, null ) );
652    results.add( new Tuple( "7", "g2", null, null ) );
653    results.add( new Tuple( "7", "g3", null, null ) );
654    results.add( new Tuple( "7", "g4", null, null ) );
655    results.add( new Tuple( "7", "g5", null, null ) );
656    results.add( new Tuple( null, "h1", null, "H1" ) );
657
658    handleJoins( "joininnerouter", new LeftJoin(), results );
659    }
660
661  /**
662   * 1 a1
663   * 1 a2
664   * 1 a3
665   * 2 b1
666   * 3 c1
667   * 4 d1
668   * 4 d2
669   * 4 d3
670   * 5 e1
671   * 5 e2
672   * 5 e3
673   * 7 g1
674   * 7 g2
675   * 7 g3
676   * 7 g4
677   * 7 g5
678   * null h1
679   * <p/>
680   * 1 A1
681   * 1 A2
682   * 1 A3
683   * 2 B1
684   * 2 B2
685   * 2 B3
686   * 4 D1
687   * 6 F1
688   * 6 F2
689   * null H1
690   * <p/>
691   * 1  a1      1       A1
692   * 1  a1      1       A2
693   * 1  a1      1       A3
694   * 1  a2      1       A1
695   * 1  a2      1       A2
696   * 1  a2      1       A3
697   * 1  a3      1       A1
698   * 1  a3      1       A2
699   * 1  a3      1       A3
700   * 2  b1      2       B1
701   * 2  b1      2       B2
702   * 2  b1      2       B3
703   * 4  d1      4       D1
704   * 4  d2      4       D1
705   * 4  d3      4       D1
706   * null       null    6       F1
707   * null       null    6       F2
708   * null h1    null    H1
709   *
710   * @throws Exception
711   */
712  @Test
713  public void testJoinOuterInner() throws Exception
714    {
715    // skip if hadoop cluster mode, outer joins don't behave the same
716    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
717      return;
718
719    Set<Tuple> results = new HashSet<Tuple>();
720
721    results.add( new Tuple( "1", "a1", "1", "A1" ) );
722    results.add( new Tuple( "1", "a1", "1", "A2" ) );
723    results.add( new Tuple( "1", "a1", "1", "A3" ) );
724    results.add( new Tuple( "1", "a2", "1", "A1" ) );
725    results.add( new Tuple( "1", "a2", "1", "A2" ) );
726    results.add( new Tuple( "1", "a2", "1", "A3" ) );
727    results.add( new Tuple( "1", "a3", "1", "A1" ) );
728    results.add( new Tuple( "1", "a3", "1", "A2" ) );
729    results.add( new Tuple( "1", "a3", "1", "A3" ) );
730    results.add( new Tuple( "2", "b1", "2", "B1" ) );
731    results.add( new Tuple( "2", "b1", "2", "B2" ) );
732    results.add( new Tuple( "2", "b1", "2", "B3" ) );
733    results.add( new Tuple( "4", "d1", "4", "D1" ) );
734    results.add( new Tuple( "4", "d2", "4", "D1" ) );
735    results.add( new Tuple( "4", "d3", "4", "D1" ) );
736    results.add( new Tuple( null, null, "6", "F1" ) );
737    results.add( new Tuple( null, null, "6", "F2" ) );
738    results.add( new Tuple( null, "h1", null, "H1" ) );
739
740    handleJoins( "joinouterinner", new RightJoin(), results );
741    }
742
743  private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception
744    {
745    getPlatform().copyFromLocal( inputFileLhsSparse );
746    getPlatform().copyFromLocal( inputFileRhsSparse );
747
748    Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class );
749    Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse );
750    Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse );
751
752    Map sources = new HashMap();
753
754    sources.put( "lower", sourceLower );
755    sources.put( "upper", sourceUpper );
756
757    Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE );
758
759    Pipe pipeLower = new Pipe( "lower" );
760    Pipe pipeUpper = new Pipe( "upper" );
761
762    Fields declaredFields = new Fields( "num", "char", "num2", "char2" );
763    Fields groupingFields = new Fields( "num" );
764
765    Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner );
766
767    splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS );
768
769    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
770
771    flow.complete();
772
773    validateLength( flow, results.size() );
774
775    List<Tuple> actual = getSinkAsList( flow );
776
777    results.removeAll( actual );
778
779    assertEquals( 0, results.size() );
780    }
781
782  /**
783   * 1 a
784   * 5 b
785   * 6 c
786   * 5 b
787   * 5 e
788   * <p/>
789   * 1 A
790   * 2 B
791   * 3 C
792   * 4 D
793   * 5 E
794   * <p/>
795   * 1 a
796   * 2 b
797   * 3 c
798   * 4 d
799   * 5 e
800   * <p/>
801   * 1  a       1       A  1  a
802   * -  -   2   B  2  b
803   * -  -   3   C  3  c
804   * -  -   4   D  4  d
805   * 5  b       5   E  5  e
806   * 5  e       5   E  5  e
807   *
808   * @throws Exception
809   */
810  @Test
811  public void testJoinMixed() throws Exception
812    {
813    // skip if hadoop cluster mode, outer joins don't behave the same
814    if( getPlatform().isMapReduce() && getPlatform().isUseCluster() )
815      return;
816
817    getPlatform().copyFromLocal( inputFileLowerOffset );
818    getPlatform().copyFromLocal( inputFileLower );
819    getPlatform().copyFromLocal( inputFileUpper );
820
821    Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset );
822    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
823    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
824
825    Map sources = new HashMap();
826
827    sources.put( "loweroffset", sourceLowerOffset );
828    sources.put( "lower", sourceLower );
829    sources.put( "upper", sourceUpper );
830
831    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE );
832
833    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
834
835    Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter );
836    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
837    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
838
839    Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower );
840    Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) );
841
842    MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} );
843    Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join );
844
845    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
846
847    flow.complete();
848
849    validateLength( flow, 6 );
850
851    Set<Tuple> results = new HashSet<Tuple>();
852
853    results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) );
854    results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) );
855    results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) );
856    results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) );
857    results.add( new Tuple( "5\tb\t5\tE\t5\te" ) );
858    results.add( new Tuple( "5\te\t5\tE\t5\te" ) );
859
860    List<Tuple> actual = getSinkAsList( flow );
861
862    results.removeAll( actual );
863
864    assertEquals( 0, results.size() );
865    }
866
867  @Test
868  public void testJoinDiffFields() throws Exception
869    {
870    getPlatform().copyFromLocal( inputFileLower );
871    getPlatform().copyFromLocal( inputFileUpper );
872
873    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
874    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
875
876    Map sources = new HashMap();
877
878    sources.put( "lower", sourceLower );
879    sources.put( "upper", sourceUpper );
880
881    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE );
882
883    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
884    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
885
886    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
887    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
888
889    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
890
891    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
892
893    flow.complete();
894
895    validateLength( flow, 5 );
896
897    List<Tuple> actual = getSinkAsList( flow );
898
899    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
900    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
901    }
902
903  @Test
904  public void testJoinGroupBy() throws Exception
905    {
906    getPlatform().copyFromLocal( inputFileLower );
907    getPlatform().copyFromLocal( inputFileUpper );
908
909    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
910    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
911
912    Map sources = new HashMap();
913
914    sources.put( "lower", sourceLower );
915    sources.put( "upper", sourceUpper );
916
917    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE );
918
919    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
920    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
921
922    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
923    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
924
925    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
926
927    Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) );
928
929    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby );
930
931    flow.complete();
932
933    validateLength( flow, 5, null );
934
935    List<Tuple> actual = getSinkAsList( flow );
936
937    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
938    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) );
939    }
940
941  @Test
942  public void testJoinSamePipe() throws Exception
943    {
944    getPlatform().copyFromLocal( inputFileLower );
945
946    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
947
948    Map sources = new HashMap();
949
950    sources.put( "lower", source );
951
952    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE );
953
954    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
955
956    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
957
958    Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) );
959
960    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
961
962    flow.complete();
963
964    validateLength( flow, 5, null );
965
966    List<Tuple> actual = getSinkAsList( flow );
967
968    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
969    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
970    }
971
972  @Test
973  public void testJoinSamePipe2() throws Exception
974    {
975    getPlatform().copyFromLocal( inputFileLower );
976
977    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
978
979    Map sources = new HashMap();
980
981    sources.put( "lower", source );
982
983    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE );
984
985    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
986
987    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
988
989    Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
990
991    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
992
993    flow.complete();
994
995    validateLength( flow, 5, null );
996
997    List<Tuple> actual = getSinkAsList( flow );
998
999    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1000    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1001    }
1002
1003  @Test
1004  public void testJoinSamePipe3() throws Exception
1005    {
1006    getPlatform().copyFromLocal( inputFileLower );
1007
1008    Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower );
1009
1010    Map sources = new HashMap();
1011
1012    sources.put( "lower", source );
1013
1014    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE );
1015
1016    Pipe pipe = new Pipe( "lower" );
1017
1018    Pipe lhs = new Pipe( "lhs", pipe );
1019    Pipe rhs = new Pipe( "rhs", pipe );
1020
1021    Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1022
1023    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join );
1024
1025    flow.complete();
1026
1027    validateLength( flow, 5, null );
1028
1029    List<Tuple> actual = getSinkAsList( flow );
1030
1031    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1032    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1033    }
1034
1035  /**
1036   * Same source as rightmost
1037   * <p/>
1038   * should be a single job as the same file accumulates into the joins
1039   *
1040   * @throws Exception
1041   */
1042  @Test
1043  public void testJoinAroundJoinRightMost() throws Exception
1044    {
1045    getPlatform().copyFromLocal( inputFileLower );
1046    getPlatform().copyFromLocal( inputFileUpper );
1047
1048    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1049    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1050
1051    Map sources = new HashMap();
1052
1053    sources.put( "lower", sourceLower );
1054    sources.put( "upper1", sourceUpper );
1055    sources.put( "upper2", sourceUpper );
1056
1057    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE );
1058
1059    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1060
1061    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1062    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1063    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1064
1065    Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1066
1067    splice1 = new Each( splice1, new Identity() );
1068
1069    Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1070
1071    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1072
1073//    flow.writeDOT( "joinaroundrightmost.dot" );
1074
1075    if( getPlatform().isMapReduce() )
1076      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1077
1078    flow.complete();
1079
1080    validateLength( flow, 5, null );
1081
1082    List<Tuple> actual = getSinkAsList( flow );
1083
1084    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1085    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1086    }
1087
1088  /**
1089   * Same source as leftmost
1090   *
1091   * @throws Exception
1092   */
1093  @Test
1094  public void testJoinAroundJoinLeftMost() throws Exception
1095    {
1096    getPlatform().copyFromLocal( inputFileLower );
1097    getPlatform().copyFromLocal( inputFileUpper );
1098
1099    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1100    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1101
1102    Map sources = new HashMap();
1103
1104    sources.put( "lower", sourceLower );
1105    sources.put( "upper1", sourceUpper );
1106    sources.put( "upper2", sourceUpper );
1107
1108    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE );
1109
1110    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1111
1112    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1113    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1114    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1115
1116    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1117
1118    splice1 = new Each( splice1, new Identity() );
1119
1120    Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1121
1122    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1123
1124//    flow.writeDOT( "joinaroundleftmost.dot" );
1125
1126    if( getPlatform().isMapReduce() )
1127      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1128
1129    flow.complete();
1130
1131    validateLength( flow, 5, null );
1132
1133    List<Tuple> actual = getSinkAsList( flow );
1134
1135    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) );
1136    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) );
1137    }
1138
1139  /**
1140   * Upper as leftmost and rightmost forcing two jobs
1141   *
1142   * @throws Exception
1143   */
1144  @Test
1145  public void testJoinAroundJoinRightMostSwapped() throws Exception
1146    {
1147    getPlatform().copyFromLocal( inputFileLower );
1148    getPlatform().copyFromLocal( inputFileUpper );
1149
1150    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1151    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1152
1153    Map sources = new HashMap();
1154
1155    sources.put( "lower", sourceLower );
1156    sources.put( "upper1", sourceUpper );
1157    sources.put( "upper2", sourceUpper );
1158
1159    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE );
1160
1161    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1162
1163    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1164    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1165    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1166
1167    Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1168
1169    splice1 = new Each( splice1, new Identity() );
1170
1171    // upper2 becomes leftmost, forcing a tap between the joins
1172    Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1173
1174    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1175
1176    if( getPlatform().isMapReduce() )
1177      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1178
1179    flow.complete();
1180
1181    validateLength( flow, 5, null );
1182
1183    List<Tuple> actual = getSinkAsList( flow );
1184
1185    assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) );
1186    assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) );
1187    }
1188
1189  @Test
1190  public void testJoinGroupByJoin() throws Exception
1191    {
1192    getPlatform().copyFromLocal( inputFileLower );
1193    getPlatform().copyFromLocal( inputFileUpper );
1194    getPlatform().copyFromLocal( inputFileJoined );
1195
1196    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1197    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1198    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );
1199
1200    Map sources = new HashMap();
1201
1202    sources.put( "lower", sourceLower );
1203    sources.put( "upper", sourceUpper );
1204    sources.put( "joined", sourceJoined );
1205
1206    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE );
1207
1208    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1209    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1210    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );
1211
1212    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1213    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1214    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );
1215
1216    Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1217
1218    pipe = new GroupBy( pipe, new Fields( "numA" ) );
1219
1220    pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );
1221
1222    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
1223
1224    if( getPlatform().isMapReduce() )
1225      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1226
1227    flow.complete();
1228
1229    validateLength( flow, 5, null );
1230
1231    List<Tuple> actual = getSinkAsList( flow );
1232
1233    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) );
1234    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) );
1235    }
1236
1237  /**
1238   * here the same file is fed into the same HashJoin.
1239   * <p/>
1240   * This is three jobs.
1241   * <p/>
1242   * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin
1243   * <p/>
1244   * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io
1245   * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin
1246   * <p/>
1247   * /-T-\ <-- accumulated
1248   * T      HJ
1249   * \---/ <-- streamed
1250   *
1251   * @throws Exception
1252   */
1253  @Test
1254  public void testJoinSameSourceIntoJoin() throws Exception
1255    {
1256    getPlatform().copyFromLocal( inputFileLower );
1257    getPlatform().copyFromLocal( inputFileUpper );
1258
1259    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1260    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1261
1262    Map sources = new HashMap();
1263
1264    sources.put( "lower", sourceLower );
1265    sources.put( "upper1", sourceUpper );
1266    sources.put( "upper2", sourceUpper );
1267
1268    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE );
1269
1270    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1271
1272    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1273    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1274    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1275
1276    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1277
1278    splice1 = new Each( splice1, new Identity() );
1279
1280    Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) );
1281
1282    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 );
1283
1284//    flow.writeDOT( "joinsamesourceintojoin.dot" );
1285
1286    if( getPlatform().isMapReduce() )
1287      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1288
1289    flow.complete();
1290
1291    validateLength( flow, 5, null );
1292
1293    List<Tuple> actual = getSinkAsList( flow );
1294
1295    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
1296    assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) );
1297    }
1298
1299  @Test
1300  public void testJoinSameSourceIntoJoinSimple() throws Exception
1301    {
1302    getPlatform().copyFromLocal( inputFileUpper );
1303
1304    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1305
1306    Map sources = new HashMap();
1307
1308    sources.put( "upper1", sourceUpper );
1309    sources.put( "upper2", sourceUpper );
1310
1311    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoinsimple" ), SinkMode.REPLACE );
1312
1313    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1314
1315    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1316    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1317
1318    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1319
1320    splice1 = new Each( splice1, new Identity() );
1321
1322    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 );
1323
1324//    flow.writeDOT( "joinsamesourceintojoin.dot" );
1325
1326    if( getPlatform().isMapReduce() )
1327      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1328
1329    flow.complete();
1330
1331    validateLength( flow, 5, null );
1332
1333    List<Tuple> actual = getSinkAsList( flow );
1334
1335    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) );
1336    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) );
1337    }
1338
1339  /**
1340   * Loosely tests for a deadlock when BlockingHashJoinAnnotator rule doesn't excluce the GroupBy from the blocking
1341   * annotation.
1342   * <p/>
1343   * the deadlock is random on the order of the paths traversed from the Source Tap + fork.
1344   *
1345   * @throws Exception
1346   */
1347  @Test
1348  public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception
1349    {
1350    getPlatform().copyFromLocal( inputFileLower );
1351    getPlatform().copyFromLocal( inputFileUpper );
1352
1353    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1354
1355    Map sources = new HashMap();
1356
1357    sources.put( "upper1", sourceUpper );
1358    sources.put( "upper2", sourceUpper );
1359
1360    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceovergroupbyintojoinsimple" ), SinkMode.REPLACE );
1361
1362    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1363
1364    Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter );
1365    Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter );
1366
1367    pipeUpper1 = new GroupBy( pipeUpper1, new Fields( "num" ) );
1368    pipeUpper2 = new GroupBy( pipeUpper2, new Fields( "num" ) );
1369
1370    Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1371
1372    splice1 = new Each( splice1, new Identity() );
1373
1374    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 );
1375
1376    if( getPlatform().isMapReduce() )
1377      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1378
1379    flow.complete();
1380
1381    validateLength( flow, 5, null );
1382
1383    List<Tuple> actual = getSinkAsList( flow );
1384
1385    assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) );
1386    assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) );
1387    }
1388
1389  /**
1390   * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy
1391   * without loading unused sources
1392   *
1393   * @throws Exception
1394   */
1395  @Test
1396  public void testJoinsIntoGroupBy() throws Exception
1397    {
1398    getPlatform().copyFromLocal( inputFileLower );
1399    getPlatform().copyFromLocal( inputFileUpper );
1400
1401    getPlatform().copyFromLocal( inputFileLhs );
1402    getPlatform().copyFromLocal( inputFileRhs );
1403
1404    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1405    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1406
1407    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1408    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1409
1410    Map sources = new HashMap();
1411
1412    sources.put( "lower", sourceLower );
1413    sources.put( "upper", sourceUpper );
1414    sources.put( "lhs", sourceLhs );
1415    sources.put( "rhs", sourceRhs );
1416
1417    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE );
1418
1419    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1420
1421    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1422    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1423
1424    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1425    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1426
1427    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1428
1429    upperLower = new Each( upperLower, new Identity() );
1430
1431    Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1432
1433    lhsRhs = new Each( lhsRhs, new Identity() );
1434
1435    Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) );
1436
1437    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1438
1439    if( getPlatform().isMapReduce() )
1440      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1441
1442    flow.complete();
1443
1444    validateLength( flow, 42, null );
1445
1446    List<Tuple> actual = getSinkAsList( flow );
1447
1448    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1449    assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) );
1450    }
1451
1452  @Test
1453  public void testJoinSamePipeAroundGroupBy() throws Exception
1454    {
1455    getPlatform().copyFromLocal( inputFileLower );
1456
1457    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1458    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE );
1459
1460    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1461
1462    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1463
1464    Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() );
1465
1466    Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() );
1467
1468    rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) );
1469
1470    rhsPipe = new Each( rhsPipe, new Identity() );
1471
1472    Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) );
1473
1474    Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
1475
1476    flow.complete();
1477
1478    validateLength( flow, 5, null );
1479
1480    List<Tuple> actual = getSinkAsList( flow );
1481
1482    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) );
1483    assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) );
1484    }
1485
1486  /**
1487   * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper
1488   * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join
1489   *
1490   * @throws Exception
1491   */
1492  @Test
1493  public void testJoinsIntoCoGroupLhs() throws Exception
1494    {
1495    getPlatform().copyFromLocal( inputFileLower );
1496    getPlatform().copyFromLocal( inputFileUpper );
1497
1498    getPlatform().copyFromLocal( inputFileLhs );
1499    getPlatform().copyFromLocal( inputFileRhs );
1500
1501    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1502    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1503
1504    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1505    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1506
1507    Map sources = new HashMap();
1508
1509    sources.put( "lower", sourceLower );
1510    sources.put( "upper", sourceUpper );
1511    sources.put( "lhs", sourceLhs );
1512    sources.put( "rhs", sourceRhs );
1513
1514    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE );
1515
1516    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1517
1518    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1519    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1520
1521    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1522    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1523
1524    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1525
1526    upperLower = new Each( upperLower, new Identity() );
1527
1528    Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1529
1530    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1531
1532    Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1533
1534    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1535
1536    if( getPlatform().isMapReduce() )
1537      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1538
1539    flow.complete();
1540
1541    validateLength( flow, 37, null );
1542
1543    List<Tuple> actual = getSinkAsList( flow );
1544
1545    assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) );
1546    assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) );
1547    }
1548
1549  /**
1550   * This test results in one MR jobs because one join feeds into the streamed side of the second.
1551   *
1552   * @throws Exception
1553   */
1554  @Test
1555  public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception
1556    {
1557    getPlatform().copyFromLocal( inputFileLower );
1558    getPlatform().copyFromLocal( inputFileUpper );
1559
1560    getPlatform().copyFromLocal( inputFileLhs );
1561    getPlatform().copyFromLocal( inputFileRhs );
1562
1563    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1564    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1565
1566    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1567    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1568
1569    Map sources = new HashMap();
1570
1571    sources.put( "lower", sourceLower );
1572    sources.put( "upper", sourceUpper );
1573    sources.put( "lhs", sourceLhs );
1574    sources.put( "rhs", sourceRhs );
1575
1576    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE );
1577
1578    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1579
1580    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1581    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1582
1583    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1584    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1585
1586    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1587
1588    upperLower = new Each( upperLower, new Identity() );
1589
1590    Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) );
1591
1592    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1593
1594    Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) );
1595
1596    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1597
1598    if( getPlatform().isMapReduce() )
1599      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1600
1601    flow.complete();
1602
1603    validateLength( flow, 37, null );
1604
1605    List<Tuple> actual = getSinkAsList( flow );
1606
1607    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1608    assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1609    }
1610
1611  @Test
1612  public void testJoinsIntoCoGroupRhs() throws Exception
1613    {
1614    getPlatform().copyFromLocal( inputFileLower );
1615    getPlatform().copyFromLocal( inputFileUpper );
1616
1617    getPlatform().copyFromLocal( inputFileLhs );
1618    getPlatform().copyFromLocal( inputFileRhs );
1619
1620    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1621    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1622
1623    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1624    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1625
1626    Map sources = new HashMap();
1627
1628    sources.put( "lower", sourceLower );
1629    sources.put( "upper", sourceUpper );
1630    sources.put( "lhs", sourceLhs );
1631    sources.put( "rhs", sourceRhs );
1632
1633    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE );
1634
1635    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1636
1637    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1638    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1639
1640    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1641    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1642
1643    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1644
1645    upperLower = new Each( upperLower, new Identity() );
1646
1647    Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) );
1648
1649    lhsUpperLower = new Each( lhsUpperLower, new Identity() );
1650
1651    Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) );
1652
1653    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1654
1655    if( getPlatform().isMapReduce() )
1656      assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() );
1657
1658    flow.complete();
1659
1660    validateLength( flow, 37, null );
1661
1662    List<Tuple> actual = getSinkAsList( flow );
1663
1664    assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) );
1665    assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) );
1666    }
1667
1668  @Test
1669  public void testJoinsIntoCoGroup() throws Exception
1670    {
1671    getPlatform().copyFromLocal( inputFileLower );
1672    getPlatform().copyFromLocal( inputFileUpper );
1673
1674    getPlatform().copyFromLocal( inputFileLhs );
1675    getPlatform().copyFromLocal( inputFileRhs );
1676
1677    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1678    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1679
1680    Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs );
1681    Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs );
1682
1683    Map sources = new HashMap();
1684
1685    sources.put( "lower", sourceLower );
1686    sources.put( "upper", sourceUpper );
1687    sources.put( "lhs", sourceLhs );
1688    sources.put( "rhs", sourceRhs );
1689
1690    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE );
1691
1692    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1693
1694    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1695    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1696
1697    Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter );
1698    Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter );
1699
1700    Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) );
1701
1702    upperLower = new Each( upperLower, new Identity() );
1703
1704    Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) );
1705
1706    lhsRhs = new Each( lhsRhs, new Identity() );
1707
1708    Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) );
1709
1710    Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped );
1711
1712    if( getPlatform().isMapReduce() )
1713      assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() );
1714
1715    flow.complete();
1716
1717    validateLength( flow, 37, null );
1718
1719    List<Tuple> actual = getSinkAsList( flow );
1720
1721    assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) );
1722    assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) );
1723    }
1724
1725  public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable
1726    {
1727
1728    @Override
1729    public int compare( Comparable lhs, Comparable rhs )
1730      {
1731      return lhs.toString().compareTo( rhs.toString() );
1732      }
1733
1734    @Override
1735    public int hashCode( Comparable value )
1736      {
1737      if( value == null )
1738        return 0;
1739
1740      return value.toString().hashCode();
1741      }
1742    }
1743
1744  /**
1745   * Tests Hasher being honored even if default comparator is null.
1746   *
1747   * @throws Exception
1748   */
1749  @Test
1750  public void testJoinWithHasher() throws Exception
1751    {
1752    getPlatform().copyFromLocal( inputFileLower );
1753    getPlatform().copyFromLocal( inputFileUpper );
1754
1755    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1756    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1757
1758    Map sources = new HashMap();
1759
1760    sources.put( "lower", sourceLower );
1761    sources.put( "upper", sourceUpper );
1762
1763    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE );
1764
1765    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1766
1767    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1768
1769    pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE );
1770
1771    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1772
1773    Fields num = new Fields( "num" );
1774    num.setComparator( "num", new AllComparator() );
1775
1776    Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
1777
1778    Map<Object, Object> properties = getProperties();
1779
1780    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1781
1782    flow.complete();
1783
1784    validateLength( flow, 5 );
1785
1786    List<Tuple> values = getSinkAsList( flow );
1787
1788    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1789    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1790    }
1791
1792  @Test
1793  public void testJoinNone() throws Exception
1794    {
1795    getPlatform().copyFromLocal( inputFileLower );
1796    getPlatform().copyFromLocal( inputFileUpper );
1797
1798    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1799    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1800
1801    Map sources = new HashMap();
1802
1803    sources.put( "lower", sourceLower );
1804    sources.put( "upper", sourceUpper );
1805
1806    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE );
1807
1808    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
1809
1810    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
1811    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
1812
1813    Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) );
1814
1815    Map<Object, Object> properties = getProperties();
1816
1817    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
1818
1819    flow.complete();
1820
1821    validateLength( flow, 25 );
1822
1823    List<Tuple> values = getSinkAsList( flow );
1824
1825    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
1826    assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
1827    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
1828    }
1829
1830  @Test
1831  public void testGroupBySplitJoins() throws Exception
1832    {
1833    getPlatform().copyFromLocal( inputFileLower );
1834    getPlatform().copyFromLocal( inputFileUpper );
1835    getPlatform().copyFromLocal( inputFileJoined );
1836
1837    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
1838    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
1839    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );
1840
1841    Map sources = new HashMap();
1842
1843    sources.put( "lower", sourceLower );
1844    sources.put( "upper", sourceUpper );
1845    sources.put( "joined", sourceJoined );
1846
1847    Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE );
1848    Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE );
1849
1850    Map sinks = new HashMap();
1851
1852    sinks.put( "lhs", lhsSink );
1853    sinks.put( "rhs", rhsSink );
1854
1855    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
1856    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
1857    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );
1858
1859    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
1860    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
1861    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );
1862
1863    Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) );
1864
1865    pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS );
1866
1867    Pipe lhsPipe = new Each( pipe, new Identity() );
1868    lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );
1869
1870    Pipe rhsPipe = new Each( pipe, new Identity() );
1871    rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );
1872
1873    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe );
1874
1875    if( getPlatform().isMapReduce() )
1876      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
1877
1878    flow.complete();
1879
1880    validateLength( flow.openSink( "lhs" ), 5, null );
1881    validateLength( flow.openSink( "rhs" ), 5, null );
1882
1883    List<Tuple> lhsActual = asList( flow, lhsSink );
1884
1885    assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) );
1886    assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) );
1887
1888    List<Tuple> rhsActual = asList( flow, rhsSink );
1889
1890    assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) );
1891    assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) );
1892    }
1893
1894  /**
1895   * currently we cannot efficiently plan for this case. better to throw an error
1896   * <p/>
1897   * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch.
1898   * <p/>
1899   * commented code is for troubleshooting.
1900   *
1901   * @throws Exception
1902   */
1903  @Test
1904  public void testJoinMergeGroupBy() throws Exception
1905    {
1906    getPlatform().copyFromLocal( inputFileNums10 );
1907    getPlatform().copyFromLocal( inputFileNums20 );
1908
1909    Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 );
1910    Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 );
1911
1912    Pipe lhs = new Pipe( "lhs" );
1913    Pipe rhs = new Pipe( "rhs" );
1914
1915//    Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) );
1916    Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) );
1917
1918    Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS );
1919//    pruned = new Checkpoint( pruned );
1920    Pipe merged = new Merge( pruned, rhs );
1921    Pipe grouped = new GroupBy( merged, new Fields( "id2" ) );
1922//    Pipe grouped = new GroupBy( Pipe.pipes(  pruned, people  ), new Fields( "id2" ) );
1923    Aggregator count = new Count( new Fields( "count" ) );
1924    Pipe counted = new Every( grouped, count );
1925
1926    String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" );
1927    Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE );
1928
1929    FlowDef flowDef = FlowDef.flowDef()
1930      .setName( "join-merge" )
1931      .addSource( rhs, rhsTap )
1932      .addSource( lhs, lhsTap )
1933      .addTailSink( counted, sink );
1934
1935    boolean failOnPlanner = !getPlatform().supportsGroupByAfterMerge();
1936
1937    Flow flow = null;
1938
1939    try
1940      {
1941      flow = getPlatform().getFlowConnector().connect( flowDef );
1942
1943      if( failOnPlanner )
1944        fail( "planner should throw error on plan" );
1945      }
1946    catch( Exception exception )
1947      {
1948      if( !failOnPlanner )
1949        throw exception;
1950
1951      return;
1952      }
1953
1954//    flow.writeDOT( "joinmerge.dot" );
1955//    flow.writeStepsDOT( "joinmerge-steps.dot" );
1956
1957    flow.complete();
1958
1959    validateLength( flow, 20 );
1960
1961    List<Tuple> values = getSinkAsList( flow );
1962    List<Tuple> expected = new ArrayList<Tuple>();
1963
1964    expected.add( new Tuple( "1", "2" ) );
1965    expected.add( new Tuple( "10", "2" ) );
1966    expected.add( new Tuple( "11", "1" ) );
1967    expected.add( new Tuple( "12", "1" ) );
1968    expected.add( new Tuple( "13", "1" ) );
1969    expected.add( new Tuple( "14", "1" ) );
1970    expected.add( new Tuple( "15", "1" ) );
1971    expected.add( new Tuple( "16", "1" ) );
1972    expected.add( new Tuple( "17", "1" ) );
1973    expected.add( new Tuple( "18", "1" ) );
1974    expected.add( new Tuple( "19", "1" ) );
1975    expected.add( new Tuple( "2", "2" ) );
1976    expected.add( new Tuple( "20", "1" ) );
1977    expected.add( new Tuple( "3", "2" ) );
1978    expected.add( new Tuple( "4", "2" ) );
1979    expected.add( new Tuple( "5", "2" ) );
1980    expected.add( new Tuple( "6", "2" ) );
1981    expected.add( new Tuple( "7", "2" ) );
1982    expected.add( new Tuple( "8", "2" ) );
1983    expected.add( new Tuple( "9", "2" ) );
1984
1985    Collections.sort( values );
1986    Collections.sort( expected );
1987
1988    assertEquals( expected, values );
1989    }
1990
1991  /**
1992   * Under tez, this can result in the HashJoin being duplicated across nodes for each split after the HashJoin
1993   * BoundaryBalanceJoinSplitTransformer inserts a Boundary at the split, preventing duplication of the path
1994   *
1995   * @throws Exception
1996   */
1997  @Test
1998  public void testJoinSplit() throws Exception
1999    {
2000    getPlatform().copyFromLocal( inputFileLhs );
2001    getPlatform().copyFromLocal( inputFileRhs );
2002
2003    FlowDef flowDef = FlowDef.flowDef()
2004      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2005      .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) )
2006      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2007      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2008
2009    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2010    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2011
2012    Pipe join = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2013
2014    Pipe pipeLhs = new Each( new Pipe( "lhsSink", join ), new Identity() );
2015    Pipe pipeRhs = new Each( new Pipe( "rhsSink", join ), new Identity() );
2016
2017    flowDef
2018      .addTail( pipeLhs )
2019      .addTail( pipeRhs );
2020
2021    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2022
2023    flow.complete();
2024
2025    validateLength( flow, 37, null );
2026
2027    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2028
2029    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2030    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2031
2032    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2033
2034    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2035    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2036    }
2037
2038  /**
2039   * catches a situation where BottomUpJoinedBoundariesNodePartitioner may capture an invalid HashJoin sub-graph
2040   * if the in-bound Boundary is split upon.
2041   */
2042  @Test
2043  public void testSameSourceJoinSplitIntoJoin() throws Exception
2044    {
2045    getPlatform().copyFromLocal( inputFileLhs );
2046    getPlatform().copyFromLocal( inputFileRhs );
2047
2048    FlowDef flowDef = FlowDef.flowDef()
2049      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2050      .addSource( "rhs", getPlatform().getTextFile( inputFileLhs ) )
2051      .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) )
2052      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2053      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2054
2055    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2056    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2057
2058    Pipe joinFirst = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2059
2060    Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() );
2061
2062    Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) );
2063
2064    joinSecond = new HashJoin( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) );
2065
2066    Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() );
2067
2068    flowDef
2069      .addTail( pipeLhs )
2070      .addTail( pipeRhs );
2071
2072    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2073
2074    flow.complete();
2075
2076    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2077
2078    assertEquals( 37, values.size() );
2079    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
2080    assertTrue( values.contains( new Tuple( "1\ta\t1\tb" ) ) );
2081
2082    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2083
2084    assertEquals( 109, values.size() );
2085    assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\tA" ) ) );
2086    assertTrue( values.contains( new Tuple( "1\ta\t1\tb\t1\tB" ) ) );
2087    }
2088
2089  /**
2090   * checks that a split after a HashJoin does not result in the HashJoin execution being duplicated across
2091   * multiple nodes, one for each branch in the split.
2092   */
2093  @Test
2094  public void testJoinSplitBeforeJoin() throws Exception
2095    {
2096    getPlatform().copyFromLocal( inputFileLhs );
2097    getPlatform().copyFromLocal( inputFileRhs );
2098
2099    FlowDef flowDef = FlowDef.flowDef()
2100      .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) )
2101      .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) )
2102      .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) )
2103      .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) )
2104      .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) );
2105
2106    Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) );
2107    Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) );
2108
2109    pipeUpper = new Checkpoint( pipeUpper );
2110
2111    HashJoin hashJoin = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() );
2112
2113    Pipe joinFirst = hashJoin;
2114
2115    joinFirst = new Each( joinFirst, new Identity() );
2116
2117    Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() );
2118
2119    pipeLhs = new GroupBy( pipeLhs, new Fields( "numLHS" ) );
2120
2121    joinFirst = new Each( new Pipe( "lhsSplit", joinFirst ), new Identity() );
2122
2123    Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) );
2124
2125    joinSecond = new CoGroup( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) );
2126
2127    Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() );
2128
2129    flowDef
2130      .addTail( pipeLhs )
2131      .addTail( pipeRhs );
2132
2133    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2134
2135    if( getPlatform().isDAG() )
2136      {
2137      FlowStep flowStep = (FlowStep) flow.getFlowSteps().get( 0 );
2138      List<ElementGraph> elementGraphs = flowStep.getFlowNodeGraph().getElementGraphs( hashJoin );
2139
2140      assertEquals( 1, elementGraphs.size() );
2141      }
2142
2143    flow.complete();
2144
2145    List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) );
2146
2147    assertEquals( 37, values.size() );
2148    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2149    assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) );
2150
2151    values = asList( flow, flowDef.getSinks().get( "rhsSink" ) );
2152
2153    assertEquals( 109, values.size() );
2154    assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
2155    assertTrue( values.contains( new Tuple( "1\ta\t1\tB\t1\tB" ) ) );
2156    }
2157
2158  @Test
2159  public void testGroupBySplitGroupByJoin() throws Exception
2160    {
2161    getPlatform().copyFromLocal( inputFileLower );
2162
2163    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2164
2165    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );
2166
2167    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
2168
2169    Pipe pipeFirst = new Pipe( "first" );
2170    pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
2171    pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
2172    pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );
2173
2174    Pipe pipeSecond = new Pipe( "second", pipeFirst );
2175    pipeSecond = new Each( pipeSecond, new Identity() );
2176    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2177    pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
2178    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2179    pipeSecond = new Every( pipeSecond, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL );
2180
2181    Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );
2182
2183    Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice );
2184
2185    flow.complete();
2186
2187    validateLength( flow, 5, null );
2188
2189    List<Tuple> values = getSinkAsList( flow );
2190
2191    assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
2192    assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
2193    assertTrue( values.contains( new Tuple( "3\tc\t3\tc" ) ) );
2194    assertTrue( values.contains( new Tuple( "4\td\t4\td" ) ) );
2195    assertTrue( values.contains( new Tuple( "5\te\t5\te" ) ) );
2196    }
2197
2198  @Test
2199  public void testGroupBySplitSplitGroupByJoin() throws Exception
2200    {
2201    getPlatform().copyFromLocal( inputFileLower );
2202
2203    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2204
2205    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );
2206
2207    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
2208
2209    Pipe pipeFirst = new Pipe( "first" );
2210    pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
2211    pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
2212    pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );
2213
2214    Pipe pipeSecond = new Pipe( "second", pipeFirst );
2215    pipeSecond = new Each( pipeSecond, new Identity() );
2216    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2217    pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
2218
2219    Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );
2220//    Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) );
2221
2222    splice = new HashJoin( splice, new Fields( 0 ), pipeSecond, new Fields( "num" ), Fields.size( 6 ) );
2223
2224    Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice );
2225
2226    flow.complete();
2227
2228    validateLength( flow, 5, null );
2229
2230    List<Tuple> values = getSinkAsList( flow );
2231
2232    assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) );
2233    assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) );
2234    assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) );
2235    assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) );
2236    assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) );
2237    }
2238
2239  @Test
2240  public void testGroupBySplitAroundSplitGroupByJoin() throws Exception
2241    {
2242    getPlatform().copyFromLocal( inputFileLower );
2243
2244    Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2245
2246    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );
2247    Tap sink2 = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink2" ), SinkMode.REPLACE );
2248
2249    Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
2250
2251    Pipe pipeInit = new Pipe( "init" );
2252    Pipe pipeFirst = new Pipe( "first", pipeInit );
2253    pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
2254    pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
2255    pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );
2256
2257    Pipe sink2Pipe = new Pipe( "sink2", pipeFirst );
2258
2259    Pipe pipeSecond = new Pipe( "second", pipeInit );
2260    pipeSecond = new Each( pipeSecond, new Fields( "line" ), splitter );
2261    pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
2262    pipeSecond = new Every( pipeSecond, new Fields( "char" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
2263
2264//    Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );
2265    Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) );
2266
2267    Pipe pipeThird = new Pipe( "third", pipeSecond );
2268    pipeThird = new Each( pipeThird, new Identity() );
2269    pipeThird = new GroupBy( pipeThird, new Fields( "num" ) );
2270    pipeThird = new Every( pipeThird, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL );
2271
2272    splice = new HashJoin( splice, new Fields( 0 ), pipeThird, new Fields( "num" ), Fields.size( 6 ) );
2273
2274    FlowDef flowDef = FlowDef.flowDef()
2275      .setName( splice.getName() )
2276      .addSource( "init", source )
2277      .addTailSink( splice, sink )
2278      .addTailSink( sink2Pipe, sink2 );
2279
2280    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
2281
2282    flow.complete();
2283
2284    validateLength( flow, 5, null );
2285
2286    List<Tuple> values = getSinkAsList( flow );
2287
2288    assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) );
2289    assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) );
2290    assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) );
2291    assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) );
2292    assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) );
2293    }
2294
2295  /**
2296   * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together.
2297   *
2298   * @throws Exception
2299   */
2300  @Test
2301  public void testForkThenJoin() throws Exception
2302    {
2303    getPlatform().copyFromLocal( inputFileLower );
2304    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2305
2306    Map sources = new HashMap();
2307
2308    sources.put( "lower", sourceLower );
2309
2310    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
2311
2312    Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " );
2313
2314    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
2315    Pipe pipeUpper = new Each( new Pipe( "upper", pipeLower ), new Fields( "text" ),
2316      new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ),
2317      Fields.REPLACE );
2318
2319    Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
2320
2321    Map<Object, Object> properties = getProperties();
2322
2323    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
2324
2325    flow.complete();
2326
2327    validateLength( flow, 5 );
2328
2329    List<Tuple> values = getSinkAsList( flow );
2330
2331    assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) );
2332    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
2333    }
2334
2335  /**
2336   * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together.
2337   *
2338   * @throws Exception
2339   */
2340  @Test
2341  public void testForkCoGroupThenHashJoin() throws Exception
2342    {
2343    getPlatform().copyFromLocal( inputFileLower );
2344    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2345    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
2346
2347    Map sources = new HashMap();
2348
2349    sources.put( "sourceLower", sourceLower );
2350    sources.put( "sourceUpper", sourceUpper );
2351
2352    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
2353
2354    Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " );
2355
2356    Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter );
2357    Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter );
2358
2359    Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ),
2360      new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ),
2361      Fields.REPLACE );
2362    Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ),
2363      new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ),
2364      Fields.REPLACE );
2365
2366    leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) );
2367    rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) );
2368
2369    Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) );
2370
2371    Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) );
2372
2373    Map<Object, Object> properties = getProperties();
2374
2375    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, leftSplice );
2376
2377    flow.complete();
2378
2379    validateLength( flow, 5 );
2380
2381    List<Tuple> values = getSinkAsList( flow );
2382    // that the flow completes at all is already success.
2383    assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta" ) ) );
2384    assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb" ) ) );
2385    }
2386
2387  /**
2388   * This test checks for a deadlock when the same input is forked, adapted on one edge, cogroup with something,
2389   * then hashjoined back together.
2390   *
2391   * @throws Exception
2392   */
2393  @Test
2394  public void testForkCoGroupThenHashJoinCoGroupAgain() throws Exception
2395    {
2396    getPlatform().copyFromLocal( inputFileLower );
2397    getPlatform().copyFromLocal( inputFileUpper );
2398
2399    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
2400    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
2401
2402    Map sources = new HashMap();
2403
2404    sources.put( "sourceLower", sourceLower );
2405    sources.put( "sourceUpper", sourceUpper );
2406
2407    Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
2408
2409    Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " );
2410
2411    Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter );
2412    Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter );
2413
2414    Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ),
2415      new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ),
2416      Fields.REPLACE );
2417    Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ),
2418      new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ),
2419      Fields.REPLACE );
2420
2421    leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) );
2422    rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) );
2423
2424    Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) );
2425
2426    Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) );
2427    Pipe rightSplice = new HashJoin( rightPipeUpper, new Fields( "num" ), middleSplice, new Fields( "numM2" ) );
2428
2429    leftSplice = new Rename( leftSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numL1", "charL1", "numM1L", "charM1L", "numM2L", "charM2L" ) );
2430    rightSplice = new Rename( rightSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numR1", "charR1", "numM1R", "charM1R", "numM2R", "charM2R" ) );
2431
2432    leftSplice = new GroupBy( leftSplice, new Fields( "numM1L" ) );
2433    rightSplice = new GroupBy( rightSplice, new Fields( "numM2R" ) );
2434
2435    Pipe splice = new CoGroup( "cogrouping", leftSplice, new Fields( "numM1L" ), rightSplice, new Fields( "numM2R" ) );
2436
2437    Map<Object, Object> properties = getProperties();
2438
2439    Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
2440
2441    flow.complete();
2442
2443    validateLength( flow, 5 );
2444
2445    List<Tuple> values = getSinkAsList( flow );
2446
2447    // getting this far is a success already (past old deadlocks)
2448    assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA\t1\tA\t1\ta" ) ) );
2449    assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb\t2\tB\t2\tB\t2\tb" ) ) );
2450    }
2451  }