001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop.planner.rule.expression;
023
024import cascading.flow.planner.iso.expression.ElementCapture;
025import cascading.flow.planner.iso.expression.ExpressionGraph;
026import cascading.flow.planner.iso.expression.FlowElementExpression;
027import cascading.flow.planner.iso.expression.NonSafeOperationExpression;
028import cascading.flow.planner.iso.expression.TypeExpression;
029import cascading.flow.planner.rule.RuleExpression;
030import cascading.flow.planner.rule.expressiongraph.NonSafeAndSplitAndSyncPipeExpressionGraph;
031import cascading.pipe.Pipe;
032import cascading.tap.Tap;
033
034/**
035 * Captures a split against a non-safe Operation to prevent the operation from running
036 * in parallel mappers.
037 * <p>
038 * THis variant allows a non-safe operation to be followed by any Pipe. this assumes other rules
039 * enforce splitting to only be on Each or Pipe types.
040 */
041public class BalanceNonSafePipeSplitExpression extends RuleExpression
042  {
043  public BalanceNonSafePipeSplitExpression()
044    {
045    super(
046      new NonSafeAndSplitAndSyncPipeExpressionGraph(),
047
048      new ExpressionGraph()
049        .arcs(
050          new FlowElementExpression( Tap.class ),
051          new NonSafeOperationExpression(),
052          new FlowElementExpression( Pipe.class, TypeExpression.Topo.SplitOnly )
053        ),
054
055      new ExpressionGraph()
056        .arcs(
057          new FlowElementExpression( ElementCapture.Primary, Pipe.class, TypeExpression.Topo.Tail )
058        )
059    );
060    }
061  }