001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow;
023
024import java.util.LinkedHashSet;
025import java.util.Properties;
026import java.util.Set;
027
028import cascading.property.Props;
029import cascading.util.Util;
030
031/**
032 * Class FlowRuntimeProps is a fluent helper class for setting {@link Flow} specific runtime properties through
033 * a {@link FlowConnector}.
034 * <p>
035 * These properties apply to the cluster or remote side of the Flow execution. For client (or local) side properties
036 * see {@link cascading.flow.FlowProps}.
037 * <p>
038 * Available properties are:
039 * <ul>
040 * <li>gather partitions - number of slices (partitions) to gather keys within each {@link cascading.flow.FlowNode}.
041 * In MapReduce this is the number of reducers. In Tez DAG this is the scatter gather parallelization.</li>
042 * <li>log counters - counter names to log to INFO when a cluster side slice completes.</li>
043 * </ul>
044 * <p>
045 * Note, if the num of gather partitions is not set, the Flow may fail during planning or setup, depending on the
046 * platform.
047 */
048public class FlowRuntimeProps extends Props
049  {
050  public static final String GATHER_PARTITIONS = "cascading.flow.runtime.gather.partitions.num";
051  public static final String LOG_COUNTERS = "cascading.flow.runtime.log.counters";
052  public static final String COMBINE_SPLITS = "cascading.flow.runtime.splits.combine";
053
054  int gatherPartitions = 0;
055  Set<String> logCounters = new LinkedHashSet<>();
056  Boolean combineSplits;
057
058  public static FlowRuntimeProps flowRuntimeProps()
059    {
060    return new FlowRuntimeProps();
061    }
062
063  public FlowRuntimeProps()
064    {
065    }
066
067  /**
068   * Method getGatherPartitions returns the number of gather partitions
069   *
070   * @return number of gather partitions
071   */
072  public int getGatherPartitions()
073    {
074    return gatherPartitions;
075    }
076
077  /**
078   * Method setGatherPartitions sets the default number of gather partitions each {@link cascading.flow.FlowNode}
079   * should use.
080   *
081   * @param gatherPartitions number of gather partitions to use per node
082   * @return this
083   */
084  public FlowRuntimeProps setGatherPartitions( int gatherPartitions )
085    {
086    if( gatherPartitions < 1 )
087      throw new IllegalArgumentException( "gatherPartitions value must be greater than zero" );
088
089    this.gatherPartitions = gatherPartitions;
090
091    return this;
092    }
093
094  /**
095   * Method addLogCounter adds a new counter to be logged when a cluster side slice completes.
096   * <p>
097   * The given counters will be logged using the default cluster side logging mechanism.
098   *
099   * @param counter the Enum counter to log
100   * @return this
101   */
102  public FlowRuntimeProps addLogCounter( Enum counter )
103    {
104    addLogCounter( counter.getDeclaringClass().getName(), counter.name() );
105
106    return this;
107    }
108
109  /**
110   * Method addLogCounter adds a new counter to be logged when a cluster side slice completes.
111   * <p>
112   * The given counters will be logged using the default cluster side logging mechanism.
113   *
114   * @param group   the String counter group to log
115   * @param counter the String counter name to log
116   * @return this
117   */
118  public FlowRuntimeProps addLogCounter( String group, String counter )
119    {
120    logCounters.add( group + ":" + counter );
121
122    return this;
123    }
124
125  public Boolean getCombineSplits()
126    {
127    return combineSplits;
128    }
129
130  /**
131   * Method setCombineSplits will enable or disable combining of 'splits' on sources.
132   * <p>
133   * A split is a sub-set of data from a {@link cascading.tap.Tap} source resource. Combining
134   * small splits into larger ones both reduce parallelism, but also reduce overhead of starting
135   * work on a very small data set.
136   * <p>
137   * This is commonly done when sourcing large numbers of very small files.
138   * <p>
139   * Setting this value will change the default, which is a platform dependent value.
140   *
141   * @param combineSplits
142   * @return
143   */
144  public FlowRuntimeProps setCombineSplits( Boolean combineSplits )
145    {
146    this.combineSplits = combineSplits;
147
148    return this;
149    }
150
151  @Override
152  protected void addPropertiesTo( Properties properties )
153    {
154    if( gatherPartitions > 0 )
155      properties.setProperty( GATHER_PARTITIONS, Integer.toString( gatherPartitions ) );
156
157    if( !logCounters.isEmpty() )
158      properties.setProperty( LOG_COUNTERS, Util.join( logCounters, "," ) );
159
160    if( combineSplits != null )
161      properties.setProperty( COMBINE_SPLITS, Boolean.toString( combineSplits ) );
162    }
163  }