001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading; 023 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Comparator; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033 034import cascading.flow.Flow; 035import cascading.flow.FlowDef; 036import cascading.flow.FlowStep; 037import cascading.flow.planner.graph.ElementGraph; 038import cascading.operation.Aggregator; 039import cascading.operation.Function; 040import cascading.operation.Identity; 041import cascading.operation.aggregator.Count; 042import cascading.operation.aggregator.First; 043import cascading.operation.expression.ExpressionFunction; 044import cascading.operation.regex.RegexFilter; 045import cascading.operation.regex.RegexSplitter; 046import cascading.pipe.Checkpoint; 047import cascading.pipe.CoGroup; 048import cascading.pipe.Each; 049import cascading.pipe.Every; 050import cascading.pipe.GroupBy; 051import cascading.pipe.HashJoin; 052import cascading.pipe.Merge; 053import cascading.pipe.Pipe; 054import cascading.pipe.assembly.Rename; 055import cascading.pipe.joiner.InnerJoin; 056import cascading.pipe.joiner.Joiner; 057import cascading.pipe.joiner.LeftJoin; 058import cascading.pipe.joiner.MixedJoin; 059import cascading.pipe.joiner.OuterJoin; 060import cascading.pipe.joiner.RightJoin; 061import cascading.tap.SinkMode; 062import cascading.tap.Tap; 063import cascading.tuple.Fields; 064import cascading.tuple.Hasher; 065import cascading.tuple.Tuple; 066import org.junit.Test; 067 068import static data.InputData.*; 069 070public class JoinFieldedPipesPlatformTest extends PlatformTestCase 071 { 072 public JoinFieldedPipesPlatformTest() 073 { 074 super( true, 4, 1 ); // leave cluster testing enabled 075 } 076 077 @Test 078 public void testCross() throws Exception 079 { 080 getPlatform().copyFromLocal( inputFileLhs ); 081 getPlatform().copyFromLocal( inputFileRhs ); 082 083 Map sources = new HashMap(); 084 085 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 086 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 087 088 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 089 090 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 091 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 092 093 Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 094 095 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 096 097 flow.complete(); 098 099 validateLength( flow, 37, null ); 100 101 List<Tuple> values = getSinkAsList( flow ); 102 103 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 104 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 105 } 106 107 @Test 108 public void testJoin() throws Exception 109 { 110 getPlatform().copyFromLocal( inputFileLower ); 111 getPlatform().copyFromLocal( inputFileUpper ); 112 113 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 114 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 115 116 Map sources = new HashMap(); 117 118 sources.put( "lower", sourceLower ); 119 sources.put( "upper", sourceUpper ); 120 121 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 122 123 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 124 125 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 126 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 127 128 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 129 130 Map<Object, Object> properties = getProperties(); 131 132 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 133 134 flow.complete(); 135 136 validateLength( flow, 5 ); 137 138 List<Tuple> values = getSinkAsList( flow ); 139 140 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 141 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 142 } 143 144 @Test 145 public void testJoinSamePipeName() throws Exception 146 { 147 getPlatform().copyFromLocal( inputFileLower ); 148 getPlatform().copyFromLocal( inputFileUpper ); 149 150 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 151 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 152 153 Map sources = new HashMap(); 154 155 sources.put( "lower", sourceLower ); 156 sources.put( "upper", sourceUpper ); 157 158 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 159 160 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 161 162 Pipe pipeLower = new Pipe( "lower" ); 163 Pipe pipeUpper = new Pipe( "upper" ); 164 165 // these pipes will hide the source name, and could cause one to be lost 166 pipeLower = new Pipe( "same", pipeLower ); 167 pipeUpper = new Pipe( "same", pipeUpper ); 168 169 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 170 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 171 172// pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 173// pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 174 175 pipeLower = new Pipe( "left", pipeLower ); 176 pipeUpper = new Pipe( "right", pipeUpper ); 177 178// pipeLower = new Each( pipeLower, new Debug( true ) ); 179// pipeUpper = new Each( pipeUpper, new Debug( true ) ); 180 181 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 182 183// splice = new Each( splice, new Debug( true ) ); 184 splice = new Pipe( "splice", splice ); 185 splice = new Pipe( "tail", splice ); 186 187 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 188 189 flow.complete(); 190 191 validateLength( flow, 5 ); 192 193 List<Tuple> values = getSinkAsList( flow ); 194 195 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 196 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 197 } 198 199 @Test 200 public void testJoinWithUnknowns() throws Exception 201 { 202 getPlatform().copyFromLocal( inputFileLower ); 203 getPlatform().copyFromLocal( inputFileUpper ); 204 205 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 206 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 207 208 Map sources = new HashMap(); 209 210 sources.put( "lower", sourceLower ); 211 sources.put( "upper", sourceUpper ); 212 213 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 214 215 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 216 217 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 218 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 219 220 Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 221 222 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 223 224 flow.complete(); 225 226 validateLength( flow, 5 ); 227 228 List<Tuple> values = getSinkAsList( flow ); 229 230 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 231 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 232 } 233 234 /** 235 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 236 * a new stream using an outerjoin. 237 * 238 * @throws Exception 239 */ 240 @Test 241 public void testJoinFilteredBranch() throws Exception 242 { 243 getPlatform().copyFromLocal( inputFileLower ); 244 getPlatform().copyFromLocal( inputFileUpper ); 245 246 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 247 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 248 249 Map sources = new HashMap(); 250 251 sources.put( "lower", sourceLower ); 252 sources.put( "upper", sourceUpper ); 253 254 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE ); 255 256 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 257 258 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 259 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 260 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 261 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 262 263 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 264 265 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 266 267 flow.complete(); 268 269 validateLength( flow, 5 ); 270 271 List<Tuple> values = getSinkAsList( flow ); 272 273 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 274 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 275 } 276 277 @Test 278 public void testJoinSelf() throws Exception 279 { 280 getPlatform().copyFromLocal( inputFileLhs ); 281 282 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 283 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 284 285 Map sources = new HashMap(); 286 287 sources.put( "lhs", sourceLhs ); 288 sources.put( "rhs", sourceRhs ); 289 290 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE ); 291 292 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 293 294 Pipe pipeLower = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 295 Pipe pipeUpper = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 296 297 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 298 299 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 300 301 flow.complete(); 302 303 validateLength( flow, 37 ); 304 305 List<Tuple> values = getSinkAsList( flow ); 306 307 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 308 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 309 } 310 311 @Test 312 public void testSameSourceJoin() throws Exception 313 { 314 getPlatform().copyFromLocal( inputFileLhs ); 315 316 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs ); 317 318 Map sources = new HashMap(); 319 320 sources.put( "lhs", source ); 321 sources.put( "rhs", source ); 322 323 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE ); 324 325 Pipe pipeLower = new Pipe( "lhs" ); 326 Pipe pipeUpper = new Pipe( "rhs" ); 327 328 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 329 330 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 331 332 flow.complete(); 333 334 validateLength( flow, 37 ); 335 336 List<Tuple> values = getSinkAsList( flow ); 337 338 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 339 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 340 } 341 342 /** 343 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 344 * 345 * @throws Exception when 346 */ 347 @Test 348 public void testJoinAfterEvery() throws Exception 349 { 350 getPlatform().copyFromLocal( inputFileLower ); 351 getPlatform().copyFromLocal( inputFileUpper ); 352 353 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 354 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 355 356 Map sources = new HashMap(); 357 358 sources.put( "lower", sourceLower ); 359 sources.put( "upper", sourceUpper ); 360 361 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 362 363 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 364 365 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 366 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 367 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 368 369 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 370 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 371 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 372 373 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 374 375 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 376 377 flow.complete(); 378 379 validateLength( flow, 5, null ); 380 381 List<Tuple> values = getSinkAsList( flow ); 382 383 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 384 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 385 } 386 387 @Test 388 public void testJoinInnerSingleField() throws Exception 389 { 390 getPlatform().copyFromLocal( inputFileLowerOffset ); 391 getPlatform().copyFromLocal( inputFileUpper ); 392 393 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 394 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 395 396 Map sources = new HashMap(); 397 398 sources.put( "lower", sourceLower ); 399 sources.put( "upper", sourceUpper ); 400 401 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE ); 402 403 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 404 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 405 406 Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 407 408 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 409 410 flow.complete(); 411 412 validateLength( flow, 3, null ); 413 414 Set<Tuple> results = new HashSet<Tuple>(); 415 416 results.add( new Tuple( "1\t1" ) ); 417 results.add( new Tuple( "5\t5" ) ); 418 419 List<Tuple> actual = getSinkAsList( flow ); 420 421 results.removeAll( actual ); 422 423 assertEquals( 0, results.size() ); 424 } 425 426 /** 427 * 1 a1 428 * 1 a2 429 * 1 a3 430 * 2 b1 431 * 3 c1 432 * 4 d1 433 * 4 d2 434 * 4 d3 435 * 5 e1 436 * 5 e2 437 * 5 e3 438 * 7 g1 439 * 7 g2 440 * 7 g3 441 * 7 g4 442 * 7 g5 443 * null h1 444 * <p/> 445 * 1 A1 446 * 1 A2 447 * 1 A3 448 * 2 B1 449 * 2 B2 450 * 2 B3 451 * 4 D1 452 * 6 F1 453 * 6 F2 454 * null H1 455 * <p/> 456 * 1 a1 1 A1 457 * 1 a1 1 A2 458 * 1 a1 1 A3 459 * 1 a2 1 A1 460 * 1 a2 1 A2 461 * 1 a2 1 A3 462 * 1 a3 1 A1 463 * 1 a3 1 A2 464 * 1 a3 1 A3 465 * 2 b1 2 B1 466 * 2 b1 2 B2 467 * 2 b1 2 B3 468 * 4 d1 4 D1 469 * 4 d2 4 D1 470 * 4 d3 4 D1 471 * null h1 null H1 472 * 473 * @throws Exception 474 */ 475 @Test 476 public void testJoinInner() throws Exception 477 { 478 HashSet<Tuple> results = new HashSet<Tuple>(); 479 480 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 481 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 482 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 483 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 484 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 485 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 486 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 487 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 488 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 489 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 490 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 491 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 492 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 493 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 494 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 495 results.add( new Tuple( null, "h1", null, "H1" ) ); 496 497 handleJoins( "joininner", new InnerJoin(), results ); 498 } 499 500 /** 501 * /** 502 * 1 a1 503 * 1 a2 504 * 1 a3 505 * 2 b1 506 * 3 c1 507 * 4 d1 508 * 4 d2 509 * 4 d3 510 * 5 e1 511 * 5 e2 512 * 5 e3 513 * 7 g1 514 * 7 g2 515 * 7 g3 516 * 7 g4 517 * 7 g5 518 * null h1 519 * <p/> 520 * 1 A1 521 * 1 A2 522 * 1 A3 523 * 2 B1 524 * 2 B2 525 * 2 B3 526 * 4 D1 527 * 6 F1 528 * 6 F2 529 * null H1 530 * <p/> 531 * 1 a1 1 A1 532 * 1 a1 1 A2 533 * 1 a1 1 A3 534 * 1 a2 1 A1 535 * 1 a2 1 A2 536 * 1 a2 1 A3 537 * 1 a3 1 A1 538 * 1 a3 1 A2 539 * 1 a3 1 A3 540 * 2 b1 2 B1 541 * 2 b1 2 B2 542 * 2 b1 2 B3 543 * 3 c1 null null 544 * 4 d1 4 D1 545 * 4 d2 4 D1 546 * 4 d3 4 D1 547 * 5 e1 null null 548 * 5 e2 null null 549 * 5 e3 null null 550 * null null 6 F1 551 * null null 6 F2 552 * 7 g1 null null 553 * 7 g2 null null 554 * 7 g3 null null 555 * 7 g4 null null 556 * 7 g5 null null 557 * null h1 null H1 558 * 559 * @throws Exception 560 */ 561 @Test 562 public void testJoinOuter() throws Exception 563 { 564 // skip if hadoop cluster mode, outer joins don't behave the same 565 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 566 return; 567 568 Set<Tuple> results = new HashSet<Tuple>(); 569 570 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 571 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 572 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 573 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 574 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 575 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 576 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 577 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 578 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 579 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 580 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 581 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 582 results.add( new Tuple( "3", "c1", null, null ) ); 583 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 584 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 585 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 586 results.add( new Tuple( "5", "e1", null, null ) ); 587 results.add( new Tuple( "5", "e2", null, null ) ); 588 results.add( new Tuple( "5", "e3", null, null ) ); 589 results.add( new Tuple( null, null, "6", "F1" ) ); 590 results.add( new Tuple( null, null, "6", "F2" ) ); 591 results.add( new Tuple( "7", "g1", null, null ) ); 592 results.add( new Tuple( "7", "g2", null, null ) ); 593 results.add( new Tuple( "7", "g3", null, null ) ); 594 results.add( new Tuple( "7", "g4", null, null ) ); 595 results.add( new Tuple( "7", "g5", null, null ) ); 596 results.add( new Tuple( null, "h1", null, "H1" ) ); 597 598 handleJoins( "joinouter", new OuterJoin(), results ); 599 } 600 601 /** 602 * 1 a1 603 * 1 a2 604 * 1 a3 605 * 2 b1 606 * 3 c1 607 * 4 d1 608 * 4 d2 609 * 4 d3 610 * 5 e1 611 * 5 e2 612 * 5 e3 613 * 7 g1 614 * 7 g2 615 * 7 g3 616 * 7 g4 617 * 7 g5 618 * null h1 619 * <p/> 620 * 1 A1 621 * 1 A2 622 * 1 A3 623 * 2 B1 624 * 2 B2 625 * 2 B3 626 * 4 D1 627 * 6 F1 628 * 6 F2 629 * null H1 630 * <p/> 631 * 1 a1 1 A1 632 * 1 a1 1 A2 633 * 1 a1 1 A3 634 * 1 a2 1 A1 635 * 1 a2 1 A2 636 * 1 a2 1 A3 637 * 1 a3 1 A1 638 * 1 a3 1 A2 639 * 1 a3 1 A3 640 * 2 b1 2 B1 641 * 2 b1 2 B2 642 * 2 b1 2 B3 643 * 3 c1 null null 644 * 4 d1 4 D1 645 * 4 d2 4 D1 646 * 4 d3 4 D1 647 * 5 e1 null null 648 * 5 e2 null null 649 * 5 e3 null null 650 * 7 g1 null null 651 * 7 g2 null null 652 * 7 g3 null null 653 * 7 g4 null null 654 * 7 g5 null null 655 * null h1 null H1 656 * 657 * @throws Exception 658 */ 659 @Test 660 public void testJoinInnerOuter() throws Exception 661 { 662 Set<Tuple> results = new HashSet<Tuple>(); 663 664 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 665 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 666 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 667 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 668 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 669 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 670 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 671 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 672 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 673 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 674 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 675 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 676 results.add( new Tuple( "3", "c1", null, null ) ); 677 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 678 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 679 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 680 results.add( new Tuple( "5", "e1", null, null ) ); 681 results.add( new Tuple( "5", "e2", null, null ) ); 682 results.add( new Tuple( "5", "e3", null, null ) ); 683 results.add( new Tuple( "7", "g1", null, null ) ); 684 results.add( new Tuple( "7", "g2", null, null ) ); 685 results.add( new Tuple( "7", "g3", null, null ) ); 686 results.add( new Tuple( "7", "g4", null, null ) ); 687 results.add( new Tuple( "7", "g5", null, null ) ); 688 results.add( new Tuple( null, "h1", null, "H1" ) ); 689 690 handleJoins( "joininnerouter", new LeftJoin(), results ); 691 } 692 693 /** 694 * 1 a1 695 * 1 a2 696 * 1 a3 697 * 2 b1 698 * 3 c1 699 * 4 d1 700 * 4 d2 701 * 4 d3 702 * 5 e1 703 * 5 e2 704 * 5 e3 705 * 7 g1 706 * 7 g2 707 * 7 g3 708 * 7 g4 709 * 7 g5 710 * null h1 711 * <p/> 712 * 1 A1 713 * 1 A2 714 * 1 A3 715 * 2 B1 716 * 2 B2 717 * 2 B3 718 * 4 D1 719 * 6 F1 720 * 6 F2 721 * null H1 722 * <p/> 723 * 1 a1 1 A1 724 * 1 a1 1 A2 725 * 1 a1 1 A3 726 * 1 a2 1 A1 727 * 1 a2 1 A2 728 * 1 a2 1 A3 729 * 1 a3 1 A1 730 * 1 a3 1 A2 731 * 1 a3 1 A3 732 * 2 b1 2 B1 733 * 2 b1 2 B2 734 * 2 b1 2 B3 735 * 4 d1 4 D1 736 * 4 d2 4 D1 737 * 4 d3 4 D1 738 * null null 6 F1 739 * null null 6 F2 740 * null h1 null H1 741 * 742 * @throws Exception 743 */ 744 @Test 745 public void testJoinOuterInner() throws Exception 746 { 747 // skip if hadoop cluster mode, outer joins don't behave the same 748 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 749 return; 750 751 Set<Tuple> results = new HashSet<Tuple>(); 752 753 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 754 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 755 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 756 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 757 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 758 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 759 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 760 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 761 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 762 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 763 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 764 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 765 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 766 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 767 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 768 results.add( new Tuple( null, null, "6", "F1" ) ); 769 results.add( new Tuple( null, null, "6", "F2" ) ); 770 results.add( new Tuple( null, "h1", null, "H1" ) ); 771 772 handleJoins( "joinouterinner", new RightJoin(), results ); 773 } 774 775 private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception 776 { 777 getPlatform().copyFromLocal( inputFileLhsSparse ); 778 getPlatform().copyFromLocal( inputFileRhsSparse ); 779 780 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 781 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 782 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 783 784 Map sources = new HashMap(); 785 786 sources.put( "lower", sourceLower ); 787 sources.put( "upper", sourceUpper ); 788 789 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 790 791 Pipe pipeLower = new Pipe( "lower" ); 792 Pipe pipeUpper = new Pipe( "upper" ); 793 794 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 795 Fields groupingFields = new Fields( "num" ); 796 797 Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner ); 798 799 splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS ); 800 801 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 802 803 flow.complete(); 804 805 validateLength( flow, results.size() ); 806 807 List<Tuple> actual = getSinkAsList( flow ); 808 809 results.removeAll( actual ); 810 811 assertEquals( 0, results.size() ); 812 } 813 814 /** 815 * 1 a 816 * 5 b 817 * 6 c 818 * 5 b 819 * 5 e 820 * <p/> 821 * 1 A 822 * 2 B 823 * 3 C 824 * 4 D 825 * 5 E 826 * <p/> 827 * 1 a 828 * 2 b 829 * 3 c 830 * 4 d 831 * 5 e 832 * <p/> 833 * 1 a 1 A 1 a 834 * - - 2 B 2 b 835 * - - 3 C 3 c 836 * - - 4 D 4 d 837 * 5 b 5 E 5 e 838 * 5 e 5 E 5 e 839 * 840 * @throws Exception 841 */ 842 @Test 843 public void testJoinMixed() throws Exception 844 { 845 // skip if hadoop cluster mode, outer joins don't behave the same 846 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 847 return; 848 849 getPlatform().copyFromLocal( inputFileLowerOffset ); 850 getPlatform().copyFromLocal( inputFileLower ); 851 getPlatform().copyFromLocal( inputFileUpper ); 852 853 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 854 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 855 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 856 857 Map sources = new HashMap(); 858 859 sources.put( "loweroffset", sourceLowerOffset ); 860 sources.put( "lower", sourceLower ); 861 sources.put( "upper", sourceUpper ); 862 863 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE ); 864 865 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 866 867 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 868 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 869 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 870 871 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 872 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 873 874 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 875 Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join ); 876 877 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 878 879 flow.complete(); 880 881 validateLength( flow, 6 ); 882 883 Set<Tuple> results = new HashSet<Tuple>(); 884 885 results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) ); 886 results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) ); 887 results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) ); 888 results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) ); 889 results.add( new Tuple( "5\tb\t5\tE\t5\te" ) ); 890 results.add( new Tuple( "5\te\t5\tE\t5\te" ) ); 891 892 List<Tuple> actual = getSinkAsList( flow ); 893 894 results.removeAll( actual ); 895 896 assertEquals( 0, results.size() ); 897 } 898 899 @Test 900 public void testJoinDiffFields() throws Exception 901 { 902 getPlatform().copyFromLocal( inputFileLower ); 903 getPlatform().copyFromLocal( inputFileUpper ); 904 905 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 906 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 907 908 Map sources = new HashMap(); 909 910 sources.put( "lower", sourceLower ); 911 sources.put( "upper", sourceUpper ); 912 913 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 914 915 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 916 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 917 918 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 919 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 920 921 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 922 923 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 924 925 flow.complete(); 926 927 validateLength( flow, 5 ); 928 929 List<Tuple> actual = getSinkAsList( flow ); 930 931 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 932 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 933 } 934 935 @Test 936 public void testJoinGroupBy() throws Exception 937 { 938 getPlatform().copyFromLocal( inputFileLower ); 939 getPlatform().copyFromLocal( inputFileUpper ); 940 941 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 942 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 943 944 Map sources = new HashMap(); 945 946 sources.put( "lower", sourceLower ); 947 sources.put( "upper", sourceUpper ); 948 949 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE ); 950 951 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 952 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 953 954 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 955 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 956 957 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 958 959 Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) ); 960 961 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby ); 962 963 flow.complete(); 964 965 validateLength( flow, 5, null ); 966 967 List<Tuple> actual = getSinkAsList( flow ); 968 969 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 970 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 971 } 972 973 @Test 974 public void testJoinSamePipe() throws Exception 975 { 976 getPlatform().copyFromLocal( inputFileLower ); 977 978 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 979 980 Map sources = new HashMap(); 981 982 sources.put( "lower", source ); 983 984 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 985 986 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 987 988 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 989 990 Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 991 992 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 993 994 flow.complete(); 995 996 validateLength( flow, 5, null ); 997 998 List<Tuple> actual = getSinkAsList( flow ); 999 1000 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1001 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1002 } 1003 1004 @Test 1005 public void testJoinSamePipe2() throws Exception 1006 { 1007 getPlatform().copyFromLocal( inputFileLower ); 1008 1009 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1010 1011 Map sources = new HashMap(); 1012 1013 sources.put( "lower", source ); 1014 1015 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 1016 1017 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1018 1019 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1020 1021 Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1022 1023 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 1024 1025 flow.complete(); 1026 1027 validateLength( flow, 5, null ); 1028 1029 List<Tuple> actual = getSinkAsList( flow ); 1030 1031 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1032 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1033 } 1034 1035 @Test 1036 public void testJoinSamePipe3() throws Exception 1037 { 1038 getPlatform().copyFromLocal( inputFileLower ); 1039 1040 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1041 1042 Map sources = new HashMap(); 1043 1044 sources.put( "lower", source ); 1045 1046 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1047 1048 Pipe pipe = new Pipe( "lower" ); 1049 1050 Pipe lhs = new Pipe( "lhs", pipe ); 1051 Pipe rhs = new Pipe( "rhs", pipe ); 1052 1053 Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1054 1055 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 1056 1057 flow.complete(); 1058 1059 validateLength( flow, 5, null ); 1060 1061 List<Tuple> actual = getSinkAsList( flow ); 1062 1063 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1064 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1065 } 1066 1067 /** 1068 * Same source as rightmost 1069 * <p/> 1070 * should be a single job as the same file accumulates into the joins 1071 * 1072 * @throws Exception 1073 */ 1074 @Test 1075 public void testJoinAroundJoinRightMost() throws Exception 1076 { 1077 getPlatform().copyFromLocal( inputFileLower ); 1078 getPlatform().copyFromLocal( inputFileUpper ); 1079 1080 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1081 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1082 1083 Map sources = new HashMap(); 1084 1085 sources.put( "lower", sourceLower ); 1086 sources.put( "upper1", sourceUpper ); 1087 sources.put( "upper2", sourceUpper ); 1088 1089 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE ); 1090 1091 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1092 1093 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1094 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1095 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1096 1097 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1098 1099 splice1 = new Each( splice1, new Identity() ); 1100 1101 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1102 1103 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1104 1105// flow.writeDOT( "joinaroundrightmost.dot" ); 1106 1107 if( getPlatform().isMapReduce() ) 1108 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1109 1110 flow.complete(); 1111 1112 validateLength( flow, 5, null ); 1113 1114 List<Tuple> actual = getSinkAsList( flow ); 1115 1116 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1117 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1118 } 1119 1120 /** 1121 * Same source as leftmost 1122 * 1123 * @throws Exception 1124 */ 1125 @Test 1126 public void testJoinAroundJoinLeftMost() throws Exception 1127 { 1128 getPlatform().copyFromLocal( inputFileLower ); 1129 getPlatform().copyFromLocal( inputFileUpper ); 1130 1131 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1132 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1133 1134 Map sources = new HashMap(); 1135 1136 sources.put( "lower", sourceLower ); 1137 sources.put( "upper1", sourceUpper ); 1138 sources.put( "upper2", sourceUpper ); 1139 1140 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE ); 1141 1142 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1143 1144 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1145 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1146 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1147 1148 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1149 1150 splice1 = new Each( splice1, new Identity() ); 1151 1152 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1153 1154 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1155 1156// flow.writeDOT( "joinaroundleftmost.dot" ); 1157 1158 if( getPlatform().isMapReduce() ) 1159 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1160 1161 flow.complete(); 1162 1163 validateLength( flow, 5, null ); 1164 1165 List<Tuple> actual = getSinkAsList( flow ); 1166 1167 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) ); 1168 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) ); 1169 } 1170 1171 /** 1172 * Upper as leftmost and rightmost forcing two jobs 1173 * 1174 * @throws Exception 1175 */ 1176 @Test 1177 public void testJoinAroundJoinRightMostSwapped() throws Exception 1178 { 1179 getPlatform().copyFromLocal( inputFileLower ); 1180 getPlatform().copyFromLocal( inputFileUpper ); 1181 1182 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1183 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1184 1185 Map sources = new HashMap(); 1186 1187 sources.put( "lower", sourceLower ); 1188 sources.put( "upper1", sourceUpper ); 1189 sources.put( "upper2", sourceUpper ); 1190 1191 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE ); 1192 1193 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1194 1195 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1196 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1197 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1198 1199 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1200 1201 splice1 = new Each( splice1, new Identity() ); 1202 1203 // upper2 becomes leftmost, forcing a tap between the joins 1204 Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1205 1206 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1207 1208 if( getPlatform().isMapReduce() ) 1209 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1210 1211 flow.complete(); 1212 1213 validateLength( flow, 5, null ); 1214 1215 List<Tuple> actual = getSinkAsList( flow ); 1216 1217 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) ); 1218 assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) ); 1219 } 1220 1221 @Test 1222 public void testJoinGroupByJoin() throws Exception 1223 { 1224 getPlatform().copyFromLocal( inputFileLower ); 1225 getPlatform().copyFromLocal( inputFileUpper ); 1226 getPlatform().copyFromLocal( inputFileJoined ); 1227 1228 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1229 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1230 Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined ); 1231 1232 Map sources = new HashMap(); 1233 1234 sources.put( "lower", sourceLower ); 1235 sources.put( "upper", sourceUpper ); 1236 sources.put( "joined", sourceJoined ); 1237 1238 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE ); 1239 1240 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1241 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1242 Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" ); 1243 1244 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1245 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1246 Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined ); 1247 1248 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1249 1250 pipe = new GroupBy( pipe, new Fields( "numA" ) ); 1251 1252 pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) ); 1253 1254 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 1255 1256 if( getPlatform().isMapReduce() ) 1257 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1258 1259 flow.complete(); 1260 1261 validateLength( flow, 5, null ); 1262 1263 List<Tuple> actual = getSinkAsList( flow ); 1264 1265 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) ); 1266 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) ); 1267 } 1268 1269 /** 1270 * here the same file is fed into the same HashJoin. 1271 * <p/> 1272 * This is three jobs. 1273 * <p/> 1274 * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin 1275 * <p/> 1276 * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io 1277 * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin 1278 * <p/> 1279 * /-T-\ <-- accumulated 1280 * T HJ 1281 * \---/ <-- streamed 1282 * 1283 * @throws Exception 1284 */ 1285 @Test 1286 public void testJoinSameSourceIntoJoin() throws Exception 1287 { 1288 getPlatform().copyFromLocal( inputFileLower ); 1289 getPlatform().copyFromLocal( inputFileUpper ); 1290 1291 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1292 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1293 1294 Map sources = new HashMap(); 1295 1296 sources.put( "lower", sourceLower ); 1297 sources.put( "upper1", sourceUpper ); 1298 sources.put( "upper2", sourceUpper ); 1299 1300 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE ); 1301 1302 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1303 1304 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1305 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1306 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1307 1308 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1309 1310 splice1 = new Each( splice1, new Identity() ); 1311 1312 Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1313 1314 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1315 1316// flow.writeDOT( "joinsamesourceintojoin.dot" ); 1317 1318 if( getPlatform().isMapReduce() ) 1319 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1320 1321 flow.complete(); 1322 1323 validateLength( flow, 5, null ); 1324 1325 List<Tuple> actual = getSinkAsList( flow ); 1326 1327 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1328 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1329 } 1330 1331 @Test 1332 public void testJoinSameSourceIntoJoinSimple() throws Exception 1333 { 1334 getPlatform().copyFromLocal( inputFileUpper ); 1335 1336 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1337 1338 Map sources = new HashMap(); 1339 1340 sources.put( "upper1", sourceUpper ); 1341 sources.put( "upper2", sourceUpper ); 1342 1343 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoinsimple" ), SinkMode.REPLACE ); 1344 1345 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1346 1347 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1348 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1349 1350 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1351 1352 splice1 = new Each( splice1, new Identity() ); 1353 1354 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 ); 1355 1356// flow.writeDOT( "joinsamesourceintojoin.dot" ); 1357 1358 if( getPlatform().isMapReduce() ) 1359 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1360 1361 flow.complete(); 1362 1363 validateLength( flow, 5, null ); 1364 1365 List<Tuple> actual = getSinkAsList( flow ); 1366 1367 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) ); 1368 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) ); 1369 } 1370 1371 /** 1372 * Loosely tests for a deadlock when BlockingHashJoinAnnotator rule doesn't excluce the GroupBy from the blocking 1373 * annotation. 1374 * <p/> 1375 * the deadlock is random on the order of the paths traversed from the Source Tap + fork. 1376 * 1377 * @throws Exception 1378 */ 1379 @Test 1380 public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception 1381 { 1382 getPlatform().copyFromLocal( inputFileLower ); 1383 getPlatform().copyFromLocal( inputFileUpper ); 1384 1385 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1386 1387 Map sources = new HashMap(); 1388 1389 sources.put( "upper1", sourceUpper ); 1390 sources.put( "upper2", sourceUpper ); 1391 1392 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceovergroupbyintojoinsimple" ), SinkMode.REPLACE ); 1393 1394 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1395 1396 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1397 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1398 1399 pipeUpper1 = new GroupBy( pipeUpper1, new Fields( "num" ) ); 1400 pipeUpper2 = new GroupBy( pipeUpper2, new Fields( "num" ) ); 1401 1402 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1403 1404 splice1 = new Each( splice1, new Identity() ); 1405 1406 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 ); 1407 1408 if( getPlatform().isMapReduce() ) 1409 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1410 1411 flow.complete(); 1412 1413 validateLength( flow, 5, null ); 1414 1415 List<Tuple> actual = getSinkAsList( flow ); 1416 1417 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) ); 1418 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) ); 1419 } 1420 1421 /** 1422 * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy 1423 * without loading unused sources 1424 * 1425 * @throws Exception 1426 */ 1427 @Test 1428 public void testJoinsIntoGroupBy() throws Exception 1429 { 1430 getPlatform().copyFromLocal( inputFileLower ); 1431 getPlatform().copyFromLocal( inputFileUpper ); 1432 1433 getPlatform().copyFromLocal( inputFileLhs ); 1434 getPlatform().copyFromLocal( inputFileRhs ); 1435 1436 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1437 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1438 1439 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1440 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1441 1442 Map sources = new HashMap(); 1443 1444 sources.put( "lower", sourceLower ); 1445 sources.put( "upper", sourceUpper ); 1446 sources.put( "lhs", sourceLhs ); 1447 sources.put( "rhs", sourceRhs ); 1448 1449 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE ); 1450 1451 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1452 1453 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1454 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1455 1456 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1457 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1458 1459 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1460 1461 upperLower = new Each( upperLower, new Identity() ); 1462 1463 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1464 1465 lhsRhs = new Each( lhsRhs, new Identity() ); 1466 1467 Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) ); 1468 1469 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1470 1471 if( getPlatform().isMapReduce() ) 1472 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1473 1474 flow.complete(); 1475 1476 validateLength( flow, 42, null ); 1477 1478 List<Tuple> actual = getSinkAsList( flow ); 1479 1480 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1481 assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) ); 1482 } 1483 1484 @Test 1485 public void testJoinSamePipeAroundGroupBy() throws Exception 1486 { 1487 getPlatform().copyFromLocal( inputFileLower ); 1488 1489 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1490 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE ); 1491 1492 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1493 1494 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1495 1496 Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() ); 1497 1498 Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() ); 1499 1500 rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) ); 1501 1502 rhsPipe = new Each( rhsPipe, new Identity() ); 1503 1504 Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1505 1506 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 1507 1508 flow.complete(); 1509 1510 validateLength( flow, 5, null ); 1511 1512 List<Tuple> actual = getSinkAsList( flow ); 1513 1514 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1515 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1516 } 1517 1518 /** 1519 * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper 1520 * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join 1521 * 1522 * @throws Exception 1523 */ 1524 @Test 1525 public void testJoinsIntoCoGroupLhs() throws Exception 1526 { 1527 getPlatform().copyFromLocal( inputFileLower ); 1528 getPlatform().copyFromLocal( inputFileUpper ); 1529 1530 getPlatform().copyFromLocal( inputFileLhs ); 1531 getPlatform().copyFromLocal( inputFileRhs ); 1532 1533 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1534 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1535 1536 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1537 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1538 1539 Map sources = new HashMap(); 1540 1541 sources.put( "lower", sourceLower ); 1542 sources.put( "upper", sourceUpper ); 1543 sources.put( "lhs", sourceLhs ); 1544 sources.put( "rhs", sourceRhs ); 1545 1546 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE ); 1547 1548 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1549 1550 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1551 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1552 1553 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1554 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1555 1556 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1557 1558 upperLower = new Each( upperLower, new Identity() ); 1559 1560 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1561 1562 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1563 1564 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1565 1566 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1567 1568 if( getPlatform().isMapReduce() ) 1569 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1570 1571 flow.complete(); 1572 1573 validateLength( flow, 37, null ); 1574 1575 List<Tuple> actual = getSinkAsList( flow ); 1576 1577 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) ); 1578 assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) ); 1579 } 1580 1581 /** 1582 * This test results in one MR jobs because one join feeds into the streamed side of the second. 1583 * 1584 * @throws Exception 1585 */ 1586 @Test 1587 public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception 1588 { 1589 getPlatform().copyFromLocal( inputFileLower ); 1590 getPlatform().copyFromLocal( inputFileUpper ); 1591 1592 getPlatform().copyFromLocal( inputFileLhs ); 1593 getPlatform().copyFromLocal( inputFileRhs ); 1594 1595 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1596 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1597 1598 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1599 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1600 1601 Map sources = new HashMap(); 1602 1603 sources.put( "lower", sourceLower ); 1604 sources.put( "upper", sourceUpper ); 1605 sources.put( "lhs", sourceLhs ); 1606 sources.put( "rhs", sourceRhs ); 1607 1608 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE ); 1609 1610 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1611 1612 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1613 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1614 1615 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1616 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1617 1618 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1619 1620 upperLower = new Each( upperLower, new Identity() ); 1621 1622 Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) ); 1623 1624 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1625 1626 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1627 1628 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1629 1630 if( getPlatform().isMapReduce() ) 1631 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1632 1633 flow.complete(); 1634 1635 validateLength( flow, 37, null ); 1636 1637 List<Tuple> actual = getSinkAsList( flow ); 1638 1639 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1640 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1641 } 1642 1643 @Test 1644 public void testJoinsIntoCoGroupRhs() throws Exception 1645 { 1646 getPlatform().copyFromLocal( inputFileLower ); 1647 getPlatform().copyFromLocal( inputFileUpper ); 1648 1649 getPlatform().copyFromLocal( inputFileLhs ); 1650 getPlatform().copyFromLocal( inputFileRhs ); 1651 1652 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1653 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1654 1655 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1656 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1657 1658 Map sources = new HashMap(); 1659 1660 sources.put( "lower", sourceLower ); 1661 sources.put( "upper", sourceUpper ); 1662 sources.put( "lhs", sourceLhs ); 1663 sources.put( "rhs", sourceRhs ); 1664 1665 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE ); 1666 1667 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1668 1669 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1670 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1671 1672 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1673 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1674 1675 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1676 1677 upperLower = new Each( upperLower, new Identity() ); 1678 1679 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1680 1681 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1682 1683 Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) ); 1684 1685 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1686 1687 if( getPlatform().isMapReduce() ) 1688 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1689 1690 flow.complete(); 1691 1692 validateLength( flow, 37, null ); 1693 1694 List<Tuple> actual = getSinkAsList( flow ); 1695 1696 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) ); 1697 assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) ); 1698 } 1699 1700 @Test 1701 public void testJoinsIntoCoGroup() throws Exception 1702 { 1703 getPlatform().copyFromLocal( inputFileLower ); 1704 getPlatform().copyFromLocal( inputFileUpper ); 1705 1706 getPlatform().copyFromLocal( inputFileLhs ); 1707 getPlatform().copyFromLocal( inputFileRhs ); 1708 1709 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1710 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1711 1712 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1713 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1714 1715 Map sources = new HashMap(); 1716 1717 sources.put( "lower", sourceLower ); 1718 sources.put( "upper", sourceUpper ); 1719 sources.put( "lhs", sourceLhs ); 1720 sources.put( "rhs", sourceRhs ); 1721 1722 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE ); 1723 1724 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1725 1726 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1727 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1728 1729 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1730 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1731 1732 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) ); 1733 1734 upperLower = new Each( upperLower, new Identity() ); 1735 1736 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) ); 1737 1738 lhsRhs = new Each( lhsRhs, new Identity() ); 1739 1740 Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) ); 1741 1742 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1743 1744 if( getPlatform().isMapReduce() ) 1745 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1746 1747 flow.complete(); 1748 1749 validateLength( flow, 37, null ); 1750 1751 List<Tuple> actual = getSinkAsList( flow ); 1752 1753 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1754 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1755 } 1756 1757 public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable 1758 { 1759 1760 @Override 1761 public int compare( Comparable lhs, Comparable rhs ) 1762 { 1763 return lhs.toString().compareTo( rhs.toString() ); 1764 } 1765 1766 @Override 1767 public int hashCode( Comparable value ) 1768 { 1769 if( value == null ) 1770 return 0; 1771 1772 return value.toString().hashCode(); 1773 } 1774 } 1775 1776 /** 1777 * Tests Hasher being honored even if default comparator is null. 1778 * 1779 * @throws Exception 1780 */ 1781 @Test 1782 public void testJoinWithHasher() throws Exception 1783 { 1784 getPlatform().copyFromLocal( inputFileLower ); 1785 getPlatform().copyFromLocal( inputFileUpper ); 1786 1787 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1788 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1789 1790 Map sources = new HashMap(); 1791 1792 sources.put( "lower", sourceLower ); 1793 sources.put( "upper", sourceUpper ); 1794 1795 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE ); 1796 1797 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1798 1799 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1800 1801 pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE ); 1802 1803 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1804 1805 Fields num = new Fields( "num" ); 1806 num.setComparator( "num", new AllComparator() ); 1807 1808 Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 1809 1810 Map<Object, Object> properties = getProperties(); 1811 1812 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1813 1814 flow.complete(); 1815 1816 validateLength( flow, 5 ); 1817 1818 List<Tuple> values = getSinkAsList( flow ); 1819 1820 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1821 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1822 } 1823 1824 @Test 1825 public void testJoinNone() throws Exception 1826 { 1827 getPlatform().copyFromLocal( inputFileLower ); 1828 getPlatform().copyFromLocal( inputFileUpper ); 1829 1830 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1831 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1832 1833 Map sources = new HashMap(); 1834 1835 sources.put( "lower", sourceLower ); 1836 sources.put( "upper", sourceUpper ); 1837 1838 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1839 1840 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1841 1842 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1843 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1844 1845 Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1846 1847 Map<Object, Object> properties = getProperties(); 1848 1849 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1850 1851 flow.complete(); 1852 1853 validateLength( flow, 25 ); 1854 1855 List<Tuple> values = getSinkAsList( flow ); 1856 1857 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1858 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1859 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1860 } 1861 1862 @Test 1863 public void testGroupBySplitJoins() throws Exception 1864 { 1865 getPlatform().copyFromLocal( inputFileLower ); 1866 getPlatform().copyFromLocal( inputFileUpper ); 1867 getPlatform().copyFromLocal( inputFileJoined ); 1868 1869 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1870 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1871 Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined ); 1872 1873 Map sources = new HashMap(); 1874 1875 sources.put( "lower", sourceLower ); 1876 sources.put( "upper", sourceUpper ); 1877 sources.put( "joined", sourceJoined ); 1878 1879 Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ); 1880 Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ); 1881 1882 Map sinks = new HashMap(); 1883 1884 sinks.put( "lhs", lhsSink ); 1885 sinks.put( "rhs", rhsSink ); 1886 1887 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1888 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1889 Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" ); 1890 1891 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1892 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1893 Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined ); 1894 1895 Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) ); 1896 1897 pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS ); 1898 1899 Pipe lhsPipe = new Each( pipe, new Identity() ); 1900 lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1901 1902 Pipe rhsPipe = new Each( pipe, new Identity() ); 1903 rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) ); 1904 1905 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe ); 1906 1907 if( getPlatform().isMapReduce() ) 1908 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1909 1910 flow.complete(); 1911 1912 validateLength( flow.openSink( "lhs" ), 5, null ); 1913 validateLength( flow.openSink( "rhs" ), 5, null ); 1914 1915 List<Tuple> lhsActual = asList( flow, lhsSink ); 1916 1917 assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1918 assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1919 1920 List<Tuple> rhsActual = asList( flow, rhsSink ); 1921 1922 assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) ); 1923 assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) ); 1924 } 1925 1926 /** 1927 * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch. 1928 * <p> 1929 * The planner nw 1930 * <p> 1931 * <p/> 1932 * commented code is for troubleshooting. 1933 * 1934 * @throws Exception 1935 */ 1936 @Test 1937 public void testJoinMergeGroupBy() throws Exception 1938 { 1939 getPlatform().copyFromLocal( inputFileNums10 ); 1940 getPlatform().copyFromLocal( inputFileNums20 ); 1941 1942 Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 ); 1943 Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 ); 1944 1945 Pipe lhs = new Pipe( "lhs" ); 1946 Pipe rhs = new Pipe( "rhs" ); 1947 1948// Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) ); 1949 Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) ); 1950 1951 Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS ); 1952// pruned = new Checkpoint( pruned ); 1953 Pipe merged = new Merge( pruned, rhs ); 1954 Pipe grouped = new GroupBy( merged, new Fields( "id2" ) ); 1955// Pipe grouped = new GroupBy( Pipe.pipes( pruned, people ), new Fields( "id2" ) ); 1956 Aggregator count = new Count( new Fields( "count" ) ); 1957 Pipe counted = new Every( grouped, count ); 1958 1959 String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" ); 1960 Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE ); 1961 1962 FlowDef flowDef = FlowDef.flowDef() 1963 .setName( "join-merge" ) 1964 .addSource( rhs, rhsTap ) 1965 .addSource( lhs, lhsTap ) 1966 .addTailSink( counted, sink ); 1967 1968 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 1969 1970// flow.writeDOT( "joinmerge.dot" ); 1971// flow.writeStepsDOT( "joinmerge-steps.dot" ); 1972 1973 flow.complete(); 1974 1975 validateLength( flow, 20 ); 1976 1977 List<Tuple> values = getSinkAsList( flow ); 1978 List<Tuple> expected = new ArrayList<Tuple>(); 1979 1980 expected.add( new Tuple( "1", "2" ) ); 1981 expected.add( new Tuple( "10", "2" ) ); 1982 expected.add( new Tuple( "11", "1" ) ); 1983 expected.add( new Tuple( "12", "1" ) ); 1984 expected.add( new Tuple( "13", "1" ) ); 1985 expected.add( new Tuple( "14", "1" ) ); 1986 expected.add( new Tuple( "15", "1" ) ); 1987 expected.add( new Tuple( "16", "1" ) ); 1988 expected.add( new Tuple( "17", "1" ) ); 1989 expected.add( new Tuple( "18", "1" ) ); 1990 expected.add( new Tuple( "19", "1" ) ); 1991 expected.add( new Tuple( "2", "2" ) ); 1992 expected.add( new Tuple( "20", "1" ) ); 1993 expected.add( new Tuple( "3", "2" ) ); 1994 expected.add( new Tuple( "4", "2" ) ); 1995 expected.add( new Tuple( "5", "2" ) ); 1996 expected.add( new Tuple( "6", "2" ) ); 1997 expected.add( new Tuple( "7", "2" ) ); 1998 expected.add( new Tuple( "8", "2" ) ); 1999 expected.add( new Tuple( "9", "2" ) ); 2000 2001 Collections.sort( values ); 2002 Collections.sort( expected ); 2003 2004 assertEquals( expected, values ); 2005 } 2006 2007 /** 2008 * Under tez, this can result in the HashJoin being duplicated across nodes for each split after the HashJoin 2009 * BoundaryBalanceJoinSplitTransformer inserts a Boundary at the split, preventing duplication of the path 2010 * 2011 * @throws Exception 2012 */ 2013 @Test 2014 public void testJoinSplit() throws Exception 2015 { 2016 getPlatform().copyFromLocal( inputFileLhs ); 2017 getPlatform().copyFromLocal( inputFileRhs ); 2018 2019 FlowDef flowDef = FlowDef.flowDef() 2020 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2021 .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) ) 2022 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2023 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2024 2025 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2026 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2027 2028 Pipe join = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2029 2030 Pipe pipeLhs = new Each( new Pipe( "lhsSink", join ), new Identity() ); 2031 Pipe pipeRhs = new Each( new Pipe( "rhsSink", join ), new Identity() ); 2032 2033 flowDef 2034 .addTail( pipeLhs ) 2035 .addTail( pipeRhs ); 2036 2037 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2038 2039 flow.complete(); 2040 2041 validateLength( flow, 37, null ); 2042 2043 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2044 2045 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2046 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2047 2048 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2049 2050 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2051 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2052 } 2053 2054 /** 2055 * catches a situation where BottomUpJoinedBoundariesNodePartitioner may capture an invalid HashJoin sub-graph 2056 * if the in-bound Boundary is split upon. 2057 */ 2058 @Test 2059 public void testSameSourceJoinSplitIntoJoin() throws Exception 2060 { 2061 getPlatform().copyFromLocal( inputFileLhs ); 2062 getPlatform().copyFromLocal( inputFileRhs ); 2063 2064 FlowDef flowDef = FlowDef.flowDef() 2065 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2066 .addSource( "rhs", getPlatform().getTextFile( inputFileLhs ) ) 2067 .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) ) 2068 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2069 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2070 2071 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2072 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2073 2074 Pipe joinFirst = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2075 2076 Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() ); 2077 2078 Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) ); 2079 2080 joinSecond = new HashJoin( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) ); 2081 2082 Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() ); 2083 2084 flowDef 2085 .addTail( pipeLhs ) 2086 .addTail( pipeRhs ); 2087 2088 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2089 2090 flow.complete(); 2091 2092 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2093 2094 assertEquals( 37, values.size() ); 2095 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 2096 assertTrue( values.contains( new Tuple( "1\ta\t1\tb" ) ) ); 2097 2098 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2099 2100 assertEquals( 109, values.size() ); 2101 assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\tA" ) ) ); 2102 assertTrue( values.contains( new Tuple( "1\ta\t1\tb\t1\tB" ) ) ); 2103 } 2104 2105 /** 2106 * checks that a split after a HashJoin does not result in the HashJoin execution being duplicated across 2107 * multiple nodes, one for each branch in the split. 2108 */ 2109 @Test 2110 public void testJoinSplitBeforeJoin() throws Exception 2111 { 2112 getPlatform().copyFromLocal( inputFileLhs ); 2113 getPlatform().copyFromLocal( inputFileRhs ); 2114 2115 FlowDef flowDef = FlowDef.flowDef() 2116 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2117 .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) ) 2118 .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) ) 2119 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2120 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2121 2122 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2123 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2124 2125 pipeUpper = new Checkpoint( pipeUpper ); 2126 2127 HashJoin hashJoin = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2128 2129 Pipe joinFirst = hashJoin; 2130 2131 joinFirst = new Each( joinFirst, new Identity() ); 2132 2133 Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() ); 2134 2135 pipeLhs = new GroupBy( pipeLhs, new Fields( "numLHS" ) ); 2136 2137 joinFirst = new Each( new Pipe( "lhsSplit", joinFirst ), new Identity() ); 2138 2139 Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) ); 2140 2141 joinSecond = new CoGroup( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) ); 2142 2143 Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() ); 2144 2145 flowDef 2146 .addTail( pipeLhs ) 2147 .addTail( pipeRhs ); 2148 2149 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2150 2151 if( getPlatform().isDAG() ) 2152 { 2153 FlowStep flowStep = (FlowStep) flow.getFlowSteps().get( 0 ); 2154 List<ElementGraph> elementGraphs = flowStep.getFlowNodeGraph().getElementGraphs( hashJoin ); 2155 2156 assertEquals( 1, elementGraphs.size() ); 2157 } 2158 2159 flow.complete(); 2160 2161 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2162 2163 assertEquals( 37, values.size() ); 2164 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2165 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2166 2167 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2168 2169 assertEquals( 109, values.size() ); 2170 assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 2171 assertTrue( values.contains( new Tuple( "1\ta\t1\tB\t1\tB" ) ) ); 2172 } 2173 2174 @Test 2175 public void testGroupBySplitGroupByJoin() throws Exception 2176 { 2177 getPlatform().copyFromLocal( inputFileLower ); 2178 2179 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2180 2181 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE ); 2182 2183 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 2184 2185 Pipe pipeFirst = new Pipe( "first" ); 2186 pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter ); 2187 pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) ); 2188 pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL ); 2189 2190 Pipe pipeSecond = new Pipe( "second", pipeFirst ); 2191 pipeSecond = new Each( pipeSecond, new Identity() ); 2192 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2193 pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL ); 2194 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2195 pipeSecond = new Every( pipeSecond, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL ); 2196 2197 Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) ); 2198 2199 Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice ); 2200 2201 flow.complete(); 2202 2203 validateLength( flow, 5, null ); 2204 2205 List<Tuple> values = getSinkAsList( flow ); 2206 2207 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 2208 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 2209 assertTrue( values.contains( new Tuple( "3\tc\t3\tc" ) ) ); 2210 assertTrue( values.contains( new Tuple( "4\td\t4\td" ) ) ); 2211 assertTrue( values.contains( new Tuple( "5\te\t5\te" ) ) ); 2212 } 2213 2214 @Test 2215 public void testGroupBySplitSplitGroupByJoin() throws Exception 2216 { 2217 getPlatform().copyFromLocal( inputFileLower ); 2218 2219 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2220 2221 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE ); 2222 2223 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 2224 2225 Pipe pipeFirst = new Pipe( "first" ); 2226 pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter ); 2227 pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) ); 2228 pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL ); 2229 2230 Pipe pipeSecond = new Pipe( "second", pipeFirst ); 2231 pipeSecond = new Each( pipeSecond, new Identity() ); 2232 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2233 pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL ); 2234 2235 Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) ); 2236// Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) ); 2237 2238 splice = new HashJoin( splice, new Fields( 0 ), pipeSecond, new Fields( "num" ), Fields.size( 6 ) ); 2239 2240 Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice ); 2241 2242 flow.complete(); 2243 2244 validateLength( flow, 5, null ); 2245 2246 List<Tuple> values = getSinkAsList( flow ); 2247 2248 assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) ); 2249 assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) ); 2250 assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) ); 2251 assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) ); 2252 assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) ); 2253 } 2254 2255 @Test 2256 public void testGroupBySplitAroundSplitGroupByJoin() throws Exception 2257 { 2258 getPlatform().copyFromLocal( inputFileLower ); 2259 2260 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2261 2262 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE ); 2263 Tap sink2 = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink2" ), SinkMode.REPLACE ); 2264 2265 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 2266 2267 Pipe pipeInit = new Pipe( "init" ); 2268 Pipe pipeFirst = new Pipe( "first", pipeInit ); 2269 pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter ); 2270 pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) ); 2271 pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL ); 2272 2273 Pipe sink2Pipe = new Pipe( "sink2", pipeFirst ); 2274 2275 Pipe pipeSecond = new Pipe( "second", pipeInit ); 2276 pipeSecond = new Each( pipeSecond, new Fields( "line" ), splitter ); 2277 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2278 pipeSecond = new Every( pipeSecond, new Fields( "char" ), new First( new Fields( "secondFirst" ) ), Fields.ALL ); 2279 2280// Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) ); 2281 Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) ); 2282 2283 Pipe pipeThird = new Pipe( "third", pipeSecond ); 2284 pipeThird = new Each( pipeThird, new Identity() ); 2285 pipeThird = new GroupBy( pipeThird, new Fields( "num" ) ); 2286 pipeThird = new Every( pipeThird, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL ); 2287 2288 splice = new HashJoin( splice, new Fields( 0 ), pipeThird, new Fields( "num" ), Fields.size( 6 ) ); 2289 2290 FlowDef flowDef = FlowDef.flowDef() 2291 .setName( splice.getName() ) 2292 .addSource( "init", source ) 2293 .addTailSink( splice, sink ) 2294 .addTailSink( sink2Pipe, sink2 ); 2295 2296 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2297 2298 flow.complete(); 2299 2300 validateLength( flow, 5, null ); 2301 2302 List<Tuple> values = getSinkAsList( flow ); 2303 2304 assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) ); 2305 assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) ); 2306 assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) ); 2307 assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) ); 2308 assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) ); 2309 } 2310 2311 /** 2312 * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together. 2313 * 2314 * @throws Exception 2315 */ 2316 @Test 2317 public void testForkThenJoin() throws Exception 2318 { 2319 getPlatform().copyFromLocal( inputFileLower ); 2320 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2321 2322 Map sources = new HashMap(); 2323 2324 sources.put( "lower", sourceLower ); 2325 2326 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 2327 2328 Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " ); 2329 2330 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 2331 Pipe pipeUpper = new Each( new Pipe( "upper", pipeLower ), new Fields( "text" ), 2332 new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ), 2333 Fields.REPLACE ); 2334 2335 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 2336 2337 Map<Object, Object> properties = getProperties(); 2338 2339 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 2340 2341 flow.complete(); 2342 2343 validateLength( flow, 5 ); 2344 2345 List<Tuple> values = getSinkAsList( flow ); 2346 2347 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2348 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 2349 } 2350 2351 /** 2352 * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together. 2353 * 2354 * @throws Exception 2355 */ 2356 @Test 2357 public void testForkCoGroupThenHashJoin() throws Exception 2358 { 2359 getPlatform().copyFromLocal( inputFileLower ); 2360 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2361 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 2362 2363 Map sources = new HashMap(); 2364 2365 sources.put( "sourceLower", sourceLower ); 2366 sources.put( "sourceUpper", sourceUpper ); 2367 2368 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 2369 2370 Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " ); 2371 2372 Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter ); 2373 Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter ); 2374 2375 Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ), 2376 new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ), 2377 Fields.REPLACE ); 2378 Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ), 2379 new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ), 2380 Fields.REPLACE ); 2381 2382 leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) ); 2383 rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) ); 2384 2385 Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) ); 2386 2387 Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) ); 2388 2389 Map<Object, Object> properties = getProperties(); 2390 2391 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, leftSplice ); 2392 2393 flow.complete(); 2394 2395 validateLength( flow, 5 ); 2396 2397 List<Tuple> values = getSinkAsList( flow ); 2398 // that the flow completes at all is already success. 2399 assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta" ) ) ); 2400 assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb" ) ) ); 2401 } 2402 2403 /** 2404 * This test checks for a deadlock when the same input is forked, adapted on one edge, cogroup with something, 2405 * then hashjoined back together. 2406 * 2407 * @throws Exception 2408 */ 2409 @Test 2410 public void testForkCoGroupThenHashJoinCoGroupAgain() throws Exception 2411 { 2412 getPlatform().copyFromLocal( inputFileLower ); 2413 getPlatform().copyFromLocal( inputFileUpper ); 2414 2415 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2416 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 2417 2418 Map sources = new HashMap(); 2419 2420 sources.put( "sourceLower", sourceLower ); 2421 sources.put( "sourceUpper", sourceUpper ); 2422 2423 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 2424 2425 Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " ); 2426 2427 Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter ); 2428 Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter ); 2429 2430 Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ), 2431 new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ), 2432 Fields.REPLACE ); 2433 Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ), 2434 new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ), 2435 Fields.REPLACE ); 2436 2437 leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) ); 2438 rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) ); 2439 2440 Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) ); 2441 2442 Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) ); 2443 Pipe rightSplice = new HashJoin( rightPipeUpper, new Fields( "num" ), middleSplice, new Fields( "numM2" ) ); 2444 2445 leftSplice = new Rename( leftSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numL1", "charL1", "numM1L", "charM1L", "numM2L", "charM2L" ) ); 2446 rightSplice = new Rename( rightSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numR1", "charR1", "numM1R", "charM1R", "numM2R", "charM2R" ) ); 2447 2448 leftSplice = new GroupBy( leftSplice, new Fields( "numM1L" ) ); 2449 rightSplice = new GroupBy( rightSplice, new Fields( "numM2R" ) ); 2450 2451 Pipe splice = new CoGroup( "cogrouping", leftSplice, new Fields( "numM1L" ), rightSplice, new Fields( "numM2R" ) ); 2452 2453 Map<Object, Object> properties = getProperties(); 2454 2455 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 2456 2457 flow.complete(); 2458 2459 validateLength( flow, 5 ); 2460 2461 List<Tuple> values = getSinkAsList( flow ); 2462 2463 // getting this far is a success already (past old deadlocks) 2464 assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA\t1\tA\t1\ta" ) ) ); 2465 assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb\t2\tB\t2\tB\t2\tb" ) ) ); 2466 } 2467 }