001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.lang.reflect.Type; 024import java.util.ArrayList; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.LinkedHashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033 034import cascading.flow.planner.DeclaresResults; 035import cascading.flow.planner.Scope; 036import cascading.pipe.joiner.BufferJoin; 037import cascading.pipe.joiner.InnerJoin; 038import cascading.pipe.joiner.Joiner; 039import cascading.tuple.Fields; 040import cascading.tuple.FieldsResolverException; 041import cascading.tuple.TupleException; 042import cascading.tuple.coerce.Coercions; 043import cascading.tuple.type.CoercibleType; 044import cascading.util.Util; 045 046import static java.util.Arrays.asList; 047 048/** 049 * The base class for {@link GroupBy}, {@link CoGroup}, {@link Merge}, and {@link HashJoin}. This class should not be used directly. 050 * 051 * @see GroupBy 052 * @see CoGroup 053 * @see Merge 054 * @see HashJoin 055 */ 056public class Splice extends Pipe 057 { 058 enum Kind 059 { 060 GroupBy, CoGroup, Merge, Join 061 } 062 063 private Kind kind; 064 /** Field spliceName */ 065 private String spliceName; 066 /** Field pipes */ 067 private final List<Pipe> pipes = new ArrayList<Pipe>(); 068 /** Field groupFieldsMap */ 069 protected final Map<String, Fields> keyFieldsMap = new LinkedHashMap<String, Fields>(); // keep order 070 /** Field sortFieldsMap */ 071 protected Map<String, Fields> sortFieldsMap = new LinkedHashMap<String, Fields>(); // keep order 072 /** Field reverseOrder */ 073 private boolean reverseOrder = false; 074 /** Field declaredFields */ 075 protected Fields declaredFields; 076 /** Field resultGroupFields */ 077 protected Fields resultGroupFields; 078 /** Field repeat */ 079 private int numSelfJoins = 0; 080 /** Field coGrouper */ 081 private Joiner joiner; 082 083 /** Field pipePos */ 084 private transient Map<String, Integer> pipePos; 085 086 /** 087 * Constructor Splice creates a new Splice instance. 088 * 089 * @param lhs of type Pipe 090 * @param lhsGroupFields of type Fields 091 * @param rhs of type Pipe 092 * @param rhsGroupFields of type Fields 093 * @param declaredFields of type Fields 094 */ 095 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields ) 096 { 097 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, null, null ); 098 } 099 100 /** 101 * Constructor Splice creates a new Splice instance. 102 * 103 * @param lhs of type Pipe 104 * @param lhsGroupFields of type Fields 105 * @param rhs of type Pipe 106 * @param rhsGroupFields of type Fields 107 * @param declaredFields of type Fields 108 * @param resultGroupFields of type Fields 109 */ 110 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields ) 111 { 112 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, null ); 113 } 114 115 /** 116 * Constructor Splice creates a new Splice instance. 117 * 118 * @param lhs of type Pipe 119 * @param lhsGroupFields of type Fields 120 * @param rhs of type Pipe 121 * @param rhsGroupFields of type Fields 122 * @param declaredFields of type Fields 123 * @param joiner of type CoGrouper 124 */ 125 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner ) 126 { 127 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, joiner ); 128 } 129 130 /** 131 * Constructor Splice creates a new Splice instance. 132 * 133 * @param lhs of type Pipe 134 * @param lhsGroupFields of type Fields 135 * @param rhs of type Pipe 136 * @param rhsGroupFields of type Fields 137 * @param declaredFields of type Fields 138 * @param resultGroupFields of type Fields 139 * @param joiner of type Joiner 140 */ 141 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 142 { 143 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, resultGroupFields, joiner ); 144 } 145 146 /** 147 * Constructor Splice creates a new Splice instance. 148 * 149 * @param lhs of type Pipe 150 * @param lhsGroupFields of type Fields 151 * @param rhs of type Pipe 152 * @param rhsGroupFields of type Fields 153 * @param joiner of type CoGrouper 154 */ 155 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner ) 156 { 157 this( lhs, lhsGroupFields, rhs, rhsGroupFields, null, joiner ); 158 } 159 160 /** 161 * Constructor Splice creates a new Splice instance. 162 * 163 * @param lhs of type Pipe 164 * @param lhsGroupFields of type Fields 165 * @param rhs of type Pipe 166 * @param rhsGroupFields of type Fields 167 */ 168 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields ) 169 { 170 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ) ); 171 } 172 173 /** 174 * Constructor Splice creates a new Splice instance. 175 * 176 * @param pipes of type Pipe... 177 */ 178 protected Splice( Pipe... pipes ) 179 { 180 this( pipes, (Fields[]) null ); 181 } 182 183 /** 184 * Constructor Splice creates a new Splice instance. 185 * 186 * @param pipes of type Pipe[] 187 * @param groupFields of type Fields[] 188 */ 189 protected Splice( Pipe[] pipes, Fields[] groupFields ) 190 { 191 this( null, pipes, groupFields, null, null ); 192 } 193 194 /** 195 * Constructor Splice creates a new Splice instance. 196 * 197 * @param spliceName of type String 198 * @param pipes of type Pipe[] 199 * @param groupFields of type Fields[] 200 */ 201 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields ) 202 { 203 this( spliceName, pipes, groupFields, null, null ); 204 } 205 206 /** 207 * Constructor Splice creates a new Splice instance. 208 * 209 * @param spliceName of type String 210 * @param pipes of type Pipe[] 211 * @param groupFields of type Fields[] 212 * @param declaredFields of type Fields 213 */ 214 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields ) 215 { 216 this( spliceName, pipes, groupFields, declaredFields, null ); 217 } 218 219 /** 220 * Constructor Splice creates a new Splice instance. 221 * 222 * @param spliceName of type String 223 * @param pipes of type Pipe[] 224 * @param groupFields of type Fields[] 225 * @param declaredFields of type Fields 226 * @param resultGroupFields of type Fields 227 */ 228 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields ) 229 { 230 this( spliceName, pipes, groupFields, declaredFields, resultGroupFields, null ); 231 } 232 233 /** 234 * Constructor Splice creates a new Splice instance. 235 * 236 * @param pipes of type Pipe[] 237 * @param groupFields of type Fields[] 238 * @param declaredFields of type Fields 239 * @param joiner of type CoGrouper 240 */ 241 protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Joiner joiner ) 242 { 243 this( null, pipes, groupFields, declaredFields, null, joiner ); 244 } 245 246 /** 247 * Constructor Splice creates a new Splice instance. 248 * 249 * @param pipes of type Pipe[] 250 * @param groupFields of type Fields[] 251 * @param declaredFields of type Fields 252 * @param resultGroupFields of type Fields 253 * @param joiner of type Joiner 254 */ 255 protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 256 { 257 this( null, pipes, groupFields, declaredFields, resultGroupFields, joiner ); 258 } 259 260 /** 261 * Constructor Splice creates a new Splice instance. 262 * 263 * @param spliceName of type String 264 * @param pipes of type Pipe[] 265 * @param groupFields of type Fields[] 266 * @param declaredFields of type Fields 267 * @param joiner of type CoGrouper 268 */ 269 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 270 { 271 if( pipes == null ) 272 throw new IllegalArgumentException( "pipes array may not be null" ); 273 274 setKind(); 275 this.spliceName = spliceName; 276 277 int uniques = new HashSet<Pipe>( asList( Pipe.resolvePreviousAll( pipes ) ) ).size(); 278 279 if( pipes.length > 1 && uniques == 1 ) 280 { 281 if( isMerge() ) 282 throw new IllegalArgumentException( "may not merge a pipe with itself without intermediate operations after the split" ); 283 284 if( groupFields == null ) 285 throw new IllegalArgumentException( "groupFields array may not be null" ); 286 287 if( new HashSet<Fields>( asList( groupFields ) ).size() != 1 ) 288 throw new IllegalArgumentException( "all groupFields must be identical" ); 289 290 addPipe( pipes[ 0 ] ); 291 this.numSelfJoins = pipes.length - 1; 292 this.keyFieldsMap.put( pipes[ 0 ].getName(), groupFields[ 0 ] ); 293 294 if( resultGroupFields != null && groupFields[ 0 ].size() * pipes.length != resultGroupFields.size() ) 295 throw new IllegalArgumentException( "resultGroupFields and cogroup joined fields must be same size" ); 296 } 297 else 298 { 299 int last = -1; 300 for( int i = 0; i < pipes.length; i++ ) 301 { 302 addPipe( pipes[ i ] ); 303 304 if( groupFields == null || groupFields.length == 0 ) 305 { 306 addGroupFields( pipes[ i ], Fields.FIRST ); 307 continue; 308 } 309 310 if( last != -1 && last != groupFields[ i ].size() ) 311 throw new IllegalArgumentException( "all groupFields must be same size" ); 312 313 last = groupFields[ i ].size(); 314 addGroupFields( pipes[ i ], groupFields[ i ] ); 315 } 316 317 if( resultGroupFields != null && last * pipes.length != resultGroupFields.size() ) 318 throw new IllegalArgumentException( "resultGroupFields and cogroup resulting joined fields must be same size" ); 319 } 320 321 this.declaredFields = declaredFields; 322 this.resultGroupFields = resultGroupFields; 323 this.joiner = joiner; 324 325 verifyCoGrouper(); 326 } 327 328 /** 329 * Constructor Splice creates a new Splice instance. 330 * 331 * @param spliceName of type String 332 * @param lhs of type Pipe 333 * @param lhsGroupFields of type Fields 334 * @param rhs of type Pipe 335 * @param rhsGroupFields of type Fields 336 * @param declaredFields of type Fields 337 */ 338 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields ) 339 { 340 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields ); 341 this.spliceName = spliceName; 342 } 343 344 /** 345 * Constructor Splice creates a new Splice instance. 346 * 347 * @param spliceName of type String 348 * @param lhs of type Pipe 349 * @param lhsGroupFields of type Fields 350 * @param rhs of type Pipe 351 * @param rhsGroupFields of type Fields 352 * @param declaredFields of type Fields 353 * @param resultGroupFields of type Fields 354 */ 355 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields ) 356 { 357 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields ); 358 this.spliceName = spliceName; 359 } 360 361 /** 362 * Constructor Splice creates a new Splice instance. 363 * 364 * @param spliceName of type String 365 * @param lhs of type Pipe 366 * @param lhsGroupFields of type Fields 367 * @param rhs of type Pipe 368 * @param rhsGroupFields of type Fields 369 * @param declaredFields of type Fields 370 * @param joiner of type CoGrouper 371 */ 372 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner ) 373 { 374 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, joiner ); 375 this.spliceName = spliceName; 376 } 377 378 /** 379 * Constructor Splice creates a new Splice instance. 380 * 381 * @param spliceName of type String 382 * @param lhs of type Pipe 383 * @param lhsGroupFields of type Fields 384 * @param rhs of type Pipe 385 * @param rhsGroupFields of type Fields 386 * @param declaredFields of type Fields 387 * @param resultGroupFields of type Fields 388 * @param joiner of type Joiner 389 */ 390 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 391 { 392 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, joiner ); 393 this.spliceName = spliceName; 394 } 395 396 /** 397 * Constructor Splice creates a new Splice instance. 398 * 399 * @param spliceName of type String 400 * @param lhs of type Pipe 401 * @param lhsGroupFields of type Fields 402 * @param rhs of type Pipe 403 * @param rhsGroupFields of type Fields 404 * @param joiner of type CoGrouper 405 */ 406 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner ) 407 { 408 this( lhs, lhsGroupFields, rhs, rhsGroupFields, joiner ); 409 this.spliceName = spliceName; 410 } 411 412 /** 413 * Constructor Splice creates a new Splice instance. 414 * 415 * @param spliceName of type String 416 * @param lhs of type Pipe 417 * @param lhsGroupFields of type Fields 418 * @param rhs of type Pipe 419 * @param rhsGroupFields of type Fields 420 */ 421 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields ) 422 { 423 this( lhs, lhsGroupFields, rhs, rhsGroupFields ); 424 this.spliceName = spliceName; 425 } 426 427 /** 428 * Constructor Splice creates a new Splice instance. 429 * 430 * @param spliceName of type String 431 * @param pipes of type Pipe... 432 */ 433 protected Splice( String spliceName, Pipe... pipes ) 434 { 435 this( pipes ); 436 this.spliceName = spliceName; 437 } 438 439 /** 440 * Constructor Splice creates a new Splice instance. 441 * 442 * @param pipe of type Pipe 443 * @param groupFields of type Fields 444 * @param numSelfJoins of type int 445 * @param declaredFields of type Fields 446 */ 447 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields ) 448 { 449 this( pipe, groupFields, numSelfJoins ); 450 this.declaredFields = declaredFields; 451 } 452 453 /** 454 * Constructor Splice creates a new Splice instance. 455 * 456 * @param pipe of type Pipe 457 * @param groupFields of type Fields 458 * @param numSelfJoins of type int 459 * @param declaredFields of type Fields 460 * @param resultGroupFields of type Fields 461 */ 462 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields ) 463 { 464 this( pipe, groupFields, numSelfJoins ); 465 this.declaredFields = declaredFields; 466 this.resultGroupFields = resultGroupFields; 467 468 if( resultGroupFields != null && groupFields.size() * numSelfJoins != resultGroupFields.size() ) 469 throw new IllegalArgumentException( "resultGroupFields and cogroup resulting join fields must be same size" ); 470 } 471 472 /** 473 * Constructor Splice creates a new Splice instance. 474 * 475 * @param pipe of type Pipe 476 * @param groupFields of type Fields 477 * @param numSelfJoins of type int 478 * @param declaredFields of type Fields 479 * @param joiner of type CoGrouper 480 */ 481 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 482 { 483 this( pipe, groupFields, numSelfJoins, declaredFields ); 484 this.joiner = joiner; 485 486 verifyCoGrouper(); 487 } 488 489 /** 490 * Constructor Splice creates a new Splice instance. 491 * 492 * @param pipe of type Pipe 493 * @param groupFields of type Fields 494 * @param numSelfJoins of type int 495 * @param declaredFields of type Fields 496 * @param resultGroupFields of type Fields 497 * @param joiner of type Joiner 498 */ 499 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 500 { 501 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields ); 502 this.joiner = joiner; 503 504 verifyCoGrouper(); 505 } 506 507 /** 508 * Constructor Splice creates a new Splice instance. 509 * 510 * @param pipe of type Pipe 511 * @param groupFields of type Fields 512 * @param numSelfJoins of type int 513 * @param joiner of type CoGrouper 514 */ 515 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner ) 516 { 517 setKind(); 518 addPipe( pipe ); 519 this.keyFieldsMap.put( pipe.getName(), groupFields ); 520 this.numSelfJoins = numSelfJoins; 521 this.joiner = joiner; 522 523 verifyCoGrouper(); 524 } 525 526 /** 527 * Constructor Splice creates a new Splice instance. 528 * 529 * @param pipe of type Pipe 530 * @param groupFields of type Fields 531 * @param numSelfJoins of type int 532 */ 533 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins ) 534 { 535 this( pipe, groupFields, numSelfJoins, (Joiner) null ); 536 } 537 538 /** 539 * Constructor Splice creates a new Splice instance. 540 * 541 * @param spliceName of type String 542 * @param pipe of type Pipe 543 * @param groupFields of type Fields 544 * @param numSelfJoins of type int 545 * @param declaredFields of type Fields 546 */ 547 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields ) 548 { 549 this( pipe, groupFields, numSelfJoins, declaredFields ); 550 this.spliceName = spliceName; 551 } 552 553 /** 554 * Constructor Splice creates a new Splice instance. 555 * 556 * @param spliceName of type String 557 * @param pipe of type Pipe 558 * @param groupFields of type Fields 559 * @param numSelfJoins of type int 560 * @param declaredFields of type Fields 561 * @param resultGroupFields of type Fields 562 */ 563 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields ) 564 { 565 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields ); 566 this.spliceName = spliceName; 567 } 568 569 /** 570 * Constructor Splice creates a new Splice instance. 571 * 572 * @param spliceName of type String 573 * @param pipe of type Pipe 574 * @param groupFields of type Fields 575 * @param numSelfJoins of type int 576 * @param declaredFields of type Fields 577 * @param joiner of type CoGrouper 578 */ 579 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 580 { 581 this( pipe, groupFields, numSelfJoins, declaredFields, joiner ); 582 this.spliceName = spliceName; 583 } 584 585 /** 586 * Constructor Splice creates a new Splice instance. 587 * 588 * @param spliceName of type String 589 * @param pipe of type Pipe 590 * @param groupFields of type Fields 591 * @param numSelfJoins of type int 592 * @param declaredFields of type Fields 593 * @param resultGroupFields of type Fields 594 * @param joiner of type Joiner 595 */ 596 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 597 { 598 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields, joiner ); 599 this.spliceName = spliceName; 600 } 601 602 /** 603 * Constructor Splice creates a new Splice instance. 604 * 605 * @param spliceName of type String 606 * @param pipe of type Pipe 607 * @param groupFields of type Fields 608 * @param numSelfJoins of type int 609 * @param joiner of type CoGrouper 610 */ 611 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner ) 612 { 613 this( pipe, groupFields, numSelfJoins, joiner ); 614 this.spliceName = spliceName; 615 } 616 617 /** 618 * Constructor Splice creates a new Splice instance. 619 * 620 * @param spliceName of type String 621 * @param pipe of type Pipe 622 * @param groupFields of type Fields 623 * @param numSelfJoins of type int 624 */ 625 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins ) 626 { 627 this( pipe, groupFields, numSelfJoins ); 628 this.spliceName = spliceName; 629 } 630 631 //////////// 632 // GROUPBY 633 //////////// 634 635 /** 636 * Constructor Splice creates a new Splice instance where grouping occurs on {@link Fields#ALL} fields. 637 * 638 * @param pipe of type Pipe 639 */ 640 protected Splice( Pipe pipe ) 641 { 642 this( null, pipe, Fields.ALL, null, false ); 643 } 644 645 /** 646 * Constructor Splice creates a new Splice instance. 647 * 648 * @param pipe of type Pipe 649 * @param groupFields of type Fields 650 */ 651 protected Splice( Pipe pipe, Fields groupFields ) 652 { 653 this( null, pipe, groupFields, null, false ); 654 } 655 656 /** 657 * Constructor Splice creates a new Splice instance. 658 * 659 * @param spliceName of type String 660 * @param pipe of type Pipe 661 * @param groupFields of type Fields 662 */ 663 protected Splice( String spliceName, Pipe pipe, Fields groupFields ) 664 { 665 this( spliceName, pipe, groupFields, null, false ); 666 } 667 668 /** 669 * Constructor Splice creates a new Splice instance. 670 * 671 * @param pipe of type Pipe 672 * @param groupFields of type Fields 673 * @param sortFields of type Fields 674 */ 675 protected Splice( Pipe pipe, Fields groupFields, Fields sortFields ) 676 { 677 this( null, pipe, groupFields, sortFields, false ); 678 } 679 680 /** 681 * Constructor Splice creates a new Splice instance. 682 * 683 * @param spliceName of type String 684 * @param pipe of type Pipe 685 * @param groupFields of type Fields 686 * @param sortFields of type Fields 687 */ 688 protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields ) 689 { 690 this( spliceName, pipe, groupFields, sortFields, false ); 691 } 692 693 /** 694 * Constructor Splice creates a new Splice instance. 695 * 696 * @param pipe of type Pipe 697 * @param groupFields of type Fields 698 * @param sortFields of type Fields 699 * @param reverseOrder of type boolean 700 */ 701 protected Splice( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 702 { 703 this( null, pipe, groupFields, sortFields, reverseOrder ); 704 } 705 706 /** 707 * Constructor Splice creates a new Splice instance. 708 * 709 * @param spliceName of type String 710 * @param pipe of type Pipe 711 * @param groupFields of type Fields 712 * @param sortFields of type Fields 713 * @param reverseOrder of type boolean 714 */ 715 protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 716 { 717 this( spliceName, Pipe.pipes( pipe ), groupFields, sortFields, reverseOrder ); 718 } 719 720 /** 721 * Constructor Splice creates a new Splice instance. 722 * 723 * @param pipes of type Pipe 724 * @param groupFields of type Fields 725 */ 726 protected Splice( Pipe[] pipes, Fields groupFields ) 727 { 728 this( null, pipes, groupFields, null, false ); 729 } 730 731 /** 732 * Constructor Splice creates a new Splice instance. 733 * 734 * @param spliceName of type String 735 * @param pipes of type Pipe 736 * @param groupFields of type Fields 737 */ 738 protected Splice( String spliceName, Pipe[] pipes, Fields groupFields ) 739 { 740 this( spliceName, pipes, groupFields, null, false ); 741 } 742 743 /** 744 * Constructor Splice creates a new Splice instance. 745 * 746 * @param pipes of type Pipe 747 * @param groupFields of type Fields 748 * @param sortFields of type Fields 749 */ 750 protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields ) 751 { 752 this( null, pipes, groupFields, sortFields, false ); 753 } 754 755 /** 756 * Constructor Splice creates a new Splice instance. 757 * 758 * @param spliceName of type String 759 * @param pipe of type Pipe 760 * @param groupFields of type Fields 761 * @param sortFields of type Fields 762 */ 763 protected Splice( String spliceName, Pipe[] pipe, Fields groupFields, Fields sortFields ) 764 { 765 this( spliceName, pipe, groupFields, sortFields, false ); 766 } 767 768 /** 769 * Constructor Splice creates a new Splice instance. 770 * 771 * @param pipes of type Pipe 772 * @param groupFields of type Fields 773 * @param sortFields of type Fields 774 * @param reverseOrder of type boolean 775 */ 776 protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 777 { 778 this( null, pipes, groupFields, sortFields, reverseOrder ); 779 } 780 781 /** 782 * Constructor Splice creates a new Splice instance. 783 * 784 * @param spliceName of type String 785 * @param pipes of type Pipe[] 786 * @param groupFields of type Fields 787 * @param sortFields of type Fields 788 * @param reverseOrder of type boolean 789 */ 790 protected Splice( String spliceName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 791 { 792 if( pipes == null ) 793 throw new IllegalArgumentException( "pipes array may not be null" ); 794 795 if( groupFields == null ) 796 throw new IllegalArgumentException( "groupFields may not be null" ); 797 798 setKind(); 799 this.spliceName = spliceName; 800 801 for( Pipe pipe : pipes ) 802 { 803 addPipe( pipe ); 804 this.keyFieldsMap.put( pipe.getName(), groupFields ); 805 806 if( sortFields != null ) 807 this.sortFieldsMap.put( pipe.getName(), sortFields ); 808 } 809 810 this.reverseOrder = reverseOrder; 811 this.joiner = new InnerJoin(); 812 } 813 814 private void verifyCoGrouper() 815 { 816 if( isJoin() && joiner instanceof BufferJoin ) 817 throw new IllegalArgumentException( "invalid joiner, may not use BufferJoiner in a HashJoin" ); 818 819 if( joiner == null ) 820 { 821 joiner = new InnerJoin(); 822 return; 823 } 824 825 if( joiner.numJoins() == -1 ) 826 return; 827 828 int joins = Math.max( numSelfJoins, keyFieldsMap.size() - 1 ); // joining two streams is one join 829 830 if( joins != joiner.numJoins() ) 831 throw new IllegalArgumentException( "invalid joiner, only accepts " + joiner.numJoins() + " joins, there are: " + joins ); 832 } 833 834 private void setKind() 835 { 836 if( this instanceof GroupBy ) 837 kind = Kind.GroupBy; 838 else if( this instanceof CoGroup ) 839 kind = Kind.CoGroup; 840 else if( this instanceof Merge ) 841 kind = Kind.Merge; 842 else 843 kind = Kind.Join; 844 } 845 846 /** 847 * Method getDeclaredFields returns the declaredFields of this Splice object. 848 * 849 * @return the declaredFields (type Fields) of this Splice object. 850 */ 851 public Fields getDeclaredFields() 852 { 853 return declaredFields; 854 } 855 856 private void addPipe( Pipe pipe ) 857 { 858 if( pipe.getName() == null ) 859 throw new IllegalArgumentException( "each input pipe must have a name" ); 860 861 pipes.add( pipe ); // allow same pipe 862 } 863 864 private void addGroupFields( Pipe pipe, Fields fields ) 865 { 866 if( keyFieldsMap.containsKey( pipe.getName() ) ) 867 throw new IllegalArgumentException( "each input pipe branch must be uniquely named" ); 868 869 keyFieldsMap.put( pipe.getName(), fields ); 870 } 871 872 @Override 873 public String getName() 874 { 875 if( spliceName != null ) 876 return spliceName; 877 878 StringBuffer buffer = new StringBuffer(); 879 880 for( Pipe pipe : pipes ) 881 { 882 if( buffer.length() != 0 ) 883 { 884 if( isGroupBy() || isMerge() ) 885 buffer.append( "+" ); 886 else if( isCoGroup() || isJoin() ) 887 buffer.append( "*" ); // more semantically correct 888 } 889 890 buffer.append( pipe.getName() ); 891 } 892 893 spliceName = buffer.toString(); 894 895 return spliceName; 896 } 897 898 @Override 899 public Pipe[] getPrevious() 900 { 901 return pipes.toArray( new Pipe[ pipes.size() ] ); 902 } 903 904 /** 905 * Method getGroupingSelectors returns the groupingSelectors of this Splice object. 906 * 907 * @return the groupingSelectors (type Map<String, Fields>) of this Splice object. 908 */ 909 public Map<String, Fields> getKeySelectors() 910 { 911 return keyFieldsMap; 912 } 913 914 /** 915 * Method getSortingSelectors returns the sortingSelectors of this Splice object. 916 * 917 * @return the sortingSelectors (type Map<String, Fields>) of this Splice object. 918 */ 919 public Map<String, Fields> getSortingSelectors() 920 { 921 return sortFieldsMap; 922 } 923 924 /** 925 * Method isSorted returns true if this Splice instance is sorting values other than the group fields. 926 * 927 * @return the sorted (type boolean) of this Splice object. 928 */ 929 public boolean isSorted() 930 { 931 return !sortFieldsMap.isEmpty(); 932 } 933 934 /** 935 * Method isSortReversed returns true if sorting is reversed. 936 * 937 * @return the sortReversed (type boolean) of this Splice object. 938 */ 939 public boolean isSortReversed() 940 { 941 return reverseOrder; 942 } 943 944 public synchronized Map<String, Integer> getPipePos() 945 { 946 if( pipePos != null ) 947 return pipePos; 948 949 pipePos = new HashMap<String, Integer>(); 950 951 int pos = 0; 952 for( Object pipe : pipes ) 953 pipePos.put( ( (Pipe) pipe ).getName(), pos++ ); 954 955 return pipePos; 956 } 957 958 public Joiner getJoiner() 959 { 960 return joiner; 961 } 962 963 /** 964 * Method isGroupBy returns true if this Splice instance will perform a GroupBy operation. 965 * 966 * @return the groupBy (type boolean) of this Splice object. 967 */ 968 public final boolean isGroupBy() 969 { 970 return kind == Kind.GroupBy; 971 } 972 973 public final boolean isCoGroup() 974 { 975 return kind == Kind.CoGroup; 976 } 977 978 public final boolean isMerge() 979 { 980 return kind == Kind.Merge; 981 } 982 983 public final boolean isJoin() 984 { 985 return kind == Kind.Join; 986 } 987 988 public int getNumSelfJoins() 989 { 990 return numSelfJoins; 991 } 992 993 public boolean isSelfJoin() 994 { 995 return numSelfJoins != 0; 996 } 997 998 // FIELDS 999 1000 @Override 1001 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 1002 { 1003 Map<String, Fields> groupingSelectors = resolveGroupingSelectors( incomingScopes ); 1004 Map<String, Fields> sortingSelectors = resolveSortingSelectors( incomingScopes ); 1005 Fields declared = resolveDeclared( incomingScopes ); 1006 1007 Fields outGroupingFields = resultGroupFields; 1008 1009 if( outGroupingFields == null && isCoGroup() ) 1010 outGroupingFields = createJoinFields( incomingScopes, groupingSelectors, declared ); 1011 1012 // for Group, the outgoing fields are the same as those declared 1013 Scope.Kind kind = getScopeKind(); 1014 1015 return new Scope( getName(), declared, outGroupingFields, groupingSelectors, sortingSelectors, declared, kind ); 1016 } 1017 1018 private Scope.Kind getScopeKind() 1019 { 1020 switch( kind ) 1021 { 1022 case GroupBy: 1023 return Scope.Kind.GROUPBY; 1024 case CoGroup: 1025 return Scope.Kind.COGROUP; 1026 case Merge: 1027 return Scope.Kind.MERGE; 1028 case Join: 1029 return Scope.Kind.HASHJOIN; 1030 } 1031 1032 throw new IllegalStateException( "unknown kind: " + kind ); 1033 } 1034 1035 private Fields createJoinFields( Set<Scope> incomingScopes, Map<String, Fields> groupingSelectors, Fields declared ) 1036 { 1037 if( declared.isNone() ) 1038 declared = Fields.UNKNOWN; 1039 1040 Map<String, Fields> incomingFields = new HashMap<String, Fields>(); 1041 1042 for( Scope scope : incomingScopes ) 1043 incomingFields.put( scope.getName(), scope.getIncomingSpliceFields() ); 1044 1045 Fields outGroupingFields = Fields.NONE; 1046 1047 int offset = 0; 1048 for( Pipe pipe : pipes ) // need to retain order of pipes 1049 { 1050 String pipeName = pipe.getName(); 1051 Fields pipeGroupingSelector = groupingSelectors.get( pipeName ); 1052 Fields incomingField = incomingFields.get( pipeName ); 1053 1054 if( !pipeGroupingSelector.isNone() ) 1055 { 1056 Fields offsetFields = incomingField.selectPos( pipeGroupingSelector, offset ); 1057 Fields resolvedSelect = declared.select( offsetFields ); 1058 1059 outGroupingFields = outGroupingFields.append( resolvedSelect ); 1060 } 1061 1062 offset += incomingField.size(); 1063 } 1064 1065 return outGroupingFields; 1066 } 1067 1068 Map<String, Fields> resolveGroupingSelectors( Set<Scope> incomingScopes ) 1069 { 1070 try 1071 { 1072 Map<String, Fields> groupingSelectors = getKeySelectors(); 1073 Map<String, Fields> groupingFields = resolveSelectorsAgainstIncoming( incomingScopes, groupingSelectors, "grouping" ); 1074 1075 if( !verifySameSize( groupingFields ) ) 1076 throw new OperatorException( this, "all grouping fields must be same size: " + toString() ); 1077 1078 verifySameTypes( groupingSelectors, groupingFields ); 1079 1080 return groupingFields; 1081 } 1082 catch( FieldsResolverException exception ) 1083 { 1084 throw new OperatorException( this, OperatorException.Kind.grouping, exception.getSourceFields(), exception.getSelectorFields(), exception ); 1085 } 1086 catch( RuntimeException exception ) 1087 { 1088 throw new OperatorException( this, "could not resolve grouping selector in: " + this, exception ); 1089 } 1090 } 1091 1092 private boolean verifySameTypes( Map<String, Fields> groupingSelectors, Map<String, Fields> groupingFields ) 1093 { 1094 // create array of field positions with comparators from the grouping selectors 1095 // unsure which side has the comparators declared so make a union 1096 boolean[] hasComparator = new boolean[ groupingFields.values().iterator().next().size() ]; 1097 1098 for( Map.Entry<String, Fields> entry : groupingSelectors.entrySet() ) 1099 { 1100 Comparator[] comparatorsArray = entry.getValue().getComparators(); 1101 1102 for( int i = 0; i < comparatorsArray.length; i++ ) 1103 hasComparator[ i ] = hasComparator[ i ] || comparatorsArray[ i ] != null; 1104 } 1105 1106 // compare all the rhs fields with the lhs (lhs and rhs are arbitrary here) 1107 Iterator<Fields> iterator = groupingFields.values().iterator(); 1108 Fields lhsFields = iterator.next(); 1109 Type[] lhsTypes = lhsFields.getTypes(); 1110 1111 // if types are null, no basis for comparison 1112 if( lhsTypes == null ) 1113 return true; 1114 1115 while( iterator.hasNext() ) 1116 { 1117 Fields rhsFields = iterator.next(); 1118 Type[] rhsTypes = rhsFields.getTypes(); 1119 1120 // if types are null, no basis for comparison 1121 if( rhsTypes == null ) 1122 return true; 1123 1124 for( int i = 0; i < lhsTypes.length; i++ ) 1125 { 1126 if( hasComparator[ i ] ) 1127 continue; 1128 1129 Type lhs = lhsTypes[ i ]; 1130 Type rhs = rhsTypes[ i ]; 1131 1132 lhs = getCanonicalType( lhs ); 1133 rhs = getCanonicalType( rhs ); 1134 1135 if( lhs.equals( rhs ) ) 1136 continue; 1137 1138 Fields lhsError = new Fields( lhsFields.get( i ), lhsFields.getType( i ) ); 1139 Fields rhsError = new Fields( rhsFields.get( i ), rhsFields.getType( i ) ); 1140 1141 throw new OperatorException( this, "grouping fields must declare same types:" + lhsError.printVerbose() + " not same as " + rhsError.printVerbose() ); 1142 } 1143 } 1144 1145 return true; 1146 } 1147 1148 private Type getCanonicalType( Type type ) 1149 { 1150 if( type instanceof CoercibleType ) 1151 type = ( (CoercibleType) type ).getCanonicalType(); 1152 1153 // if one side is primitive, normalize to its primitive wrapper type 1154 if( type instanceof Class ) 1155 type = Coercions.asNonPrimitive( (Class) type ); 1156 1157 return type; 1158 } 1159 1160 private boolean verifySameSize( Map<String, Fields> groupingFields ) 1161 { 1162 Iterator<Fields> iterator = groupingFields.values().iterator(); 1163 int size = iterator.next().size(); 1164 1165 while( iterator.hasNext() ) 1166 { 1167 Fields groupingField = iterator.next(); 1168 1169 if( groupingField.size() != size ) 1170 return false; 1171 1172 size = groupingField.size(); 1173 } 1174 1175 return true; 1176 } 1177 1178 private Map<String, Fields> resolveSelectorsAgainstIncoming( Set<Scope> incomingScopes, Map<String, Fields> selectors, String type ) 1179 { 1180 Map<String, Fields> resolvedFields = new HashMap<String, Fields>(); 1181 1182 for( Scope incomingScope : incomingScopes ) 1183 { 1184 Fields selector = selectors.get( incomingScope.getName() ); 1185 1186 if( selector == null ) 1187 throw new OperatorException( this, "no " + type + " selector found for: " + incomingScope.getName() ); 1188 1189 Fields incomingFields; 1190 1191 if( selector.isNone() ) 1192 incomingFields = Fields.NONE; 1193 else if( selector.isAll() ) 1194 incomingFields = incomingScope.getIncomingSpliceFields(); 1195 else if( selector.isGroup() ) 1196 incomingFields = incomingScope.getOutGroupingFields(); 1197 else if( selector.isValues() ) 1198 incomingFields = incomingScope.getOutValuesFields().subtract( incomingScope.getOutGroupingFields() ); 1199 else 1200 incomingFields = incomingScope.getIncomingSpliceFields().select( selector ); 1201 1202 resolvedFields.put( incomingScope.getName(), incomingFields ); 1203 } 1204 1205 return resolvedFields; 1206 } 1207 1208 Map<String, Fields> resolveSortingSelectors( Set<Scope> incomingScopes ) 1209 { 1210 try 1211 { 1212 if( getSortingSelectors().isEmpty() ) 1213 return null; 1214 1215 return resolveSelectorsAgainstIncoming( incomingScopes, getSortingSelectors(), "sorting" ); 1216 } 1217 catch( FieldsResolverException exception ) 1218 { 1219 throw new OperatorException( this, OperatorException.Kind.sorting, exception.getSourceFields(), exception.getSelectorFields(), exception ); 1220 } 1221 catch( RuntimeException exception ) 1222 { 1223 throw new OperatorException( this, "could not resolve sorting selector in: " + this, exception ); 1224 } 1225 } 1226 1227 @Override 1228 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 1229 { 1230 return incomingScope.getIncomingSpliceFields(); 1231 } 1232 1233 Fields resolveDeclared( Set<Scope> incomingScopes ) 1234 { 1235 try 1236 { 1237 Fields declaredFields = getJoinDeclaredFields(); 1238 1239 // Fields.NONE is a flag to the CoGroup the following Buffer will use the JoinerClosure directly 1240 if( declaredFields != null && declaredFields.isNone() ) 1241 { 1242 if( !isCoGroup() ) 1243 throw new IllegalArgumentException( "Fields.NONE may only be declared as the join fields when using a CoGroup" ); 1244 1245 return Fields.NONE; 1246 } 1247 1248 if( declaredFields != null ) // null for GroupBy 1249 { 1250 if( incomingScopes.size() != pipes.size() && isSelfJoin() ) 1251 throw new OperatorException( this, "self joins without intermediate operators are not permitted, see 'numSelfJoins' constructor or identity function" ); 1252 1253 int size = 0; 1254 boolean foundUnknown = false; 1255 1256 List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes ); 1257 1258 for( Fields fields : appendableFields ) 1259 { 1260 foundUnknown = foundUnknown || fields.isUnknown(); 1261 size += fields.size(); 1262 } 1263 1264 // we must relax field checking in the face of unknown fields 1265 if( !foundUnknown && declaredFields.size() != size * ( numSelfJoins + 1 ) ) 1266 { 1267 if( isSelfJoin() ) 1268 throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " != size: " + size * ( numSelfJoins + 1 ) ); 1269 else 1270 throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " resolved: " + Util.print( appendableFields, "" ) ); 1271 } 1272 1273 int i = 0; 1274 for( Fields appendableField : appendableFields ) 1275 { 1276 Type[] types = appendableField.getTypes(); 1277 1278 if( types == null ) 1279 { 1280 i += appendableField.size(); 1281 continue; 1282 } 1283 1284 for( Type type : types ) 1285 { 1286 if( type != null ) 1287 declaredFields = declaredFields.applyType( i, type ); 1288 1289 i++; 1290 } 1291 } 1292 1293 return declaredFields; 1294 } 1295 1296 // support merge or cogrouping here 1297 if( isGroupBy() || isMerge() ) 1298 { 1299 Iterator<Scope> iterator = incomingScopes.iterator(); 1300 Fields commonFields = iterator.next().getIncomingSpliceFields(); 1301 1302 while( iterator.hasNext() ) 1303 { 1304 Scope incomingScope = iterator.next(); 1305 Fields fields = incomingScope.getIncomingSpliceFields(); 1306 1307 if( !commonFields.equalsFields( fields ) ) 1308 throw new OperatorException( this, "merged streams must declare the same field names, in the same order, expected: " + commonFields.printVerbose() + " found: " + fields.printVerbose() ); 1309 } 1310 1311 return commonFields; 1312 } 1313 else 1314 { 1315 List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes ); 1316 Fields appendedFields = new Fields(); 1317 1318 try 1319 { 1320 // will fail on name collisions 1321 for( Fields appendableField : appendableFields ) 1322 appendedFields = appendedFields.append( appendableField ); 1323 } 1324 catch( TupleException exception ) 1325 { 1326 String fields = ""; 1327 1328 for( Fields appendableField : appendableFields ) 1329 fields += appendableField.print(); 1330 1331 throw new OperatorException( this, "found duplicate field names in joined tuple stream: " + fields, exception ); 1332 } 1333 1334 return appendedFields; 1335 } 1336 } 1337 catch( OperatorException exception ) 1338 { 1339 throw exception; 1340 } 1341 catch( RuntimeException exception ) 1342 { 1343 throw new OperatorException( this, "could not resolve declared fields in: " + this, exception ); 1344 } 1345 } 1346 1347 public Fields getJoinDeclaredFields() 1348 { 1349 Fields declaredFields = getDeclaredFields(); 1350 1351 if( !( joiner instanceof DeclaresResults ) ) 1352 return declaredFields; 1353 1354 if( declaredFields == null && ( (DeclaresResults) joiner ).getFieldDeclaration() != null ) 1355 declaredFields = ( (DeclaresResults) joiner ).getFieldDeclaration(); 1356 1357 return declaredFields; 1358 } 1359 1360 private List<Fields> getOrderedResolvedFields( Set<Scope> incomingScopes ) 1361 { 1362 Map<String, Scope> scopesMap = new HashMap<String, Scope>(); 1363 1364 for( Scope incomingScope : incomingScopes ) 1365 scopesMap.put( incomingScope.getName(), incomingScope ); 1366 1367 List<Fields> appendableFields = new ArrayList<Fields>(); 1368 1369 for( Pipe pipe : pipes ) 1370 appendableFields.add( scopesMap.get( pipe.getName() ).getIncomingSpliceFields() ); 1371 return appendableFields; 1372 } 1373 1374 // OBJECT OVERRIDES 1375 1376 @Override 1377 @SuppressWarnings({"RedundantIfStatement"}) 1378 public boolean equals( Object object ) 1379 { 1380 if( this == object ) 1381 return true; 1382 if( object == null || getClass() != object.getClass() ) 1383 return false; 1384 if( !super.equals( object ) ) 1385 return false; 1386 1387 Splice splice = (Splice) object; 1388 1389 if( spliceName != null ? !spliceName.equals( splice.spliceName ) : splice.spliceName != null ) 1390 return false; 1391 if( keyFieldsMap != null ? !keyFieldsMap.equals( splice.keyFieldsMap ) : splice.keyFieldsMap != null ) 1392 return false; 1393 if( pipes != null ? !pipes.equals( splice.pipes ) : splice.pipes != null ) 1394 return false; 1395 1396 return true; 1397 } 1398 1399 @Override 1400 public int hashCode() 1401 { 1402 int result = super.hashCode(); 1403 result = 31 * result + ( pipes != null ? pipes.hashCode() : 0 ); 1404 result = 31 * result + ( keyFieldsMap != null ? keyFieldsMap.hashCode() : 0 ); 1405 result = 31 * result + ( spliceName != null ? spliceName.hashCode() : 0 ); 1406 return result; 1407 } 1408 1409 @Override 1410 public String toString() 1411 { 1412 StringBuilder buffer = new StringBuilder( super.toString() ); 1413 1414 buffer.append( "[by:" ); 1415 1416 for( String name : keyFieldsMap.keySet() ) 1417 { 1418 if( keyFieldsMap.size() > 1 ) 1419 buffer.append( " " ).append( name ).append( ":" ); 1420 1421 buffer.append( keyFieldsMap.get( name ).printVerbose() ); 1422 } 1423 1424 if( isSelfJoin() ) 1425 buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" ); 1426 1427 buffer.append( "]" ); 1428 1429 return buffer.toString(); 1430 } 1431 1432 @Override 1433 protected void printInternal( StringBuffer buffer, Scope scope ) 1434 { 1435 super.printInternal( buffer, scope ); 1436 Map<String, Fields> map = scope.getKeySelectors(); 1437 1438 if( map != null ) 1439 { 1440 buffer.append( "[by:" ); 1441 1442 // important to retain incoming pipe order 1443 for( Map.Entry<String, Fields> entry : keyFieldsMap.entrySet() ) 1444 { 1445 String name = entry.getKey(); 1446 1447 if( map.size() > 1 ) 1448 buffer.append( name ).append( ":" ); 1449 1450 Fields keys = map.get( name ); 1451 1452 // if keys null, this is likely an edge contracted map 1453 if( keys == null ) 1454 buffer.append( "<unavailable>" ); 1455 else 1456 buffer.append( keys.print() ); // get resolved keys 1457 } 1458 1459 if( isSelfJoin() ) 1460 buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" ); 1461 1462 buffer.append( "]" ); 1463 } 1464 } 1465 }