001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.Comparator; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import cascading.flow.Flow; 034import cascading.flow.FlowDef; 035import cascading.flow.FlowStep; 036import cascading.flow.planner.graph.ElementGraph; 037import cascading.operation.Aggregator; 038import cascading.operation.Function; 039import cascading.operation.Identity; 040import cascading.operation.aggregator.Count; 041import cascading.operation.aggregator.First; 042import cascading.operation.expression.ExpressionFunction; 043import cascading.operation.regex.RegexFilter; 044import cascading.operation.regex.RegexSplitter; 045import cascading.pipe.Checkpoint; 046import cascading.pipe.CoGroup; 047import cascading.pipe.Each; 048import cascading.pipe.Every; 049import cascading.pipe.GroupBy; 050import cascading.pipe.HashJoin; 051import cascading.pipe.Merge; 052import cascading.pipe.Pipe; 053import cascading.pipe.assembly.Rename; 054import cascading.pipe.joiner.InnerJoin; 055import cascading.pipe.joiner.Joiner; 056import cascading.pipe.joiner.LeftJoin; 057import cascading.pipe.joiner.MixedJoin; 058import cascading.pipe.joiner.OuterJoin; 059import cascading.pipe.joiner.RightJoin; 060import cascading.tap.SinkMode; 061import cascading.tap.Tap; 062import cascading.tuple.Fields; 063import cascading.tuple.Hasher; 064import cascading.tuple.Tuple; 065import org.junit.Test; 066 067import static data.InputData.*; 068 069public class JoinFieldedPipesPlatformTest extends PlatformTestCase 070 { 071 public JoinFieldedPipesPlatformTest() 072 { 073 super( true, 4, 1 ); // leave cluster testing enabled 074 } 075 076 @Test 077 public void testCross() throws Exception 078 { 079 getPlatform().copyFromLocal( inputFileLhs ); 080 getPlatform().copyFromLocal( inputFileRhs ); 081 082 Map sources = new HashMap(); 083 084 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 085 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 086 087 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 088 089 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 090 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 091 092 Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 093 094 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 095 096 flow.complete(); 097 098 validateLength( flow, 37, null ); 099 100 List<Tuple> values = getSinkAsList( flow ); 101 102 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 103 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 104 } 105 106 @Test 107 public void testJoin() throws Exception 108 { 109 getPlatform().copyFromLocal( inputFileLower ); 110 getPlatform().copyFromLocal( inputFileUpper ); 111 112 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 113 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 114 115 Map sources = new HashMap(); 116 117 sources.put( "lower", sourceLower ); 118 sources.put( "upper", sourceUpper ); 119 120 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 121 122 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 123 124 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 125 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 126 127 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 128 129 Map<Object, Object> properties = getProperties(); 130 131 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 132 133 flow.complete(); 134 135 validateLength( flow, 5 ); 136 137 List<Tuple> values = getSinkAsList( flow ); 138 139 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 140 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 141 } 142 143 @Test 144 public void testJoinSamePipeName() throws Exception 145 { 146 getPlatform().copyFromLocal( inputFileLower ); 147 getPlatform().copyFromLocal( inputFileUpper ); 148 149 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 150 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 151 152 Map sources = new HashMap(); 153 154 sources.put( "lower", sourceLower ); 155 sources.put( "upper", sourceUpper ); 156 157 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 158 159 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 160 161 Pipe pipeLower = new Pipe( "lower" ); 162 Pipe pipeUpper = new Pipe( "upper" ); 163 164 // these pipes will hide the source name, and could cause one to be lost 165 pipeLower = new Pipe( "same", pipeLower ); 166 pipeUpper = new Pipe( "same", pipeUpper ); 167 168 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 169 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 170 171// pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 172// pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 173 174 pipeLower = new Pipe( "left", pipeLower ); 175 pipeUpper = new Pipe( "right", pipeUpper ); 176 177// pipeLower = new Each( pipeLower, new Debug( true ) ); 178// pipeUpper = new Each( pipeUpper, new Debug( true ) ); 179 180 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 181 182// splice = new Each( splice, new Debug( true ) ); 183 splice = new Pipe( "splice", splice ); 184 splice = new Pipe( "tail", splice ); 185 186 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 187 188 flow.complete(); 189 190 validateLength( flow, 5 ); 191 192 List<Tuple> values = getSinkAsList( flow ); 193 194 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 195 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 196 } 197 198 @Test 199 public void testJoinWithUnknowns() throws Exception 200 { 201 getPlatform().copyFromLocal( inputFileLower ); 202 getPlatform().copyFromLocal( inputFileUpper ); 203 204 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 205 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 206 207 Map sources = new HashMap(); 208 209 sources.put( "lower", sourceLower ); 210 sources.put( "upper", sourceUpper ); 211 212 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 213 214 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 215 216 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 217 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 218 219 Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 220 221 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 222 223 flow.complete(); 224 225 validateLength( flow, 5 ); 226 227 List<Tuple> values = getSinkAsList( flow ); 228 229 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 230 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 231 } 232 233 /** 234 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 235 * a new stream using an outerjoin. 236 * 237 * @throws Exception 238 */ 239 @Test 240 public void testJoinFilteredBranch() throws Exception 241 { 242 getPlatform().copyFromLocal( inputFileLower ); 243 getPlatform().copyFromLocal( inputFileUpper ); 244 245 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 246 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 247 248 Map sources = new HashMap(); 249 250 sources.put( "lower", sourceLower ); 251 sources.put( "upper", sourceUpper ); 252 253 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE ); 254 255 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 256 257 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 258 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 259 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 260 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 261 262 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 263 264 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 265 266 flow.complete(); 267 268 validateLength( flow, 5 ); 269 270 List<Tuple> values = getSinkAsList( flow ); 271 272 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 273 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 274 } 275 276 @Test 277 public void testJoinSelf() throws Exception 278 { 279 getPlatform().copyFromLocal( inputFileLower ); 280 281 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 282 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 283 284 Map sources = new HashMap(); 285 286 sources.put( "lower", sourceLower ); 287 sources.put( "upper", sourceUpper ); 288 289 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE ); 290 291 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 292 293 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 294 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 295 296 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 297 298 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 299 300 flow.complete(); 301 302 validateLength( flow, 5 ); 303 304 List<Tuple> values = getSinkAsList( flow ); 305 306 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 307 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 308 } 309 310 /** 311 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 312 * 313 * @throws Exception when 314 */ 315 @Test 316 public void testJoinAfterEvery() throws Exception 317 { 318 getPlatform().copyFromLocal( inputFileLower ); 319 getPlatform().copyFromLocal( inputFileUpper ); 320 321 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 322 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 323 324 Map sources = new HashMap(); 325 326 sources.put( "lower", sourceLower ); 327 sources.put( "upper", sourceUpper ); 328 329 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 330 331 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 332 333 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 334 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 335 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 336 337 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 338 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 339 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 340 341 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 342 343 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 344 345 flow.complete(); 346 347 validateLength( flow, 5, null ); 348 349 List<Tuple> values = getSinkAsList( flow ); 350 351 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 352 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 353 } 354 355 @Test 356 public void testJoinInnerSingleField() throws Exception 357 { 358 getPlatform().copyFromLocal( inputFileLowerOffset ); 359 getPlatform().copyFromLocal( inputFileUpper ); 360 361 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 362 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 363 364 Map sources = new HashMap(); 365 366 sources.put( "lower", sourceLower ); 367 sources.put( "upper", sourceUpper ); 368 369 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE ); 370 371 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 372 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 373 374 Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 375 376 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 377 378 flow.complete(); 379 380 validateLength( flow, 3, null ); 381 382 Set<Tuple> results = new HashSet<Tuple>(); 383 384 results.add( new Tuple( "1\t1" ) ); 385 results.add( new Tuple( "5\t5" ) ); 386 387 List<Tuple> actual = getSinkAsList( flow ); 388 389 results.removeAll( actual ); 390 391 assertEquals( 0, results.size() ); 392 } 393 394 /** 395 * 1 a1 396 * 1 a2 397 * 1 a3 398 * 2 b1 399 * 3 c1 400 * 4 d1 401 * 4 d2 402 * 4 d3 403 * 5 e1 404 * 5 e2 405 * 5 e3 406 * 7 g1 407 * 7 g2 408 * 7 g3 409 * 7 g4 410 * 7 g5 411 * null h1 412 * <p/> 413 * 1 A1 414 * 1 A2 415 * 1 A3 416 * 2 B1 417 * 2 B2 418 * 2 B3 419 * 4 D1 420 * 6 F1 421 * 6 F2 422 * null H1 423 * <p/> 424 * 1 a1 1 A1 425 * 1 a1 1 A2 426 * 1 a1 1 A3 427 * 1 a2 1 A1 428 * 1 a2 1 A2 429 * 1 a2 1 A3 430 * 1 a3 1 A1 431 * 1 a3 1 A2 432 * 1 a3 1 A3 433 * 2 b1 2 B1 434 * 2 b1 2 B2 435 * 2 b1 2 B3 436 * 4 d1 4 D1 437 * 4 d2 4 D1 438 * 4 d3 4 D1 439 * null h1 null H1 440 * 441 * @throws Exception 442 */ 443 @Test 444 public void testJoinInner() throws Exception 445 { 446 HashSet<Tuple> results = new HashSet<Tuple>(); 447 448 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 449 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 450 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 451 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 452 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 453 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 454 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 455 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 456 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 457 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 458 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 459 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 460 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 461 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 462 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 463 results.add( new Tuple( null, "h1", null, "H1" ) ); 464 465 handleJoins( "joininner", new InnerJoin(), results ); 466 } 467 468 /** 469 * /** 470 * 1 a1 471 * 1 a2 472 * 1 a3 473 * 2 b1 474 * 3 c1 475 * 4 d1 476 * 4 d2 477 * 4 d3 478 * 5 e1 479 * 5 e2 480 * 5 e3 481 * 7 g1 482 * 7 g2 483 * 7 g3 484 * 7 g4 485 * 7 g5 486 * null h1 487 * <p/> 488 * 1 A1 489 * 1 A2 490 * 1 A3 491 * 2 B1 492 * 2 B2 493 * 2 B3 494 * 4 D1 495 * 6 F1 496 * 6 F2 497 * null H1 498 * <p/> 499 * 1 a1 1 A1 500 * 1 a1 1 A2 501 * 1 a1 1 A3 502 * 1 a2 1 A1 503 * 1 a2 1 A2 504 * 1 a2 1 A3 505 * 1 a3 1 A1 506 * 1 a3 1 A2 507 * 1 a3 1 A3 508 * 2 b1 2 B1 509 * 2 b1 2 B2 510 * 2 b1 2 B3 511 * 3 c1 null null 512 * 4 d1 4 D1 513 * 4 d2 4 D1 514 * 4 d3 4 D1 515 * 5 e1 null null 516 * 5 e2 null null 517 * 5 e3 null null 518 * null null 6 F1 519 * null null 6 F2 520 * 7 g1 null null 521 * 7 g2 null null 522 * 7 g3 null null 523 * 7 g4 null null 524 * 7 g5 null null 525 * null h1 null H1 526 * 527 * @throws Exception 528 */ 529 @Test 530 public void testJoinOuter() throws Exception 531 { 532 // skip if hadoop cluster mode, outer joins don't behave the same 533 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 534 return; 535 536 Set<Tuple> results = new HashSet<Tuple>(); 537 538 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 539 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 540 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 541 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 542 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 543 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 544 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 545 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 546 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 547 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 548 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 549 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 550 results.add( new Tuple( "3", "c1", null, null ) ); 551 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 552 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 553 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 554 results.add( new Tuple( "5", "e1", null, null ) ); 555 results.add( new Tuple( "5", "e2", null, null ) ); 556 results.add( new Tuple( "5", "e3", null, null ) ); 557 results.add( new Tuple( null, null, "6", "F1" ) ); 558 results.add( new Tuple( null, null, "6", "F2" ) ); 559 results.add( new Tuple( "7", "g1", null, null ) ); 560 results.add( new Tuple( "7", "g2", null, null ) ); 561 results.add( new Tuple( "7", "g3", null, null ) ); 562 results.add( new Tuple( "7", "g4", null, null ) ); 563 results.add( new Tuple( "7", "g5", null, null ) ); 564 results.add( new Tuple( null, "h1", null, "H1" ) ); 565 566 handleJoins( "joinouter", new OuterJoin(), results ); 567 } 568 569 /** 570 * 1 a1 571 * 1 a2 572 * 1 a3 573 * 2 b1 574 * 3 c1 575 * 4 d1 576 * 4 d2 577 * 4 d3 578 * 5 e1 579 * 5 e2 580 * 5 e3 581 * 7 g1 582 * 7 g2 583 * 7 g3 584 * 7 g4 585 * 7 g5 586 * null h1 587 * <p/> 588 * 1 A1 589 * 1 A2 590 * 1 A3 591 * 2 B1 592 * 2 B2 593 * 2 B3 594 * 4 D1 595 * 6 F1 596 * 6 F2 597 * null H1 598 * <p/> 599 * 1 a1 1 A1 600 * 1 a1 1 A2 601 * 1 a1 1 A3 602 * 1 a2 1 A1 603 * 1 a2 1 A2 604 * 1 a2 1 A3 605 * 1 a3 1 A1 606 * 1 a3 1 A2 607 * 1 a3 1 A3 608 * 2 b1 2 B1 609 * 2 b1 2 B2 610 * 2 b1 2 B3 611 * 3 c1 null null 612 * 4 d1 4 D1 613 * 4 d2 4 D1 614 * 4 d3 4 D1 615 * 5 e1 null null 616 * 5 e2 null null 617 * 5 e3 null null 618 * 7 g1 null null 619 * 7 g2 null null 620 * 7 g3 null null 621 * 7 g4 null null 622 * 7 g5 null null 623 * null h1 null H1 624 * 625 * @throws Exception 626 */ 627 @Test 628 public void testJoinInnerOuter() throws Exception 629 { 630 Set<Tuple> results = new HashSet<Tuple>(); 631 632 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 633 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 634 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 635 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 636 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 637 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 638 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 639 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 640 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 641 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 642 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 643 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 644 results.add( new Tuple( "3", "c1", null, null ) ); 645 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 646 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 647 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 648 results.add( new Tuple( "5", "e1", null, null ) ); 649 results.add( new Tuple( "5", "e2", null, null ) ); 650 results.add( new Tuple( "5", "e3", null, null ) ); 651 results.add( new Tuple( "7", "g1", null, null ) ); 652 results.add( new Tuple( "7", "g2", null, null ) ); 653 results.add( new Tuple( "7", "g3", null, null ) ); 654 results.add( new Tuple( "7", "g4", null, null ) ); 655 results.add( new Tuple( "7", "g5", null, null ) ); 656 results.add( new Tuple( null, "h1", null, "H1" ) ); 657 658 handleJoins( "joininnerouter", new LeftJoin(), results ); 659 } 660 661 /** 662 * 1 a1 663 * 1 a2 664 * 1 a3 665 * 2 b1 666 * 3 c1 667 * 4 d1 668 * 4 d2 669 * 4 d3 670 * 5 e1 671 * 5 e2 672 * 5 e3 673 * 7 g1 674 * 7 g2 675 * 7 g3 676 * 7 g4 677 * 7 g5 678 * null h1 679 * <p/> 680 * 1 A1 681 * 1 A2 682 * 1 A3 683 * 2 B1 684 * 2 B2 685 * 2 B3 686 * 4 D1 687 * 6 F1 688 * 6 F2 689 * null H1 690 * <p/> 691 * 1 a1 1 A1 692 * 1 a1 1 A2 693 * 1 a1 1 A3 694 * 1 a2 1 A1 695 * 1 a2 1 A2 696 * 1 a2 1 A3 697 * 1 a3 1 A1 698 * 1 a3 1 A2 699 * 1 a3 1 A3 700 * 2 b1 2 B1 701 * 2 b1 2 B2 702 * 2 b1 2 B3 703 * 4 d1 4 D1 704 * 4 d2 4 D1 705 * 4 d3 4 D1 706 * null null 6 F1 707 * null null 6 F2 708 * null h1 null H1 709 * 710 * @throws Exception 711 */ 712 @Test 713 public void testJoinOuterInner() throws Exception 714 { 715 // skip if hadoop cluster mode, outer joins don't behave the same 716 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 717 return; 718 719 Set<Tuple> results = new HashSet<Tuple>(); 720 721 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 722 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 723 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 724 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 725 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 726 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 727 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 728 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 729 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 730 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 731 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 732 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 733 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 734 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 735 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 736 results.add( new Tuple( null, null, "6", "F1" ) ); 737 results.add( new Tuple( null, null, "6", "F2" ) ); 738 results.add( new Tuple( null, "h1", null, "H1" ) ); 739 740 handleJoins( "joinouterinner", new RightJoin(), results ); 741 } 742 743 private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception 744 { 745 getPlatform().copyFromLocal( inputFileLhsSparse ); 746 getPlatform().copyFromLocal( inputFileRhsSparse ); 747 748 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 749 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 750 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 751 752 Map sources = new HashMap(); 753 754 sources.put( "lower", sourceLower ); 755 sources.put( "upper", sourceUpper ); 756 757 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 758 759 Pipe pipeLower = new Pipe( "lower" ); 760 Pipe pipeUpper = new Pipe( "upper" ); 761 762 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 763 Fields groupingFields = new Fields( "num" ); 764 765 Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner ); 766 767 splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS ); 768 769 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 770 771 flow.complete(); 772 773 validateLength( flow, results.size() ); 774 775 List<Tuple> actual = getSinkAsList( flow ); 776 777 results.removeAll( actual ); 778 779 assertEquals( 0, results.size() ); 780 } 781 782 /** 783 * 1 a 784 * 5 b 785 * 6 c 786 * 5 b 787 * 5 e 788 * <p/> 789 * 1 A 790 * 2 B 791 * 3 C 792 * 4 D 793 * 5 E 794 * <p/> 795 * 1 a 796 * 2 b 797 * 3 c 798 * 4 d 799 * 5 e 800 * <p/> 801 * 1 a 1 A 1 a 802 * - - 2 B 2 b 803 * - - 3 C 3 c 804 * - - 4 D 4 d 805 * 5 b 5 E 5 e 806 * 5 e 5 E 5 e 807 * 808 * @throws Exception 809 */ 810 @Test 811 public void testJoinMixed() throws Exception 812 { 813 // skip if hadoop cluster mode, outer joins don't behave the same 814 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 815 return; 816 817 getPlatform().copyFromLocal( inputFileLowerOffset ); 818 getPlatform().copyFromLocal( inputFileLower ); 819 getPlatform().copyFromLocal( inputFileUpper ); 820 821 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 822 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 823 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 824 825 Map sources = new HashMap(); 826 827 sources.put( "loweroffset", sourceLowerOffset ); 828 sources.put( "lower", sourceLower ); 829 sources.put( "upper", sourceUpper ); 830 831 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE ); 832 833 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 834 835 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 836 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 837 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 838 839 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 840 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 841 842 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 843 Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join ); 844 845 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 846 847 flow.complete(); 848 849 validateLength( flow, 6 ); 850 851 Set<Tuple> results = new HashSet<Tuple>(); 852 853 results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) ); 854 results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) ); 855 results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) ); 856 results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) ); 857 results.add( new Tuple( "5\tb\t5\tE\t5\te" ) ); 858 results.add( new Tuple( "5\te\t5\tE\t5\te" ) ); 859 860 List<Tuple> actual = getSinkAsList( flow ); 861 862 results.removeAll( actual ); 863 864 assertEquals( 0, results.size() ); 865 } 866 867 @Test 868 public void testJoinDiffFields() throws Exception 869 { 870 getPlatform().copyFromLocal( inputFileLower ); 871 getPlatform().copyFromLocal( inputFileUpper ); 872 873 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 874 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 875 876 Map sources = new HashMap(); 877 878 sources.put( "lower", sourceLower ); 879 sources.put( "upper", sourceUpper ); 880 881 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 882 883 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 884 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 885 886 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 887 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 888 889 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 890 891 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 892 893 flow.complete(); 894 895 validateLength( flow, 5 ); 896 897 List<Tuple> actual = getSinkAsList( flow ); 898 899 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 900 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 901 } 902 903 @Test 904 public void testJoinGroupBy() throws Exception 905 { 906 getPlatform().copyFromLocal( inputFileLower ); 907 getPlatform().copyFromLocal( inputFileUpper ); 908 909 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 910 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 911 912 Map sources = new HashMap(); 913 914 sources.put( "lower", sourceLower ); 915 sources.put( "upper", sourceUpper ); 916 917 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE ); 918 919 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 920 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 921 922 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 923 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 924 925 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 926 927 Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) ); 928 929 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby ); 930 931 flow.complete(); 932 933 validateLength( flow, 5, null ); 934 935 List<Tuple> actual = getSinkAsList( flow ); 936 937 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 938 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 939 } 940 941 @Test 942 public void testJoinSamePipe() throws Exception 943 { 944 getPlatform().copyFromLocal( inputFileLower ); 945 946 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 947 948 Map sources = new HashMap(); 949 950 sources.put( "lower", source ); 951 952 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 953 954 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 955 956 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 957 958 Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 959 960 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 961 962 flow.complete(); 963 964 validateLength( flow, 5, null ); 965 966 List<Tuple> actual = getSinkAsList( flow ); 967 968 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 969 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 970 } 971 972 @Test 973 public void testJoinSamePipe2() throws Exception 974 { 975 getPlatform().copyFromLocal( inputFileLower ); 976 977 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 978 979 Map sources = new HashMap(); 980 981 sources.put( "lower", source ); 982 983 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 984 985 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 986 987 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 988 989 Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 990 991 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 992 993 flow.complete(); 994 995 validateLength( flow, 5, null ); 996 997 List<Tuple> actual = getSinkAsList( flow ); 998 999 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1000 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1001 } 1002 1003 @Test 1004 public void testJoinSamePipe3() throws Exception 1005 { 1006 getPlatform().copyFromLocal( inputFileLower ); 1007 1008 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1009 1010 Map sources = new HashMap(); 1011 1012 sources.put( "lower", source ); 1013 1014 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1015 1016 Pipe pipe = new Pipe( "lower" ); 1017 1018 Pipe lhs = new Pipe( "lhs", pipe ); 1019 Pipe rhs = new Pipe( "rhs", pipe ); 1020 1021 Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1022 1023 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 1024 1025 flow.complete(); 1026 1027 validateLength( flow, 5, null ); 1028 1029 List<Tuple> actual = getSinkAsList( flow ); 1030 1031 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1032 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1033 } 1034 1035 /** 1036 * Same source as rightmost 1037 * <p/> 1038 * should be a single job as the same file accumulates into the joins 1039 * 1040 * @throws Exception 1041 */ 1042 @Test 1043 public void testJoinAroundJoinRightMost() throws Exception 1044 { 1045 getPlatform().copyFromLocal( inputFileLower ); 1046 getPlatform().copyFromLocal( inputFileUpper ); 1047 1048 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1049 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1050 1051 Map sources = new HashMap(); 1052 1053 sources.put( "lower", sourceLower ); 1054 sources.put( "upper1", sourceUpper ); 1055 sources.put( "upper2", sourceUpper ); 1056 1057 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE ); 1058 1059 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1060 1061 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1062 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1063 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1064 1065 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1066 1067 splice1 = new Each( splice1, new Identity() ); 1068 1069 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1070 1071 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1072 1073// flow.writeDOT( "joinaroundrightmost.dot" ); 1074 1075 if( getPlatform().isMapReduce() ) 1076 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1077 1078 flow.complete(); 1079 1080 validateLength( flow, 5, null ); 1081 1082 List<Tuple> actual = getSinkAsList( flow ); 1083 1084 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1085 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1086 } 1087 1088 /** 1089 * Same source as leftmost 1090 * 1091 * @throws Exception 1092 */ 1093 @Test 1094 public void testJoinAroundJoinLeftMost() throws Exception 1095 { 1096 getPlatform().copyFromLocal( inputFileLower ); 1097 getPlatform().copyFromLocal( inputFileUpper ); 1098 1099 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1100 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1101 1102 Map sources = new HashMap(); 1103 1104 sources.put( "lower", sourceLower ); 1105 sources.put( "upper1", sourceUpper ); 1106 sources.put( "upper2", sourceUpper ); 1107 1108 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE ); 1109 1110 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1111 1112 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1113 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1114 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1115 1116 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1117 1118 splice1 = new Each( splice1, new Identity() ); 1119 1120 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1121 1122 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1123 1124// flow.writeDOT( "joinaroundleftmost.dot" ); 1125 1126 if( getPlatform().isMapReduce() ) 1127 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1128 1129 flow.complete(); 1130 1131 validateLength( flow, 5, null ); 1132 1133 List<Tuple> actual = getSinkAsList( flow ); 1134 1135 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) ); 1136 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) ); 1137 } 1138 1139 /** 1140 * Upper as leftmost and rightmost forcing two jobs 1141 * 1142 * @throws Exception 1143 */ 1144 @Test 1145 public void testJoinAroundJoinRightMostSwapped() throws Exception 1146 { 1147 getPlatform().copyFromLocal( inputFileLower ); 1148 getPlatform().copyFromLocal( inputFileUpper ); 1149 1150 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1151 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1152 1153 Map sources = new HashMap(); 1154 1155 sources.put( "lower", sourceLower ); 1156 sources.put( "upper1", sourceUpper ); 1157 sources.put( "upper2", sourceUpper ); 1158 1159 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE ); 1160 1161 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1162 1163 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1164 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1165 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1166 1167 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1168 1169 splice1 = new Each( splice1, new Identity() ); 1170 1171 // upper2 becomes leftmost, forcing a tap between the joins 1172 Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1173 1174 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1175 1176 if( getPlatform().isMapReduce() ) 1177 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1178 1179 flow.complete(); 1180 1181 validateLength( flow, 5, null ); 1182 1183 List<Tuple> actual = getSinkAsList( flow ); 1184 1185 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) ); 1186 assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) ); 1187 } 1188 1189 @Test 1190 public void testJoinGroupByJoin() throws Exception 1191 { 1192 getPlatform().copyFromLocal( inputFileLower ); 1193 getPlatform().copyFromLocal( inputFileUpper ); 1194 getPlatform().copyFromLocal( inputFileJoined ); 1195 1196 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1197 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1198 Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined ); 1199 1200 Map sources = new HashMap(); 1201 1202 sources.put( "lower", sourceLower ); 1203 sources.put( "upper", sourceUpper ); 1204 sources.put( "joined", sourceJoined ); 1205 1206 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE ); 1207 1208 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1209 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1210 Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" ); 1211 1212 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1213 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1214 Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined ); 1215 1216 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1217 1218 pipe = new GroupBy( pipe, new Fields( "numA" ) ); 1219 1220 pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) ); 1221 1222 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 1223 1224 if( getPlatform().isMapReduce() ) 1225 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1226 1227 flow.complete(); 1228 1229 validateLength( flow, 5, null ); 1230 1231 List<Tuple> actual = getSinkAsList( flow ); 1232 1233 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) ); 1234 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) ); 1235 } 1236 1237 /** 1238 * here the same file is fed into the same HashJoin. 1239 * <p/> 1240 * This is three jobs. 1241 * <p/> 1242 * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin 1243 * <p/> 1244 * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io 1245 * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin 1246 * <p/> 1247 * /-T-\ <-- accumulated 1248 * T HJ 1249 * \---/ <-- streamed 1250 * 1251 * @throws Exception 1252 */ 1253 @Test 1254 public void testJoinSameSourceIntoJoin() throws Exception 1255 { 1256 getPlatform().copyFromLocal( inputFileLower ); 1257 getPlatform().copyFromLocal( inputFileUpper ); 1258 1259 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1260 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1261 1262 Map sources = new HashMap(); 1263 1264 sources.put( "lower", sourceLower ); 1265 sources.put( "upper1", sourceUpper ); 1266 sources.put( "upper2", sourceUpper ); 1267 1268 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE ); 1269 1270 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1271 1272 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1273 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1274 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1275 1276 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1277 1278 splice1 = new Each( splice1, new Identity() ); 1279 1280 Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1281 1282 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1283 1284// flow.writeDOT( "joinsamesourceintojoin.dot" ); 1285 1286 if( getPlatform().isMapReduce() ) 1287 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1288 1289 flow.complete(); 1290 1291 validateLength( flow, 5, null ); 1292 1293 List<Tuple> actual = getSinkAsList( flow ); 1294 1295 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1296 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1297 } 1298 1299 @Test 1300 public void testJoinSameSourceIntoJoinSimple() throws Exception 1301 { 1302 getPlatform().copyFromLocal( inputFileUpper ); 1303 1304 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1305 1306 Map sources = new HashMap(); 1307 1308 sources.put( "upper1", sourceUpper ); 1309 sources.put( "upper2", sourceUpper ); 1310 1311 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoinsimple" ), SinkMode.REPLACE ); 1312 1313 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1314 1315 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1316 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1317 1318 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1319 1320 splice1 = new Each( splice1, new Identity() ); 1321 1322 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 ); 1323 1324// flow.writeDOT( "joinsamesourceintojoin.dot" ); 1325 1326 if( getPlatform().isMapReduce() ) 1327 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1328 1329 flow.complete(); 1330 1331 validateLength( flow, 5, null ); 1332 1333 List<Tuple> actual = getSinkAsList( flow ); 1334 1335 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) ); 1336 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) ); 1337 } 1338 1339 /** 1340 * Loosely tests for a deadlock when BlockingHashJoinAnnotator rule doesn't excluce the GroupBy from the blocking 1341 * annotation. 1342 * <p/> 1343 * the deadlock is random on the order of the paths traversed from the Source Tap + fork. 1344 * 1345 * @throws Exception 1346 */ 1347 @Test 1348 public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception 1349 { 1350 getPlatform().copyFromLocal( inputFileLower ); 1351 getPlatform().copyFromLocal( inputFileUpper ); 1352 1353 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1354 1355 Map sources = new HashMap(); 1356 1357 sources.put( "upper1", sourceUpper ); 1358 sources.put( "upper2", sourceUpper ); 1359 1360 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceovergroupbyintojoinsimple" ), SinkMode.REPLACE ); 1361 1362 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1363 1364 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1365 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1366 1367 pipeUpper1 = new GroupBy( pipeUpper1, new Fields( "num" ) ); 1368 pipeUpper2 = new GroupBy( pipeUpper2, new Fields( "num" ) ); 1369 1370 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1371 1372 splice1 = new Each( splice1, new Identity() ); 1373 1374 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 ); 1375 1376 if( getPlatform().isMapReduce() ) 1377 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1378 1379 flow.complete(); 1380 1381 validateLength( flow, 5, null ); 1382 1383 List<Tuple> actual = getSinkAsList( flow ); 1384 1385 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) ); 1386 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) ); 1387 } 1388 1389 /** 1390 * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy 1391 * without loading unused sources 1392 * 1393 * @throws Exception 1394 */ 1395 @Test 1396 public void testJoinsIntoGroupBy() throws Exception 1397 { 1398 getPlatform().copyFromLocal( inputFileLower ); 1399 getPlatform().copyFromLocal( inputFileUpper ); 1400 1401 getPlatform().copyFromLocal( inputFileLhs ); 1402 getPlatform().copyFromLocal( inputFileRhs ); 1403 1404 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1405 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1406 1407 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1408 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1409 1410 Map sources = new HashMap(); 1411 1412 sources.put( "lower", sourceLower ); 1413 sources.put( "upper", sourceUpper ); 1414 sources.put( "lhs", sourceLhs ); 1415 sources.put( "rhs", sourceRhs ); 1416 1417 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE ); 1418 1419 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1420 1421 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1422 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1423 1424 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1425 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1426 1427 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1428 1429 upperLower = new Each( upperLower, new Identity() ); 1430 1431 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1432 1433 lhsRhs = new Each( lhsRhs, new Identity() ); 1434 1435 Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) ); 1436 1437 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1438 1439 if( getPlatform().isMapReduce() ) 1440 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1441 1442 flow.complete(); 1443 1444 validateLength( flow, 42, null ); 1445 1446 List<Tuple> actual = getSinkAsList( flow ); 1447 1448 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1449 assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) ); 1450 } 1451 1452 @Test 1453 public void testJoinSamePipeAroundGroupBy() throws Exception 1454 { 1455 getPlatform().copyFromLocal( inputFileLower ); 1456 1457 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1458 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE ); 1459 1460 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1461 1462 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1463 1464 Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() ); 1465 1466 Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() ); 1467 1468 rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) ); 1469 1470 rhsPipe = new Each( rhsPipe, new Identity() ); 1471 1472 Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1473 1474 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 1475 1476 flow.complete(); 1477 1478 validateLength( flow, 5, null ); 1479 1480 List<Tuple> actual = getSinkAsList( flow ); 1481 1482 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1483 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1484 } 1485 1486 /** 1487 * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper 1488 * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join 1489 * 1490 * @throws Exception 1491 */ 1492 @Test 1493 public void testJoinsIntoCoGroupLhs() throws Exception 1494 { 1495 getPlatform().copyFromLocal( inputFileLower ); 1496 getPlatform().copyFromLocal( inputFileUpper ); 1497 1498 getPlatform().copyFromLocal( inputFileLhs ); 1499 getPlatform().copyFromLocal( inputFileRhs ); 1500 1501 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1502 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1503 1504 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1505 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1506 1507 Map sources = new HashMap(); 1508 1509 sources.put( "lower", sourceLower ); 1510 sources.put( "upper", sourceUpper ); 1511 sources.put( "lhs", sourceLhs ); 1512 sources.put( "rhs", sourceRhs ); 1513 1514 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE ); 1515 1516 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1517 1518 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1519 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1520 1521 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1522 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1523 1524 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1525 1526 upperLower = new Each( upperLower, new Identity() ); 1527 1528 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1529 1530 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1531 1532 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1533 1534 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1535 1536 if( getPlatform().isMapReduce() ) 1537 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1538 1539 flow.complete(); 1540 1541 validateLength( flow, 37, null ); 1542 1543 List<Tuple> actual = getSinkAsList( flow ); 1544 1545 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) ); 1546 assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) ); 1547 } 1548 1549 /** 1550 * This test results in one MR jobs because one join feeds into the streamed side of the second. 1551 * 1552 * @throws Exception 1553 */ 1554 @Test 1555 public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception 1556 { 1557 getPlatform().copyFromLocal( inputFileLower ); 1558 getPlatform().copyFromLocal( inputFileUpper ); 1559 1560 getPlatform().copyFromLocal( inputFileLhs ); 1561 getPlatform().copyFromLocal( inputFileRhs ); 1562 1563 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1564 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1565 1566 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1567 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1568 1569 Map sources = new HashMap(); 1570 1571 sources.put( "lower", sourceLower ); 1572 sources.put( "upper", sourceUpper ); 1573 sources.put( "lhs", sourceLhs ); 1574 sources.put( "rhs", sourceRhs ); 1575 1576 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE ); 1577 1578 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1579 1580 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1581 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1582 1583 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1584 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1585 1586 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1587 1588 upperLower = new Each( upperLower, new Identity() ); 1589 1590 Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) ); 1591 1592 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1593 1594 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1595 1596 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1597 1598 if( getPlatform().isMapReduce() ) 1599 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1600 1601 flow.complete(); 1602 1603 validateLength( flow, 37, null ); 1604 1605 List<Tuple> actual = getSinkAsList( flow ); 1606 1607 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1608 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1609 } 1610 1611 @Test 1612 public void testJoinsIntoCoGroupRhs() throws Exception 1613 { 1614 getPlatform().copyFromLocal( inputFileLower ); 1615 getPlatform().copyFromLocal( inputFileUpper ); 1616 1617 getPlatform().copyFromLocal( inputFileLhs ); 1618 getPlatform().copyFromLocal( inputFileRhs ); 1619 1620 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1621 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1622 1623 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1624 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1625 1626 Map sources = new HashMap(); 1627 1628 sources.put( "lower", sourceLower ); 1629 sources.put( "upper", sourceUpper ); 1630 sources.put( "lhs", sourceLhs ); 1631 sources.put( "rhs", sourceRhs ); 1632 1633 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE ); 1634 1635 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1636 1637 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1638 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1639 1640 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1641 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1642 1643 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1644 1645 upperLower = new Each( upperLower, new Identity() ); 1646 1647 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1648 1649 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1650 1651 Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) ); 1652 1653 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1654 1655 if( getPlatform().isMapReduce() ) 1656 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1657 1658 flow.complete(); 1659 1660 validateLength( flow, 37, null ); 1661 1662 List<Tuple> actual = getSinkAsList( flow ); 1663 1664 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) ); 1665 assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) ); 1666 } 1667 1668 @Test 1669 public void testJoinsIntoCoGroup() throws Exception 1670 { 1671 getPlatform().copyFromLocal( inputFileLower ); 1672 getPlatform().copyFromLocal( inputFileUpper ); 1673 1674 getPlatform().copyFromLocal( inputFileLhs ); 1675 getPlatform().copyFromLocal( inputFileRhs ); 1676 1677 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1678 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1679 1680 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1681 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1682 1683 Map sources = new HashMap(); 1684 1685 sources.put( "lower", sourceLower ); 1686 sources.put( "upper", sourceUpper ); 1687 sources.put( "lhs", sourceLhs ); 1688 sources.put( "rhs", sourceRhs ); 1689 1690 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE ); 1691 1692 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1693 1694 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1695 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1696 1697 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1698 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1699 1700 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) ); 1701 1702 upperLower = new Each( upperLower, new Identity() ); 1703 1704 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) ); 1705 1706 lhsRhs = new Each( lhsRhs, new Identity() ); 1707 1708 Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) ); 1709 1710 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1711 1712 if( getPlatform().isMapReduce() ) 1713 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1714 1715 flow.complete(); 1716 1717 validateLength( flow, 37, null ); 1718 1719 List<Tuple> actual = getSinkAsList( flow ); 1720 1721 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1722 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1723 } 1724 1725 public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable 1726 { 1727 1728 @Override 1729 public int compare( Comparable lhs, Comparable rhs ) 1730 { 1731 return lhs.toString().compareTo( rhs.toString() ); 1732 } 1733 1734 @Override 1735 public int hashCode( Comparable value ) 1736 { 1737 if( value == null ) 1738 return 0; 1739 1740 return value.toString().hashCode(); 1741 } 1742 } 1743 1744 /** 1745 * Tests Hasher being honored even if default comparator is null. 1746 * 1747 * @throws Exception 1748 */ 1749 @Test 1750 public void testJoinWithHasher() throws Exception 1751 { 1752 getPlatform().copyFromLocal( inputFileLower ); 1753 getPlatform().copyFromLocal( inputFileUpper ); 1754 1755 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1756 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1757 1758 Map sources = new HashMap(); 1759 1760 sources.put( "lower", sourceLower ); 1761 sources.put( "upper", sourceUpper ); 1762 1763 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE ); 1764 1765 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1766 1767 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1768 1769 pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE ); 1770 1771 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1772 1773 Fields num = new Fields( "num" ); 1774 num.setComparator( "num", new AllComparator() ); 1775 1776 Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 1777 1778 Map<Object, Object> properties = getProperties(); 1779 1780 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1781 1782 flow.complete(); 1783 1784 validateLength( flow, 5 ); 1785 1786 List<Tuple> values = getSinkAsList( flow ); 1787 1788 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1789 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1790 } 1791 1792 @Test 1793 public void testJoinNone() throws Exception 1794 { 1795 getPlatform().copyFromLocal( inputFileLower ); 1796 getPlatform().copyFromLocal( inputFileUpper ); 1797 1798 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1799 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1800 1801 Map sources = new HashMap(); 1802 1803 sources.put( "lower", sourceLower ); 1804 sources.put( "upper", sourceUpper ); 1805 1806 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1807 1808 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1809 1810 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1811 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1812 1813 Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1814 1815 Map<Object, Object> properties = getProperties(); 1816 1817 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1818 1819 flow.complete(); 1820 1821 validateLength( flow, 25 ); 1822 1823 List<Tuple> values = getSinkAsList( flow ); 1824 1825 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1826 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1827 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1828 } 1829 1830 @Test 1831 public void testGroupBySplitJoins() throws Exception 1832 { 1833 getPlatform().copyFromLocal( inputFileLower ); 1834 getPlatform().copyFromLocal( inputFileUpper ); 1835 getPlatform().copyFromLocal( inputFileJoined ); 1836 1837 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1838 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1839 Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined ); 1840 1841 Map sources = new HashMap(); 1842 1843 sources.put( "lower", sourceLower ); 1844 sources.put( "upper", sourceUpper ); 1845 sources.put( "joined", sourceJoined ); 1846 1847 Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ); 1848 Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ); 1849 1850 Map sinks = new HashMap(); 1851 1852 sinks.put( "lhs", lhsSink ); 1853 sinks.put( "rhs", rhsSink ); 1854 1855 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1856 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1857 Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" ); 1858 1859 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1860 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1861 Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined ); 1862 1863 Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) ); 1864 1865 pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS ); 1866 1867 Pipe lhsPipe = new Each( pipe, new Identity() ); 1868 lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1869 1870 Pipe rhsPipe = new Each( pipe, new Identity() ); 1871 rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) ); 1872 1873 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe ); 1874 1875 if( getPlatform().isMapReduce() ) 1876 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1877 1878 flow.complete(); 1879 1880 validateLength( flow.openSink( "lhs" ), 5, null ); 1881 validateLength( flow.openSink( "rhs" ), 5, null ); 1882 1883 List<Tuple> lhsActual = asList( flow, lhsSink ); 1884 1885 assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1886 assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1887 1888 List<Tuple> rhsActual = asList( flow, rhsSink ); 1889 1890 assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) ); 1891 assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) ); 1892 } 1893 1894 /** 1895 * currently we cannot efficiently plan for this case. better to throw an error 1896 * <p/> 1897 * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch. 1898 * <p/> 1899 * commented code is for troubleshooting. 1900 * 1901 * @throws Exception 1902 */ 1903 @Test 1904 public void testJoinMergeGroupBy() throws Exception 1905 { 1906 getPlatform().copyFromLocal( inputFileNums10 ); 1907 getPlatform().copyFromLocal( inputFileNums20 ); 1908 1909 Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 ); 1910 Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 ); 1911 1912 Pipe lhs = new Pipe( "lhs" ); 1913 Pipe rhs = new Pipe( "rhs" ); 1914 1915// Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) ); 1916 Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) ); 1917 1918 Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS ); 1919// pruned = new Checkpoint( pruned ); 1920 Pipe merged = new Merge( pruned, rhs ); 1921 Pipe grouped = new GroupBy( merged, new Fields( "id2" ) ); 1922// Pipe grouped = new GroupBy( Pipe.pipes( pruned, people ), new Fields( "id2" ) ); 1923 Aggregator count = new Count( new Fields( "count" ) ); 1924 Pipe counted = new Every( grouped, count ); 1925 1926 String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" ); 1927 Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE ); 1928 1929 FlowDef flowDef = FlowDef.flowDef() 1930 .setName( "join-merge" ) 1931 .addSource( rhs, rhsTap ) 1932 .addSource( lhs, lhsTap ) 1933 .addTailSink( counted, sink ); 1934 1935 boolean failOnPlanner = !getPlatform().supportsGroupByAfterMerge(); 1936 1937 Flow flow = null; 1938 1939 try 1940 { 1941 flow = getPlatform().getFlowConnector().connect( flowDef ); 1942 1943 if( failOnPlanner ) 1944 fail( "planner should throw error on plan" ); 1945 } 1946 catch( Exception exception ) 1947 { 1948 if( !failOnPlanner ) 1949 throw exception; 1950 1951 return; 1952 } 1953 1954// flow.writeDOT( "joinmerge.dot" ); 1955// flow.writeStepsDOT( "joinmerge-steps.dot" ); 1956 1957 flow.complete(); 1958 1959 validateLength( flow, 20 ); 1960 1961 List<Tuple> values = getSinkAsList( flow ); 1962 List<Tuple> expected = new ArrayList<Tuple>(); 1963 1964 expected.add( new Tuple( "1", "2" ) ); 1965 expected.add( new Tuple( "10", "2" ) ); 1966 expected.add( new Tuple( "11", "1" ) ); 1967 expected.add( new Tuple( "12", "1" ) ); 1968 expected.add( new Tuple( "13", "1" ) ); 1969 expected.add( new Tuple( "14", "1" ) ); 1970 expected.add( new Tuple( "15", "1" ) ); 1971 expected.add( new Tuple( "16", "1" ) ); 1972 expected.add( new Tuple( "17", "1" ) ); 1973 expected.add( new Tuple( "18", "1" ) ); 1974 expected.add( new Tuple( "19", "1" ) ); 1975 expected.add( new Tuple( "2", "2" ) ); 1976 expected.add( new Tuple( "20", "1" ) ); 1977 expected.add( new Tuple( "3", "2" ) ); 1978 expected.add( new Tuple( "4", "2" ) ); 1979 expected.add( new Tuple( "5", "2" ) ); 1980 expected.add( new Tuple( "6", "2" ) ); 1981 expected.add( new Tuple( "7", "2" ) ); 1982 expected.add( new Tuple( "8", "2" ) ); 1983 expected.add( new Tuple( "9", "2" ) ); 1984 1985 Collections.sort( values ); 1986 Collections.sort( expected ); 1987 1988 assertEquals( expected, values ); 1989 } 1990 1991 /** 1992 * Under tez, this can result in the HashJoin being duplicated across nodes for each split after the HashJoin 1993 * BoundaryBalanceJoinSplitTransformer inserts a Boundary at the split, preventing duplication of the path 1994 * 1995 * @throws Exception 1996 */ 1997 @Test 1998 public void testJoinSplit() throws Exception 1999 { 2000 getPlatform().copyFromLocal( inputFileLhs ); 2001 getPlatform().copyFromLocal( inputFileRhs ); 2002 2003 FlowDef flowDef = FlowDef.flowDef() 2004 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2005 .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) ) 2006 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2007 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2008 2009 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2010 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2011 2012 Pipe join = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2013 2014 Pipe pipeLhs = new Each( new Pipe( "lhsSink", join ), new Identity() ); 2015 Pipe pipeRhs = new Each( new Pipe( "rhsSink", join ), new Identity() ); 2016 2017 flowDef 2018 .addTail( pipeLhs ) 2019 .addTail( pipeRhs ); 2020 2021 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2022 2023 flow.complete(); 2024 2025 validateLength( flow, 37, null ); 2026 2027 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2028 2029 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2030 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2031 2032 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2033 2034 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2035 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2036 } 2037 2038 /** 2039 * catches a situation where BottomUpJoinedBoundariesNodePartitioner may capture an invalid HashJoin sub-graph 2040 * if the in-bound Boundary is split upon. 2041 */ 2042 @Test 2043 public void testSameSourceJoinSplitIntoJoin() throws Exception 2044 { 2045 getPlatform().copyFromLocal( inputFileLhs ); 2046 getPlatform().copyFromLocal( inputFileRhs ); 2047 2048 FlowDef flowDef = FlowDef.flowDef() 2049 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2050 .addSource( "rhs", getPlatform().getTextFile( inputFileLhs ) ) 2051 .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) ) 2052 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2053 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2054 2055 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2056 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2057 2058 Pipe joinFirst = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2059 2060 Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() ); 2061 2062 Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) ); 2063 2064 joinSecond = new HashJoin( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) ); 2065 2066 Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() ); 2067 2068 flowDef 2069 .addTail( pipeLhs ) 2070 .addTail( pipeRhs ); 2071 2072 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2073 2074 flow.complete(); 2075 2076 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2077 2078 assertEquals( 37, values.size() ); 2079 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 2080 assertTrue( values.contains( new Tuple( "1\ta\t1\tb" ) ) ); 2081 2082 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2083 2084 assertEquals( 109, values.size() ); 2085 assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\tA" ) ) ); 2086 assertTrue( values.contains( new Tuple( "1\ta\t1\tb\t1\tB" ) ) ); 2087 } 2088 2089 /** 2090 * checks that a split after a HashJoin does not result in the HashJoin execution being duplicated across 2091 * multiple nodes, one for each branch in the split. 2092 */ 2093 @Test 2094 public void testJoinSplitBeforeJoin() throws Exception 2095 { 2096 getPlatform().copyFromLocal( inputFileLhs ); 2097 getPlatform().copyFromLocal( inputFileRhs ); 2098 2099 FlowDef flowDef = FlowDef.flowDef() 2100 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2101 .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) ) 2102 .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) ) 2103 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2104 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2105 2106 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2107 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2108 2109 pipeUpper = new Checkpoint( pipeUpper ); 2110 2111 HashJoin hashJoin = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2112 2113 Pipe joinFirst = hashJoin; 2114 2115 joinFirst = new Each( joinFirst, new Identity() ); 2116 2117 Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() ); 2118 2119 pipeLhs = new GroupBy( pipeLhs, new Fields( "numLHS" ) ); 2120 2121 joinFirst = new Each( new Pipe( "lhsSplit", joinFirst ), new Identity() ); 2122 2123 Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) ); 2124 2125 joinSecond = new CoGroup( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) ); 2126 2127 Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() ); 2128 2129 flowDef 2130 .addTail( pipeLhs ) 2131 .addTail( pipeRhs ); 2132 2133 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2134 2135 if( getPlatform().isDAG() ) 2136 { 2137 FlowStep flowStep = (FlowStep) flow.getFlowSteps().get( 0 ); 2138 List<ElementGraph> elementGraphs = flowStep.getFlowNodeGraph().getElementGraphs( hashJoin ); 2139 2140 assertEquals( 1, elementGraphs.size() ); 2141 } 2142 2143 flow.complete(); 2144 2145 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2146 2147 assertEquals( 37, values.size() ); 2148 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2149 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2150 2151 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2152 2153 assertEquals( 109, values.size() ); 2154 assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 2155 assertTrue( values.contains( new Tuple( "1\ta\t1\tB\t1\tB" ) ) ); 2156 } 2157 2158 @Test 2159 public void testGroupBySplitGroupByJoin() throws Exception 2160 { 2161 getPlatform().copyFromLocal( inputFileLower ); 2162 2163 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2164 2165 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE ); 2166 2167 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 2168 2169 Pipe pipeFirst = new Pipe( "first" ); 2170 pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter ); 2171 pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) ); 2172 pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL ); 2173 2174 Pipe pipeSecond = new Pipe( "second", pipeFirst ); 2175 pipeSecond = new Each( pipeSecond, new Identity() ); 2176 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2177 pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL ); 2178 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2179 pipeSecond = new Every( pipeSecond, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL ); 2180 2181 Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) ); 2182 2183 Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice ); 2184 2185 flow.complete(); 2186 2187 validateLength( flow, 5, null ); 2188 2189 List<Tuple> values = getSinkAsList( flow ); 2190 2191 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 2192 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 2193 assertTrue( values.contains( new Tuple( "3\tc\t3\tc" ) ) ); 2194 assertTrue( values.contains( new Tuple( "4\td\t4\td" ) ) ); 2195 assertTrue( values.contains( new Tuple( "5\te\t5\te" ) ) ); 2196 } 2197 2198 @Test 2199 public void testGroupBySplitSplitGroupByJoin() throws Exception 2200 { 2201 getPlatform().copyFromLocal( inputFileLower ); 2202 2203 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2204 2205 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE ); 2206 2207 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 2208 2209 Pipe pipeFirst = new Pipe( "first" ); 2210 pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter ); 2211 pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) ); 2212 pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL ); 2213 2214 Pipe pipeSecond = new Pipe( "second", pipeFirst ); 2215 pipeSecond = new Each( pipeSecond, new Identity() ); 2216 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2217 pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL ); 2218 2219 Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) ); 2220// Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) ); 2221 2222 splice = new HashJoin( splice, new Fields( 0 ), pipeSecond, new Fields( "num" ), Fields.size( 6 ) ); 2223 2224 Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice ); 2225 2226 flow.complete(); 2227 2228 validateLength( flow, 5, null ); 2229 2230 List<Tuple> values = getSinkAsList( flow ); 2231 2232 assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) ); 2233 assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) ); 2234 assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) ); 2235 assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) ); 2236 assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) ); 2237 } 2238 2239 @Test 2240 public void testGroupBySplitAroundSplitGroupByJoin() throws Exception 2241 { 2242 getPlatform().copyFromLocal( inputFileLower ); 2243 2244 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2245 2246 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE ); 2247 Tap sink2 = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink2" ), SinkMode.REPLACE ); 2248 2249 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 2250 2251 Pipe pipeInit = new Pipe( "init" ); 2252 Pipe pipeFirst = new Pipe( "first", pipeInit ); 2253 pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter ); 2254 pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) ); 2255 pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL ); 2256 2257 Pipe sink2Pipe = new Pipe( "sink2", pipeFirst ); 2258 2259 Pipe pipeSecond = new Pipe( "second", pipeInit ); 2260 pipeSecond = new Each( pipeSecond, new Fields( "line" ), splitter ); 2261 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2262 pipeSecond = new Every( pipeSecond, new Fields( "char" ), new First( new Fields( "secondFirst" ) ), Fields.ALL ); 2263 2264// Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) ); 2265 Pipe splice = new HashJoin( pipeSecond, new Fields( "num" ), pipeFirst, new Fields( "num" ), Fields.size( 4 ) ); 2266 2267 Pipe pipeThird = new Pipe( "third", pipeSecond ); 2268 pipeThird = new Each( pipeThird, new Identity() ); 2269 pipeThird = new GroupBy( pipeThird, new Fields( "num" ) ); 2270 pipeThird = new Every( pipeThird, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL ); 2271 2272 splice = new HashJoin( splice, new Fields( 0 ), pipeThird, new Fields( "num" ), Fields.size( 6 ) ); 2273 2274 FlowDef flowDef = FlowDef.flowDef() 2275 .setName( splice.getName() ) 2276 .addSource( "init", source ) 2277 .addTailSink( splice, sink ) 2278 .addTailSink( sink2Pipe, sink2 ); 2279 2280 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2281 2282 flow.complete(); 2283 2284 validateLength( flow, 5, null ); 2285 2286 List<Tuple> values = getSinkAsList( flow ); 2287 2288 assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\ta" ) ) ); 2289 assertTrue( values.contains( new Tuple( "2\tb\t2\tb\t2\tb" ) ) ); 2290 assertTrue( values.contains( new Tuple( "3\tc\t3\tc\t3\tc" ) ) ); 2291 assertTrue( values.contains( new Tuple( "4\td\t4\td\t4\td" ) ) ); 2292 assertTrue( values.contains( new Tuple( "5\te\t5\te\t5\te" ) ) ); 2293 } 2294 2295 /** 2296 * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together. 2297 * 2298 * @throws Exception 2299 */ 2300 @Test 2301 public void testForkThenJoin() throws Exception 2302 { 2303 getPlatform().copyFromLocal( inputFileLower ); 2304 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2305 2306 Map sources = new HashMap(); 2307 2308 sources.put( "lower", sourceLower ); 2309 2310 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 2311 2312 Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " ); 2313 2314 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 2315 Pipe pipeUpper = new Each( new Pipe( "upper", pipeLower ), new Fields( "text" ), 2316 new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ), 2317 Fields.REPLACE ); 2318 2319 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 2320 2321 Map<Object, Object> properties = getProperties(); 2322 2323 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 2324 2325 flow.complete(); 2326 2327 validateLength( flow, 5 ); 2328 2329 List<Tuple> values = getSinkAsList( flow ); 2330 2331 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2332 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 2333 } 2334 2335 /** 2336 * This test checks for a deadlock when the same input is forked, adapted on one edge, then hashjoined back together. 2337 * 2338 * @throws Exception 2339 */ 2340 @Test 2341 public void testForkCoGroupThenHashJoin() throws Exception 2342 { 2343 getPlatform().copyFromLocal( inputFileLower ); 2344 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2345 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 2346 2347 Map sources = new HashMap(); 2348 2349 sources.put( "sourceLower", sourceLower ); 2350 sources.put( "sourceUpper", sourceUpper ); 2351 2352 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 2353 2354 Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " ); 2355 2356 Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter ); 2357 Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter ); 2358 2359 Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ), 2360 new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ), 2361 Fields.REPLACE ); 2362 Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ), 2363 new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ), 2364 Fields.REPLACE ); 2365 2366 leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) ); 2367 rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) ); 2368 2369 Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) ); 2370 2371 Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) ); 2372 2373 Map<Object, Object> properties = getProperties(); 2374 2375 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, leftSplice ); 2376 2377 flow.complete(); 2378 2379 validateLength( flow, 5 ); 2380 2381 List<Tuple> values = getSinkAsList( flow ); 2382 // that the flow completes at all is already success. 2383 assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta" ) ) ); 2384 assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb" ) ) ); 2385 } 2386 2387 /** 2388 * This test checks for a deadlock when the same input is forked, adapted on one edge, cogroup with something, 2389 * then hashjoined back together. 2390 * 2391 * @throws Exception 2392 */ 2393 @Test 2394 public void testForkCoGroupThenHashJoinCoGroupAgain() throws Exception 2395 { 2396 getPlatform().copyFromLocal( inputFileLower ); 2397 getPlatform().copyFromLocal( inputFileUpper ); 2398 2399 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2400 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 2401 2402 Map sources = new HashMap(); 2403 2404 sources.put( "sourceLower", sourceLower ); 2405 sources.put( "sourceUpper", sourceUpper ); 2406 2407 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 2408 2409 Function splitter = new RegexSplitter( new Fields( "num", "text" ), " " ); 2410 2411 Pipe leftPipeLower = new Each( new Pipe( "sourceLower" ), new Fields( "line" ), splitter ); 2412 Pipe rightPipeUpper = new Each( new Pipe( "sourceUpper" ), new Fields( "line" ), splitter ); 2413 2414 Pipe leftPipeUpper = new Each( new Pipe( "leftUpper", leftPipeLower ), new Fields( "text" ), 2415 new ExpressionFunction( Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class ), 2416 Fields.REPLACE ); 2417 Pipe rightPipeLower = new Each( new Pipe( "rightLower", rightPipeUpper ), new Fields( "text" ), 2418 new ExpressionFunction( Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class ), 2419 Fields.REPLACE ); 2420 2421 leftPipeUpper = new GroupBy( leftPipeUpper, new Fields( "num" ) ); 2422 rightPipeLower = new GroupBy( rightPipeLower, new Fields( "num" ) ); 2423 2424 Pipe middleSplice = new CoGroup( "middleCoGroup", leftPipeUpper, new Fields( "num" ), rightPipeLower, new Fields( "num" ), new Fields( "numM1", "charM1", "numM2", "charM2" ) ); 2425 2426 Pipe leftSplice = new HashJoin( leftPipeLower, new Fields( "num" ), middleSplice, new Fields( "numM1" ) ); 2427 Pipe rightSplice = new HashJoin( rightPipeUpper, new Fields( "num" ), middleSplice, new Fields( "numM2" ) ); 2428 2429 leftSplice = new Rename( leftSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numL1", "charL1", "numM1L", "charM1L", "numM2L", "charM2L" ) ); 2430 rightSplice = new Rename( rightSplice, new Fields( "num", "text", "numM1", "charM1", "numM2", "charM2" ), new Fields( "numR1", "charR1", "numM1R", "charM1R", "numM2R", "charM2R" ) ); 2431 2432 leftSplice = new GroupBy( leftSplice, new Fields( "numM1L" ) ); 2433 rightSplice = new GroupBy( rightSplice, new Fields( "numM2R" ) ); 2434 2435 Pipe splice = new CoGroup( "cogrouping", leftSplice, new Fields( "numM1L" ), rightSplice, new Fields( "numM2R" ) ); 2436 2437 Map<Object, Object> properties = getProperties(); 2438 2439 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 2440 2441 flow.complete(); 2442 2443 validateLength( flow, 5 ); 2444 2445 List<Tuple> values = getSinkAsList( flow ); 2446 2447 // getting this far is a success already (past old deadlocks) 2448 assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA\t1\tA\t1\ta" ) ) ); 2449 assertTrue( values.contains( new Tuple( "2\tb\t2\tB\t2\tb\t2\tB\t2\tB\t2\tb" ) ) ); 2450 } 2451 }