001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe; 022 023 import java.lang.reflect.Type; 024 import java.util.ArrayList; 025 import java.util.Comparator; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.Iterator; 029 import java.util.LinkedHashMap; 030 import java.util.List; 031 import java.util.Map; 032 import java.util.Set; 033 034 import cascading.flow.FlowElement; 035 import cascading.flow.planner.DeclaresResults; 036 import cascading.flow.planner.Scope; 037 import cascading.pipe.joiner.BufferJoin; 038 import cascading.pipe.joiner.InnerJoin; 039 import cascading.pipe.joiner.Joiner; 040 import cascading.tuple.Fields; 041 import cascading.tuple.FieldsResolverException; 042 import cascading.tuple.TupleException; 043 import cascading.tuple.coerce.Coercions; 044 import cascading.tuple.type.CoercibleType; 045 import cascading.util.Util; 046 047 import static java.util.Arrays.asList; 048 049 /** 050 * The base class for {@link GroupBy}, {@link CoGroup}, {@link Merge}, and {@link HashJoin}. This class should not be used directly. 051 * 052 * @see GroupBy 053 * @see CoGroup 054 * @see Merge 055 * @see HashJoin 056 */ 057 public class Splice extends Pipe 058 { 059 static enum Kind 060 { 061 GroupBy, CoGroup, Merge, Join 062 } 063 064 private Kind kind; 065 /** Field spliceName */ 066 private String spliceName; 067 /** Field pipes */ 068 private final List<Pipe> pipes = new ArrayList<Pipe>(); 069 /** Field groupFieldsMap */ 070 protected final Map<String, Fields> keyFieldsMap = new LinkedHashMap<String, Fields>(); // keep order 071 /** Field sortFieldsMap */ 072 protected Map<String, Fields> sortFieldsMap = new LinkedHashMap<String, Fields>(); // keep order 073 /** Field reverseOrder */ 074 private boolean reverseOrder = false; 075 /** Field declaredFields */ 076 protected Fields declaredFields; 077 /** Field resultGroupFields */ 078 protected Fields resultGroupFields; 079 /** Field repeat */ 080 private int numSelfJoins = 0; 081 /** Field coGrouper */ 082 private Joiner joiner; 083 084 /** Field pipePos */ 085 private transient Map<String, Integer> pipePos; 086 087 /** 088 * Constructor Splice creates a new Splice instance. 089 * 090 * @param lhs of type Pipe 091 * @param lhsGroupFields of type Fields 092 * @param rhs of type Pipe 093 * @param rhsGroupFields of type Fields 094 * @param declaredFields of type Fields 095 */ 096 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields ) 097 { 098 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, null, null ); 099 } 100 101 /** 102 * Constructor Splice creates a new Splice instance. 103 * 104 * @param lhs of type Pipe 105 * @param lhsGroupFields of type Fields 106 * @param rhs of type Pipe 107 * @param rhsGroupFields of type Fields 108 * @param declaredFields of type Fields 109 * @param resultGroupFields of type Fields 110 */ 111 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields ) 112 { 113 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, null ); 114 } 115 116 /** 117 * Constructor Splice creates a new Splice instance. 118 * 119 * @param lhs of type Pipe 120 * @param lhsGroupFields of type Fields 121 * @param rhs of type Pipe 122 * @param rhsGroupFields of type Fields 123 * @param declaredFields of type Fields 124 * @param joiner of type CoGrouper 125 */ 126 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner ) 127 { 128 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, joiner ); 129 } 130 131 /** 132 * Constructor Splice creates a new Splice instance. 133 * 134 * @param lhs of type Pipe 135 * @param lhsGroupFields of type Fields 136 * @param rhs of type Pipe 137 * @param rhsGroupFields of type Fields 138 * @param declaredFields of type Fields 139 * @param resultGroupFields of type Fields 140 * @param joiner of type Joiner 141 */ 142 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 143 { 144 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, resultGroupFields, joiner ); 145 } 146 147 /** 148 * Constructor Splice creates a new Splice instance. 149 * 150 * @param lhs of type Pipe 151 * @param lhsGroupFields of type Fields 152 * @param rhs of type Pipe 153 * @param rhsGroupFields of type Fields 154 * @param joiner of type CoGrouper 155 */ 156 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner ) 157 { 158 this( lhs, lhsGroupFields, rhs, rhsGroupFields, null, joiner ); 159 } 160 161 /** 162 * Constructor Splice creates a new Splice instance. 163 * 164 * @param lhs of type Pipe 165 * @param lhsGroupFields of type Fields 166 * @param rhs of type Pipe 167 * @param rhsGroupFields of type Fields 168 */ 169 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields ) 170 { 171 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ) ); 172 } 173 174 /** 175 * Constructor Splice creates a new Splice instance. 176 * 177 * @param pipes of type Pipe... 178 */ 179 protected Splice( Pipe... pipes ) 180 { 181 this( pipes, (Fields[]) null ); 182 } 183 184 /** 185 * Constructor Splice creates a new Splice instance. 186 * 187 * @param pipes of type Pipe[] 188 * @param groupFields of type Fields[] 189 */ 190 protected Splice( Pipe[] pipes, Fields[] groupFields ) 191 { 192 this( null, pipes, groupFields, null, null ); 193 } 194 195 /** 196 * Constructor Splice creates a new Splice instance. 197 * 198 * @param spliceName of type String 199 * @param pipes of type Pipe[] 200 * @param groupFields of type Fields[] 201 */ 202 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields ) 203 { 204 this( spliceName, pipes, groupFields, null, null ); 205 } 206 207 /** 208 * Constructor Splice creates a new Splice instance. 209 * 210 * @param spliceName of type String 211 * @param pipes of type Pipe[] 212 * @param groupFields of type Fields[] 213 * @param declaredFields of type Fields 214 */ 215 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields ) 216 { 217 this( spliceName, pipes, groupFields, declaredFields, null ); 218 } 219 220 /** 221 * Constructor Splice creates a new Splice instance. 222 * 223 * @param spliceName of type String 224 * @param pipes of type Pipe[] 225 * @param groupFields of type Fields[] 226 * @param declaredFields of type Fields 227 * @param resultGroupFields of type Fields 228 */ 229 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields ) 230 { 231 this( spliceName, pipes, groupFields, declaredFields, resultGroupFields, null ); 232 } 233 234 /** 235 * Constructor Splice creates a new Splice instance. 236 * 237 * @param pipes of type Pipe[] 238 * @param groupFields of type Fields[] 239 * @param declaredFields of type Fields 240 * @param joiner of type CoGrouper 241 */ 242 protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Joiner joiner ) 243 { 244 this( null, pipes, groupFields, declaredFields, null, joiner ); 245 } 246 247 /** 248 * Constructor Splice creates a new Splice instance. 249 * 250 * @param pipes of type Pipe[] 251 * @param groupFields of type Fields[] 252 * @param declaredFields of type Fields 253 * @param resultGroupFields of type Fields 254 * @param joiner of type Joiner 255 */ 256 protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 257 { 258 this( null, pipes, groupFields, declaredFields, resultGroupFields, joiner ); 259 } 260 261 /** 262 * Constructor Splice creates a new Splice instance. 263 * 264 * @param spliceName of type String 265 * @param pipes of type Pipe[] 266 * @param groupFields of type Fields[] 267 * @param declaredFields of type Fields 268 * @param joiner of type CoGrouper 269 */ 270 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 271 { 272 if( pipes == null ) 273 throw new IllegalArgumentException( "pipes array may not be null" ); 274 275 setKind(); 276 this.spliceName = spliceName; 277 278 int uniques = new HashSet<Pipe>( asList( Pipe.resolvePreviousAll( pipes ) ) ).size(); 279 280 if( pipes.length > 1 && uniques == 1 ) 281 { 282 if( isMerge() ) 283 throw new IllegalArgumentException( "may not merge a pipe with itself without intermediate operations after the split" ); 284 285 if( groupFields == null ) 286 throw new IllegalArgumentException( "groupFields array may not be null" ); 287 288 if( new HashSet<Fields>( asList( groupFields ) ).size() != 1 ) 289 throw new IllegalArgumentException( "all groupFields must be identical" ); 290 291 addPipe( pipes[ 0 ] ); 292 this.numSelfJoins = pipes.length - 1; 293 this.keyFieldsMap.put( pipes[ 0 ].getName(), groupFields[ 0 ] ); 294 295 if( resultGroupFields != null && groupFields[ 0 ].size() * pipes.length != resultGroupFields.size() ) 296 throw new IllegalArgumentException( "resultGroupFields and cogroup joined fields must be same size" ); 297 } 298 else 299 { 300 int last = -1; 301 for( int i = 0; i < pipes.length; i++ ) 302 { 303 addPipe( pipes[ i ] ); 304 305 if( groupFields == null || groupFields.length == 0 ) 306 { 307 addGroupFields( pipes[ i ], Fields.FIRST ); 308 continue; 309 } 310 311 if( last != -1 && last != groupFields[ i ].size() ) 312 throw new IllegalArgumentException( "all groupFields must be same size" ); 313 314 last = groupFields[ i ].size(); 315 addGroupFields( pipes[ i ], groupFields[ i ] ); 316 } 317 318 if( resultGroupFields != null && last * pipes.length != resultGroupFields.size() ) 319 throw new IllegalArgumentException( "resultGroupFields and cogroup resulting joined fields must be same size" ); 320 } 321 322 this.declaredFields = declaredFields; 323 this.resultGroupFields = resultGroupFields; 324 this.joiner = joiner; 325 326 verifyCoGrouper(); 327 } 328 329 /** 330 * Constructor Splice creates a new Splice instance. 331 * 332 * @param spliceName of type String 333 * @param lhs of type Pipe 334 * @param lhsGroupFields of type Fields 335 * @param rhs of type Pipe 336 * @param rhsGroupFields of type Fields 337 * @param declaredFields of type Fields 338 */ 339 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields ) 340 { 341 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields ); 342 this.spliceName = spliceName; 343 } 344 345 /** 346 * Constructor Splice creates a new Splice instance. 347 * 348 * @param spliceName of type String 349 * @param lhs of type Pipe 350 * @param lhsGroupFields of type Fields 351 * @param rhs of type Pipe 352 * @param rhsGroupFields of type Fields 353 * @param declaredFields of type Fields 354 * @param resultGroupFields of type Fields 355 */ 356 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields ) 357 { 358 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields ); 359 this.spliceName = spliceName; 360 } 361 362 /** 363 * Constructor Splice creates a new Splice instance. 364 * 365 * @param spliceName of type String 366 * @param lhs of type Pipe 367 * @param lhsGroupFields of type Fields 368 * @param rhs of type Pipe 369 * @param rhsGroupFields of type Fields 370 * @param declaredFields of type Fields 371 * @param joiner of type CoGrouper 372 */ 373 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner ) 374 { 375 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, joiner ); 376 this.spliceName = spliceName; 377 } 378 379 /** 380 * Constructor Splice creates a new Splice instance. 381 * 382 * @param spliceName of type String 383 * @param lhs of type Pipe 384 * @param lhsGroupFields of type Fields 385 * @param rhs of type Pipe 386 * @param rhsGroupFields of type Fields 387 * @param declaredFields of type Fields 388 * @param resultGroupFields of type Fields 389 * @param joiner of type Joiner 390 */ 391 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 392 { 393 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, joiner ); 394 this.spliceName = spliceName; 395 } 396 397 /** 398 * Constructor Splice creates a new Splice instance. 399 * 400 * @param spliceName of type String 401 * @param lhs of type Pipe 402 * @param lhsGroupFields of type Fields 403 * @param rhs of type Pipe 404 * @param rhsGroupFields of type Fields 405 * @param joiner of type CoGrouper 406 */ 407 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner ) 408 { 409 this( lhs, lhsGroupFields, rhs, rhsGroupFields, joiner ); 410 this.spliceName = spliceName; 411 } 412 413 /** 414 * Constructor Splice creates a new Splice instance. 415 * 416 * @param spliceName of type String 417 * @param lhs of type Pipe 418 * @param lhsGroupFields of type Fields 419 * @param rhs of type Pipe 420 * @param rhsGroupFields of type Fields 421 */ 422 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields ) 423 { 424 this( lhs, lhsGroupFields, rhs, rhsGroupFields ); 425 this.spliceName = spliceName; 426 } 427 428 /** 429 * Constructor Splice creates a new Splice instance. 430 * 431 * @param spliceName of type String 432 * @param pipes of type Pipe... 433 */ 434 protected Splice( String spliceName, Pipe... pipes ) 435 { 436 this( pipes ); 437 this.spliceName = spliceName; 438 } 439 440 /** 441 * Constructor Splice creates a new Splice instance. 442 * 443 * @param pipe of type Pipe 444 * @param groupFields of type Fields 445 * @param numSelfJoins of type int 446 * @param declaredFields of type Fields 447 */ 448 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields ) 449 { 450 this( pipe, groupFields, numSelfJoins ); 451 this.declaredFields = declaredFields; 452 } 453 454 /** 455 * Constructor Splice creates a new Splice instance. 456 * 457 * @param pipe of type Pipe 458 * @param groupFields of type Fields 459 * @param numSelfJoins of type int 460 * @param declaredFields of type Fields 461 * @param resultGroupFields of type Fields 462 */ 463 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields ) 464 { 465 this( pipe, groupFields, numSelfJoins ); 466 this.declaredFields = declaredFields; 467 this.resultGroupFields = resultGroupFields; 468 469 if( resultGroupFields != null && groupFields.size() * numSelfJoins != resultGroupFields.size() ) 470 throw new IllegalArgumentException( "resultGroupFields and cogroup resulting join fields must be same size" ); 471 } 472 473 /** 474 * Constructor Splice creates a new Splice instance. 475 * 476 * @param pipe of type Pipe 477 * @param groupFields of type Fields 478 * @param numSelfJoins of type int 479 * @param declaredFields of type Fields 480 * @param joiner of type CoGrouper 481 */ 482 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 483 { 484 this( pipe, groupFields, numSelfJoins, declaredFields ); 485 this.joiner = joiner; 486 487 verifyCoGrouper(); 488 } 489 490 /** 491 * Constructor Splice creates a new Splice instance. 492 * 493 * @param pipe of type Pipe 494 * @param groupFields of type Fields 495 * @param numSelfJoins of type int 496 * @param declaredFields of type Fields 497 * @param resultGroupFields of type Fields 498 * @param joiner of type Joiner 499 */ 500 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 501 { 502 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields ); 503 this.joiner = joiner; 504 505 verifyCoGrouper(); 506 } 507 508 /** 509 * Constructor Splice creates a new Splice instance. 510 * 511 * @param pipe of type Pipe 512 * @param groupFields of type Fields 513 * @param numSelfJoins of type int 514 * @param joiner of type CoGrouper 515 */ 516 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner ) 517 { 518 setKind(); 519 addPipe( pipe ); 520 this.keyFieldsMap.put( pipe.getName(), groupFields ); 521 this.numSelfJoins = numSelfJoins; 522 this.joiner = joiner; 523 524 verifyCoGrouper(); 525 } 526 527 /** 528 * Constructor Splice creates a new Splice instance. 529 * 530 * @param pipe of type Pipe 531 * @param groupFields of type Fields 532 * @param numSelfJoins of type int 533 */ 534 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins ) 535 { 536 this( pipe, groupFields, numSelfJoins, (Joiner) null ); 537 } 538 539 /** 540 * Constructor Splice creates a new Splice instance. 541 * 542 * @param spliceName of type String 543 * @param pipe of type Pipe 544 * @param groupFields of type Fields 545 * @param numSelfJoins of type int 546 * @param declaredFields of type Fields 547 */ 548 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields ) 549 { 550 this( pipe, groupFields, numSelfJoins, declaredFields ); 551 this.spliceName = spliceName; 552 } 553 554 /** 555 * Constructor Splice creates a new Splice instance. 556 * 557 * @param spliceName of type String 558 * @param pipe of type Pipe 559 * @param groupFields of type Fields 560 * @param numSelfJoins of type int 561 * @param declaredFields of type Fields 562 * @param resultGroupFields of type Fields 563 */ 564 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields ) 565 { 566 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields ); 567 this.spliceName = spliceName; 568 } 569 570 /** 571 * Constructor Splice creates a new Splice instance. 572 * 573 * @param spliceName of type String 574 * @param pipe of type Pipe 575 * @param groupFields of type Fields 576 * @param numSelfJoins of type int 577 * @param declaredFields of type Fields 578 * @param joiner of type CoGrouper 579 */ 580 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 581 { 582 this( pipe, groupFields, numSelfJoins, declaredFields, joiner ); 583 this.spliceName = spliceName; 584 } 585 586 /** 587 * Constructor Splice creates a new Splice instance. 588 * 589 * @param spliceName of type String 590 * @param pipe of type Pipe 591 * @param groupFields of type Fields 592 * @param numSelfJoins of type int 593 * @param declaredFields of type Fields 594 * @param resultGroupFields of type Fields 595 * @param joiner of type Joiner 596 */ 597 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 598 { 599 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields, joiner ); 600 this.spliceName = spliceName; 601 } 602 603 /** 604 * Constructor Splice creates a new Splice instance. 605 * 606 * @param spliceName of type String 607 * @param pipe of type Pipe 608 * @param groupFields of type Fields 609 * @param numSelfJoins of type int 610 * @param joiner of type CoGrouper 611 */ 612 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner ) 613 { 614 this( pipe, groupFields, numSelfJoins, joiner ); 615 this.spliceName = spliceName; 616 } 617 618 /** 619 * Constructor Splice creates a new Splice instance. 620 * 621 * @param spliceName of type String 622 * @param pipe of type Pipe 623 * @param groupFields of type Fields 624 * @param numSelfJoins of type int 625 */ 626 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins ) 627 { 628 this( pipe, groupFields, numSelfJoins ); 629 this.spliceName = spliceName; 630 } 631 632 //////////// 633 // GROUPBY 634 //////////// 635 636 /** 637 * Constructor Splice creates a new Splice instance where grouping occurs on {@link Fields#ALL} fields. 638 * 639 * @param pipe of type Pipe 640 */ 641 protected Splice( Pipe pipe ) 642 { 643 this( null, pipe, Fields.ALL, null, false ); 644 } 645 646 /** 647 * Constructor Splice creates a new Splice instance. 648 * 649 * @param pipe of type Pipe 650 * @param groupFields of type Fields 651 */ 652 protected Splice( Pipe pipe, Fields groupFields ) 653 { 654 this( null, pipe, groupFields, null, false ); 655 } 656 657 /** 658 * Constructor Splice creates a new Splice instance. 659 * 660 * @param spliceName of type String 661 * @param pipe of type Pipe 662 * @param groupFields of type Fields 663 */ 664 protected Splice( String spliceName, Pipe pipe, Fields groupFields ) 665 { 666 this( spliceName, pipe, groupFields, null, false ); 667 } 668 669 /** 670 * Constructor Splice creates a new Splice instance. 671 * 672 * @param pipe of type Pipe 673 * @param groupFields of type Fields 674 * @param sortFields of type Fields 675 */ 676 protected Splice( Pipe pipe, Fields groupFields, Fields sortFields ) 677 { 678 this( null, pipe, groupFields, sortFields, false ); 679 } 680 681 /** 682 * Constructor Splice creates a new Splice instance. 683 * 684 * @param spliceName of type String 685 * @param pipe of type Pipe 686 * @param groupFields of type Fields 687 * @param sortFields of type Fields 688 */ 689 protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields ) 690 { 691 this( spliceName, pipe, groupFields, sortFields, false ); 692 } 693 694 /** 695 * Constructor Splice creates a new Splice instance. 696 * 697 * @param pipe of type Pipe 698 * @param groupFields of type Fields 699 * @param sortFields of type Fields 700 * @param reverseOrder of type boolean 701 */ 702 protected Splice( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 703 { 704 this( null, pipe, groupFields, sortFields, reverseOrder ); 705 } 706 707 /** 708 * Constructor Splice creates a new Splice instance. 709 * 710 * @param spliceName of type String 711 * @param pipe of type Pipe 712 * @param groupFields of type Fields 713 * @param sortFields of type Fields 714 * @param reverseOrder of type boolean 715 */ 716 protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 717 { 718 this( spliceName, Pipe.pipes( pipe ), groupFields, sortFields, reverseOrder ); 719 } 720 721 /** 722 * Constructor Splice creates a new Splice instance. 723 * 724 * @param pipes of type Pipe 725 * @param groupFields of type Fields 726 */ 727 protected Splice( Pipe[] pipes, Fields groupFields ) 728 { 729 this( null, pipes, groupFields, null, false ); 730 } 731 732 /** 733 * Constructor Splice creates a new Splice instance. 734 * 735 * @param spliceName of type String 736 * @param pipes of type Pipe 737 * @param groupFields of type Fields 738 */ 739 protected Splice( String spliceName, Pipe[] pipes, Fields groupFields ) 740 { 741 this( spliceName, pipes, groupFields, null, false ); 742 } 743 744 /** 745 * Constructor Splice creates a new Splice instance. 746 * 747 * @param pipes of type Pipe 748 * @param groupFields of type Fields 749 * @param sortFields of type Fields 750 */ 751 protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields ) 752 { 753 this( null, pipes, groupFields, sortFields, false ); 754 } 755 756 /** 757 * Constructor Splice creates a new Splice instance. 758 * 759 * @param spliceName of type String 760 * @param pipe of type Pipe 761 * @param groupFields of type Fields 762 * @param sortFields of type Fields 763 */ 764 protected Splice( String spliceName, Pipe[] pipe, Fields groupFields, Fields sortFields ) 765 { 766 this( spliceName, pipe, groupFields, sortFields, false ); 767 } 768 769 /** 770 * Constructor Splice creates a new Splice instance. 771 * 772 * @param pipes of type Pipe 773 * @param groupFields of type Fields 774 * @param sortFields of type Fields 775 * @param reverseOrder of type boolean 776 */ 777 protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 778 { 779 this( null, pipes, groupFields, sortFields, reverseOrder ); 780 } 781 782 /** 783 * Constructor Splice creates a new Splice instance. 784 * 785 * @param spliceName of type String 786 * @param pipes of type Pipe[] 787 * @param groupFields of type Fields 788 * @param sortFields of type Fields 789 * @param reverseOrder of type boolean 790 */ 791 protected Splice( String spliceName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 792 { 793 if( pipes == null ) 794 throw new IllegalArgumentException( "pipes array may not be null" ); 795 796 if( groupFields == null ) 797 throw new IllegalArgumentException( "groupFields may not be null" ); 798 799 setKind(); 800 this.spliceName = spliceName; 801 802 for( Pipe pipe : pipes ) 803 { 804 addPipe( pipe ); 805 this.keyFieldsMap.put( pipe.getName(), groupFields ); 806 807 if( sortFields != null ) 808 this.sortFieldsMap.put( pipe.getName(), sortFields ); 809 } 810 811 this.reverseOrder = reverseOrder; 812 this.joiner = new InnerJoin(); 813 } 814 815 private void verifyCoGrouper() 816 { 817 if( isJoin() && joiner instanceof BufferJoin ) 818 throw new IllegalArgumentException( "invalid joiner, may not use BufferJoiner in a HashJoin" ); 819 820 if( joiner == null ) 821 { 822 joiner = new InnerJoin(); 823 return; 824 } 825 826 if( joiner.numJoins() == -1 ) 827 return; 828 829 int joins = Math.max( numSelfJoins, keyFieldsMap.size() - 1 ); // joining two streams is one join 830 831 if( joins != joiner.numJoins() ) 832 throw new IllegalArgumentException( "invalid joiner, only accepts " + joiner.numJoins() + " joins, there are: " + joins ); 833 } 834 835 private void setKind() 836 { 837 if( this instanceof GroupBy ) 838 kind = Kind.GroupBy; 839 else if( this instanceof CoGroup ) 840 kind = Kind.CoGroup; 841 else if( this instanceof Merge ) 842 kind = Kind.Merge; 843 else 844 kind = Kind.Join; 845 } 846 847 /** 848 * Method getDeclaredFields returns the declaredFields of this Splice object. 849 * 850 * @return the declaredFields (type Fields) of this Splice object. 851 */ 852 public Fields getDeclaredFields() 853 { 854 return declaredFields; 855 } 856 857 private void addPipe( Pipe pipe ) 858 { 859 if( pipe.getName() == null ) 860 throw new IllegalArgumentException( "each input pipe must have a name" ); 861 862 pipes.add( pipe ); // allow same pipe 863 } 864 865 private void addGroupFields( Pipe pipe, Fields fields ) 866 { 867 if( keyFieldsMap.containsKey( pipe.getName() ) ) 868 throw new IllegalArgumentException( "each input pipe branch must be uniquely named" ); 869 870 keyFieldsMap.put( pipe.getName(), fields ); 871 } 872 873 @Override 874 public String getName() 875 { 876 if( spliceName != null ) 877 return spliceName; 878 879 StringBuffer buffer = new StringBuffer(); 880 881 for( Pipe pipe : pipes ) 882 { 883 if( buffer.length() != 0 ) 884 { 885 if( isGroupBy() || isMerge() ) 886 buffer.append( "+" ); 887 else if( isCoGroup() || isJoin() ) 888 buffer.append( "*" ); // more semantically correct 889 } 890 891 buffer.append( pipe.getName() ); 892 } 893 894 spliceName = buffer.toString(); 895 896 return spliceName; 897 } 898 899 @Override 900 public Pipe[] getPrevious() 901 { 902 return pipes.toArray( new Pipe[ pipes.size() ] ); 903 } 904 905 /** 906 * Method getGroupingSelectors returns the groupingSelectors of this Splice object. 907 * 908 * @return the groupingSelectors (type Map<String, Fields>) of this Splice object. 909 */ 910 public Map<String, Fields> getKeySelectors() 911 { 912 return keyFieldsMap; 913 } 914 915 /** 916 * Method getSortingSelectors returns the sortingSelectors of this Splice object. 917 * 918 * @return the sortingSelectors (type Map<String, Fields>) of this Splice object. 919 */ 920 public Map<String, Fields> getSortingSelectors() 921 { 922 return sortFieldsMap; 923 } 924 925 /** 926 * Method isSorted returns true if this Splice instance is sorting values other than the group fields. 927 * 928 * @return the sorted (type boolean) of this Splice object. 929 */ 930 public boolean isSorted() 931 { 932 return !sortFieldsMap.isEmpty(); 933 } 934 935 /** 936 * Method isSortReversed returns true if sorting is reversed. 937 * 938 * @return the sortReversed (type boolean) of this Splice object. 939 */ 940 public boolean isSortReversed() 941 { 942 return reverseOrder; 943 } 944 945 public synchronized Map<String, Integer> getPipePos() 946 { 947 if( pipePos != null ) 948 return pipePos; 949 950 pipePos = new HashMap<String, Integer>(); 951 952 int pos = 0; 953 for( Object pipe : pipes ) 954 pipePos.put( ( (Pipe) pipe ).getName(), pos++ ); 955 956 return pipePos; 957 } 958 959 public Joiner getJoiner() 960 { 961 return joiner; 962 } 963 964 /** 965 * Method isGroupBy returns true if this Splice instance will perform a GroupBy operation. 966 * 967 * @return the groupBy (type boolean) of this Splice object. 968 */ 969 public final boolean isGroupBy() 970 { 971 return kind == Kind.GroupBy; 972 } 973 974 public final boolean isCoGroup() 975 { 976 return kind == Kind.CoGroup; 977 } 978 979 public final boolean isMerge() 980 { 981 return kind == Kind.Merge; 982 } 983 984 public final boolean isJoin() 985 { 986 return kind == Kind.Join; 987 } 988 989 public int getNumSelfJoins() 990 { 991 return numSelfJoins; 992 } 993 994 boolean isSelfJoin() 995 { 996 return numSelfJoins != 0; 997 } 998 999 // FIELDS 1000 1001 @Override 1002 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 1003 { 1004 Map<String, Fields> groupingSelectors = resolveGroupingSelectors( incomingScopes ); 1005 Map<String, Fields> sortingSelectors = resolveSortingSelectors( incomingScopes ); 1006 Fields declared = resolveDeclared( incomingScopes ); 1007 1008 Fields outGroupingFields = resultGroupFields; 1009 1010 if( outGroupingFields == null && isCoGroup() ) 1011 outGroupingFields = createJoinFields( incomingScopes, groupingSelectors, declared ); 1012 1013 // for Group, the outgoing fields are the same as those declared 1014 return new Scope( getName(), declared, outGroupingFields, groupingSelectors, sortingSelectors, declared, isGroupBy() ); 1015 } 1016 1017 private Fields createJoinFields( Set<Scope> incomingScopes, Map<String, Fields> groupingSelectors, Fields declared ) 1018 { 1019 if( declared.isNone() ) 1020 declared = Fields.UNKNOWN; 1021 1022 Map<String, Fields> incomingFields = new HashMap<String, Fields>(); 1023 1024 for( Scope scope : incomingScopes ) 1025 incomingFields.put( scope.getName(), scope.getIncomingSpliceFields() ); 1026 1027 Fields outGroupingFields = Fields.NONE; 1028 1029 int offset = 0; 1030 for( Pipe pipe : pipes ) // need to retain order of pipes 1031 { 1032 String pipeName = pipe.getName(); 1033 Fields pipeGroupingSelector = groupingSelectors.get( pipeName ); 1034 Fields incomingField = incomingFields.get( pipeName ); 1035 1036 if( !pipeGroupingSelector.isNone() ) 1037 { 1038 Fields offsetFields = incomingField.selectPos( pipeGroupingSelector, offset ); 1039 Fields resolvedSelect = declared.select( offsetFields ); 1040 1041 outGroupingFields = outGroupingFields.append( resolvedSelect ); 1042 } 1043 1044 offset += incomingField.size(); 1045 } 1046 1047 return outGroupingFields; 1048 } 1049 1050 Map<String, Fields> resolveGroupingSelectors( Set<Scope> incomingScopes ) 1051 { 1052 try 1053 { 1054 Map<String, Fields> groupingSelectors = getKeySelectors(); 1055 Map<String, Fields> groupingFields = resolveSelectorsAgainstIncoming( incomingScopes, groupingSelectors, "grouping" ); 1056 1057 if( !verifySameSize( groupingFields ) ) 1058 throw new OperatorException( this, "all grouping fields must be same size: " + toString() ); 1059 1060 verifySameTypes( groupingSelectors, groupingFields ); 1061 1062 return groupingFields; 1063 } 1064 catch( FieldsResolverException exception ) 1065 { 1066 throw new OperatorException( this, OperatorException.Kind.grouping, exception.getSourceFields(), exception.getSelectorFields(), exception ); 1067 } 1068 catch( RuntimeException exception ) 1069 { 1070 throw new OperatorException( this, "could not resolve grouping selector in: " + this, exception ); 1071 } 1072 } 1073 1074 private boolean verifySameTypes( Map<String, Fields> groupingSelectors, Map<String, Fields> groupingFields ) 1075 { 1076 // create array of field positions with comparators from the grouping selectors 1077 // unsure which side has the comparators declared so make a union 1078 boolean[] hasComparator = new boolean[ groupingFields.values().iterator().next().size() ]; 1079 1080 for( Map.Entry<String, Fields> entry : groupingSelectors.entrySet() ) 1081 { 1082 Comparator[] comparatorsArray = entry.getValue().getComparators(); 1083 1084 for( int i = 0; i < comparatorsArray.length; i++ ) 1085 hasComparator[ i ] = hasComparator[ i ] || comparatorsArray[ i ] != null; 1086 } 1087 1088 // compare all the rhs fields with the lhs (lhs and rhs are arbitrary here) 1089 Iterator<Fields> iterator = groupingFields.values().iterator(); 1090 Fields lhsFields = iterator.next(); 1091 Type[] lhsTypes = lhsFields.getTypes(); 1092 1093 // if types are null, no basis for comparison 1094 if( lhsTypes == null ) 1095 return true; 1096 1097 while( iterator.hasNext() ) 1098 { 1099 Fields rhsFields = iterator.next(); 1100 Type[] rhsTypes = rhsFields.getTypes(); 1101 1102 // if types are null, no basis for comparison 1103 if( rhsTypes == null ) 1104 return true; 1105 1106 for( int i = 0; i < lhsTypes.length; i++ ) 1107 { 1108 if( hasComparator[ i ] ) 1109 continue; 1110 1111 Type lhs = lhsTypes[ i ]; 1112 Type rhs = rhsTypes[ i ]; 1113 1114 lhs = getCanonicalType( lhs ); 1115 rhs = getCanonicalType( rhs ); 1116 1117 if( lhs.equals( rhs ) ) 1118 continue; 1119 1120 Fields lhsError = new Fields( lhsFields.get( i ), lhsFields.getType( i ) ); 1121 Fields rhsError = new Fields( rhsFields.get( i ), rhsFields.getType( i ) ); 1122 1123 throw new OperatorException( this, "grouping fields must declare same types:" + lhsError.printVerbose() + " not same as " + rhsError.printVerbose() ); 1124 } 1125 } 1126 1127 return true; 1128 } 1129 1130 private Type getCanonicalType( Type type ) 1131 { 1132 if( type instanceof CoercibleType ) 1133 type = ( (CoercibleType) type ).getCanonicalType(); 1134 1135 // if one side is primitive, normalize to its primitive wrapper type 1136 if( type instanceof Class ) 1137 type = Coercions.asNonPrimitive( (Class) type ); 1138 1139 return type; 1140 } 1141 1142 private boolean verifySameSize( Map<String, Fields> groupingFields ) 1143 { 1144 Iterator<Fields> iterator = groupingFields.values().iterator(); 1145 int size = iterator.next().size(); 1146 1147 while( iterator.hasNext() ) 1148 { 1149 Fields groupingField = iterator.next(); 1150 1151 if( groupingField.size() != size ) 1152 return false; 1153 1154 size = groupingField.size(); 1155 } 1156 1157 return true; 1158 } 1159 1160 private Map<String, Fields> resolveSelectorsAgainstIncoming( Set<Scope> incomingScopes, Map<String, Fields> selectors, String type ) 1161 { 1162 Map<String, Fields> resolvedFields = new HashMap<String, Fields>(); 1163 1164 for( Scope incomingScope : incomingScopes ) 1165 { 1166 Fields selector = selectors.get( incomingScope.getName() ); 1167 1168 if( selector == null ) 1169 throw new OperatorException( this, "no " + type + " selector found for: " + incomingScope.getName() ); 1170 1171 Fields incomingFields; 1172 1173 if( selector.isNone() ) 1174 incomingFields = Fields.NONE; 1175 else if( selector.isAll() ) 1176 incomingFields = incomingScope.getIncomingSpliceFields(); 1177 else if( selector.isGroup() ) 1178 incomingFields = incomingScope.getOutGroupingFields(); 1179 else if( selector.isValues() ) 1180 incomingFields = incomingScope.getOutValuesFields().subtract( incomingScope.getOutGroupingFields() ); 1181 else 1182 incomingFields = incomingScope.getIncomingSpliceFields().select( selector ); 1183 1184 resolvedFields.put( incomingScope.getName(), incomingFields ); 1185 } 1186 1187 return resolvedFields; 1188 } 1189 1190 Map<String, Fields> resolveSortingSelectors( Set<Scope> incomingScopes ) 1191 { 1192 try 1193 { 1194 if( getSortingSelectors().isEmpty() ) 1195 return null; 1196 1197 return resolveSelectorsAgainstIncoming( incomingScopes, getSortingSelectors(), "sorting" ); 1198 } 1199 catch( FieldsResolverException exception ) 1200 { 1201 throw new OperatorException( this, OperatorException.Kind.sorting, exception.getSourceFields(), exception.getSelectorFields(), exception ); 1202 } 1203 catch( RuntimeException exception ) 1204 { 1205 throw new OperatorException( this, "could not resolve sorting selector in: " + this, exception ); 1206 } 1207 } 1208 1209 @Override 1210 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 1211 { 1212 return incomingScope.getIncomingSpliceFields(); 1213 } 1214 1215 Fields resolveDeclared( Set<Scope> incomingScopes ) 1216 { 1217 try 1218 { 1219 Fields declaredFields = getJoinDeclaredFields(); 1220 1221 // Fields.NONE is a flag to the CoGroup the following Buffer will use the JoinerClosure directly 1222 if( declaredFields != null && declaredFields.isNone() ) 1223 { 1224 if( !isCoGroup() ) 1225 throw new IllegalArgumentException( "Fields.NONE may only be declared as the join fields when using a CoGroup" ); 1226 1227 return Fields.NONE; 1228 } 1229 1230 if( declaredFields != null ) // null for GroupBy 1231 { 1232 if( incomingScopes.size() != pipes.size() && isSelfJoin() ) 1233 throw new OperatorException( this, "self joins without intermediate operators are not permitted, see 'numSelfJoins' constructor or identity function" ); 1234 1235 int size = 0; 1236 boolean foundUnknown = false; 1237 1238 List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes ); 1239 1240 for( Fields fields : appendableFields ) 1241 { 1242 foundUnknown = foundUnknown || fields.isUnknown(); 1243 size += fields.size(); 1244 } 1245 1246 // we must relax field checking in the face of unknown fields 1247 if( !foundUnknown && declaredFields.size() != size * ( numSelfJoins + 1 ) ) 1248 { 1249 if( isSelfJoin() ) 1250 throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " != size: " + size * ( numSelfJoins + 1 ) ); 1251 else 1252 throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " resolved: " + Util.print( appendableFields, "" ) ); 1253 } 1254 1255 int i = 0; 1256 for( Fields appendableField : appendableFields ) 1257 { 1258 Type[] types = appendableField.getTypes(); 1259 1260 if( types == null ) 1261 { 1262 i += appendableField.size(); 1263 continue; 1264 } 1265 1266 for( Type type : types ) 1267 { 1268 if( type != null ) 1269 declaredFields = declaredFields.applyType( i, type ); 1270 1271 i++; 1272 } 1273 } 1274 1275 return declaredFields; 1276 } 1277 1278 // support merge or cogrouping here 1279 if( isGroupBy() || isMerge() ) 1280 { 1281 Iterator<Scope> iterator = incomingScopes.iterator(); 1282 Fields commonFields = iterator.next().getIncomingSpliceFields(); 1283 1284 while( iterator.hasNext() ) 1285 { 1286 Scope incomingScope = iterator.next(); 1287 Fields fields = incomingScope.getIncomingSpliceFields(); 1288 1289 if( !commonFields.equalsFields( fields ) ) 1290 throw new OperatorException( this, "merged streams must declare the same field names, in the same order, expected: " + commonFields.printVerbose() + " found: " + fields.printVerbose() ); 1291 } 1292 1293 return commonFields; 1294 } 1295 else 1296 { 1297 List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes ); 1298 Fields appendedFields = new Fields(); 1299 1300 try 1301 { 1302 // will fail on name collisions 1303 for( Fields appendableField : appendableFields ) 1304 appendedFields = appendedFields.append( appendableField ); 1305 } 1306 catch( TupleException exception ) 1307 { 1308 String fields = ""; 1309 1310 for( Fields appendableField : appendableFields ) 1311 fields += appendableField.print(); 1312 1313 throw new OperatorException( this, "found duplicate field names in joined tuple stream: " + fields, exception ); 1314 } 1315 1316 return appendedFields; 1317 } 1318 } 1319 catch( OperatorException exception ) 1320 { 1321 throw exception; 1322 } 1323 catch( RuntimeException exception ) 1324 { 1325 throw new OperatorException( this, "could not resolve declared fields in: " + this, exception ); 1326 } 1327 } 1328 1329 public Fields getJoinDeclaredFields() 1330 { 1331 Fields declaredFields = getDeclaredFields(); 1332 1333 if( !( joiner instanceof DeclaresResults ) ) 1334 return declaredFields; 1335 1336 if( declaredFields == null && ( (DeclaresResults) joiner ).getFieldDeclaration() != null ) 1337 declaredFields = ( (DeclaresResults) joiner ).getFieldDeclaration(); 1338 1339 return declaredFields; 1340 } 1341 1342 private List<Fields> getOrderedResolvedFields( Set<Scope> incomingScopes ) 1343 { 1344 Map<String, Scope> scopesMap = new HashMap<String, Scope>(); 1345 1346 for( Scope incomingScope : incomingScopes ) 1347 scopesMap.put( incomingScope.getName(), incomingScope ); 1348 1349 List<Fields> appendableFields = new ArrayList<Fields>(); 1350 1351 for( Pipe pipe : pipes ) 1352 appendableFields.add( scopesMap.get( pipe.getName() ).getIncomingSpliceFields() ); 1353 return appendableFields; 1354 } 1355 1356 @Override 1357 public boolean isEquivalentTo( FlowElement element ) 1358 { 1359 boolean equivalentTo = super.isEquivalentTo( element ); 1360 1361 if( !equivalentTo ) 1362 return equivalentTo; 1363 1364 Splice splice = (Splice) element; 1365 1366 if( !keyFieldsMap.equals( splice.keyFieldsMap ) ) 1367 return false; 1368 1369 if( !pipes.equals( splice.pipes ) ) 1370 return false; 1371 1372 return true; 1373 } 1374 1375 // OBJECT OVERRIDES 1376 1377 @Override 1378 @SuppressWarnings({"RedundantIfStatement"}) 1379 public boolean equals( Object object ) 1380 { 1381 if( this == object ) 1382 return true; 1383 if( object == null || getClass() != object.getClass() ) 1384 return false; 1385 if( !super.equals( object ) ) 1386 return false; 1387 1388 Splice splice = (Splice) object; 1389 1390 if( spliceName != null ? !spliceName.equals( splice.spliceName ) : splice.spliceName != null ) 1391 return false; 1392 if( keyFieldsMap != null ? !keyFieldsMap.equals( splice.keyFieldsMap ) : splice.keyFieldsMap != null ) 1393 return false; 1394 if( pipes != null ? !pipes.equals( splice.pipes ) : splice.pipes != null ) 1395 return false; 1396 1397 return true; 1398 } 1399 1400 @Override 1401 public int hashCode() 1402 { 1403 int result = super.hashCode(); 1404 result = 31 * result + ( pipes != null ? pipes.hashCode() : 0 ); 1405 result = 31 * result + ( keyFieldsMap != null ? keyFieldsMap.hashCode() : 0 ); 1406 result = 31 * result + ( spliceName != null ? spliceName.hashCode() : 0 ); 1407 return result; 1408 } 1409 1410 @Override 1411 public String toString() 1412 { 1413 StringBuilder buffer = new StringBuilder( super.toString() ); 1414 1415 buffer.append( "[by:" ); 1416 1417 for( String name : keyFieldsMap.keySet() ) 1418 { 1419 if( keyFieldsMap.size() > 1 ) 1420 buffer.append( " " ).append( name ).append( ":" ); 1421 1422 buffer.append( keyFieldsMap.get( name ).printVerbose() ); 1423 } 1424 1425 if( isSelfJoin() ) 1426 buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" ); 1427 1428 buffer.append( "]" ); 1429 1430 return buffer.toString(); 1431 } 1432 1433 @Override 1434 protected void printInternal( StringBuffer buffer, Scope scope ) 1435 { 1436 super.printInternal( buffer, scope ); 1437 Map<String, Fields> map = scope.getKeySelectors(); 1438 1439 if( map != null ) 1440 { 1441 buffer.append( "[by:" ); 1442 1443 // important to retain incoming pipe order 1444 for( Map.Entry<String, Fields> entry : keyFieldsMap.entrySet() ) 1445 { 1446 String name = entry.getKey(); 1447 1448 if( map.size() > 1 ) 1449 buffer.append( name ).append( ":" ); 1450 1451 buffer.append( map.get( name ).print() ); // get resolved keys 1452 } 1453 1454 if( isSelfJoin() ) 1455 buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" ); 1456 1457 buffer.append( "]" ); 1458 } 1459 } 1460 }