001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.planner;
023
024import cascading.flow.hadoop.planner.rule.transformer.ReplaceAccumulateTapWithDistCacheTransformer;
025import cascading.flow.planner.rule.RuleRegistry;
026import cascading.flow.planner.rule.annotator.LogicalMergeAnnotator;
027import cascading.flow.planner.rule.assertion.BufferAfterEveryAssert;
028import cascading.flow.planner.rule.assertion.EveryAfterBufferAssert;
029import cascading.flow.planner.rule.assertion.LoneGroupAssert;
030import cascading.flow.planner.rule.assertion.MissingGroupAssert;
031import cascading.flow.planner.rule.assertion.SplitBeforeEveryAssert;
032import cascading.flow.planner.rule.partitioner.WholeGraphStepPartitioner;
033import cascading.flow.planner.rule.transformer.ApplyAssertionLevelTransformer;
034import cascading.flow.planner.rule.transformer.ApplyDebugLevelTransformer;
035import cascading.flow.planner.rule.transformer.RemoveNoOpPipeTransformer;
036import cascading.flow.tez.planner.rule.annotator.AccumulatedPostNodeAnnotator;
037import cascading.flow.tez.planner.rule.assertion.DualStreamedAccumulatedMergeNodeAssert;
038import cascading.flow.tez.planner.rule.partitioner.BottomUpBoundariesNodePartitioner;
039import cascading.flow.tez.planner.rule.partitioner.BottomUpJoinedBoundariesNodePartitioner;
040import cascading.flow.tez.planner.rule.partitioner.BottomUpJoinedBoundariesTriangleNodePartitioner;
041import cascading.flow.tez.planner.rule.partitioner.ConsecutiveGroupOrMergesNodePartitioner;
042import cascading.flow.tez.planner.rule.partitioner.SplitJoinBoundariesNodeRePartitioner;
043import cascading.flow.tez.planner.rule.partitioner.StreamedAccumulatedBoundariesNodeRePartitioner;
044import cascading.flow.tez.planner.rule.partitioner.StreamedOnlySourcesNodeRePartitioner;
045import cascading.flow.tez.planner.rule.partitioner.TopDownSplitBoundariesNodePartitioner;
046import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceCheckpointTransformer;
047import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupBlockingHashJoinTransformer;
048import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitHashJoinTransformer;
049import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitSpliceTransformer;
050import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitTransformer;
051import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinSameSourceTransformer;
052import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinToHashJoinTransformer;
053import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceJoinSplitTransformer;
054import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitSplitToStreamedHashJoinTransformer;
055import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitToStreamedHashJoinTransformer;
056import cascading.flow.tez.planner.rule.transformer.RemoveMalformedHashJoinNodeTransformer;
057
058/**
059 * The HashJoinHadoop2TezRuleRegistry provides support for assemblies using {@link cascading.pipe.HashJoin} pipes.
060 * <p>
061 * Detecting and optimizing for HashJoin pipes adds further complexity and time to converge on a valid physical plan.
062 * <p>
063 * If facing slowdowns, and no HashJoins are used, switch to the
064 * {@link cascading.flow.tez.planner.NoHashJoinHadoop2TezRuleRegistry} via the appropriate
065 * {@link cascading.flow.FlowConnector} constructor.
066 */
067public class HashJoinHadoop2TezRuleRegistry extends RuleRegistry
068  {
069  public HashJoinHadoop2TezRuleRegistry()
070    {
071//    enableDebugLogging();
072
073    // PreBalance
074    addRule( new LoneGroupAssert() );
075    addRule( new MissingGroupAssert() );
076    addRule( new BufferAfterEveryAssert() );
077    addRule( new EveryAfterBufferAssert() );
078    addRule( new SplitBeforeEveryAssert() );
079
080    addRule( new BoundaryBalanceGroupSplitTransformer() );
081    addRule( new BoundaryBalanceGroupSplitSpliceTransformer() ); // prevents AssemblyHelpersPlatformTest#testSameSourceMerge deadlock
082    addRule( new BoundaryBalanceCheckpointTransformer() );
083
084    // hash join
085    addRule( new BoundaryBalanceHashJoinSameSourceTransformer() );
086    addRule( new BoundaryBalanceSplitToStreamedHashJoinTransformer() ); // testGroupBySplitGroupByJoin
087    addRule( new BoundaryBalanceSplitSplitToStreamedHashJoinTransformer() ); // testGroupBySplitSplitGroupByJoin
088    addRule( new BoundaryBalanceHashJoinToHashJoinTransformer() ); // force HJ into unique nodes
089    addRule( new BoundaryBalanceGroupBlockingHashJoinTransformer() ); // joinAfterEvery
090    addRule( new BoundaryBalanceGroupSplitHashJoinTransformer() ); // groupBySplitJoins
091    addRule( new BoundaryBalanceJoinSplitTransformer() ); // prevents duplication of HashJoin, testJoinSplit
092
093    // PreResolve
094    addRule( new RemoveNoOpPipeTransformer() );
095    addRule( new ApplyAssertionLevelTransformer() );
096    addRule( new ApplyDebugLevelTransformer() );
097    addRule( new LogicalMergeAnnotator() ); // MergePipesPlatformTest#testSameSourceMergeHashJoin
098    addRule( new ReplaceAccumulateTapWithDistCacheTransformer() );
099
100    // PostResolve
101
102    // PartitionSteps
103    addRule( new WholeGraphStepPartitioner() );
104
105    // PostSteps
106
107    // PartitionNodes
108
109    // no match with HashJoin inclusion
110    addRule( new TopDownSplitBoundariesNodePartitioner() ); // split from source to multiple sinks
111    addRule( new ConsecutiveGroupOrMergesNodePartitioner() );
112    addRule( new BottomUpBoundariesNodePartitioner() ); // streamed paths re-partitioned w/ StreamedOnly
113    addRule( new SplitJoinBoundariesNodeRePartitioner() ); // testCoGroupSelf - compensates for tez-1190
114
115    // hash join inclusion
116    addRule( new BottomUpJoinedBoundariesNodePartitioner() ); // will capture multiple inputs into sink for use with HashJoins
117    addRule( new BottomUpJoinedBoundariesTriangleNodePartitioner() ); // will capture multiple inputs into sink for use with HashJoins
118    addRule( new StreamedAccumulatedBoundariesNodeRePartitioner() ); // joinsIntoCoGroupLhs & groupBySplitJoins
119    addRule( new StreamedOnlySourcesNodeRePartitioner() );
120
121    // PostNodes
122    addRule( new RemoveMalformedHashJoinNodeTransformer() ); // joinsIntoCoGroupLhs
123    addRule( new AccumulatedPostNodeAnnotator() ); // allows accumulated boundaries to be identified
124
125    addRule( new DualStreamedAccumulatedMergeNodeAssert() );
126    }
127  }