001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.util.LinkedHashSet;
024import java.util.Properties;
025import java.util.Set;
026
027import cascading.property.Props;
028import cascading.util.Util;
029
030/**
031 * Class FlowRuntimeProps is a fluent helper class for setting {@link Flow} specific runtime properties through
032 * a {@link FlowConnector}.
033 * <p/>
034 * These properties apply to the cluster or remote side of the Flow execution. For client (or local) side properties
035 * see {@link cascading.flow.FlowProps}.
036 * <p/>
037 * Available properties are:
038 * <p/>
039 * <ul>
040 * <li>gather partitions - number of slices (partitions) to gather keys within each {@link cascading.flow.FlowNode}.
041 * In MapReduce this is the number of reducers. In Tez DAG this is the scatter gather parallelization.</li>
042 * <li>log counters - counter names to log to INFO when a cluster side slice completes.</li>
043 * </ul>
044 * <p/>
045 * Note, if the num of gather partitions is not set, the Flow may fail during planning or setup, depending on the
046 * platform.
047 */
048public class FlowRuntimeProps extends Props
049  {
050  public static final String GATHER_PARTITIONS = "cascading.flow.runtime.gather.partitions.num";
051  public static final String LOG_COUNTERS = "cascading.flow.runtime.log.counters";
052  public static final String COMBINE_SPLITS = "cascading.flow.runtime.splits.combine";
053
054  int gatherPartitions = 0;
055  Set<String> logCounters = new LinkedHashSet<>();
056  Boolean combineSplits;
057
058  public static FlowRuntimeProps flowRuntimeProps()
059    {
060    return new FlowRuntimeProps();
061    }
062
063  public FlowRuntimeProps()
064    {
065    }
066
067  /**
068   * Method getGatherPartitions returns the number of gather partitions
069   *
070   * @return number of gather partitions
071   */
072  public int getGatherPartitions()
073    {
074    return gatherPartitions;
075    }
076
077  /**
078   * Method setGatherPartitions sets the default number of gather partitions each {@link cascading.flow.FlowNode}
079   * should use.
080   *
081   * @param gatherPartitions number of gather partitions to use per node
082   * @return this
083   */
084  public FlowRuntimeProps setGatherPartitions( int gatherPartitions )
085    {
086    if( gatherPartitions < 1 )
087      throw new IllegalArgumentException( "gatherPartitions value must be greater than zero" );
088
089    this.gatherPartitions = gatherPartitions;
090
091    return this;
092    }
093
094  /**
095   * Method addLogCounter adds a new counter to be logged when a cluster side slice completes.
096   * <p/>
097   * The given counters will be logged using the default cluster side logging mechanism.
098   *
099   * @param counter the Enum counter to log
100   * @return this
101   */
102  public FlowRuntimeProps addLogCounter( Enum counter )
103    {
104    addLogCounter( counter.getDeclaringClass().getName(), counter.name() );
105
106    return this;
107    }
108
109  /**
110   * Method addLogCounter adds a new counter to be logged when a cluster side slice completes.
111   * <p/>
112   * The given counters will be logged using the default cluster side logging mechanism.
113   *
114   * @param group   the String counter group to log
115   * @param counter the String counter name to log
116   * @return this
117   */
118  public FlowRuntimeProps addLogCounter( String group, String counter )
119    {
120    logCounters.add( group + ":" + counter );
121
122    return this;
123    }
124
125  public Boolean getCombineSplits()
126    {
127    return combineSplits;
128    }
129
130  /**
131   * Method setCombineSplits will enable or disable combining of 'splits' on sources.
132   * <p/>
133   * A split is a sub-set of data from a {@link cascading.tap.Tap} source resource. Combining
134   * small splits into larger ones both reduce parallelism, but also reduce overhead of starting
135   * work on a very small data set.
136   * <p/>
137   * This is commonly done when sourcing large numbers of very small files.
138   * <p/>
139   * Setting this value will change the default, which is a platform dependent value.
140   *
141   * @param combineSplits
142   * @return
143   */
144  public FlowRuntimeProps setCombineSplits( Boolean combineSplits )
145    {
146    this.combineSplits = combineSplits;
147
148    return this;
149    }
150
151  @Override
152  protected void addPropertiesTo( Properties properties )
153    {
154    if( gatherPartitions > 0 )
155      properties.setProperty( GATHER_PARTITIONS, Integer.toString( gatherPartitions ) );
156
157    if( !logCounters.isEmpty() )
158      properties.setProperty( LOG_COUNTERS, Util.join( logCounters, "," ) );
159
160    if( combineSplits != null )
161      properties.setProperty( COMBINE_SPLITS, Boolean.toString( combineSplits ) );
162    }
163  }