001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.cascade.Cascades; 031import cascading.flow.Flow; 032import cascading.flow.FlowConnectorProps; 033import cascading.flow.FlowDef; 034import cascading.flow.FlowProps; 035import cascading.operation.Function; 036import cascading.operation.Identity; 037import cascading.operation.Insert; 038import cascading.operation.aggregator.Count; 039import cascading.operation.aggregator.First; 040import cascading.operation.regex.RegexFilter; 041import cascading.operation.regex.RegexSplitGenerator; 042import cascading.operation.regex.RegexSplitter; 043import cascading.pipe.CoGroup; 044import cascading.pipe.Each; 045import cascading.pipe.Every; 046import cascading.pipe.GroupBy; 047import cascading.pipe.Pipe; 048import cascading.pipe.assembly.Discard; 049import cascading.pipe.assembly.Rename; 050import cascading.pipe.assembly.Retain; 051import cascading.pipe.joiner.InnerJoin; 052import cascading.pipe.joiner.Joiner; 053import cascading.pipe.joiner.LeftJoin; 054import cascading.pipe.joiner.MixedJoin; 055import cascading.pipe.joiner.OuterJoin; 056import cascading.pipe.joiner.RightJoin; 057import cascading.tap.SinkMode; 058import cascading.tap.Tap; 059import cascading.tuple.Fields; 060import cascading.tuple.Tuple; 061import cascading.util.NullNotEquivalentComparator; 062import org.junit.Test; 063 064import static data.InputData.*; 065 066public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase 067 { 068 public CoGroupFieldedPipesPlatformTest() 069 { 070 super( true, 4, 1 ); // leave cluster testing enabled 071 } 072 073 @Test 074 public void testCross() throws Exception 075 { 076 getPlatform().copyFromLocal( inputFileLhs ); 077 getPlatform().copyFromLocal( inputFileRhs ); 078 079 Map sources = new HashMap(); 080 081 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 082 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 083 084 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 085 086 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 087 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 088 089 Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 090 091 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 092 093 flow.complete(); 094 095 validateLength( flow, 37, null ); 096 097 List<Tuple> values = getSinkAsList( flow ); 098 099 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 100 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 101 } 102 103 @Test 104 public void testCoGroup() throws Exception 105 { 106 getPlatform().copyFromLocal( inputFileLower ); 107 getPlatform().copyFromLocal( inputFileUpper ); 108 109 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 110 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 111 112 Map sources = new HashMap(); 113 114 sources.put( "lower", sourceLower ); 115 sources.put( "upper", sourceUpper ); 116 117 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE ); 118 119 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 120 121 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 122 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 123 124 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) ); 125 126 Map<Object, Object> properties = getProperties(); 127 128 // make sure hasher is getting called, but does nothing special 129 FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() ); 130 131 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 132 133 flow.complete(); 134 135 validateLength( flow, 5 ); 136 137 List<Tuple> values = getSinkAsList( flow ); 138 139 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 140 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 141 } 142 143 @Test 144 public void testCoGroupSamePipeName() throws Exception 145 { 146 getPlatform().copyFromLocal( inputFileLower ); 147 getPlatform().copyFromLocal( inputFileUpper ); 148 149 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 150 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 151 152 Map sources = new HashMap(); 153 154 sources.put( "lower", sourceLower ); 155 sources.put( "upper", sourceUpper ); 156 157 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 158 159 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 160 161 Pipe pipeLower = new Pipe( "lower" ); 162 Pipe pipeUpper = new Pipe( "upper" ); 163 164 // these pipes will hide the source name, and could cause one to be lost 165 pipeLower = new Pipe( "same", pipeLower ); 166 pipeUpper = new Pipe( "same", pipeUpper ); 167 168 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 169 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 170 171// pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 172// pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 173 174 pipeLower = new Pipe( "left", pipeLower ); 175 pipeUpper = new Pipe( "right", pipeUpper ); 176 177// pipeLower = new Each( pipeLower, new Debug( true ) ); 178// pipeUpper = new Each( pipeUpper, new Debug( true ) ); 179 180 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 181 182// splice = new Each( splice, new Debug( true ) ); 183 splice = new Pipe( "splice", splice ); 184 splice = new Pipe( "tail", splice ); 185 186 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 187 188 flow.complete(); 189 190 validateLength( flow, 5 ); 191 192 List<Tuple> values = getSinkAsList( flow ); 193 194 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 195 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 196 } 197 198 @Test 199 public void testCoGroupWithUnknowns() throws Exception 200 { 201 getPlatform().copyFromLocal( inputFileLower ); 202 getPlatform().copyFromLocal( inputFileUpper ); 203 204 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 205 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 206 207 Map sources = new HashMap(); 208 209 sources.put( "lower", sourceLower ); 210 sources.put( "upper", sourceUpper ); 211 212 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 213 214 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 215 216 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 217 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 218 219 Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 220 221 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 222 223 flow.complete(); 224 225 validateLength( flow, 5 ); 226 227 List<Tuple> values = getSinkAsList( flow ); 228 229 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 230 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 231 } 232 233 /** 234 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 235 * a new stream using an outerjoin. 236 * 237 * @throws Exception 238 */ 239 @Test 240 public void testCoGroupFilteredBranch() throws Exception 241 { 242 getPlatform().copyFromLocal( inputFileLower ); 243 getPlatform().copyFromLocal( inputFileUpper ); 244 245 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 246 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 247 248 Map sources = new HashMap(); 249 250 sources.put( "lower", sourceLower ); 251 sources.put( "upper", sourceUpper ); 252 253 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE ); 254 255 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 256 257 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 258 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 259 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 260 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 261 262 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 263 264 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 265 266 flow.complete(); 267 268 validateLength( flow, 5 ); 269 270 List<Tuple> values = getSinkAsList( flow ); 271 272 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 273 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 274 } 275 276 @Test 277 public void testCoGroupSelf() throws Exception 278 { 279 getPlatform().copyFromLocal( inputFileLower ); 280 281 // intentionally creating multiple instances that are equivalent 282 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 283 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 284 285 Map sources = new HashMap(); 286 287 sources.put( "lower", sourceLower ); 288 sources.put( "upper", sourceUpper ); 289 290 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE ); 291 292 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 293 294 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 295 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 296 297 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 298 299 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 300 301 flow.complete(); 302 303 validateLength( flow, 5 ); 304 305 List<Tuple> values = getSinkAsList( flow ); 306 307 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 308 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 309 } 310 311 @Test 312 public void testSplitCoGroupSelf() throws Exception 313 { 314 getPlatform().copyFromLocal( inputFileLower ); 315 316 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 317 318 Map sources = new HashMap(); 319 320 sources.put( "lowerLhs", source ); 321 sources.put( "upperLhs", source ); 322 sources.put( "lowerRhs", source ); 323 sources.put( "upperRhs", source ); 324 325 Tap sinkLhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/lhs" ), SinkMode.REPLACE ); 326 Tap sinkRhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/rhs" ), SinkMode.REPLACE ); 327 328 Map sinks = new HashMap(); 329 330 sinks.put( "lhs", sinkLhs ); 331 sinks.put( "rhs", sinkRhs ); 332 333 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 334 335 Pipe pipeLowerLhs = new Each( new Pipe( "lowerLhs" ), new Fields( "line" ), splitter ); 336 Pipe pipeUpperLhs = new Each( new Pipe( "upperLhs" ), new Fields( "line" ), splitter ); 337 338 Pipe spliceLhs = new CoGroup( "lhs", pipeLowerLhs, new Fields( "num" ), pipeUpperLhs, new Fields( "num" ), Fields.size( 4 ) ); 339 340 Pipe pipeLowerRhs = new Each( new Pipe( "lowerRhs" ), new Fields( "line" ), splitter ); 341 Pipe pipeUpperRhs = new Each( new Pipe( "upperRhs" ), new Fields( "line" ), splitter ); 342 343 Pipe spliceRhs = new CoGroup( "rhs", pipeLowerRhs, new Fields( "num" ), pipeUpperRhs, new Fields( "num" ), Fields.size( 4 ) ); 344 345 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, spliceLhs, spliceRhs ); 346 347 flow.complete(); 348 349 List<Tuple> values = asList( flow, sinkLhs ); 350 351 assertEquals( 5, values.size() ); 352 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 353 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 354 355 values = asList( flow, sinkRhs ); 356 357 assertEquals( 5, values.size() ); 358 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 359 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 360 } 361 362 /** 363 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 364 * 365 * @throws Exception when 366 */ 367 @Test 368 public void testCoGroupAfterEvery() throws Exception 369 { 370 getPlatform().copyFromLocal( inputFileLower ); 371 getPlatform().copyFromLocal( inputFileUpper ); 372 373 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower ); 374 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileUpper ); 375 376 Map sources = new HashMap(); 377 378 sources.put( "lower", sourceLower ); 379 sources.put( "upper", sourceUpper ); 380 381 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 382 383 Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " ); 384 385 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 386 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 387 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 388 389 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 390 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 391 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 392 393 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 394 395 Map<Object, Object> properties = getPlatform().getProperties(); 396 397 properties.put( "cascading.serialization.types.required", "true" ); 398 399 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 400 401 flow.complete(); 402 403 validateLength( flow, 5, null ); 404 405 List<Tuple> values = getSinkAsList( flow ); 406 407 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 408 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 409 } 410 411 /** 412 * Tests that CoGroup properly resolves fields when following an Every 413 * 414 * @throws Exception 415 */ 416 @Test 417 public void testCoGroupAfterEveryNoDeclared() throws Exception 418 { 419 getPlatform().copyFromLocal( inputFileLower ); 420 getPlatform().copyFromLocal( inputFileUpper ); 421 422 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 423 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 424 425 Map sources = new HashMap(); 426 427 sources.put( "lower", sourceLower ); 428 sources.put( "upper", sourceUpper ); 429 430 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE ); 431 432 Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " ); 433 434 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 ); 435 pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL ); 436 pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) ); 437 pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL ); 438 439 Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " ); 440 441 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 ); 442 pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) ); 443 pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL ); 444 445 Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 446 447 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 448 449 flow.complete(); 450 451 validateLength( flow, 5, null ); 452 453 List<Tuple> values = getSinkAsList( flow ); 454 455 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 456 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 457 } 458 459 @Test 460 public void testCoGroupInnerSingleField() throws Exception 461 { 462 getPlatform().copyFromLocal( inputFileLowerOffset ); 463 getPlatform().copyFromLocal( inputFileUpper ); 464 465 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 466 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 467 468 Map sources = new HashMap(); 469 470 sources.put( "lower", sourceLower ); 471 sources.put( "upper", sourceUpper ); 472 473 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE ); 474 475 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 476 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 477 478 Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 479 480 join = new Every( join, new Count() ); 481 482// join = new Each( join, new Debug( true ) ); 483 484 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 485 486 flow.complete(); 487 488 validateLength( flow, 2, null ); 489 490 Set<Tuple> results = new HashSet<Tuple>(); 491 492 results.add( new Tuple( "1\t1\t1" ) ); 493 results.add( new Tuple( "5\t5\t2" ) ); 494 495 List<Tuple> actual = getSinkAsList( flow ); 496 497 results.removeAll( actual ); 498 499 assertEquals( 0, results.size() ); 500 } 501 502 /** 503 * 1 a1 504 * 1 a2 505 * 1 a3 506 * 2 b1 507 * 3 c1 508 * 4 d1 509 * 4 d2 510 * 4 d3 511 * 5 e1 512 * 5 e2 513 * 5 e3 514 * 7 g1 515 * 7 g2 516 * 7 g3 517 * 7 g4 518 * 7 g5 519 * null h1 520 * <p/> 521 * 1 A1 522 * 1 A2 523 * 1 A3 524 * 2 B1 525 * 2 B2 526 * 2 B3 527 * 4 D1 528 * 6 F1 529 * 6 F2 530 * null H1 531 * <p/> 532 * 1 a1 1 A1 533 * 1 a1 1 A2 534 * 1 a1 1 A3 535 * 1 a2 1 A1 536 * 1 a2 1 A2 537 * 1 a2 1 A3 538 * 1 a3 1 A1 539 * 1 a3 1 A2 540 * 1 a3 1 A3 541 * 2 b1 2 B1 542 * 2 b1 2 B2 543 * 2 b1 2 B3 544 * 4 d1 4 D1 545 * 4 d2 4 D1 546 * 4 d3 4 D1 547 * null h1 null H1 548 * 549 * @throws Exception 550 */ 551 @Test 552 public void testCoGroupInner() throws Exception 553 { 554 HashSet<Tuple> results = new HashSet<Tuple>(); 555 556 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 557 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 558 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 559 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 560 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 561 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 562 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 563 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 564 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 565 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 566 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 567 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 568 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 569 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 570 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 571 results.add( new Tuple( null, "h1", null, "H1" ) ); 572 573 handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null ); 574 handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null ); 575 } 576 577 /** 578 * 1 a1 579 * 1 a2 580 * 1 a3 581 * 2 b1 582 * 3 c1 583 * 4 d1 584 * 4 d2 585 * 4 d3 586 * 5 e1 587 * 5 e2 588 * 5 e3 589 * 7 g1 590 * 7 g2 591 * 7 g3 592 * 7 g4 593 * 7 g5 594 * null h1 595 * <p/> 596 * 1 A1 597 * 1 A2 598 * 1 A3 599 * 2 B1 600 * 2 B2 601 * 2 B3 602 * 4 D1 603 * 6 F1 604 * 6 F2 605 * null H1 606 * <p/> 607 * 1 a1 1 A1 608 * 1 a1 1 A2 609 * 1 a1 1 A3 610 * 1 a2 1 A1 611 * 1 a2 1 A2 612 * 1 a2 1 A3 613 * 1 a3 1 A1 614 * 1 a3 1 A2 615 * 1 a3 1 A3 616 * 2 b1 2 B1 617 * 2 b1 2 B2 618 * 2 b1 2 B3 619 * 4 d1 4 D1 620 * 4 d2 4 D1 621 * 4 d3 4 D1 622 * 623 * @throws Exception 624 */ 625 @Test 626 public void testCoGroupInnerNull() throws Exception 627 { 628 HashSet<Tuple> results = new HashSet<Tuple>(); 629 630 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 631 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 632 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 633 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 634 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 635 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 636 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 637 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 638 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 639 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 640 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 641 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 642 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 643 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 644 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 645 646 handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() ); 647 handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() ); 648 } 649 650 /** 651 * 1 a1 652 * 1 a2 653 * 1 a3 654 * 2 b1 655 * 3 c1 656 * 4 d1 657 * 4 d2 658 * 4 d3 659 * 5 e1 660 * 5 e2 661 * 5 e3 662 * 7 g1 663 * 7 g2 664 * 7 g3 665 * 7 g4 666 * 7 g5 667 * null h1 668 * <p/> 669 * 1 A1 670 * 1 A2 671 * 1 A3 672 * 2 B1 673 * 2 B2 674 * 2 B3 675 * 4 D1 676 * 6 F1 677 * 6 F2 678 * null H1 679 * <p/> 680 * 1 a1 1 A1 681 * 1 a1 1 A2 682 * 1 a1 1 A3 683 * 1 a2 1 A1 684 * 1 a2 1 A2 685 * 1 a2 1 A3 686 * 1 a3 1 A1 687 * 1 a3 1 A2 688 * 1 a3 1 A3 689 * 2 b1 2 B1 690 * 2 b1 2 B2 691 * 2 b1 2 B3 692 * 3 c1 null null 693 * 4 d1 4 D1 694 * 4 d2 4 D1 695 * 4 d3 4 D1 696 * 5 e1 null null 697 * 5 e2 null null 698 * 5 e3 null null 699 * null null 6 F1 700 * null null 6 F2 701 * 7 g1 null null 702 * 7 g2 null null 703 * 7 g3 null null 704 * 7 g4 null null 705 * 7 g5 null null 706 * null h1 null H1 707 * 708 * @throws Exception 709 */ 710 @Test 711 public void testCoGroupOuter() throws Exception 712 { 713 Set<Tuple> results = new HashSet<Tuple>(); 714 715 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 716 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 717 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 718 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 719 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 720 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 721 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 722 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 723 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 724 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 725 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 726 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 727 results.add( new Tuple( "3", "c1", null, null ) ); 728 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 729 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 730 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 731 results.add( new Tuple( "5", "e1", null, null ) ); 732 results.add( new Tuple( "5", "e2", null, null ) ); 733 results.add( new Tuple( "5", "e3", null, null ) ); 734 results.add( new Tuple( null, null, "6", "F1" ) ); 735 results.add( new Tuple( null, null, "6", "F2" ) ); 736 results.add( new Tuple( "7", "g1", null, null ) ); 737 results.add( new Tuple( "7", "g2", null, null ) ); 738 results.add( new Tuple( "7", "g3", null, null ) ); 739 results.add( new Tuple( "7", "g4", null, null ) ); 740 results.add( new Tuple( "7", "g5", null, null ) ); 741 results.add( new Tuple( null, "h1", null, "H1" ) ); 742 743 handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null ); 744 handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null ); 745 } 746 747 /** 748 * 1 a1 749 * 1 a2 750 * 1 a3 751 * 2 b1 752 * 3 c1 753 * 4 d1 754 * 4 d2 755 * 4 d3 756 * 5 e1 757 * 5 e2 758 * 5 e3 759 * 7 g1 760 * 7 g2 761 * 7 g3 762 * 7 g4 763 * 7 g5 764 * null h1 765 * <p/> 766 * 1 A1 767 * 1 A2 768 * 1 A3 769 * 2 B1 770 * 2 B2 771 * 2 B3 772 * 4 D1 773 * 6 F1 774 * 6 F2 775 * null H1 776 * <p/> 777 * 1 a1 1 A1 778 * 1 a1 1 A2 779 * 1 a1 1 A3 780 * 1 a2 1 A1 781 * 1 a2 1 A2 782 * 1 a2 1 A3 783 * 1 a3 1 A1 784 * 1 a3 1 A2 785 * 1 a3 1 A3 786 * 2 b1 2 B1 787 * 2 b1 2 B2 788 * 2 b1 2 B3 789 * 3 c1 null null 790 * 4 d1 4 D1 791 * 4 d2 4 D1 792 * 4 d3 4 D1 793 * 5 e1 null null 794 * 5 e2 null null 795 * 5 e3 null null 796 * null null 6 F1 797 * null null 6 F2 798 * 7 g1 null null 799 * 7 g2 null null 800 * 7 g3 null null 801 * 7 g4 null null 802 * 7 g5 null null 803 * null h1 null null 804 * null null null H1 805 * 806 * @throws Exception 807 */ 808 @Test 809 public void testCoGroupOuterNull() throws Exception 810 { 811 Set<Tuple> results = new HashSet<Tuple>(); 812 813 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 814 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 815 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 816 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 817 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 818 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 819 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 820 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 821 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 822 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 823 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 824 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 825 results.add( new Tuple( "3", "c1", null, null ) ); 826 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 827 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 828 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 829 results.add( new Tuple( "5", "e1", null, null ) ); 830 results.add( new Tuple( "5", "e2", null, null ) ); 831 results.add( new Tuple( "5", "e3", null, null ) ); 832 results.add( new Tuple( null, null, "6", "F1" ) ); 833 results.add( new Tuple( null, null, "6", "F2" ) ); 834 results.add( new Tuple( "7", "g1", null, null ) ); 835 results.add( new Tuple( "7", "g2", null, null ) ); 836 results.add( new Tuple( "7", "g3", null, null ) ); 837 results.add( new Tuple( "7", "g4", null, null ) ); 838 results.add( new Tuple( "7", "g5", null, null ) ); 839 results.add( new Tuple( null, "h1", null, null ) ); 840 results.add( new Tuple( null, null, null, "H1" ) ); 841 842 handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() ); 843 handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() ); 844 } 845 846 /** 847 * 1 a1 848 * 1 a2 849 * 1 a3 850 * 2 b1 851 * 3 c1 852 * 4 d1 853 * 4 d2 854 * 4 d3 855 * 5 e1 856 * 5 e2 857 * 5 e3 858 * 7 g1 859 * 7 g2 860 * 7 g3 861 * 7 g4 862 * 7 g5 863 * null h1 864 * <p/> 865 * 1 A1 866 * 1 A2 867 * 1 A3 868 * 2 B1 869 * 2 B2 870 * 2 B3 871 * 4 D1 872 * 6 F1 873 * 6 F2 874 * null H1 875 * <p/> 876 * 1 a1 1 A1 877 * 1 a1 1 A2 878 * 1 a1 1 A3 879 * 1 a2 1 A1 880 * 1 a2 1 A2 881 * 1 a2 1 A3 882 * 1 a3 1 A1 883 * 1 a3 1 A2 884 * 1 a3 1 A3 885 * 2 b1 2 B1 886 * 2 b1 2 B2 887 * 2 b1 2 B3 888 * 3 c1 null null 889 * 4 d1 4 D1 890 * 4 d2 4 D1 891 * 4 d3 4 D1 892 * 5 e1 null null 893 * 5 e2 null null 894 * 5 e3 null null 895 * 7 g1 null null 896 * 7 g2 null null 897 * 7 g3 null null 898 * 7 g4 null null 899 * 7 g5 null null 900 * null h1 null H1 901 * 902 * @throws Exception 903 */ 904 @Test 905 public void testCoGroupInnerOuter() throws Exception 906 { 907 Set<Tuple> results = new HashSet<Tuple>(); 908 909 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 910 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 911 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 912 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 913 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 914 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 915 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 916 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 917 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 918 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 919 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 920 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 921 results.add( new Tuple( "3", "c1", null, null ) ); 922 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 923 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 924 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 925 results.add( new Tuple( "5", "e1", null, null ) ); 926 results.add( new Tuple( "5", "e2", null, null ) ); 927 results.add( new Tuple( "5", "e3", null, null ) ); 928 results.add( new Tuple( "7", "g1", null, null ) ); 929 results.add( new Tuple( "7", "g2", null, null ) ); 930 results.add( new Tuple( "7", "g3", null, null ) ); 931 results.add( new Tuple( "7", "g4", null, null ) ); 932 results.add( new Tuple( "7", "g5", null, null ) ); 933 results.add( new Tuple( null, "h1", null, "H1" ) ); 934 935 handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null ); 936 handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null ); 937 } 938 939 /** 940 * 1 a1 941 * 1 a2 942 * 1 a3 943 * 2 b1 944 * 3 c1 945 * 4 d1 946 * 4 d2 947 * 4 d3 948 * 5 e1 949 * 5 e2 950 * 5 e3 951 * 7 g1 952 * 7 g2 953 * 7 g3 954 * 7 g4 955 * 7 g5 956 * null h1 957 * <p/> 958 * 1 A1 959 * 1 A2 960 * 1 A3 961 * 2 B1 962 * 2 B2 963 * 2 B3 964 * 4 D1 965 * 6 F1 966 * 6 F2 967 * null H1 968 * <p/> 969 * 1 a1 1 A1 970 * 1 a1 1 A2 971 * 1 a1 1 A3 972 * 1 a2 1 A1 973 * 1 a2 1 A2 974 * 1 a2 1 A3 975 * 1 a3 1 A1 976 * 1 a3 1 A2 977 * 1 a3 1 A3 978 * 2 b1 2 B1 979 * 2 b1 2 B2 980 * 2 b1 2 B3 981 * 3 c1 null null 982 * 4 d1 4 D1 983 * 4 d2 4 D1 984 * 4 d3 4 D1 985 * 5 e1 null null 986 * 5 e2 null null 987 * 5 e3 null null 988 * 7 g1 null null 989 * 7 g2 null null 990 * 7 g3 null null 991 * 7 g4 null null 992 * 7 g5 null null 993 * null h1 null null 994 * 995 * @throws Exception 996 */ 997 @Test 998 public void testCoGroupInnerOuterNull() throws Exception 999 { 1000 Set<Tuple> results = new HashSet<Tuple>(); 1001 1002 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1003 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1004 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1005 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1006 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1007 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1008 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1009 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1010 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1011 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1012 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1013 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1014 results.add( new Tuple( "3", "c1", null, null ) ); 1015 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1016 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1017 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1018 results.add( new Tuple( "5", "e1", null, null ) ); 1019 results.add( new Tuple( "5", "e2", null, null ) ); 1020 results.add( new Tuple( "5", "e3", null, null ) ); 1021 results.add( new Tuple( "7", "g1", null, null ) ); 1022 results.add( new Tuple( "7", "g2", null, null ) ); 1023 results.add( new Tuple( "7", "g3", null, null ) ); 1024 results.add( new Tuple( "7", "g4", null, null ) ); 1025 results.add( new Tuple( "7", "g5", null, null ) ); 1026 results.add( new Tuple( null, "h1", null, null ) ); 1027 1028 handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() ); 1029 handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() ); 1030 } 1031 1032 /** 1033 * 1 a1 1034 * 1 a2 1035 * 1 a3 1036 * 2 b1 1037 * 3 c1 1038 * 4 d1 1039 * 4 d2 1040 * 4 d3 1041 * 5 e1 1042 * 5 e2 1043 * 5 e3 1044 * 7 g1 1045 * 7 g2 1046 * 7 g3 1047 * 7 g4 1048 * 7 g5 1049 * null h1 1050 * <p/> 1051 * 1 A1 1052 * 1 A2 1053 * 1 A3 1054 * 2 B1 1055 * 2 B2 1056 * 2 B3 1057 * 4 D1 1058 * 6 F1 1059 * 6 F2 1060 * null H1 1061 * <p/> 1062 * 1 a1 1 A1 1063 * 1 a1 1 A2 1064 * 1 a1 1 A3 1065 * 1 a2 1 A1 1066 * 1 a2 1 A2 1067 * 1 a2 1 A3 1068 * 1 a3 1 A1 1069 * 1 a3 1 A2 1070 * 1 a3 1 A3 1071 * 2 b1 2 B1 1072 * 2 b1 2 B2 1073 * 2 b1 2 B3 1074 * 4 d1 4 D1 1075 * 4 d2 4 D1 1076 * 4 d3 4 D1 1077 * null null 6 F1 1078 * null null 6 F2 1079 * null h1 null H1 1080 * 1081 * @throws Exception 1082 */ 1083 @Test 1084 public void testCoGroupOuterInner() throws Exception 1085 { 1086 Set<Tuple> results = new HashSet<Tuple>(); 1087 1088 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1089 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1090 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1091 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1092 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1093 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1094 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1095 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1096 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1097 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1098 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1099 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1100 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1101 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1102 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1103 results.add( new Tuple( null, null, "6", "F1" ) ); 1104 results.add( new Tuple( null, null, "6", "F2" ) ); 1105 results.add( new Tuple( null, "h1", null, "H1" ) ); 1106 1107 handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null ); 1108 handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null ); 1109 } 1110 1111 /** 1112 * 1 a1 1113 * 1 a2 1114 * 1 a3 1115 * 2 b1 1116 * 3 c1 1117 * 4 d1 1118 * 4 d2 1119 * 4 d3 1120 * 5 e1 1121 * 5 e2 1122 * 5 e3 1123 * 7 g1 1124 * 7 g2 1125 * 7 g3 1126 * 7 g4 1127 * 7 g5 1128 * null h1 1129 * <p/> 1130 * 1 A1 1131 * 1 A2 1132 * 1 A3 1133 * 2 B1 1134 * 2 B2 1135 * 2 B3 1136 * 4 D1 1137 * 6 F1 1138 * 6 F2 1139 * null H1 1140 * <p/> 1141 * 1 a1 1 A1 1142 * 1 a1 1 A2 1143 * 1 a1 1 A3 1144 * 1 a2 1 A1 1145 * 1 a2 1 A2 1146 * 1 a2 1 A3 1147 * 1 a3 1 A1 1148 * 1 a3 1 A2 1149 * 1 a3 1 A3 1150 * 2 b1 2 B1 1151 * 2 b1 2 B2 1152 * 2 b1 2 B3 1153 * 4 d1 4 D1 1154 * 4 d2 4 D1 1155 * 4 d3 4 D1 1156 * null null 6 F1 1157 * null null 6 F2 1158 * null null null H1 1159 * 1160 * @throws Exception 1161 */ 1162 @Test 1163 public void testCoGroupOuterInnerNull() throws Exception 1164 { 1165 Set<Tuple> results = new HashSet<Tuple>(); 1166 1167 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1168 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1169 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1170 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1171 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1172 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1173 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1174 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1175 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1176 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1177 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1178 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1179 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1180 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1181 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1182 results.add( new Tuple( null, null, "6", "F1" ) ); 1183 results.add( new Tuple( null, null, "6", "F2" ) ); 1184 results.add( new Tuple( null, null, null, "H1" ) ); 1185 1186 handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() ); 1187 handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() ); 1188 } 1189 1190 private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception 1191 { 1192 results = new HashSet<Tuple>( results ); 1193 1194 getPlatform().copyFromLocal( inputFileLhsSparse ); 1195 getPlatform().copyFromLocal( inputFileRhsSparse ); 1196 1197 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 1198 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 1199 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 1200 1201 Map sources = new HashMap(); 1202 1203 sources.put( "lower", sourceLower ); 1204 sources.put( "upper", sourceUpper ); 1205 1206 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 1207 1208 Pipe pipeLower = new Pipe( "lower" ); 1209 Pipe pipeUpper = new Pipe( "upper" ); 1210 1211 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 1212 1213 Fields groupFields = new Fields( "num" ); 1214 1215 if( comparator != null ) 1216 groupFields.setComparator( 0, comparator ); 1217 1218 Pipe splice; 1219 if( useResultGroupFields ) 1220 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner ); 1221 else 1222 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner ); 1223 1224 splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS ); 1225 1226 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1227 1228 flow.complete(); 1229 1230 validateLength( flow, results.size() ); 1231 1232 List<Tuple> actual = getSinkAsList( flow ); 1233 1234 results.removeAll( actual ); 1235 1236 assertEquals( 0, results.size() ); 1237 } 1238 1239 /** 1240 * 1 a 1241 * 5 b 1242 * 6 c 1243 * 5 b 1244 * 5 e 1245 * <p/> 1246 * 1 A 1247 * 2 B 1248 * 3 C 1249 * 4 D 1250 * 5 E 1251 * <p/> 1252 * 1 a 1253 * 2 b 1254 * 3 c 1255 * 4 d 1256 * 5 e 1257 * <p/> 1258 * 1 a 1 A 1 a 1259 * - - 2 B 2 b 1260 * - - 3 C 3 c 1261 * - - 4 D 4 d 1262 * 5 b 5 E 5 e 1263 * 5 e 5 E 5 e 1264 * 1265 * @throws Exception 1266 */ 1267 @Test 1268 public void testCoGroupMixed() throws Exception 1269 { 1270 getPlatform().copyFromLocal( inputFileLowerOffset ); 1271 getPlatform().copyFromLocal( inputFileLower ); 1272 getPlatform().copyFromLocal( inputFileUpper ); 1273 1274 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 1275 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1276 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1277 1278 Map sources = new HashMap(); 1279 1280 sources.put( "loweroffset", sourceLowerOffset ); 1281 sources.put( "lower", sourceLower ); 1282 sources.put( "upper", sourceUpper ); 1283 1284 Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE ); 1285 1286 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1287 1288 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 1289 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1290 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1291 1292 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 1293 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 1294 1295 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 1296 Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join ); 1297 1298 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1299 1300 flow.complete(); 1301 1302 validateLength( flow, 6 ); 1303 1304 Set<Tuple> results = new HashSet<Tuple>(); 1305 1306 results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) ); 1307 results.add( new Tuple( null, null, "2", "B", "2", "b" ) ); 1308 results.add( new Tuple( null, null, "3", "C", "3", "c" ) ); 1309 results.add( new Tuple( null, null, "4", "D", "4", "d" ) ); 1310 results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) ); 1311 results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) ); 1312 1313 List<Tuple> actual = getSinkAsList( flow ); 1314 1315 results.removeAll( actual ); 1316 1317 assertEquals( 0, results.size() ); 1318 } 1319 1320 @Test 1321 public void testCoGroupDiffFields() throws Exception 1322 { 1323 getPlatform().copyFromLocal( inputFileLower ); 1324 getPlatform().copyFromLocal( inputFileUpper ); 1325 1326 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1327 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1328 1329 Map sources = new HashMap(); 1330 1331 sources.put( "lower", sourceLower ); 1332 sources.put( "upper", sourceUpper ); 1333 1334 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 1335 1336 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1337 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1338 1339 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1340 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1341 1342 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1343 1344 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1345 1346 flow.complete(); 1347 1348 validateLength( flow, 5 ); 1349 1350 List<Tuple> actual = getSinkAsList( flow ); 1351 1352 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1353 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1354 } 1355 1356 @Test 1357 public void testCoGroupGroupBy() throws Exception 1358 { 1359 getPlatform().copyFromLocal( inputFileLower ); 1360 getPlatform().copyFromLocal( inputFileUpper ); 1361 1362 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1363 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1364 1365 Map sources = new HashMap(); 1366 1367 sources.put( "lower", sourceLower ); 1368 sources.put( "upper", sourceUpper ); 1369 1370 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE ); 1371 1372 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ).applyTypes( String.class, String.class ), " " ); 1373 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ).applyTypes( String.class, String.class ), " " ); 1374 1375 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1376 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1377 1378 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1379 1380 Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) ); 1381 1382 Map<Object, Object> properties = getPlatform().getProperties(); 1383 1384 properties.put( "cascading.serialization.types.required", "true" ); 1385 1386 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, groupby ); 1387 1388 flow.complete(); 1389 1390 validateLength( flow, 5, null ); 1391 1392 List<Tuple> actual = getSinkAsList( flow ); 1393 1394 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1395 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1396 } 1397 1398 @Test 1399 public void testCoGroupSamePipe() throws Exception 1400 { 1401 getPlatform().copyFromLocal( inputFileLower ); 1402 1403 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1404 1405 Map sources = new HashMap(); 1406 1407 sources.put( "lower", source ); 1408 1409 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 1410 1411 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1412 1413 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1414 1415 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 1416 1417 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1418 1419 flow.complete(); 1420 1421 validateLength( flow, 5, null ); 1422 1423 List<Tuple> actual = getSinkAsList( flow ); 1424 1425 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1426 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1427 } 1428 1429 @Test 1430 public void testCoGroupSamePipe2() throws Exception 1431 { 1432 getPlatform().copyFromLocal( inputFileLower ); 1433 1434 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1435 1436 Map sources = new HashMap(); 1437 1438 sources.put( "lower", source ); 1439 1440 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 1441 1442 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1443 1444 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1445 1446 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1447 1448 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1449 1450 flow.complete(); 1451 1452 validateLength( flow, 5, null ); 1453 1454 List<Tuple> actual = getSinkAsList( flow ); 1455 1456 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1457 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1458 } 1459 1460 @Test 1461 public void testCoGroupSamePipe3() throws Exception 1462 { 1463 getPlatform().copyFromLocal( inputFileLower ); 1464 1465 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1466 1467 Map sources = new HashMap(); 1468 1469 sources.put( "lower", source ); 1470 1471 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1472 1473 Pipe pipe = new Pipe( "lower" ); 1474 1475 Pipe lhs = new Pipe( "lhs", pipe ); 1476 Pipe rhs = new Pipe( "rhs", pipe ); 1477 1478 Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1479 1480 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1481 1482 flow.complete(); 1483 1484 validateLength( flow, 5, null ); 1485 1486 List<Tuple> actual = getSinkAsList( flow ); 1487 1488 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1489 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1490 } 1491 1492 @Test 1493 public void testCoGroupAroundCoGroup() throws Exception 1494 { 1495 getPlatform().copyFromLocal( inputFileLower ); 1496 getPlatform().copyFromLocal( inputFileUpper ); 1497 1498 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1499 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1500 1501 Map sources = new HashMap(); 1502 1503 sources.put( "lower", sourceLower ); 1504 sources.put( "upper1", sourceUpper ); 1505 sources.put( "upper2", sourceUpper ); 1506 1507 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE ); 1508 1509 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1510 1511 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1512 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1513 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1514 1515 Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1516 1517 splice1 = new Each( splice1, new Identity() ); 1518 1519 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1520 1521 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1522 1523 flow.complete(); 1524 1525 validateLength( flow, 5, null ); 1526 1527 List<Tuple> actual = getSinkAsList( flow ); 1528 1529 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1530 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1531 } 1532 1533 @Test 1534 public void testCoGroupAroundCoGroupWithout() throws Exception 1535 { 1536 runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" ); 1537 } 1538 1539 @Test 1540 public void testCoGroupAroundCoGroupWith() throws Exception 1541 { 1542 // hack to get classname 1543 runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" ); 1544 } 1545 1546 private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException 1547 { 1548 getPlatform().copyFromLocal( inputFileNums20 ); 1549 getPlatform().copyFromLocal( inputFileNums10 ); 1550 1551 Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ); 1552 Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 ); 1553 1554 Map sources = new HashMap(); 1555 1556 sources.put( "source20", source20 ); 1557 sources.put( "source101", source10 ); 1558 sources.put( "source102", source10 ); 1559 1560 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE ); 1561 1562 Pipe pipeNum20 = new Pipe( "source20" ); 1563 Pipe pipeNum101 = new Pipe( "source101" ); 1564 Pipe pipeNum102 = new Pipe( "source102" ); 1565 1566 Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) ); 1567 1568 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) ); 1569 1570 splice2 = new Each( splice2, new Identity() ); 1571 1572 Map<Object, Object> properties = getPlatform().getProperties(); 1573 1574 if( getPlatform().isMapReduce() ) 1575 FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass ); 1576 1577 Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 ); 1578 1579 if( getPlatform().isMapReduce() ) 1580 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1581 1582 flow.complete(); 1583 1584 validateLength( flow, 10 ); 1585 1586 List<Tuple> actual = getSinkAsList( flow ); 1587 1588 assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) ); 1589 assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) ); 1590 } 1591 1592 @Test 1593 public void testCoGroupDiffFieldsSameFile() throws Exception 1594 { 1595 getPlatform().copyFromLocal( inputFileLower ); 1596 getPlatform().copyFromLocal( inputFileUpper ); 1597 1598 Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1599 Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower ); 1600 1601 Map sources = new HashMap(); 1602 1603 sources.put( "offsetLower", sourceOffsetLower ); 1604 sources.put( "lower", sourceLower ); 1605 1606 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE ); 1607 1608 Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " ); 1609 Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " ); 1610 1611 Pipe offsetLower = new Pipe( "offsetLower" ); 1612 offsetLower = new Discard( offsetLower, new Fields( "offset" ) ); 1613 offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower ); 1614 1615 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper ); 1616 1617 Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) ); 1618 1619 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1620 1621 flow.complete(); 1622 1623 validateLength( flow, 5 ); 1624 1625 List<Tuple> actual = getSinkAsList( flow ); 1626 1627 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1628 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1629 } 1630 1631 @Test 1632 public void testJoinNone() throws Exception 1633 { 1634 getPlatform().copyFromLocal( inputFileLower ); 1635 getPlatform().copyFromLocal( inputFileUpper ); 1636 1637 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1638 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1639 1640 Map sources = new HashMap(); 1641 1642 sources.put( "lower", sourceLower ); 1643 sources.put( "upper", sourceUpper ); 1644 1645 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1646 1647 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1648 1649 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1650 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1651 1652 Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1653 1654 Map<Object, Object> properties = getProperties(); 1655 1656 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1657 1658 flow.complete(); 1659 1660 validateLength( flow, 25 ); 1661 1662 List<Tuple> values = getSinkAsList( flow ); 1663 1664 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1665 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1666 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1667 } 1668 1669 @Test 1670 public void testMultiJoin() throws Exception 1671 { 1672 getPlatform().copyFromLocal( inputFileCrossX2 ); 1673 1674 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 ); 1675 Tap innerSink = getPlatform().getTextFile( getOutputPath( "inner" ), SinkMode.REPLACE ); 1676 Tap outerSink = getPlatform().getTextFile( getOutputPath( "outer" ), SinkMode.REPLACE ); 1677 Tap leftSink = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE ); 1678 Tap rightSink = getPlatform().getTextFile( getOutputPath( "right" ), SinkMode.REPLACE ); 1679 1680 Pipe uniques = new Pipe( "unique" ); 1681 1682 uniques = new Each( uniques, new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1683 1684 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1685 1686 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1687 1688// uniques = new Each( uniques, new Debug( true ) ); 1689 1690 Pipe fielded = new Pipe( "fielded" ); 1691 1692 fielded = new Each( fielded, new Fields( "line" ), new RegexSplitter( "\\s" ) ); 1693 1694// fielded = new Each( fielded, new Debug( true ) ); 1695 1696 Pipe inner = new CoGroup( "inner", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new InnerJoin() ); 1697 Pipe outer = new CoGroup( "outer", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new OuterJoin() ); 1698 Pipe left = new CoGroup( "left", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new LeftJoin() ); 1699 Pipe right = new CoGroup( "right", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new RightJoin() ); 1700 1701 Pipe[] heads = Pipe.pipes( uniques, fielded ); 1702 Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) ); 1703 1704 Pipe[] tails = Pipe.pipes( inner, outer, left, right ); 1705 Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) ); 1706 1707 Flow flow = getPlatform().getFlowConnector().connect( "multi-joins", sources, sinks, tails ); 1708 1709 flow.complete(); 1710 1711 validateLength( flow.openTapForRead( innerSink ), 74 ); 1712 validateLength( flow.openTapForRead( outerSink ), 84 ); 1713 validateLength( flow.openTapForRead( leftSink ), 74 ); 1714 validateLength( flow.openTapForRead( rightSink ), 84 ); 1715 } 1716 1717 /** 1718 * This test checks that a valid node is created when adding back remainders during unique 1719 * path sub-graph partitioning. 1720 */ 1721 @Test 1722 public void testMultiJoinWithSplits() throws Exception 1723 { 1724 getPlatform().copyFromLocal( inputFileCrossX2 ); 1725 1726 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 ); 1727 Tap innerSinkLhs = getPlatform().getTextFile( getOutputPath( "innerLhs" ), SinkMode.REPLACE ); 1728 Tap uniqueSinkLhs = getPlatform().getTextFile( getOutputPath( "uniquesLhs" ), SinkMode.REPLACE ); 1729 Tap innerSinkRhs = getPlatform().getTextFile( getOutputPath( "innerRhs" ), SinkMode.REPLACE ); 1730 Tap uniqueSinkRhs = getPlatform().getTextFile( getOutputPath( "uniquesRhs" ), SinkMode.REPLACE ); 1731 1732 Pipe incoming = new Pipe( "incoming" ); 1733 Pipe uniquesLhs; 1734 Pipe innerLhs; 1735 Pipe uniquesRhs; 1736 Pipe innerRhs; 1737 1738 { 1739 Pipe generatorLhs = new Each( new Pipe( "genLhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1740 1741 generatorLhs = new Each( generatorLhs, new Identity() ); 1742 1743 Pipe generatorRhs = new Each( new Pipe( "genRhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1744 1745 Pipe uniques = new Each( new Pipe( "uniquesLhs", generatorLhs ), new Identity() ); 1746 1747 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1748 1749 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1750 1751 Pipe lhs = new Pipe( "lhs", generatorLhs ); 1752 1753 lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) ); 1754 1755 Pipe rhs = new Pipe( "rhs", generatorRhs ); 1756 1757 rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) ); 1758 1759 Pipe inner = new CoGroup( "innerLhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() ); 1760 1761 uniquesLhs = uniques; 1762 innerLhs = inner; 1763 } 1764 1765 { 1766 Pipe generatorLhs = new Each( new Pipe( "genLhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1767 1768 generatorLhs = new Each( generatorLhs, new Identity() ); 1769 1770 Pipe generatorRhs = new Each( new Pipe( "genRhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1771 1772 Pipe uniques = new Each( new Pipe( "uniquesRhs", generatorLhs ), new Identity() ); 1773 1774 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1775 1776 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1777 1778 Pipe lhs = new Pipe( "lhs", generatorLhs ); 1779 1780 lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) ); 1781 1782 Pipe rhs = new Pipe( "rhs", generatorRhs ); 1783 1784 rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) ); 1785 1786 Pipe inner = new CoGroup( "innerRhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() ); 1787 1788 uniquesRhs = uniques; 1789 innerRhs = inner; 1790 } 1791 1792 FlowDef flowDef = FlowDef.flowDef() 1793 .setName( "multi-joins" ) 1794 .addSource( "incoming", source ) 1795 .addTailSink( innerLhs, innerSinkLhs ) 1796 .addTailSink( uniquesLhs, uniqueSinkLhs ) 1797 .addTailSink( innerRhs, innerSinkRhs ) 1798 .addTailSink( uniquesRhs, uniqueSinkRhs ); 1799 1800 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 1801 1802 flow.complete(); 1803 1804 validateLength( flow.openTapForRead( innerSinkLhs ), 3900 ); 1805 validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 ); 1806 validateLength( flow.openTapForRead( innerSinkRhs ), 3900 ); 1807 validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 ); 1808 } 1809 1810 @Test 1811 public void testSameSourceGroupSplitCoGroup() throws Exception 1812 { 1813 getPlatform().copyFromLocal( inputFileLower ); 1814 1815 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower ); 1816 1817 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE ); 1818 1819 Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " ); 1820 1821 Pipe sourcePipe = new Each( new Pipe( "source" ), new Fields( "line" ), splitter ); 1822 sourcePipe = new GroupBy( sourcePipe, new Fields( "num" ) ); 1823 sourcePipe = new Every( sourcePipe, new Fields( "char" ), new First( new Fields( "first", String.class ) ), Fields.ALL ); 1824 1825 Pipe lhsPipe = new Retain( new Pipe( "lhs", sourcePipe ), Fields.ALL ); 1826 Pipe rhsPipe = new Retain( new Pipe( "rhs", sourcePipe ), Fields.ALL ); 1827 1828 Pipe splice = new CoGroup( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), Fields.size( 4 ) ); 1829 1830 Map<Object, Object> properties = getPlatform().getProperties(); 1831 1832 properties.put( "cascading.serialization.types.required", "true" ); 1833 1834 Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, splice ); 1835 1836 flow.complete(); 1837 1838 validateLength( flow, 5, null ); 1839 1840 List<Tuple> values = getSinkAsList( flow ); 1841 1842 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1843 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1844 } 1845 }