001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Comparator; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.regex.Pattern; 031 032import cascading.cascade.Cascades; 033import cascading.flow.Flow; 034import cascading.operation.Debug; 035import cascading.operation.Filter; 036import cascading.operation.Function; 037import cascading.operation.Identity; 038import cascading.operation.Insert; 039import cascading.operation.NoOp; 040import cascading.operation.aggregator.Count; 041import cascading.operation.aggregator.First; 042import cascading.operation.expression.ExpressionFunction; 043import cascading.operation.filter.And; 044import cascading.operation.function.UnGroup; 045import cascading.operation.regex.RegexFilter; 046import cascading.operation.regex.RegexParser; 047import cascading.operation.regex.RegexSplitter; 048import cascading.pipe.Each; 049import cascading.pipe.Every; 050import cascading.pipe.GroupBy; 051import cascading.pipe.Merge; 052import cascading.pipe.Pipe; 053import cascading.tap.MultiSourceTap; 054import cascading.tap.SinkMode; 055import cascading.tap.Tap; 056import cascading.tuple.Fields; 057import cascading.tuple.Hasher; 058import cascading.tuple.Tuple; 059import org.junit.Test; 060 061import static cascading.ComparePlatformsTest.NONDETERMINISTIC; 062import static data.InputData.*; 063 064public class FieldedPipesPlatformTest extends PlatformTestCase 065 { 066 public FieldedPipesPlatformTest() 067 { 068 super( true, 5, 3 ); // leave cluster testing enabled 069 } 070 071 @Test 072 public void testSimpleGroup() throws Exception 073 { 074 getPlatform().copyFromLocal( inputFileApache ); 075 076 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 077 078 Pipe pipe = new Pipe( "test" ); 079 080 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 081 082 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 083 084 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) ); 085 086 Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE ); 087 088 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 089 090 flow.complete(); 091 092 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check 093 validateLength( flow, 8, null ); 094 } 095 096 @Test 097 public void testSimpleChain() throws Exception 098 { 099 getPlatform().copyFromLocal( inputFileApache ); 100 101 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 102 103 Pipe pipe = new Pipe( "test" ); 104 105 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 106 107 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 108 109 pipe = new Every( pipe, new Count( new Fields( "count1" ) ) ); 110 pipe = new Every( pipe, new Count( new Fields( "count2" ) ) ); 111 pipe = new Every( pipe, new Count( new Fields( "count3" ) ) ); 112 pipe = new Every( pipe, new Count( new Fields( "count4" ) ) ); 113 114 Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "simplechain" ), SinkMode.REPLACE ); 115 116 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 117 118 flow.complete(); 119 120 validateLength( flow, 8, 5 ); 121 } 122 123 @Test 124 public void testChainEndingWithEach() throws Exception 125 { 126 getPlatform().copyFromLocal( inputFileApache ); 127 128 Pipe pipe = new Pipe( "test" ); 129 130 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 131 132 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 133 134 pipe = new Every( pipe, new Count( new Fields( "count1" ) ) ); 135 pipe = new Every( pipe, new Count( new Fields( "count2" ) ) ); 136 137 pipe = new Each( pipe, new Fields( "count1", "count2" ), new ExpressionFunction( new Fields( "sum" ), "count1 + count2", int.class ), Fields.ALL ); 138 139 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 140 Tap sink = getPlatform().getTextFile( getOutputPath( "chaineach" ), SinkMode.REPLACE ); 141 142 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 143 144 flow.complete(); 145 146 validateLength( flow, 8, null ); 147 } 148 149 // also tests the RegexSplitter 150 151 @Test 152 public void testNoGroup() throws Exception 153 { 154 getPlatform().copyFromLocal( inputFileApache ); 155 156 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache ); 157 158 Pipe pipe = new Pipe( "test" ); 159 160 pipe = new Each( pipe, new RegexSplitter( "\\s+" ), new Fields( 1 ) ); 161 162 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "nogroup" ), SinkMode.REPLACE ); 163 164 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 165 166 flow.complete(); 167 168 validateLength( flow, 10, null ); 169 170 List<Tuple> results = getSinkAsList( flow ); 171 172 assertTrue( results.contains( new Tuple( "75.185.76.245" ) ) ); 173 } 174 175 @Test 176 public void testCopy() throws Exception 177 { 178 getPlatform().copyFromLocal( inputFileApache ); 179 180 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache ); 181 182 Pipe pipe = new Pipe( "test" ); 183 184 Tap sink = getPlatform().getTextFile( getOutputPath( "copy" ), SinkMode.REPLACE ); 185 186 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 187 188 flow.complete(); 189 190 validateLength( flow, 10, null ); 191 } 192 193 @Test 194 public void testSimpleMerge() throws Exception 195 { 196 getPlatform().copyFromLocal( inputFileLower ); 197 getPlatform().copyFromLocal( inputFileUpper ); 198 199 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 200 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 201 202 Map sources = new HashMap(); 203 204 sources.put( "lower", sourceLower ); 205 sources.put( "upper", sourceUpper ); 206 207 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 208 209 // using null pos so all fields are written 210 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "simplemerge" ), SinkMode.REPLACE ); 211 212 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 213 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 214 215 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false ); 216 217 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 218 219 flow.complete(); 220 221 validateLength( flow, 10 ); 222 223 Collection results = getSinkAsList( flow ); 224 225 assertTrue( "missing value", results.contains( new Tuple( "1\ta" ) ) ); 226 assertTrue( "missing value", results.contains( new Tuple( "1\tA" ) ) ); 227 assertTrue( "missing value", results.contains( new Tuple( "2\tb" ) ) ); 228 assertTrue( "missing value", results.contains( new Tuple( "2\tB" ) ) ); 229 assertTrue( "missing value", results.contains( new Tuple( "3\tc" ) ) ); 230 assertTrue( "missing value", results.contains( new Tuple( "3\tC" ) ) ); 231 } 232 233 /** 234 * Specifically tests GroupBy will return the correct grouping fields to the following Every 235 * <p/> 236 * additionally tests secondary sorting during merging 237 * 238 * @throws Exception 239 */ 240 @Test 241 public void testSimpleMergeThree() throws Exception 242 { 243 getPlatform().copyFromLocal( inputFileLower ); 244 getPlatform().copyFromLocal( inputFileUpper ); 245 getPlatform().copyFromLocal( inputFileLowerOffset ); 246 247 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 248 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 249 Tap sourceLowerOffset = getPlatform().getTextFile( inputFileLowerOffset ); 250 251 Map sources = new HashMap(); 252 253 sources.put( "lower", sourceLower ); 254 sources.put( "upper", sourceUpper ); 255 sources.put( "offset", sourceLowerOffset ); 256 257 Tap sink = getPlatform().getDelimitedFile( Fields.ALL, "\t", getOutputPath( "simplemergethree" ), SinkMode.REPLACE ); 258 259 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 260 261 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 262 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 263 Pipe pipeOffset = new Each( new Pipe( "offset" ), new Fields( "line" ), splitter ); 264 265 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper, pipeOffset ), new Fields( "num" ), new Fields( "char" ) ); 266 267 splice = new Every( splice, new Fields( "char" ), new First( new Fields( "first" ) ) ); 268 269 splice = new Each( splice, new Fields( "num", "first" ), new Identity() ); 270 271 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 272 273 flow.complete(); 274 275 validateLength( flow, 6 ); 276 277 List<Tuple> tuples = getSinkAsList( flow ); 278 279 assertTrue( tuples.contains( new Tuple( "1", "A" ) ) ); 280 assertTrue( tuples.contains( new Tuple( "2", "B" ) ) ); 281 assertTrue( tuples.contains( new Tuple( "3", "C" ) ) ); 282 assertTrue( tuples.contains( new Tuple( "4", "D" ) ) ); 283 assertTrue( tuples.contains( new Tuple( "5", "E" ) ) ); 284 assertTrue( tuples.contains( new Tuple( "6", "c" ) ) ); 285 } 286 287 /** 288 * same test as MergePipesTest, but to test that chained groupby don't exhibit similar failures 289 * 290 * @throws Exception 291 */ 292 @Test 293 public void testSameSourceMergeThreeChainGroup() throws Exception 294 { 295 getPlatform().copyFromLocal( inputFileLower ); 296 297 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 298 299 Map sources = new HashMap(); 300 301 sources.put( "split", sourceLower ); 302 303 Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE ); 304 305 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 306 307 Pipe pipe = new Pipe( "split" ); 308 309 Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter ); 310 Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter ); 311 Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter ); 312 313 //put group before merge to test path counts 314 Pipe splice = new GroupBy( Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ) ); 315 316 // this group has its incoming paths counted, gated by the previous group 317 splice = new GroupBy( Pipe.pipes( splice, pipeOffset ), new Fields( "num" ) ); 318 319 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 320 321 if( getPlatform().isMapReduce() ) 322 assertEquals( "wrong num jobs", 2, flow.getFlowSteps().size() ); 323 324 flow.complete(); 325 326 validateLength( flow, 15 ); 327 } 328 329 @Test 330 public void testUnGroup() throws Exception 331 { 332 getPlatform().copyFromLocal( inputFileJoined ); 333 334 Tap source = getPlatform().getTextFile( inputFileJoined ); 335 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE ); 336 337 Pipe pipe = new Pipe( "test" ); 338 339 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) ); 340 341 pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) ); 342 343 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 344 345 flow.complete(); 346 347 validateLength( flow, 10 ); 348 } 349 350 @Test 351 public void testUnGroupAnon() throws Exception 352 { 353 getPlatform().copyFromLocal( inputFileJoined ); 354 355 Tap source = getPlatform().getTextFile( inputFileJoined ); 356 Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE ); 357 358 Pipe pipe = new Pipe( "test" ); 359 360 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) ); 361 362 pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) ); 363 364 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 365 366 flow.complete(); 367 368 validateLength( flow, 10 ); 369 } 370 371 @Test 372 public void testUnGroupBySize() throws Exception 373 { 374 getPlatform().copyFromLocal( inputFileJoinedExtra ); 375 376 Tap source = getPlatform().getTextFile( inputFileJoinedExtra ); 377 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped_size" ), SinkMode.REPLACE ); 378 379 Pipe pipe = new Pipe( "test" ); 380 381 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num1", "num2", "lower", "upper" ) ) ); 382 383 pipe = new Each( pipe, new UnGroup( new Fields( "num1", "num2", "char" ), new Fields( "num1", "num2" ), 1 ) ); 384 385 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 386 387 flow.complete(); 388 389 List<Tuple> tuples = asList( flow, sink ); 390 assertEquals( 10, tuples.size() ); 391 392 List<Object> values = new ArrayList<Object>(); 393 for( Tuple tuple : tuples ) 394 values.add( tuple.getObject( 1 ) ); 395 396 assertTrue( values.contains( "1\t1\ta" ) ); 397 assertTrue( values.contains( "1\t1\tA" ) ); 398 assertTrue( values.contains( "2\t2\tb" ) ); 399 assertTrue( values.contains( "2\t2\tB" ) ); 400 assertTrue( values.contains( "3\t3\tc" ) ); 401 assertTrue( values.contains( "3\t3\tC" ) ); 402 assertTrue( values.contains( "4\t4\td" ) ); 403 assertTrue( values.contains( "4\t4\tD" ) ); 404 assertTrue( values.contains( "5\t5\te" ) ); 405 assertTrue( values.contains( "5\t5\tE" ) ); 406 } 407 408 @Test 409 public void testFilter() throws Exception 410 { 411 getPlatform().copyFromLocal( inputFileApache ); 412 413 Tap source = getPlatform().getTextFile( inputFileApache ); 414 Tap sink = getPlatform().getTextFile( getOutputPath( "filter" ), SinkMode.REPLACE ); 415 416 Pipe pipe = new Pipe( "test" ); 417 418 Filter filter = new RegexFilter( "^68.*" ); 419 420 pipe = new Each( pipe, new Fields( "line" ), filter ); 421 422 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 423 424 flow.complete(); 425 426 validateLength( flow, 3 ); 427 } 428 429 @Test 430 public void testLogicFilter() throws Exception 431 { 432 getPlatform().copyFromLocal( inputFileApache ); 433 434 Tap source = getPlatform().getTextFile( inputFileApache ); 435 Tap sink = getPlatform().getTextFile( getOutputPath( "logicfilter" ), SinkMode.REPLACE ); 436 437 Pipe pipe = new Pipe( "test" ); 438 439 Filter filter = new And( new RegexFilter( "^68.*$" ), new RegexFilter( "^1000.*$" ) ); 440 441 pipe = new Each( pipe, new Fields( "line" ), filter ); 442 443 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 444 445 flow.complete(); 446 447 validateLength( flow, 3 ); 448 } 449 450 @Test 451 public void testFilterComplex() throws Exception 452 { 453 getPlatform().copyFromLocal( inputFileApache ); 454 455 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 456 Tap sink = getPlatform().getTextFile( getOutputPath( "filtercomplex" ), SinkMode.REPLACE ); 457 458 Pipe pipe = new Pipe( "test" ); 459 460 pipe = new Each( pipe, new Fields( "line" ), TestConstants.APACHE_COMMON_PARSER ); 461 462 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) ); 463 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) ); 464 465 pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL ); 466 467 pipe = new GroupBy( pipe, new Fields( "value" ) ); 468 469 pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) ); 470 471 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 472 473 flow.complete(); 474 475 validateLength( flow, 1, null ); 476 } 477 478 /** 479 * Intentionally filters all values out to test next mr job behaves 480 * 481 * @throws Exception 482 */ 483 @Test 484 public void testFilterAll() throws Exception 485 { 486 getPlatform().copyFromLocal( inputFileApache ); 487 488 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 489 Tap sink = getPlatform().getTextFile( getOutputPath( "filterall" ), SinkMode.REPLACE ); 490 491 Pipe pipe = new Pipe( "test" ); 492 493 String regex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$"; 494 Fields fieldDeclaration = new Fields( "ip", "time", "method", "event", "status", "size" ); 495 int[] groups = {1, 2, 3, 4, 5, 6}; 496 RegexParser function = new RegexParser( fieldDeclaration, regex, groups ); 497 pipe = new Each( pipe, new Fields( "line" ), function ); 498 499 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 500 501 pipe = new GroupBy( pipe, new Fields( "method" ) ); 502 503 pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL ); 504 505 pipe = new GroupBy( pipe, new Fields( "value" ) ); 506 507 pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) ); 508 509 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 510 511 flow.complete(); 512 513 validateLength( flow, 0, null ); 514 } 515 516// public void testLimitFilter() throws Exception 517// { 518// copyFromLocal( inputFileApache ); 519// 520// Tap source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileApache ); 521// Tap sink = new Lfs( new TextLine(), outputPath + "/limitfilter", true ); 522// 523// Pipe pipe = new Pipe( "test" ); 524// 525// Filter filter = new Limit( 7 ); 526// 527// pipe = new Each( pipe, new Fields( "line" ), filter ); 528// 529// Flow flow = new FlowConnector( getProperties() ).connect( source, sink, pipe ); 530// 531//// flow.writeDOT( "flow.dot" ); 532// 533// flow.complete(); 534// 535// validateLength( flow, 7, null ); 536// } 537 538 // 539 540 /* 541 * 542 * TODO: create (optional) Tez rule to consolidate into a single DAG. currently renders to two DAGs, one for each side 543 * 544 */ 545 @Test 546 public void testSplit() throws Exception 547 { 548 getPlatform().copyFromLocal( inputFileApache ); 549 550 // 46 192 551 552 Tap source = getPlatform().getTextFile( inputFileApache ); 553 Tap sink1 = getPlatform().getTextFile( getOutputPath( "split1" ), SinkMode.REPLACE ); 554 Tap sink2 = getPlatform().getTextFile( getOutputPath( "split2" ), SinkMode.REPLACE ); 555 556 Pipe pipe = new Pipe( "split" ); 557 558 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 559 560 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 561 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 562 563 Map sources = new HashMap(); 564 sources.put( "split", source ); 565 566 Map sinks = new HashMap(); 567 sinks.put( "left", sink1 ); 568 sinks.put( "right", sink2 ); 569 570 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 571 572 flow.complete(); 573 574 validateLength( flow, 1, "left" ); 575 validateLength( flow, 2, "right" ); 576 } 577 578 /** 579 * verifies non-safe rules apply in the proper place 580 * 581 * @throws Exception 582 */ 583 @Test 584 public void testSplitNonSafe() throws Exception 585 { 586 getPlatform().copyFromLocal( inputFileApache ); 587 588 // 46 192 589 590 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 591 Tap sink1 = getPlatform().getTextFile( getOutputPath( "nonsafesplit1" ), SinkMode.REPLACE ); 592 Tap sink2 = getPlatform().getTextFile( getOutputPath( "nonsafesplit2" ), SinkMode.REPLACE ); 593 594 Pipe pipe = new Pipe( "split" ); 595 596 // run job on non-safe operation, forces 3 mr jobs. 597 pipe = new Each( pipe, new TestFunction( new Fields( "ignore" ), new Tuple( 1 ), false ), new Fields( "line" ) ); 598 599 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 600 601 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 602 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 603 604 Map sources = new HashMap(); 605 sources.put( "split", source ); 606 607 Map sinks = new HashMap(); 608 sinks.put( "left", sink1 ); 609 sinks.put( "right", sink2 ); 610 611 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 612 613 flow.complete(); 614 615 validateLength( flow, 1, "left" ); 616 validateLength( flow, 2, "right" ); 617 } 618 619 @Test 620 public void testSplitSameSourceMerged() throws Exception 621 { 622 getPlatform().copyFromLocal( inputFileApache ); 623 624 // 46 192 625 626 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 627 Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemerged" ), SinkMode.REPLACE ); 628 629 Pipe pipe = new Pipe( "split" ); 630 631 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 632 633 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 634 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 635 636 Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) ); 637 638 Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged ); 639 640 flow.complete(); 641 642 validateLength( flow, 3 ); 643 } 644 645 /** 646 * verifies not inserting Identity between groups works 647 * 648 * @throws Exception 649 */ 650 @Test 651 public void testSplitOut() throws Exception 652 { 653 getPlatform().copyFromLocal( inputFileApache ); 654 655 Tap sourceLower = getPlatform().getTextFile( new Fields( "num", "line" ), inputFileApache ); 656 657 Map sources = new HashMap(); 658 659 sources.put( "lower1", sourceLower ); 660 661 // using null pos so all fields are written 662 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitout1" ), SinkMode.REPLACE ); 663 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitout2" ), SinkMode.REPLACE ); 664 665 Map sinks = new HashMap(); 666 667 sinks.put( "output1", sink1 ); 668 sinks.put( "output2", sink2 ); 669 670 Pipe pipeLower1 = new Pipe( "lower1" ); 671 672 Pipe left = new GroupBy( "output1", pipeLower1, new Fields( 0 ) ); 673 Pipe right = new GroupBy( "output2", left, new Fields( 0 ) ); 674 675 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, Pipe.pipes( left, right ) ); 676 677// flow.writeDOT( "spit.dot" ); 678 679 flow.complete(); 680 681 validateLength( flow, 10, "output1" ); 682 validateLength( flow, 10, "output2" ); 683 684 assertEquals( 10, asSet( flow, sink1 ).size() ); 685 assertEquals( 10, asSet( flow, sink2 ).size() ); 686 } 687 688 @Test 689 public void testSplitComplex() throws Exception 690 { 691 getPlatform().copyFromLocal( inputFileApache ); 692 693 // 46 192 694 695 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 696 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE ); 697 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE ); 698 699 Pipe pipe = new Pipe( "split" ); 700 701 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 702 703 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 704 705 pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) ); 706 707 pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) ); 708 709 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) ); 710 711 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) ); 712 713 Map sources = Cascades.tapsMap( "split", source ); 714 Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) ); 715 716 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 717 718 flow.complete(); 719 720 validateLength( flow, 1, "left" ); 721 validateLength( flow, 1, "right" ); 722 } 723 724 @Test 725 public void testSplitMultiple() throws Exception 726 { 727 getPlatform().copyFromLocal( inputFileApache ); 728 729 // 46 192 730 731 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 732 Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE ); 733 Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE ); 734 Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE ); 735 736 Pipe head = new Pipe( "split" ); 737 738 head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 739 740 head = new GroupBy( head, new Fields( "ip" ) ); 741 742 head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) ); 743 744 head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) ); 745 746 Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) ); 747 748 Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) ); 749 750 right = new GroupBy( right, new Fields( "ip" ) ); 751 752 Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() ); 753 754 Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() ); 755 756 Map sources = Cascades.tapsMap( "split", source ); 757 Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) ); 758 759 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight ); 760 761 flow.complete(); 762 763 validateLength( flow, 1, "left" ); 764 validateLength( flow, 1, "rightLeft" ); 765 validateLength( flow, 1, "rightRight" ); 766 } 767 768 @Test 769 public void testConcatenation() throws Exception 770 { 771 getPlatform().copyFromLocal( inputFileLower ); 772 getPlatform().copyFromLocal( inputFileUpper ); 773 774 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 775 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 776 777 Tap source = new MultiSourceTap( sourceLower, sourceUpper ); 778 779 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 780 781 // using null pos so all fields are written 782 Tap sink = getPlatform().getTextFile( getOutputPath( "complexconcat" ), SinkMode.REPLACE ); 783 784 Pipe pipe = new Each( new Pipe( "concat" ), new Fields( "line" ), splitter ); 785 786 Pipe splice = new GroupBy( pipe, new Fields( "num" ) ); 787 788 Flow countFlow = getPlatform().getFlowConnector().connect( source, sink, splice ); 789 790 countFlow.complete(); 791 792 validateLength( countFlow, 10, null ); 793 } 794 795 @Test 796 public void testGeneratorAggregator() throws Exception 797 { 798 getPlatform().copyFromLocal( inputFileApache ); 799 800 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 801 802 Pipe pipe = new Pipe( "test" ); 803 804 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 805 806 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 807 808 pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) ); 809 pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) ); 810 811 Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE ); 812 813 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 814 815 flow.complete(); 816 817 validateLength( flow, 8 * 2 * 3, null ); 818 } 819 820 @Test 821 public void testReplace() throws Exception 822 { 823 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 824 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "offset", "line" ), getOutputPath( "replace" ), SinkMode.REPLACE ); 825 826 Pipe pipe = new Pipe( "test" ); 827 828 Function parser = new RegexParser( new Fields( 0 ), "^[^ ]*" ); 829 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.REPLACE ); 830 pipe = new Each( pipe, new Fields( "line" ), new Identity( Fields.ARGS ), Fields.REPLACE ); 831 pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "line" ) ), Fields.REPLACE ); 832 833 pipe = new Each( pipe, new Debug( true ) ); 834 835 Flow flow = getPlatform().getFlowConnector( disableDebug() ).connect( source, sink, pipe ); 836 837 flow.complete(); 838 839 validateLength( flow, 10, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 840 } 841 842 @Test 843 public void testSwap() throws Exception 844 { 845 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 846 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE ); 847 848 Pipe pipe = new Pipe( "test" ); 849 850 Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" ); 851 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP ); 852 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 853 pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) ); 854 pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP ); 855 856 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 857 858 flow.complete(); 859 860 validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 861 } 862 863 @Test 864 public void testNone() throws Exception 865 { 866 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 867 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ip" ), getOutputPath( "none" ), SinkMode.REPLACE ); 868 869 Pipe pipe = new Pipe( "test" ); 870 871 Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" ); 872 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.ALL ); 873 pipe = new Each( pipe, new Fields( "line" ), new NoOp(), Fields.SWAP ); // declares Fields.NONE 874 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 875 pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) ); 876 pipe = new Each( pipe, Fields.NONE, new Insert( new Fields( "ipaddress" ), "1.2.3.4" ), Fields.ALL ); 877 878 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 879 880 flow.complete(); 881 882 validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 883 } 884 885 /** 886 * this tests a merge on two pipes with the same source and name. 887 * 888 * @throws Exception 889 */ 890 @Test 891 public void testSplitSameSourceMergedSameName() throws Exception 892 { 893 getPlatform().copyFromLocal( inputFileApache ); 894 895 // 46 192 896 897 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 898 Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemergedsamename" ), SinkMode.REPLACE ); 899 900 Pipe pipe = new Pipe( "split" ); 901 902 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 903 904 Pipe left = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 905 Pipe right = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 906 907 Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) ); 908 909 Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged ); 910 911 flow.complete(); 912 913 validateLength( flow, 3 ); 914 } 915 916 /** 917 * Catches failure to properly resolve the grouping fields as incoming to the second group-by 918 * 919 * @throws Exception 920 */ 921 @Test 922 public void testGroupGroup() throws Exception 923 { 924 getPlatform().copyFromLocal( inputFileApache ); 925 926 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 927 928 Pipe pipe = new Pipe( "test" ); 929 930 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip", String.class ), "^[^ ]*" ), new Fields( "ip" ) ); 931 932 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 933 934 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) ); 935 936 pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "count" ) ); 937 938 Tap sink = getPlatform().getTextFile( getOutputPath( "groupgroup" ), SinkMode.REPLACE ); 939 940 Map<Object, Object> properties = getProperties(); 941 942 properties.put( "cascading.serialization.types.required", "true" ); 943 944 Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, pipe ); 945 946 flow.complete(); 947 948 validateLength( flow, 8, null ); 949 } 950 951 public static class LowerComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable 952 { 953 @Override 954 public int compare( Comparable lhs, Comparable rhs ) 955 { 956 return lhs.toString().toLowerCase().compareTo( rhs.toString().toLowerCase() ); 957 } 958 959 @Override 960 public int hashCode( Comparable value ) 961 { 962 if( value == null ) 963 return 0; 964 965 return value.toString().toLowerCase().hashCode(); 966 } 967 } 968 969 @Test 970 public void testGroupByInsensitive() throws Exception 971 { 972 getPlatform().copyFromLocal( inputFileLower ); 973 getPlatform().copyFromLocal( inputFileUpper ); 974 975 Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 976 Tap sourceUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper ); 977 978 Map sources = new HashMap(); 979 980 sources.put( "lower", sourceLower ); 981 sources.put( "upper", sourceUpper ); 982 983 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "insensitivegrouping" + NONDETERMINISTIC ), SinkMode.REPLACE ); 984 985 Pipe pipeLower = new Pipe( "lower" ); 986 Pipe pipeUpper = new Pipe( "upper" ); 987 988 Pipe merge = new Merge( pipeLower, pipeUpper ); 989 990 Fields charFields = new Fields( "char" ); 991 charFields.setComparator( "char", new LowerComparator() ); 992 993 Pipe splice = new GroupBy( "groupby", merge, charFields ); 994 995 splice = new Every( splice, new Fields( "char" ), new Count() ); 996 997 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 998 999 flow.complete(); 1000 1001 // we can't guarantee if the grouping key will be upper or lower 1002 validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s2$" ) ); 1003 } 1004 }