001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading; 023 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.regex.Pattern; 033 034import cascading.cascade.Cascades; 035import cascading.flow.Flow; 036import cascading.operation.Debug; 037import cascading.operation.Filter; 038import cascading.operation.Function; 039import cascading.operation.Identity; 040import cascading.operation.Insert; 041import cascading.operation.NoOp; 042import cascading.operation.aggregator.Count; 043import cascading.operation.aggregator.First; 044import cascading.operation.expression.ExpressionFunction; 045import cascading.operation.filter.And; 046import cascading.operation.function.UnGroup; 047import cascading.operation.regex.RegexFilter; 048import cascading.operation.regex.RegexParser; 049import cascading.operation.regex.RegexSplitter; 050import cascading.pipe.Each; 051import cascading.pipe.Every; 052import cascading.pipe.GroupBy; 053import cascading.pipe.Merge; 054import cascading.pipe.Pipe; 055import cascading.tap.MultiSourceTap; 056import cascading.tap.SinkMode; 057import cascading.tap.Tap; 058import cascading.tuple.Fields; 059import cascading.tuple.Hasher; 060import cascading.tuple.Tuple; 061import org.junit.Test; 062 063import static cascading.ComparePlatformsTest.NONDETERMINISTIC; 064import static data.InputData.*; 065 066public class FieldedPipesPlatformTest extends PlatformTestCase 067 { 068 public FieldedPipesPlatformTest() 069 { 070 super( true, 5, 3 ); // leave cluster testing enabled 071 } 072 073 @Test 074 public void testSimpleGroup() throws Exception 075 { 076 getPlatform().copyFromLocal( inputFileApache ); 077 078 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 079 080 Pipe pipe = new Pipe( "test" ); 081 082 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 083 084 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 085 086 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) ); 087 088 Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE ); 089 090 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 091 092 flow.complete(); 093 094 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check 095 validateLength( flow, 8, null ); 096 } 097 098 @Test 099 public void testSimpleChain() throws Exception 100 { 101 getPlatform().copyFromLocal( inputFileApache ); 102 103 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 104 105 Pipe pipe = new Pipe( "test" ); 106 107 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 108 109 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 110 111 pipe = new Every( pipe, new Count( new Fields( "count1" ) ) ); 112 pipe = new Every( pipe, new Count( new Fields( "count2" ) ) ); 113 pipe = new Every( pipe, new Count( new Fields( "count3" ) ) ); 114 pipe = new Every( pipe, new Count( new Fields( "count4" ) ) ); 115 116 Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "simplechain" ), SinkMode.REPLACE ); 117 118 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 119 120 flow.complete(); 121 122 validateLength( flow, 8, 5 ); 123 } 124 125 @Test 126 public void testChainEndingWithEach() throws Exception 127 { 128 getPlatform().copyFromLocal( inputFileApache ); 129 130 Pipe pipe = new Pipe( "test" ); 131 132 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 133 134 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 135 136 pipe = new Every( pipe, new Count( new Fields( "count1" ) ) ); 137 pipe = new Every( pipe, new Count( new Fields( "count2" ) ) ); 138 139 pipe = new Each( pipe, new Fields( "count1", "count2" ), new ExpressionFunction( new Fields( "sum" ), "count1 + count2", int.class ), Fields.ALL ); 140 141 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 142 Tap sink = getPlatform().getTextFile( getOutputPath( "chaineach" ), SinkMode.REPLACE ); 143 144 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 145 146 flow.complete(); 147 148 validateLength( flow, 8, null ); 149 } 150 151 // also tests the RegexSplitter 152 153 @Test 154 public void testNoGroup() throws Exception 155 { 156 getPlatform().copyFromLocal( inputFileApache ); 157 158 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache ); 159 160 Pipe pipe = new Pipe( "test" ); 161 162 pipe = new Each( pipe, new RegexSplitter( "\\s+" ), new Fields( 1 ) ); 163 164 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "nogroup" ), SinkMode.REPLACE ); 165 166 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 167 168 flow.complete(); 169 170 validateLength( flow, 10, null ); 171 172 List<Tuple> results = getSinkAsList( flow ); 173 174 assertTrue( results.contains( new Tuple( "75.185.76.245" ) ) ); 175 } 176 177 @Test 178 public void testCopy() throws Exception 179 { 180 getPlatform().copyFromLocal( inputFileApache ); 181 182 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache ); 183 184 Pipe pipe = new Pipe( "test" ); 185 186 Tap sink = getPlatform().getTextFile( getOutputPath( "copy" ), SinkMode.REPLACE ); 187 188 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 189 190 flow.complete(); 191 192 validateLength( flow, 10, null ); 193 } 194 195 @Test 196 public void testSimpleMerge() throws Exception 197 { 198 getPlatform().copyFromLocal( inputFileLower ); 199 getPlatform().copyFromLocal( inputFileUpper ); 200 201 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 202 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 203 204 Map sources = new HashMap(); 205 206 sources.put( "lower", sourceLower ); 207 sources.put( "upper", sourceUpper ); 208 209 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 210 211 // using null pos so all fields are written 212 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "simplemerge" ), SinkMode.REPLACE ); 213 214 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 215 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 216 217 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false ); 218 219 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 220 221 flow.complete(); 222 223 validateLength( flow, 10 ); 224 225 Collection results = getSinkAsList( flow ); 226 227 assertTrue( "missing value", results.contains( new Tuple( "1\ta" ) ) ); 228 assertTrue( "missing value", results.contains( new Tuple( "1\tA" ) ) ); 229 assertTrue( "missing value", results.contains( new Tuple( "2\tb" ) ) ); 230 assertTrue( "missing value", results.contains( new Tuple( "2\tB" ) ) ); 231 assertTrue( "missing value", results.contains( new Tuple( "3\tc" ) ) ); 232 assertTrue( "missing value", results.contains( new Tuple( "3\tC" ) ) ); 233 } 234 235 /** 236 * Specifically tests GroupBy will return the correct grouping fields to the following Every 237 * <p/> 238 * additionally tests secondary sorting during merging 239 * 240 * @throws Exception 241 */ 242 @Test 243 public void testSimpleMergeThree() throws Exception 244 { 245 getPlatform().copyFromLocal( inputFileLower ); 246 getPlatform().copyFromLocal( inputFileUpper ); 247 getPlatform().copyFromLocal( inputFileLowerOffset ); 248 249 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 250 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 251 Tap sourceLowerOffset = getPlatform().getTextFile( inputFileLowerOffset ); 252 253 Map sources = new HashMap(); 254 255 sources.put( "lower", sourceLower ); 256 sources.put( "upper", sourceUpper ); 257 sources.put( "offset", sourceLowerOffset ); 258 259 Tap sink = getPlatform().getDelimitedFile( Fields.ALL, "\t", getOutputPath( "simplemergethree" ), SinkMode.REPLACE ); 260 261 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 262 263 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 264 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 265 Pipe pipeOffset = new Each( new Pipe( "offset" ), new Fields( "line" ), splitter ); 266 267 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper, pipeOffset ), new Fields( "num" ), new Fields( "char" ) ); 268 269 splice = new Every( splice, new Fields( "char" ), new First( new Fields( "first" ) ) ); 270 271 splice = new Each( splice, new Fields( "num", "first" ), new Identity() ); 272 273 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 274 275 flow.complete(); 276 277 validateLength( flow, 6 ); 278 279 List<Tuple> tuples = getSinkAsList( flow ); 280 281 assertTrue( tuples.contains( new Tuple( "1", "A" ) ) ); 282 assertTrue( tuples.contains( new Tuple( "2", "B" ) ) ); 283 assertTrue( tuples.contains( new Tuple( "3", "C" ) ) ); 284 assertTrue( tuples.contains( new Tuple( "4", "D" ) ) ); 285 assertTrue( tuples.contains( new Tuple( "5", "E" ) ) ); 286 assertTrue( tuples.contains( new Tuple( "6", "c" ) ) ); 287 } 288 289 @Test 290 public void testSameSourceMerge() throws Exception 291 { 292 getPlatform().copyFromLocal( inputFileLower ); 293 294 Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 295 296 Map sources = new HashMap(); 297 298 sources.put( "lower", sourceLower ); 299 sources.put( "upper", sourceLower ); 300 301 // using null pos so all fields are written 302 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE ); 303 304 Pipe pipeLower = new Pipe( "lower" ); 305 Pipe pipeUpper = new Pipe( "upper" ); 306 307 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false ); 308 309 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 310 311 flow.complete(); 312 313 validateLength( flow, 10 ); 314 315 Collection results = getSinkAsList( flow ); 316 317 assertEquals( "missing value", 2, Collections.frequency( results, new Tuple( "1\ta" ) ) ); 318 assertEquals( "missing value", 2, Collections.frequency( results, new Tuple( "2\tb" ) ) ); 319 assertEquals( "missing value", 2, Collections.frequency( results, new Tuple( "3\tc" ) ) ); 320 } 321 322 /** 323 * same test as MergePipesTest, but to test that chained groupby don't exhibit similar failures 324 * 325 * @throws Exception 326 */ 327 @Test 328 public void testSameSourceMergeThreeChainGroup() throws Exception 329 { 330 getPlatform().copyFromLocal( inputFileLower ); 331 332 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 333 334 Map sources = new HashMap(); 335 336 sources.put( "split", sourceLower ); 337 338 Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE ); 339 340 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 341 342 Pipe pipe = new Pipe( "split" ); 343 344 Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter ); 345 Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter ); 346 Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter ); 347 348 //put group before merge to test path counts 349 Pipe splice = new GroupBy( Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ) ); 350 351 // this group has its incoming paths counted, gated by the previous group 352 splice = new GroupBy( Pipe.pipes( splice, pipeOffset ), new Fields( "num" ) ); 353 354 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 355 356 if( getPlatform().isMapReduce() ) 357 assertEquals( "wrong num jobs", 2, flow.getFlowSteps().size() ); 358 359 flow.complete(); 360 361 validateLength( flow, 15 ); 362 } 363 364 @Test 365 public void testUnGroup() throws Exception 366 { 367 getPlatform().copyFromLocal( inputFileJoined ); 368 369 Tap source = getPlatform().getTextFile( inputFileJoined ); 370 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE ); 371 372 Pipe pipe = new Pipe( "test" ); 373 374 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) ); 375 376 pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) ); 377 378 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 379 380 flow.complete(); 381 382 validateLength( flow, 10 ); 383 } 384 385 @Test 386 public void testUnGroupAnon() throws Exception 387 { 388 getPlatform().copyFromLocal( inputFileJoined ); 389 390 Tap source = getPlatform().getTextFile( inputFileJoined ); 391 Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE ); 392 393 Pipe pipe = new Pipe( "test" ); 394 395 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) ); 396 397 pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) ); 398 399 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 400 401 flow.complete(); 402 403 validateLength( flow, 10 ); 404 } 405 406 @Test 407 public void testUnGroupBySize() throws Exception 408 { 409 getPlatform().copyFromLocal( inputFileJoinedExtra ); 410 411 Tap source = getPlatform().getTextFile( inputFileJoinedExtra ); 412 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped_size" ), SinkMode.REPLACE ); 413 414 Pipe pipe = new Pipe( "test" ); 415 416 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num1", "num2", "lower", "upper" ) ) ); 417 418 pipe = new Each( pipe, new UnGroup( new Fields( "num1", "num2", "char" ), new Fields( "num1", "num2" ), 1 ) ); 419 420 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 421 422 flow.complete(); 423 424 List<Tuple> tuples = asList( flow, sink ); 425 assertEquals( 10, tuples.size() ); 426 427 List<Object> values = new ArrayList<Object>(); 428 for( Tuple tuple : tuples ) 429 values.add( tuple.getObject( 1 ) ); 430 431 assertTrue( values.contains( "1\t1\ta" ) ); 432 assertTrue( values.contains( "1\t1\tA" ) ); 433 assertTrue( values.contains( "2\t2\tb" ) ); 434 assertTrue( values.contains( "2\t2\tB" ) ); 435 assertTrue( values.contains( "3\t3\tc" ) ); 436 assertTrue( values.contains( "3\t3\tC" ) ); 437 assertTrue( values.contains( "4\t4\td" ) ); 438 assertTrue( values.contains( "4\t4\tD" ) ); 439 assertTrue( values.contains( "5\t5\te" ) ); 440 assertTrue( values.contains( "5\t5\tE" ) ); 441 } 442 443 @Test 444 public void testFilter() throws Exception 445 { 446 getPlatform().copyFromLocal( inputFileApache ); 447 448 Tap source = getPlatform().getTextFile( inputFileApache ); 449 Tap sink = getPlatform().getTextFile( getOutputPath( "filter" ), SinkMode.REPLACE ); 450 451 Pipe pipe = new Pipe( "test" ); 452 453 Filter filter = new RegexFilter( "^68.*" ); 454 455 pipe = new Each( pipe, new Fields( "line" ), filter ); 456 457 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 458 459 flow.complete(); 460 461 validateLength( flow, 3 ); 462 } 463 464 @Test 465 public void testLogicFilter() throws Exception 466 { 467 getPlatform().copyFromLocal( inputFileApache ); 468 469 Tap source = getPlatform().getTextFile( inputFileApache ); 470 Tap sink = getPlatform().getTextFile( getOutputPath( "logicfilter" ), SinkMode.REPLACE ); 471 472 Pipe pipe = new Pipe( "test" ); 473 474 Filter filter = new And( new RegexFilter( "^68.*$" ), new RegexFilter( "^1000.*$" ) ); 475 476 pipe = new Each( pipe, new Fields( "line" ), filter ); 477 478 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 479 480 flow.complete(); 481 482 validateLength( flow, 3 ); 483 } 484 485 @Test 486 public void testFilterComplex() throws Exception 487 { 488 getPlatform().copyFromLocal( inputFileApache ); 489 490 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 491 Tap sink = getPlatform().getTextFile( getOutputPath( "filtercomplex" ), SinkMode.REPLACE ); 492 493 Pipe pipe = new Pipe( "test" ); 494 495 pipe = new Each( pipe, new Fields( "line" ), TestConstants.APACHE_COMMON_PARSER ); 496 497 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) ); 498 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) ); 499 500 pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL ); 501 502 pipe = new GroupBy( pipe, new Fields( "value" ) ); 503 504 pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) ); 505 506 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 507 508 flow.complete(); 509 510 validateLength( flow, 1, null ); 511 } 512 513 /** 514 * Intentionally filters all values out to test next mr job behaves 515 * 516 * @throws Exception 517 */ 518 @Test 519 public void testFilterAll() throws Exception 520 { 521 getPlatform().copyFromLocal( inputFileApache ); 522 523 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 524 Tap sink = getPlatform().getTextFile( getOutputPath( "filterall" ), SinkMode.REPLACE ); 525 526 Pipe pipe = new Pipe( "test" ); 527 528 String regex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$"; 529 Fields fieldDeclaration = new Fields( "ip", "time", "method", "event", "status", "size" ); 530 int[] groups = {1, 2, 3, 4, 5, 6}; 531 RegexParser function = new RegexParser( fieldDeclaration, regex, groups ); 532 pipe = new Each( pipe, new Fields( "line" ), function ); 533 534 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 535 536 pipe = new GroupBy( pipe, new Fields( "method" ) ); 537 538 pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL ); 539 540 pipe = new GroupBy( pipe, new Fields( "value" ) ); 541 542 pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) ); 543 544 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 545 546 flow.complete(); 547 548 validateLength( flow, 0, null ); 549 } 550 551// public void testLimitFilter() throws Exception 552// { 553// copyFromLocal( inputFileApache ); 554// 555// Tap source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileApache ); 556// Tap sink = new Lfs( new TextLine(), outputPath + "/limitfilter", true ); 557// 558// Pipe pipe = new Pipe( "test" ); 559// 560// Filter filter = new Limit( 7 ); 561// 562// pipe = new Each( pipe, new Fields( "line" ), filter ); 563// 564// Flow flow = new FlowConnector( getProperties() ).connect( source, sink, pipe ); 565// 566//// flow.writeDOT( "flow.dot" ); 567// 568// flow.complete(); 569// 570// validateLength( flow, 7, null ); 571// } 572 573 // 574 575 /* 576 * 577 * TODO: create (optional) Tez rule to consolidate into a single DAG. currently renders to two DAGs, one for each side 578 * 579 */ 580 @Test 581 public void testSplit() throws Exception 582 { 583 getPlatform().copyFromLocal( inputFileApache ); 584 585 // 46 192 586 587 Tap source = getPlatform().getTextFile( inputFileApache ); 588 Tap sink1 = getPlatform().getTextFile( getOutputPath( "split1" ), SinkMode.REPLACE ); 589 Tap sink2 = getPlatform().getTextFile( getOutputPath( "split2" ), SinkMode.REPLACE ); 590 591 Pipe pipe = new Pipe( "split" ); 592 593 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 594 595 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 596 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 597 598 Map sources = new HashMap(); 599 sources.put( "split", source ); 600 601 Map sinks = new HashMap(); 602 sinks.put( "left", sink1 ); 603 sinks.put( "right", sink2 ); 604 605 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 606 607 flow.complete(); 608 609 validateLength( flow, 1, "left" ); 610 validateLength( flow, 2, "right" ); 611 } 612 613 /** 614 * verifies non-safe rules apply in the proper place 615 * 616 * @throws Exception 617 */ 618 @Test 619 public void testSplitNonSafe() throws Exception 620 { 621 getPlatform().copyFromLocal( inputFileApache ); 622 623 // 46 192 624 625 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 626 Tap sink1 = getPlatform().getTextFile( getOutputPath( "nonsafesplit1" ), SinkMode.REPLACE ); 627 Tap sink2 = getPlatform().getTextFile( getOutputPath( "nonsafesplit2" ), SinkMode.REPLACE ); 628 629 Pipe pipe = new Pipe( "split" ); 630 631 // run job on non-safe operation, forces 3 mr jobs. 632 pipe = new Each( pipe, new TestFunction( new Fields( "ignore" ), new Tuple( 1 ), false ), new Fields( "line" ) ); 633 634 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 635 636 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 637 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 638 639 Map sources = new HashMap(); 640 sources.put( "split", source ); 641 642 Map sinks = new HashMap(); 643 sinks.put( "left", sink1 ); 644 sinks.put( "right", sink2 ); 645 646 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 647 648 flow.complete(); 649 650 validateLength( flow, 1, "left" ); 651 validateLength( flow, 2, "right" ); 652 } 653 654 @Test 655 public void testSplitSameSourceMerged() throws Exception 656 { 657 getPlatform().copyFromLocal( inputFileApache ); 658 659 // 46 192 660 661 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 662 Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemerged" ), SinkMode.REPLACE ); 663 664 Pipe pipe = new Pipe( "split" ); 665 666 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 667 668 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 669 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 670 671 Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) ); 672 673 Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged ); 674 675 flow.complete(); 676 677 validateLength( flow, 3 ); 678 } 679 680 /** 681 * verifies not inserting Identity between groups works 682 * 683 * @throws Exception 684 */ 685 @Test 686 public void testSplitOut() throws Exception 687 { 688 getPlatform().copyFromLocal( inputFileApache ); 689 690 Tap sourceLower = getPlatform().getTextFile( new Fields( "num", "line" ), inputFileApache ); 691 692 Map sources = new HashMap(); 693 694 sources.put( "lower1", sourceLower ); 695 696 // using null pos so all fields are written 697 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitout1" ), SinkMode.REPLACE ); 698 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitout2" ), SinkMode.REPLACE ); 699 700 Map sinks = new HashMap(); 701 702 sinks.put( "output1", sink1 ); 703 sinks.put( "output2", sink2 ); 704 705 Pipe pipeLower1 = new Pipe( "lower1" ); 706 707 Pipe left = new GroupBy( "output1", pipeLower1, new Fields( 0 ) ); 708 Pipe right = new GroupBy( "output2", left, new Fields( 0 ) ); 709 710 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, Pipe.pipes( left, right ) ); 711 712// flow.writeDOT( "spit.dot" ); 713 714 flow.complete(); 715 716 validateLength( flow, 10, "output1" ); 717 validateLength( flow, 10, "output2" ); 718 719 assertEquals( 10, asSet( flow, sink1 ).size() ); 720 assertEquals( 10, asSet( flow, sink2 ).size() ); 721 } 722 723 @Test 724 public void testSplitComplex() throws Exception 725 { 726 getPlatform().copyFromLocal( inputFileApache ); 727 728 // 46 192 729 730 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 731 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE ); 732 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE ); 733 734 Pipe pipe = new Pipe( "split" ); 735 736 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 737 738 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 739 740 pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) ); 741 742 pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) ); 743 744 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) ); 745 746 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) ); 747 748 Map sources = Cascades.tapsMap( "split", source ); 749 Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) ); 750 751 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 752 753 flow.complete(); 754 755 validateLength( flow, 1, "left" ); 756 validateLength( flow, 1, "right" ); 757 } 758 759 @Test 760 public void testSplitMultiple() throws Exception 761 { 762 getPlatform().copyFromLocal( inputFileApache ); 763 764 // 46 192 765 766 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 767 Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE ); 768 Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE ); 769 Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE ); 770 771 Pipe head = new Pipe( "split" ); 772 773 head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 774 775 head = new GroupBy( head, new Fields( "ip" ) ); 776 777 head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) ); 778 779 head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) ); 780 781 Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) ); 782 783 Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) ); 784 785 right = new GroupBy( right, new Fields( "ip" ) ); 786 787 Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() ); 788 789 Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() ); 790 791 Map sources = Cascades.tapsMap( "split", source ); 792 Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) ); 793 794 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight ); 795 796 flow.complete(); 797 798 validateLength( flow, 1, "left" ); 799 validateLength( flow, 1, "rightLeft" ); 800 validateLength( flow, 1, "rightRight" ); 801 } 802 803 @Test 804 public void testConcatenation() throws Exception 805 { 806 getPlatform().copyFromLocal( inputFileLower ); 807 getPlatform().copyFromLocal( inputFileUpper ); 808 809 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 810 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 811 812 Tap source = new MultiSourceTap( sourceLower, sourceUpper ); 813 814 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 815 816 // using null pos so all fields are written 817 Tap sink = getPlatform().getTextFile( getOutputPath( "complexconcat" ), SinkMode.REPLACE ); 818 819 Pipe pipe = new Each( new Pipe( "concat" ), new Fields( "line" ), splitter ); 820 821 Pipe splice = new GroupBy( pipe, new Fields( "num" ) ); 822 823 Flow countFlow = getPlatform().getFlowConnector().connect( source, sink, splice ); 824 825 countFlow.complete(); 826 827 validateLength( countFlow, 10, null ); 828 } 829 830 @Test 831 public void testGeneratorAggregator() throws Exception 832 { 833 getPlatform().copyFromLocal( inputFileApache ); 834 835 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 836 837 Pipe pipe = new Pipe( "test" ); 838 839 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 840 841 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 842 843 pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) ); 844 pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) ); 845 846 Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE ); 847 848 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 849 850 flow.complete(); 851 852 validateLength( flow, 8 * 2 * 3, null ); 853 } 854 855 @Test 856 public void testReplace() throws Exception 857 { 858 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 859 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "offset", "line" ), getOutputPath( "replace" ), SinkMode.REPLACE ); 860 861 Pipe pipe = new Pipe( "test" ); 862 863 Function parser = new RegexParser( new Fields( 0 ), "^[^ ]*" ); 864 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.REPLACE ); 865 pipe = new Each( pipe, new Fields( "line" ), new Identity( Fields.ARGS ), Fields.REPLACE ); 866 pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "line" ) ), Fields.REPLACE ); 867 868 pipe = new Each( pipe, new Debug( true ) ); 869 870 Flow flow = getPlatform().getFlowConnector( disableDebug() ).connect( source, sink, pipe ); 871 872 flow.complete(); 873 874 validateLength( flow, 10, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 875 } 876 877 @Test 878 public void testSwap() throws Exception 879 { 880 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 881 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE ); 882 883 Pipe pipe = new Pipe( "test" ); 884 885 Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" ); 886 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP ); 887 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 888 pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) ); 889 pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP ); 890 891 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 892 893 flow.complete(); 894 895 validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 896 } 897 898 @Test 899 public void testNone() throws Exception 900 { 901 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 902 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ip" ), getOutputPath( "none" ), SinkMode.REPLACE ); 903 904 Pipe pipe = new Pipe( "test" ); 905 906 Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" ); 907 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.ALL ); 908 pipe = new Each( pipe, new Fields( "line" ), new NoOp(), Fields.SWAP ); // declares Fields.NONE 909 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 910 pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) ); 911 pipe = new Each( pipe, Fields.NONE, new Insert( new Fields( "ipaddress" ), "1.2.3.4" ), Fields.ALL ); 912 913 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 914 915 flow.complete(); 916 917 validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 918 } 919 920 /** 921 * this tests a merge on two pipes with the same source and name. 922 * 923 * @throws Exception 924 */ 925 @Test 926 public void testSplitSameSourceMergedSameName() throws Exception 927 { 928 getPlatform().copyFromLocal( inputFileApache ); 929 930 // 46 192 931 932 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 933 Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemergedsamename" ), SinkMode.REPLACE ); 934 935 Pipe pipe = new Pipe( "split" ); 936 937 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 938 939 Pipe left = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 940 Pipe right = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 941 942 Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) ); 943 944 Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged ); 945 946 flow.complete(); 947 948 validateLength( flow, 3 ); 949 } 950 951 /** 952 * Catches failure to properly resolve the grouping fields as incoming to the second group-by 953 * 954 * @throws Exception 955 */ 956 @Test 957 public void testGroupGroup() throws Exception 958 { 959 getPlatform().copyFromLocal( inputFileApache ); 960 961 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 962 963 Pipe pipe = new Pipe( "test" ); 964 965 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip", String.class ), "^[^ ]*" ), new Fields( "ip" ) ); 966 967 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 968 969 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) ); 970 971 pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "count" ) ); 972 973 Tap sink = getPlatform().getTextFile( getOutputPath( "groupgroup" ), SinkMode.REPLACE ); 974 975 Map<Object, Object> properties = getProperties(); 976 977 properties.put( "cascading.serialization.types.required", "true" ); 978 979 Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, pipe ); 980 981 flow.complete(); 982 983 validateLength( flow, 8, null ); 984 } 985 986 public static class LowerComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable 987 { 988 @Override 989 public int compare( Comparable lhs, Comparable rhs ) 990 { 991 return lhs.toString().toLowerCase().compareTo( rhs.toString().toLowerCase() ); 992 } 993 994 @Override 995 public int hashCode( Comparable value ) 996 { 997 if( value == null ) 998 return 0; 999 1000 return value.toString().toLowerCase().hashCode(); 1001 } 1002 } 1003 1004 @Test 1005 public void testGroupByInsensitive() throws Exception 1006 { 1007 getPlatform().copyFromLocal( inputFileLower ); 1008 getPlatform().copyFromLocal( inputFileUpper ); 1009 1010 Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1011 Tap sourceUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper ); 1012 1013 Map sources = new HashMap(); 1014 1015 sources.put( "lower", sourceLower ); 1016 sources.put( "upper", sourceUpper ); 1017 1018 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "insensitivegrouping" + NONDETERMINISTIC ), SinkMode.REPLACE ); 1019 1020 Pipe pipeLower = new Pipe( "lower" ); 1021 Pipe pipeUpper = new Pipe( "upper" ); 1022 1023 Pipe merge = new Merge( pipeLower, pipeUpper ); 1024 1025 Fields charFields = new Fields( "char" ); 1026 charFields.setComparator( "char", new LowerComparator() ); 1027 1028 Pipe splice = new GroupBy( "groupby", merge, charFields ); 1029 1030 splice = new Every( splice, new Fields( "char" ), new Count() ); 1031 1032 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1033 1034 flow.complete(); 1035 1036 // we can't guarantee if the grouping key will be upper or lower 1037 validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s2$" ) ); 1038 } 1039 }