001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading; 023 024import java.io.IOException; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.cascade.Cascades; 032import cascading.flow.Flow; 033import cascading.flow.FlowConnectorProps; 034import cascading.flow.FlowDef; 035import cascading.flow.FlowProps; 036import cascading.operation.Function; 037import cascading.operation.Identity; 038import cascading.operation.Insert; 039import cascading.operation.aggregator.Count; 040import cascading.operation.aggregator.First; 041import cascading.operation.regex.RegexFilter; 042import cascading.operation.regex.RegexSplitGenerator; 043import cascading.operation.regex.RegexSplitter; 044import cascading.pipe.CoGroup; 045import cascading.pipe.Each; 046import cascading.pipe.Every; 047import cascading.pipe.GroupBy; 048import cascading.pipe.Pipe; 049import cascading.pipe.assembly.Discard; 050import cascading.pipe.assembly.Rename; 051import cascading.pipe.assembly.Retain; 052import cascading.pipe.joiner.InnerJoin; 053import cascading.pipe.joiner.Joiner; 054import cascading.pipe.joiner.LeftJoin; 055import cascading.pipe.joiner.MixedJoin; 056import cascading.pipe.joiner.OuterJoin; 057import cascading.pipe.joiner.RightJoin; 058import cascading.tap.SinkMode; 059import cascading.tap.Tap; 060import cascading.tuple.Fields; 061import cascading.tuple.Tuple; 062import cascading.util.NullNotEquivalentComparator; 063import org.junit.Test; 064 065import static data.InputData.*; 066 067public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase 068 { 069 public CoGroupFieldedPipesPlatformTest() 070 { 071 super( true, 4, 1 ); // leave cluster testing enabled 072 } 073 074 @Test 075 public void testCross() throws Exception 076 { 077 getPlatform().copyFromLocal( inputFileLhs ); 078 getPlatform().copyFromLocal( inputFileRhs ); 079 080 Map sources = new HashMap(); 081 082 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 083 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 084 085 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 086 087 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 088 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 089 090 Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 091 092 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 093 094 flow.complete(); 095 096 validateLength( flow, 37, null ); 097 098 List<Tuple> values = getSinkAsList( flow ); 099 100 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 101 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 102 } 103 104 @Test 105 public void testCoGroup() throws Exception 106 { 107 getPlatform().copyFromLocal( inputFileLower ); 108 getPlatform().copyFromLocal( inputFileUpper ); 109 110 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 111 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 112 113 Map sources = new HashMap(); 114 115 sources.put( "lower", sourceLower ); 116 sources.put( "upper", sourceUpper ); 117 118 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE ); 119 120 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 121 122 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 123 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 124 125 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) ); 126 127 Map<Object, Object> properties = getProperties(); 128 129 // make sure hasher is getting called, but does nothing special 130 FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() ); 131 132 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 133 134 flow.complete(); 135 136 validateLength( flow, 5 ); 137 138 List<Tuple> values = getSinkAsList( flow ); 139 140 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 141 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 142 } 143 144 @Test 145 public void testCoGroupSamePipeName() throws Exception 146 { 147 getPlatform().copyFromLocal( inputFileLower ); 148 getPlatform().copyFromLocal( inputFileUpper ); 149 150 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 151 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 152 153 Map sources = new HashMap(); 154 155 sources.put( "lower", sourceLower ); 156 sources.put( "upper", sourceUpper ); 157 158 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 159 160 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 161 162 Pipe pipeLower = new Pipe( "lower" ); 163 Pipe pipeUpper = new Pipe( "upper" ); 164 165 // these pipes will hide the source name, and could cause one to be lost 166 pipeLower = new Pipe( "same", pipeLower ); 167 pipeUpper = new Pipe( "same", pipeUpper ); 168 169 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 170 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 171 172// pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 173// pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 174 175 pipeLower = new Pipe( "left", pipeLower ); 176 pipeUpper = new Pipe( "right", pipeUpper ); 177 178// pipeLower = new Each( pipeLower, new Debug( true ) ); 179// pipeUpper = new Each( pipeUpper, new Debug( true ) ); 180 181 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 182 183// splice = new Each( splice, new Debug( true ) ); 184 splice = new Pipe( "splice", splice ); 185 splice = new Pipe( "tail", splice ); 186 187 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 188 189 flow.complete(); 190 191 validateLength( flow, 5 ); 192 193 List<Tuple> values = getSinkAsList( flow ); 194 195 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 196 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 197 } 198 199 @Test 200 public void testCoGroupWithUnknowns() throws Exception 201 { 202 getPlatform().copyFromLocal( inputFileLower ); 203 getPlatform().copyFromLocal( inputFileUpper ); 204 205 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 206 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 207 208 Map sources = new HashMap(); 209 210 sources.put( "lower", sourceLower ); 211 sources.put( "upper", sourceUpper ); 212 213 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 214 215 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 216 217 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 218 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 219 220 Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 221 222 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 223 224 flow.complete(); 225 226 validateLength( flow, 5 ); 227 228 List<Tuple> values = getSinkAsList( flow ); 229 230 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 231 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 232 } 233 234 /** 235 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 236 * a new stream using an outerjoin. 237 * 238 * @throws Exception 239 */ 240 @Test 241 public void testCoGroupFilteredBranch() throws Exception 242 { 243 getPlatform().copyFromLocal( inputFileLower ); 244 getPlatform().copyFromLocal( inputFileUpper ); 245 246 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 247 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 248 249 Map sources = new HashMap(); 250 251 sources.put( "lower", sourceLower ); 252 sources.put( "upper", sourceUpper ); 253 254 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE ); 255 256 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 257 258 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 259 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 260 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 261 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 262 263 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 264 265 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 266 267 flow.complete(); 268 269 validateLength( flow, 5 ); 270 271 List<Tuple> values = getSinkAsList( flow ); 272 273 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 274 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 275 } 276 277 @Test 278 public void testCoGroupSelf() throws Exception 279 { 280 getPlatform().copyFromLocal( inputFileLower ); 281 282 // intentionally creating multiple instances that are equivalent 283 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 284 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 285 286 Map sources = new HashMap(); 287 288 sources.put( "lower", sourceLower ); 289 sources.put( "upper", sourceUpper ); 290 291 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE ); 292 293 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 294 295 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 296 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 297 298 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 299 300 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 301 302 flow.complete(); 303 304 validateLength( flow, 5 ); 305 306 List<Tuple> values = getSinkAsList( flow ); 307 308 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 309 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 310 } 311 312 @Test 313 public void testSplitCoGroupSelf() throws Exception 314 { 315 getPlatform().copyFromLocal( inputFileLower ); 316 317 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 318 319 Map sources = new HashMap(); 320 321 sources.put( "lowerLhs", source ); 322 sources.put( "upperLhs", source ); 323 sources.put( "lowerRhs", source ); 324 sources.put( "upperRhs", source ); 325 326 Tap sinkLhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/lhs" ), SinkMode.REPLACE ); 327 Tap sinkRhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/rhs" ), SinkMode.REPLACE ); 328 329 Map sinks = new HashMap(); 330 331 sinks.put( "lhs", sinkLhs ); 332 sinks.put( "rhs", sinkRhs ); 333 334 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 335 336 Pipe pipeLowerLhs = new Each( new Pipe( "lowerLhs" ), new Fields( "line" ), splitter ); 337 Pipe pipeUpperLhs = new Each( new Pipe( "upperLhs" ), new Fields( "line" ), splitter ); 338 339 Pipe spliceLhs = new CoGroup( "lhs", pipeLowerLhs, new Fields( "num" ), pipeUpperLhs, new Fields( "num" ), Fields.size( 4 ) ); 340 341 Pipe pipeLowerRhs = new Each( new Pipe( "lowerRhs" ), new Fields( "line" ), splitter ); 342 Pipe pipeUpperRhs = new Each( new Pipe( "upperRhs" ), new Fields( "line" ), splitter ); 343 344 Pipe spliceRhs = new CoGroup( "rhs", pipeLowerRhs, new Fields( "num" ), pipeUpperRhs, new Fields( "num" ), Fields.size( 4 ) ); 345 346 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, spliceLhs, spliceRhs ); 347 348 flow.complete(); 349 350 List<Tuple> values = asList( flow, sinkLhs ); 351 352 assertEquals( 5, values.size() ); 353 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 354 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 355 356 values = asList( flow, sinkRhs ); 357 358 assertEquals( 5, values.size() ); 359 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 360 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 361 } 362 363 /** 364 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 365 * 366 * @throws Exception when 367 */ 368 @Test 369 public void testCoGroupAfterEvery() throws Exception 370 { 371 getPlatform().copyFromLocal( inputFileLower ); 372 getPlatform().copyFromLocal( inputFileUpper ); 373 374 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower ); 375 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileUpper ); 376 377 Map sources = new HashMap(); 378 379 sources.put( "lower", sourceLower ); 380 sources.put( "upper", sourceUpper ); 381 382 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 383 384 Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " ); 385 386 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 387 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 388 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 389 390 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 391 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 392 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 393 394 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 395 396 Map<Object, Object> properties = getPlatform().getProperties(); 397 398 properties.put( "cascading.serialization.types.required", "true" ); 399 400 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 401 402 flow.complete(); 403 404 validateLength( flow, 5, null ); 405 406 List<Tuple> values = getSinkAsList( flow ); 407 408 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 409 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 410 } 411 412 /** 413 * Tests that CoGroup properly resolves fields when following an Every 414 * 415 * @throws Exception 416 */ 417 @Test 418 public void testCoGroupAfterEveryNoDeclared() throws Exception 419 { 420 getPlatform().copyFromLocal( inputFileLower ); 421 getPlatform().copyFromLocal( inputFileUpper ); 422 423 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 424 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 425 426 Map sources = new HashMap(); 427 428 sources.put( "lower", sourceLower ); 429 sources.put( "upper", sourceUpper ); 430 431 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE ); 432 433 Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " ); 434 435 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 ); 436 pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL ); 437 pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) ); 438 pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL ); 439 440 Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " ); 441 442 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 ); 443 pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) ); 444 pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL ); 445 446 Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 447 448 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 449 450 flow.complete(); 451 452 validateLength( flow, 5, null ); 453 454 List<Tuple> values = getSinkAsList( flow ); 455 456 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 457 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 458 } 459 460 @Test 461 public void testCoGroupInnerSingleField() throws Exception 462 { 463 getPlatform().copyFromLocal( inputFileLowerOffset ); 464 getPlatform().copyFromLocal( inputFileUpper ); 465 466 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 467 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 468 469 Map sources = new HashMap(); 470 471 sources.put( "lower", sourceLower ); 472 sources.put( "upper", sourceUpper ); 473 474 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE ); 475 476 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 477 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 478 479 Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 480 481 join = new Every( join, new Count() ); 482 483// join = new Each( join, new Debug( true ) ); 484 485 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 486 487 flow.complete(); 488 489 validateLength( flow, 2, null ); 490 491 Set<Tuple> results = new HashSet<Tuple>(); 492 493 results.add( new Tuple( "1\t1\t1" ) ); 494 results.add( new Tuple( "5\t5\t2" ) ); 495 496 List<Tuple> actual = getSinkAsList( flow ); 497 498 results.removeAll( actual ); 499 500 assertEquals( 0, results.size() ); 501 } 502 503 /** 504 * 1 a1 505 * 1 a2 506 * 1 a3 507 * 2 b1 508 * 3 c1 509 * 4 d1 510 * 4 d2 511 * 4 d3 512 * 5 e1 513 * 5 e2 514 * 5 e3 515 * 7 g1 516 * 7 g2 517 * 7 g3 518 * 7 g4 519 * 7 g5 520 * null h1 521 * <p/> 522 * 1 A1 523 * 1 A2 524 * 1 A3 525 * 2 B1 526 * 2 B2 527 * 2 B3 528 * 4 D1 529 * 6 F1 530 * 6 F2 531 * null H1 532 * <p/> 533 * 1 a1 1 A1 534 * 1 a1 1 A2 535 * 1 a1 1 A3 536 * 1 a2 1 A1 537 * 1 a2 1 A2 538 * 1 a2 1 A3 539 * 1 a3 1 A1 540 * 1 a3 1 A2 541 * 1 a3 1 A3 542 * 2 b1 2 B1 543 * 2 b1 2 B2 544 * 2 b1 2 B3 545 * 4 d1 4 D1 546 * 4 d2 4 D1 547 * 4 d3 4 D1 548 * null h1 null H1 549 * 550 * @throws Exception 551 */ 552 @Test 553 public void testCoGroupInner() throws Exception 554 { 555 HashSet<Tuple> results = new HashSet<Tuple>(); 556 557 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 558 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 559 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 560 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 561 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 562 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 563 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 564 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 565 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 566 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 567 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 568 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 569 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 570 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 571 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 572 results.add( new Tuple( null, "h1", null, "H1" ) ); 573 574 handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null ); 575 handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null ); 576 } 577 578 /** 579 * 1 a1 580 * 1 a2 581 * 1 a3 582 * 2 b1 583 * 3 c1 584 * 4 d1 585 * 4 d2 586 * 4 d3 587 * 5 e1 588 * 5 e2 589 * 5 e3 590 * 7 g1 591 * 7 g2 592 * 7 g3 593 * 7 g4 594 * 7 g5 595 * null h1 596 * <p/> 597 * 1 A1 598 * 1 A2 599 * 1 A3 600 * 2 B1 601 * 2 B2 602 * 2 B3 603 * 4 D1 604 * 6 F1 605 * 6 F2 606 * null H1 607 * <p/> 608 * 1 a1 1 A1 609 * 1 a1 1 A2 610 * 1 a1 1 A3 611 * 1 a2 1 A1 612 * 1 a2 1 A2 613 * 1 a2 1 A3 614 * 1 a3 1 A1 615 * 1 a3 1 A2 616 * 1 a3 1 A3 617 * 2 b1 2 B1 618 * 2 b1 2 B2 619 * 2 b1 2 B3 620 * 4 d1 4 D1 621 * 4 d2 4 D1 622 * 4 d3 4 D1 623 * 624 * @throws Exception 625 */ 626 @Test 627 public void testCoGroupInnerNull() throws Exception 628 { 629 HashSet<Tuple> results = new HashSet<Tuple>(); 630 631 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 632 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 633 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 634 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 635 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 636 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 637 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 638 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 639 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 640 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 641 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 642 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 643 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 644 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 645 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 646 647 handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() ); 648 handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() ); 649 } 650 651 /** 652 * 1 a1 653 * 1 a2 654 * 1 a3 655 * 2 b1 656 * 3 c1 657 * 4 d1 658 * 4 d2 659 * 4 d3 660 * 5 e1 661 * 5 e2 662 * 5 e3 663 * 7 g1 664 * 7 g2 665 * 7 g3 666 * 7 g4 667 * 7 g5 668 * null h1 669 * <p/> 670 * 1 A1 671 * 1 A2 672 * 1 A3 673 * 2 B1 674 * 2 B2 675 * 2 B3 676 * 4 D1 677 * 6 F1 678 * 6 F2 679 * null H1 680 * <p/> 681 * 1 a1 1 A1 682 * 1 a1 1 A2 683 * 1 a1 1 A3 684 * 1 a2 1 A1 685 * 1 a2 1 A2 686 * 1 a2 1 A3 687 * 1 a3 1 A1 688 * 1 a3 1 A2 689 * 1 a3 1 A3 690 * 2 b1 2 B1 691 * 2 b1 2 B2 692 * 2 b1 2 B3 693 * 3 c1 null null 694 * 4 d1 4 D1 695 * 4 d2 4 D1 696 * 4 d3 4 D1 697 * 5 e1 null null 698 * 5 e2 null null 699 * 5 e3 null null 700 * null null 6 F1 701 * null null 6 F2 702 * 7 g1 null null 703 * 7 g2 null null 704 * 7 g3 null null 705 * 7 g4 null null 706 * 7 g5 null null 707 * null h1 null H1 708 * 709 * @throws Exception 710 */ 711 @Test 712 public void testCoGroupOuter() throws Exception 713 { 714 Set<Tuple> results = new HashSet<Tuple>(); 715 716 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 717 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 718 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 719 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 720 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 721 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 722 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 723 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 724 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 725 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 726 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 727 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 728 results.add( new Tuple( "3", "c1", null, null ) ); 729 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 730 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 731 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 732 results.add( new Tuple( "5", "e1", null, null ) ); 733 results.add( new Tuple( "5", "e2", null, null ) ); 734 results.add( new Tuple( "5", "e3", null, null ) ); 735 results.add( new Tuple( null, null, "6", "F1" ) ); 736 results.add( new Tuple( null, null, "6", "F2" ) ); 737 results.add( new Tuple( "7", "g1", null, null ) ); 738 results.add( new Tuple( "7", "g2", null, null ) ); 739 results.add( new Tuple( "7", "g3", null, null ) ); 740 results.add( new Tuple( "7", "g4", null, null ) ); 741 results.add( new Tuple( "7", "g5", null, null ) ); 742 results.add( new Tuple( null, "h1", null, "H1" ) ); 743 744 handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null ); 745 handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null ); 746 } 747 748 /** 749 * 1 a1 750 * 1 a2 751 * 1 a3 752 * 2 b1 753 * 3 c1 754 * 4 d1 755 * 4 d2 756 * 4 d3 757 * 5 e1 758 * 5 e2 759 * 5 e3 760 * 7 g1 761 * 7 g2 762 * 7 g3 763 * 7 g4 764 * 7 g5 765 * null h1 766 * <p/> 767 * 1 A1 768 * 1 A2 769 * 1 A3 770 * 2 B1 771 * 2 B2 772 * 2 B3 773 * 4 D1 774 * 6 F1 775 * 6 F2 776 * null H1 777 * <p/> 778 * 1 a1 1 A1 779 * 1 a1 1 A2 780 * 1 a1 1 A3 781 * 1 a2 1 A1 782 * 1 a2 1 A2 783 * 1 a2 1 A3 784 * 1 a3 1 A1 785 * 1 a3 1 A2 786 * 1 a3 1 A3 787 * 2 b1 2 B1 788 * 2 b1 2 B2 789 * 2 b1 2 B3 790 * 3 c1 null null 791 * 4 d1 4 D1 792 * 4 d2 4 D1 793 * 4 d3 4 D1 794 * 5 e1 null null 795 * 5 e2 null null 796 * 5 e3 null null 797 * null null 6 F1 798 * null null 6 F2 799 * 7 g1 null null 800 * 7 g2 null null 801 * 7 g3 null null 802 * 7 g4 null null 803 * 7 g5 null null 804 * null h1 null null 805 * null null null H1 806 * 807 * @throws Exception 808 */ 809 @Test 810 public void testCoGroupOuterNull() throws Exception 811 { 812 Set<Tuple> results = new HashSet<Tuple>(); 813 814 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 815 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 816 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 817 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 818 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 819 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 820 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 821 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 822 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 823 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 824 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 825 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 826 results.add( new Tuple( "3", "c1", null, null ) ); 827 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 828 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 829 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 830 results.add( new Tuple( "5", "e1", null, null ) ); 831 results.add( new Tuple( "5", "e2", null, null ) ); 832 results.add( new Tuple( "5", "e3", null, null ) ); 833 results.add( new Tuple( null, null, "6", "F1" ) ); 834 results.add( new Tuple( null, null, "6", "F2" ) ); 835 results.add( new Tuple( "7", "g1", null, null ) ); 836 results.add( new Tuple( "7", "g2", null, null ) ); 837 results.add( new Tuple( "7", "g3", null, null ) ); 838 results.add( new Tuple( "7", "g4", null, null ) ); 839 results.add( new Tuple( "7", "g5", null, null ) ); 840 results.add( new Tuple( null, "h1", null, null ) ); 841 results.add( new Tuple( null, null, null, "H1" ) ); 842 843 handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() ); 844 handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() ); 845 } 846 847 /** 848 * 1 a1 849 * 1 a2 850 * 1 a3 851 * 2 b1 852 * 3 c1 853 * 4 d1 854 * 4 d2 855 * 4 d3 856 * 5 e1 857 * 5 e2 858 * 5 e3 859 * 7 g1 860 * 7 g2 861 * 7 g3 862 * 7 g4 863 * 7 g5 864 * null h1 865 * <p/> 866 * 1 A1 867 * 1 A2 868 * 1 A3 869 * 2 B1 870 * 2 B2 871 * 2 B3 872 * 4 D1 873 * 6 F1 874 * 6 F2 875 * null H1 876 * <p/> 877 * 1 a1 1 A1 878 * 1 a1 1 A2 879 * 1 a1 1 A3 880 * 1 a2 1 A1 881 * 1 a2 1 A2 882 * 1 a2 1 A3 883 * 1 a3 1 A1 884 * 1 a3 1 A2 885 * 1 a3 1 A3 886 * 2 b1 2 B1 887 * 2 b1 2 B2 888 * 2 b1 2 B3 889 * 3 c1 null null 890 * 4 d1 4 D1 891 * 4 d2 4 D1 892 * 4 d3 4 D1 893 * 5 e1 null null 894 * 5 e2 null null 895 * 5 e3 null null 896 * 7 g1 null null 897 * 7 g2 null null 898 * 7 g3 null null 899 * 7 g4 null null 900 * 7 g5 null null 901 * null h1 null H1 902 * 903 * @throws Exception 904 */ 905 @Test 906 public void testCoGroupInnerOuter() throws Exception 907 { 908 Set<Tuple> results = new HashSet<Tuple>(); 909 910 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 911 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 912 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 913 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 914 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 915 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 916 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 917 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 918 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 919 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 920 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 921 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 922 results.add( new Tuple( "3", "c1", null, null ) ); 923 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 924 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 925 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 926 results.add( new Tuple( "5", "e1", null, null ) ); 927 results.add( new Tuple( "5", "e2", null, null ) ); 928 results.add( new Tuple( "5", "e3", null, null ) ); 929 results.add( new Tuple( "7", "g1", null, null ) ); 930 results.add( new Tuple( "7", "g2", null, null ) ); 931 results.add( new Tuple( "7", "g3", null, null ) ); 932 results.add( new Tuple( "7", "g4", null, null ) ); 933 results.add( new Tuple( "7", "g5", null, null ) ); 934 results.add( new Tuple( null, "h1", null, "H1" ) ); 935 936 handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null ); 937 handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null ); 938 } 939 940 /** 941 * 1 a1 942 * 1 a2 943 * 1 a3 944 * 2 b1 945 * 3 c1 946 * 4 d1 947 * 4 d2 948 * 4 d3 949 * 5 e1 950 * 5 e2 951 * 5 e3 952 * 7 g1 953 * 7 g2 954 * 7 g3 955 * 7 g4 956 * 7 g5 957 * null h1 958 * <p/> 959 * 1 A1 960 * 1 A2 961 * 1 A3 962 * 2 B1 963 * 2 B2 964 * 2 B3 965 * 4 D1 966 * 6 F1 967 * 6 F2 968 * null H1 969 * <p/> 970 * 1 a1 1 A1 971 * 1 a1 1 A2 972 * 1 a1 1 A3 973 * 1 a2 1 A1 974 * 1 a2 1 A2 975 * 1 a2 1 A3 976 * 1 a3 1 A1 977 * 1 a3 1 A2 978 * 1 a3 1 A3 979 * 2 b1 2 B1 980 * 2 b1 2 B2 981 * 2 b1 2 B3 982 * 3 c1 null null 983 * 4 d1 4 D1 984 * 4 d2 4 D1 985 * 4 d3 4 D1 986 * 5 e1 null null 987 * 5 e2 null null 988 * 5 e3 null null 989 * 7 g1 null null 990 * 7 g2 null null 991 * 7 g3 null null 992 * 7 g4 null null 993 * 7 g5 null null 994 * null h1 null null 995 * 996 * @throws Exception 997 */ 998 @Test 999 public void testCoGroupInnerOuterNull() throws Exception 1000 { 1001 Set<Tuple> results = new HashSet<Tuple>(); 1002 1003 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1004 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1005 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1006 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1007 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1008 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1009 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1010 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1011 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1012 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1013 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1014 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1015 results.add( new Tuple( "3", "c1", null, null ) ); 1016 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1017 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1018 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1019 results.add( new Tuple( "5", "e1", null, null ) ); 1020 results.add( new Tuple( "5", "e2", null, null ) ); 1021 results.add( new Tuple( "5", "e3", null, null ) ); 1022 results.add( new Tuple( "7", "g1", null, null ) ); 1023 results.add( new Tuple( "7", "g2", null, null ) ); 1024 results.add( new Tuple( "7", "g3", null, null ) ); 1025 results.add( new Tuple( "7", "g4", null, null ) ); 1026 results.add( new Tuple( "7", "g5", null, null ) ); 1027 results.add( new Tuple( null, "h1", null, null ) ); 1028 1029 handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() ); 1030 handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() ); 1031 } 1032 1033 /** 1034 * 1 a1 1035 * 1 a2 1036 * 1 a3 1037 * 2 b1 1038 * 3 c1 1039 * 4 d1 1040 * 4 d2 1041 * 4 d3 1042 * 5 e1 1043 * 5 e2 1044 * 5 e3 1045 * 7 g1 1046 * 7 g2 1047 * 7 g3 1048 * 7 g4 1049 * 7 g5 1050 * null h1 1051 * <p/> 1052 * 1 A1 1053 * 1 A2 1054 * 1 A3 1055 * 2 B1 1056 * 2 B2 1057 * 2 B3 1058 * 4 D1 1059 * 6 F1 1060 * 6 F2 1061 * null H1 1062 * <p/> 1063 * 1 a1 1 A1 1064 * 1 a1 1 A2 1065 * 1 a1 1 A3 1066 * 1 a2 1 A1 1067 * 1 a2 1 A2 1068 * 1 a2 1 A3 1069 * 1 a3 1 A1 1070 * 1 a3 1 A2 1071 * 1 a3 1 A3 1072 * 2 b1 2 B1 1073 * 2 b1 2 B2 1074 * 2 b1 2 B3 1075 * 4 d1 4 D1 1076 * 4 d2 4 D1 1077 * 4 d3 4 D1 1078 * null null 6 F1 1079 * null null 6 F2 1080 * null h1 null H1 1081 * 1082 * @throws Exception 1083 */ 1084 @Test 1085 public void testCoGroupOuterInner() throws Exception 1086 { 1087 Set<Tuple> results = new HashSet<Tuple>(); 1088 1089 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1090 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1091 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1092 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1093 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1094 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1095 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1096 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1097 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1098 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1099 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1100 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1101 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1102 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1103 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1104 results.add( new Tuple( null, null, "6", "F1" ) ); 1105 results.add( new Tuple( null, null, "6", "F2" ) ); 1106 results.add( new Tuple( null, "h1", null, "H1" ) ); 1107 1108 handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null ); 1109 handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null ); 1110 } 1111 1112 /** 1113 * 1 a1 1114 * 1 a2 1115 * 1 a3 1116 * 2 b1 1117 * 3 c1 1118 * 4 d1 1119 * 4 d2 1120 * 4 d3 1121 * 5 e1 1122 * 5 e2 1123 * 5 e3 1124 * 7 g1 1125 * 7 g2 1126 * 7 g3 1127 * 7 g4 1128 * 7 g5 1129 * null h1 1130 * <p/> 1131 * 1 A1 1132 * 1 A2 1133 * 1 A3 1134 * 2 B1 1135 * 2 B2 1136 * 2 B3 1137 * 4 D1 1138 * 6 F1 1139 * 6 F2 1140 * null H1 1141 * <p/> 1142 * 1 a1 1 A1 1143 * 1 a1 1 A2 1144 * 1 a1 1 A3 1145 * 1 a2 1 A1 1146 * 1 a2 1 A2 1147 * 1 a2 1 A3 1148 * 1 a3 1 A1 1149 * 1 a3 1 A2 1150 * 1 a3 1 A3 1151 * 2 b1 2 B1 1152 * 2 b1 2 B2 1153 * 2 b1 2 B3 1154 * 4 d1 4 D1 1155 * 4 d2 4 D1 1156 * 4 d3 4 D1 1157 * null null 6 F1 1158 * null null 6 F2 1159 * null null null H1 1160 * 1161 * @throws Exception 1162 */ 1163 @Test 1164 public void testCoGroupOuterInnerNull() throws Exception 1165 { 1166 Set<Tuple> results = new HashSet<Tuple>(); 1167 1168 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1169 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1170 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1171 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1172 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1173 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1174 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1175 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1176 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1177 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1178 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1179 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1180 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1181 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1182 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1183 results.add( new Tuple( null, null, "6", "F1" ) ); 1184 results.add( new Tuple( null, null, "6", "F2" ) ); 1185 results.add( new Tuple( null, null, null, "H1" ) ); 1186 1187 handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() ); 1188 handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() ); 1189 } 1190 1191 private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception 1192 { 1193 results = new HashSet<Tuple>( results ); 1194 1195 getPlatform().copyFromLocal( inputFileLhsSparse ); 1196 getPlatform().copyFromLocal( inputFileRhsSparse ); 1197 1198 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 1199 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 1200 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 1201 1202 Map sources = new HashMap(); 1203 1204 sources.put( "lower", sourceLower ); 1205 sources.put( "upper", sourceUpper ); 1206 1207 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 1208 1209 Pipe pipeLower = new Pipe( "lower" ); 1210 Pipe pipeUpper = new Pipe( "upper" ); 1211 1212 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 1213 1214 Fields groupFields = new Fields( "num" ); 1215 1216 if( comparator != null ) 1217 groupFields.setComparator( 0, comparator ); 1218 1219 Pipe splice; 1220 if( useResultGroupFields ) 1221 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner ); 1222 else 1223 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner ); 1224 1225 splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS ); 1226 1227 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1228 1229 flow.complete(); 1230 1231 validateLength( flow, results.size() ); 1232 1233 List<Tuple> actual = getSinkAsList( flow ); 1234 1235 results.removeAll( actual ); 1236 1237 assertEquals( 0, results.size() ); 1238 } 1239 1240 /** 1241 * 1 a 1242 * 5 b 1243 * 6 c 1244 * 5 b 1245 * 5 e 1246 * <p/> 1247 * 1 A 1248 * 2 B 1249 * 3 C 1250 * 4 D 1251 * 5 E 1252 * <p/> 1253 * 1 a 1254 * 2 b 1255 * 3 c 1256 * 4 d 1257 * 5 e 1258 * <p/> 1259 * 1 a 1 A 1 a 1260 * - - 2 B 2 b 1261 * - - 3 C 3 c 1262 * - - 4 D 4 d 1263 * 5 b 5 E 5 e 1264 * 5 e 5 E 5 e 1265 * 1266 * @throws Exception 1267 */ 1268 @Test 1269 public void testCoGroupMixed() throws Exception 1270 { 1271 getPlatform().copyFromLocal( inputFileLowerOffset ); 1272 getPlatform().copyFromLocal( inputFileLower ); 1273 getPlatform().copyFromLocal( inputFileUpper ); 1274 1275 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 1276 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1277 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1278 1279 Map sources = new HashMap(); 1280 1281 sources.put( "loweroffset", sourceLowerOffset ); 1282 sources.put( "lower", sourceLower ); 1283 sources.put( "upper", sourceUpper ); 1284 1285 Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE ); 1286 1287 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1288 1289 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 1290 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1291 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1292 1293 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 1294 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 1295 1296 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 1297 Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join ); 1298 1299 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1300 1301 flow.complete(); 1302 1303 validateLength( flow, 6 ); 1304 1305 Set<Tuple> results = new HashSet<Tuple>(); 1306 1307 results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) ); 1308 results.add( new Tuple( null, null, "2", "B", "2", "b" ) ); 1309 results.add( new Tuple( null, null, "3", "C", "3", "c" ) ); 1310 results.add( new Tuple( null, null, "4", "D", "4", "d" ) ); 1311 results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) ); 1312 results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) ); 1313 1314 List<Tuple> actual = getSinkAsList( flow ); 1315 1316 results.removeAll( actual ); 1317 1318 assertEquals( 0, results.size() ); 1319 } 1320 1321 @Test 1322 public void testCoGroupDiffFields() throws Exception 1323 { 1324 getPlatform().copyFromLocal( inputFileLower ); 1325 getPlatform().copyFromLocal( inputFileUpper ); 1326 1327 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1328 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1329 1330 Map sources = new HashMap(); 1331 1332 sources.put( "lower", sourceLower ); 1333 sources.put( "upper", sourceUpper ); 1334 1335 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 1336 1337 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1338 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1339 1340 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1341 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1342 1343 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1344 1345 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1346 1347 flow.complete(); 1348 1349 validateLength( flow, 5 ); 1350 1351 List<Tuple> actual = getSinkAsList( flow ); 1352 1353 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1354 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1355 } 1356 1357 @Test 1358 public void testCoGroupGroupBy() throws Exception 1359 { 1360 getPlatform().copyFromLocal( inputFileLower ); 1361 getPlatform().copyFromLocal( inputFileUpper ); 1362 1363 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1364 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1365 1366 Map sources = new HashMap(); 1367 1368 sources.put( "lower", sourceLower ); 1369 sources.put( "upper", sourceUpper ); 1370 1371 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE ); 1372 1373 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ).applyTypes( String.class, String.class ), " " ); 1374 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ).applyTypes( String.class, String.class ), " " ); 1375 1376 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1377 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1378 1379 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1380 1381 Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) ); 1382 1383 Map<Object, Object> properties = getPlatform().getProperties(); 1384 1385 properties.put( "cascading.serialization.types.required", "true" ); 1386 1387 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, groupby ); 1388 1389 flow.complete(); 1390 1391 validateLength( flow, 5, null ); 1392 1393 List<Tuple> actual = getSinkAsList( flow ); 1394 1395 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1396 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1397 } 1398 1399 @Test 1400 public void testCoGroupSamePipe() throws Exception 1401 { 1402 getPlatform().copyFromLocal( inputFileLower ); 1403 1404 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1405 1406 Map sources = new HashMap(); 1407 1408 sources.put( "lower", source ); 1409 1410 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 1411 1412 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1413 1414 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1415 1416 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 1417 1418 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1419 1420 flow.complete(); 1421 1422 validateLength( flow, 5, null ); 1423 1424 List<Tuple> actual = getSinkAsList( flow ); 1425 1426 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1427 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1428 } 1429 1430 @Test 1431 public void testCoGroupSamePipe2() throws Exception 1432 { 1433 getPlatform().copyFromLocal( inputFileLower ); 1434 1435 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1436 1437 Map sources = new HashMap(); 1438 1439 sources.put( "lower", source ); 1440 1441 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 1442 1443 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1444 1445 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1446 1447 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1448 1449 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1450 1451 flow.complete(); 1452 1453 validateLength( flow, 5, null ); 1454 1455 List<Tuple> actual = getSinkAsList( flow ); 1456 1457 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1458 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1459 } 1460 1461 @Test 1462 public void testCoGroupSamePipe3() throws Exception 1463 { 1464 getPlatform().copyFromLocal( inputFileLower ); 1465 1466 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1467 1468 Map sources = new HashMap(); 1469 1470 sources.put( "lower", source ); 1471 1472 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1473 1474 Pipe pipe = new Pipe( "lower" ); 1475 1476 Pipe lhs = new Pipe( "lhs", pipe ); 1477 Pipe rhs = new Pipe( "rhs", pipe ); 1478 1479 Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1480 1481 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1482 1483 flow.complete(); 1484 1485 validateLength( flow, 5, null ); 1486 1487 List<Tuple> actual = getSinkAsList( flow ); 1488 1489 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1490 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1491 } 1492 1493 @Test 1494 public void testCoGroupAroundCoGroup() throws Exception 1495 { 1496 getPlatform().copyFromLocal( inputFileLower ); 1497 getPlatform().copyFromLocal( inputFileUpper ); 1498 1499 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1500 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1501 1502 Map sources = new HashMap(); 1503 1504 sources.put( "lower", sourceLower ); 1505 sources.put( "upper1", sourceUpper ); 1506 sources.put( "upper2", sourceUpper ); 1507 1508 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE ); 1509 1510 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1511 1512 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1513 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1514 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1515 1516 Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1517 1518 splice1 = new Each( splice1, new Identity() ); 1519 1520 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1521 1522 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1523 1524 flow.complete(); 1525 1526 validateLength( flow, 5, null ); 1527 1528 List<Tuple> actual = getSinkAsList( flow ); 1529 1530 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1531 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1532 } 1533 1534 @Test 1535 public void testCoGroupAroundCoGroupWithout() throws Exception 1536 { 1537 runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" ); 1538 } 1539 1540 @Test 1541 public void testCoGroupAroundCoGroupWith() throws Exception 1542 { 1543 // hack to get classname 1544 runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" ); 1545 } 1546 1547 private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException 1548 { 1549 getPlatform().copyFromLocal( inputFileNums20 ); 1550 getPlatform().copyFromLocal( inputFileNums10 ); 1551 1552 Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ); 1553 Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 ); 1554 1555 Map sources = new HashMap(); 1556 1557 sources.put( "source20", source20 ); 1558 sources.put( "source101", source10 ); 1559 sources.put( "source102", source10 ); 1560 1561 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE ); 1562 1563 Pipe pipeNum20 = new Pipe( "source20" ); 1564 Pipe pipeNum101 = new Pipe( "source101" ); 1565 Pipe pipeNum102 = new Pipe( "source102" ); 1566 1567 Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) ); 1568 1569 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) ); 1570 1571 splice2 = new Each( splice2, new Identity() ); 1572 1573 Map<Object, Object> properties = getPlatform().getProperties(); 1574 1575 if( getPlatform().isMapReduce() ) 1576 FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass ); 1577 1578 Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 ); 1579 1580 if( getPlatform().isMapReduce() ) 1581 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1582 1583 flow.complete(); 1584 1585 validateLength( flow, 10 ); 1586 1587 List<Tuple> actual = getSinkAsList( flow ); 1588 1589 assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) ); 1590 assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) ); 1591 } 1592 1593 @Test 1594 public void testCoGroupDiffFieldsSameFile() throws Exception 1595 { 1596 getPlatform().copyFromLocal( inputFileLower ); 1597 getPlatform().copyFromLocal( inputFileUpper ); 1598 1599 Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1600 Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower ); 1601 1602 Map sources = new HashMap(); 1603 1604 sources.put( "offsetLower", sourceOffsetLower ); 1605 sources.put( "lower", sourceLower ); 1606 1607 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE ); 1608 1609 Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " ); 1610 Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " ); 1611 1612 Pipe offsetLower = new Pipe( "offsetLower" ); 1613 offsetLower = new Discard( offsetLower, new Fields( "offset" ) ); 1614 offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower ); 1615 1616 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper ); 1617 1618 Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) ); 1619 1620 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1621 1622 flow.complete(); 1623 1624 validateLength( flow, 5 ); 1625 1626 List<Tuple> actual = getSinkAsList( flow ); 1627 1628 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1629 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1630 } 1631 1632 @Test 1633 public void testJoinNone() throws Exception 1634 { 1635 getPlatform().copyFromLocal( inputFileLower ); 1636 getPlatform().copyFromLocal( inputFileUpper ); 1637 1638 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1639 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1640 1641 Map sources = new HashMap(); 1642 1643 sources.put( "lower", sourceLower ); 1644 sources.put( "upper", sourceUpper ); 1645 1646 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1647 1648 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1649 1650 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1651 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1652 1653 Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1654 1655 Map<Object, Object> properties = getProperties(); 1656 1657 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1658 1659 flow.complete(); 1660 1661 validateLength( flow, 25 ); 1662 1663 List<Tuple> values = getSinkAsList( flow ); 1664 1665 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1666 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1667 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1668 } 1669 1670 @Test 1671 public void testMultiJoin() throws Exception 1672 { 1673 getPlatform().copyFromLocal( inputFileCrossX2 ); 1674 1675 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 ); 1676 Tap innerSink = getPlatform().getTextFile( getOutputPath( "inner" ), SinkMode.REPLACE ); 1677 Tap outerSink = getPlatform().getTextFile( getOutputPath( "outer" ), SinkMode.REPLACE ); 1678 Tap leftSink = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE ); 1679 Tap rightSink = getPlatform().getTextFile( getOutputPath( "right" ), SinkMode.REPLACE ); 1680 1681 Pipe uniques = new Pipe( "unique" ); 1682 1683 uniques = new Each( uniques, new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1684 1685 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1686 1687 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1688 1689// uniques = new Each( uniques, new Debug( true ) ); 1690 1691 Pipe fielded = new Pipe( "fielded" ); 1692 1693 fielded = new Each( fielded, new Fields( "line" ), new RegexSplitter( "\\s" ) ); 1694 1695// fielded = new Each( fielded, new Debug( true ) ); 1696 1697 Pipe inner = new CoGroup( "inner", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new InnerJoin() ); 1698 Pipe outer = new CoGroup( "outer", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new OuterJoin() ); 1699 Pipe left = new CoGroup( "left", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new LeftJoin() ); 1700 Pipe right = new CoGroup( "right", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new RightJoin() ); 1701 1702 Pipe[] heads = Pipe.pipes( uniques, fielded ); 1703 Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) ); 1704 1705 Pipe[] tails = Pipe.pipes( inner, outer, left, right ); 1706 Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) ); 1707 1708 Flow flow = getPlatform().getFlowConnector().connect( "multi-joins", sources, sinks, tails ); 1709 1710 flow.complete(); 1711 1712 validateLength( flow.openTapForRead( innerSink ), 74 ); 1713 validateLength( flow.openTapForRead( outerSink ), 84 ); 1714 validateLength( flow.openTapForRead( leftSink ), 74 ); 1715 validateLength( flow.openTapForRead( rightSink ), 84 ); 1716 } 1717 1718 /** 1719 * This test checks that a valid node is created when adding back remainders during unique 1720 * path sub-graph partitioning. 1721 */ 1722 @Test 1723 public void testMultiJoinWithSplits() throws Exception 1724 { 1725 getPlatform().copyFromLocal( inputFileCrossX2 ); 1726 1727 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 ); 1728 Tap innerSinkLhs = getPlatform().getTextFile( getOutputPath( "innerLhs" ), SinkMode.REPLACE ); 1729 Tap uniqueSinkLhs = getPlatform().getTextFile( getOutputPath( "uniquesLhs" ), SinkMode.REPLACE ); 1730 Tap innerSinkRhs = getPlatform().getTextFile( getOutputPath( "innerRhs" ), SinkMode.REPLACE ); 1731 Tap uniqueSinkRhs = getPlatform().getTextFile( getOutputPath( "uniquesRhs" ), SinkMode.REPLACE ); 1732 1733 Pipe incoming = new Pipe( "incoming" ); 1734 Pipe uniquesLhs; 1735 Pipe innerLhs; 1736 Pipe uniquesRhs; 1737 Pipe innerRhs; 1738 1739 { 1740 Pipe generatorLhs = new Each( new Pipe( "genLhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1741 1742 generatorLhs = new Each( generatorLhs, new Identity() ); 1743 1744 Pipe generatorRhs = new Each( new Pipe( "genRhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1745 1746 Pipe uniques = new Each( new Pipe( "uniquesLhs", generatorLhs ), new Identity() ); 1747 1748 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1749 1750 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1751 1752 Pipe lhs = new Pipe( "lhs", generatorLhs ); 1753 1754 lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) ); 1755 1756 Pipe rhs = new Pipe( "rhs", generatorRhs ); 1757 1758 rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) ); 1759 1760 Pipe inner = new CoGroup( "innerLhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() ); 1761 1762 uniquesLhs = uniques; 1763 innerLhs = inner; 1764 } 1765 1766 { 1767 Pipe generatorLhs = new Each( new Pipe( "genLhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1768 1769 generatorLhs = new Each( generatorLhs, new Identity() ); 1770 1771 Pipe generatorRhs = new Each( new Pipe( "genRhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1772 1773 Pipe uniques = new Each( new Pipe( "uniquesRhs", generatorLhs ), new Identity() ); 1774 1775 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1776 1777 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1778 1779 Pipe lhs = new Pipe( "lhs", generatorLhs ); 1780 1781 lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) ); 1782 1783 Pipe rhs = new Pipe( "rhs", generatorRhs ); 1784 1785 rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) ); 1786 1787 Pipe inner = new CoGroup( "innerRhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() ); 1788 1789 uniquesRhs = uniques; 1790 innerRhs = inner; 1791 } 1792 1793 FlowDef flowDef = FlowDef.flowDef() 1794 .setName( "multi-joins" ) 1795 .addSource( "incoming", source ) 1796 .addTailSink( innerLhs, innerSinkLhs ) 1797 .addTailSink( uniquesLhs, uniqueSinkLhs ) 1798 .addTailSink( innerRhs, innerSinkRhs ) 1799 .addTailSink( uniquesRhs, uniqueSinkRhs ); 1800 1801 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 1802 1803 flow.complete(); 1804 1805 validateLength( flow.openTapForRead( innerSinkLhs ), 3900 ); 1806 validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 ); 1807 validateLength( flow.openTapForRead( innerSinkRhs ), 3900 ); 1808 validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 ); 1809 } 1810 1811 @Test 1812 public void testSameSourceGroupSplitCoGroup() throws Exception 1813 { 1814 getPlatform().copyFromLocal( inputFileLower ); 1815 1816 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower ); 1817 1818 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE ); 1819 1820 Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " ); 1821 1822 Pipe sourcePipe = new Each( new Pipe( "source" ), new Fields( "line" ), splitter ); 1823 sourcePipe = new GroupBy( sourcePipe, new Fields( "num" ) ); 1824 sourcePipe = new Every( sourcePipe, new Fields( "char" ), new First( new Fields( "first", String.class ) ), Fields.ALL ); 1825 1826 Pipe lhsPipe = new Retain( new Pipe( "lhs", sourcePipe ), Fields.ALL ); 1827 Pipe rhsPipe = new Retain( new Pipe( "rhs", sourcePipe ), Fields.ALL ); 1828 1829 Pipe splice = new CoGroup( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), Fields.size( 4 ) ); 1830 1831 Map<Object, Object> properties = getPlatform().getProperties(); 1832 1833 properties.put( "cascading.serialization.types.required", "true" ); 1834 1835 Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, splice ); 1836 1837 flow.complete(); 1838 1839 validateLength( flow, 5, null ); 1840 1841 List<Tuple> values = getSinkAsList( flow ); 1842 1843 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1844 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1845 } 1846 1847 /** 1848 * Requires rule TapBalanceGroupSplitTriangleTransformer * 1849 */ 1850 @Test 1851 public void testGroupSplitToCoGroupsTriangle() throws Exception 1852 { 1853 getPlatform().copyFromLocal( inputFileLower ); 1854 getPlatform().copyFromLocal( inputFileUpper ); 1855 1856 Map sources = new HashMap(); 1857 sources.put( "lower", getPlatform().getDelimitedFile( new Fields( "numLower", "charLower" ), " ", inputFileLower ) ); 1858 sources.put( "upper", getPlatform().getDelimitedFile( new Fields( "numUpper", "charUpper" ), " ", inputFileUpper ) ); 1859 1860 Tap sink = getPlatform().getTextFile( getOutputPath( "sink" ), SinkMode.REPLACE ); 1861 1862 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "numLower", "charLower" ), new Identity() ); 1863 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "numUpper", "charUpper" ), new Identity() ); 1864 1865 Pipe upperGroupBy = new GroupBy( pipeUpper, new Fields( "numUpper" ) ); 1866 Pipe grpByEvery = new Every( upperGroupBy, new Count(), new Fields( "numUpper", "count" ) ); 1867 1868 Pipe grpByEveryE1 = new Each( grpByEvery, new Fields( "numUpper", "count" ), new Identity() ); 1869 Pipe grpByEveryE2 = new Each( grpByEvery, new Fields( "numUpper", "count" ), new Identity( new Fields( "numUpperUpper", "countUpper" ) ) ); 1870 1871 Pipe cogroup = new CoGroup( "cogroup", pipeLower, new Fields( "numLower" ), grpByEveryE1, new Fields( "numUpper" ) ); 1872 Each cogroupEach = new Each( cogroup, new Fields( "numLower", "charLower", "numUpper", "count" ), new Identity() ); 1873 1874 Pipe cogroupNested = new CoGroup( cogroupEach, new Fields( "numLower" ), grpByEveryE2, new Fields( "numUpperUpper" ) ); 1875 Pipe cogrpNestedEach = new Each( cogroupNested, Fields.ALL, new Identity() ); 1876 1877 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogrpNestedEach ); 1878 flow.complete(); 1879 1880 validateLength( flow, 5 ); 1881 } 1882 }