001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.CascadingException;
031import cascading.flow.FlowElement;
032import cascading.flow.FlowException;
033import cascading.flow.FlowNode;
034import cascading.flow.FlowProcess;
035import cascading.flow.FlowRuntimeProps;
036import cascading.flow.hadoop.planner.HadoopFlowStepJob;
037import cascading.flow.hadoop.util.HadoopMRUtil;
038import cascading.flow.hadoop.util.HadoopUtil;
039import cascading.flow.planner.BaseFlowStep;
040import cascading.flow.planner.FlowStepJob;
041import cascading.flow.planner.PlatformInfo;
042import cascading.flow.planner.graph.ElementGraph;
043import cascading.flow.planner.process.FlowNodeGraph;
044import cascading.flow.planner.process.ProcessEdge;
045import cascading.management.state.ClientState;
046import cascading.pipe.CoGroup;
047import cascading.tap.Tap;
048import cascading.tap.hadoop.io.MultiInputFormat;
049import cascading.tap.hadoop.util.Hadoop18TapUtil;
050import cascading.tap.hadoop.util.TempHfs;
051import cascading.tuple.Fields;
052import cascading.tuple.hadoop.TupleSerialization;
053import cascading.tuple.hadoop.util.CoGroupingComparator;
054import cascading.tuple.hadoop.util.CoGroupingPartitioner;
055import cascading.tuple.hadoop.util.GroupingComparator;
056import cascading.tuple.hadoop.util.GroupingPartitioner;
057import cascading.tuple.hadoop.util.GroupingSortingComparator;
058import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
059import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
060import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
061import cascading.tuple.hadoop.util.ReverseTupleComparator;
062import cascading.tuple.hadoop.util.TupleComparator;
063import cascading.tuple.io.KeyIndexTuple;
064import cascading.tuple.io.KeyTuple;
065import cascading.tuple.io.TuplePair;
066import cascading.tuple.io.ValueIndexTuple;
067import cascading.tuple.io.ValueTuple;
068import cascading.util.ProcessLogger;
069import cascading.util.Util;
070import cascading.util.Version;
071import org.apache.hadoop.filecache.DistributedCache;
072import org.apache.hadoop.fs.Path;
073import org.apache.hadoop.mapred.FileOutputFormat;
074import org.apache.hadoop.mapred.JobConf;
075import org.apache.hadoop.mapred.OutputFormat;
076
077import static cascading.flow.hadoop.util.HadoopUtil.*;
078
079/**
080 *
081 */
082public class HadoopFlowStep extends BaseFlowStep<JobConf>
083  {
084  protected HadoopFlowStep( String name, int ordinal )
085    {
086    super( name, ordinal );
087    }
088
089  public HadoopFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph )
090    {
091    super( elementGraph, flowNodeGraph );
092    }
093
094  @Override
095  public Map<Object, Object> getConfigAsProperties()
096    {
097    return HadoopUtil.createProperties( getConfig() );
098    }
099
100  public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
101    {
102    JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig );
103
104    // disable warning
105    conf.setBoolean( "mapred.used.genericoptionsparser", true );
106
107    conf.setJobName( getStepDisplayName( conf.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) );
108
109    conf.setOutputKeyClass( KeyTuple.class );
110    conf.setOutputValueClass( ValueTuple.class );
111
112    conf.setMapRunnerClass( FlowMapper.class );
113    conf.setReducerClass( FlowReducer.class );
114
115    // set for use by the shuffling phase
116    TupleSerialization.setSerializations( conf );
117
118    initFromSources( flowProcess, conf );
119
120    initFromSink( flowProcess, conf );
121
122    initFromTraps( flowProcess, conf );
123
124    initFromStepConfigDef( conf );
125
126    int numSinkParts = getSink().getScheme().getNumSinkParts();
127
128    if( numSinkParts != 0 )
129      {
130      // if no reducer, set num map tasks to control parts
131      if( getGroup() != null )
132        conf.setNumReduceTasks( numSinkParts );
133      else
134        conf.setNumMapTasks( numSinkParts );
135      }
136    else if( getGroup() != null )
137      {
138      int gatherPartitions = conf.getNumReduceTasks();
139
140      if( gatherPartitions == 0 )
141        gatherPartitions = conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 );
142
143      if( gatherPartitions == 0 )
144        throw new FlowException( getName(), "a default number of gather partitions must be set, see FlowRuntimeProps" );
145
146      conf.setNumReduceTasks( gatherPartitions );
147      }
148
149    conf.setOutputKeyComparatorClass( TupleComparator.class );
150
151    ProcessEdge processEdge = Util.getFirst( getFlowNodeGraph().edgeSet() );
152
153    if( getGroup() == null )
154      {
155      conf.setNumReduceTasks( 0 ); // disable reducers
156      }
157    else
158      {
159      // must set map output defaults when performing a reduce
160      conf.setMapOutputKeyClass( KeyTuple.class );
161      conf.setMapOutputValueClass( ValueTuple.class );
162      conf.setPartitionerClass( GroupingPartitioner.class );
163
164      // handles the case the groupby sort should be reversed
165      if( getGroup().isSortReversed() )
166        conf.setOutputKeyComparatorClass( ReverseTupleComparator.class );
167
168      Integer ordinal = (Integer) Util.getFirst( processEdge.getSinkExpectedOrdinals() );
169
170      addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors(), (Fields) processEdge.getResolvedKeyFields().get( ordinal ) );
171
172      if( getGroup().isGroupBy() )
173        addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors(), (Fields) processEdge.getResolvedSortFields().get( ordinal ) );
174
175      if( !getGroup().isGroupBy() )
176        {
177        conf.setPartitionerClass( CoGroupingPartitioner.class );
178        conf.setMapOutputKeyClass( KeyIndexTuple.class ); // allows groups to be sorted by index
179        conf.setMapOutputValueClass( ValueIndexTuple.class );
180        conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index
181        conf.setOutputValueGroupingComparator( CoGroupingComparator.class );
182        }
183
184      if( getGroup().isSorted() )
185        {
186        conf.setPartitionerClass( GroupingSortingPartitioner.class );
187        conf.setMapOutputKeyClass( TuplePair.class );
188
189        if( getGroup().isSortReversed() )
190          conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class );
191        else
192          conf.setOutputKeyComparatorClass( GroupingSortingComparator.class );
193
194        // no need to supply a reverse comparator, only equality is checked
195        conf.setOutputValueGroupingComparator( GroupingComparator.class );
196        }
197      }
198
199    // if we write type information into the stream, we can perform comparisons in indexed tuples
200    // thus, if the edge is a CoGroup and they keys are not common types, force writing of type information
201    if( processEdge != null && ifCoGroupAndKeysHaveCommonTypes( this, processEdge.getFlowElement(), processEdge.getResolvedKeyFields() ) )
202      {
203      conf.set( "cascading.node.ordinals", Util.join( processEdge.getSinkExpectedOrdinals(), "," ) );
204      addFields( conf, "cascading.node.key.fields", processEdge.getResolvedKeyFields() );
205      addFields( conf, "cascading.node.sort.fields", processEdge.getResolvedSortFields() );
206      addFields( conf, "cascading.node.value.fields", processEdge.getResolvedValueFields() );
207      }
208
209    // perform last so init above will pass to tasks
210    String versionString = Version.getRelease();
211
212    if( versionString != null )
213      conf.set( "cascading.version", versionString );
214
215    conf.set( CASCADING_FLOW_STEP_ID, getID() );
216    conf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) );
217
218    HadoopUtil.setIsInflow( conf );
219
220    Iterator<FlowNode> iterator = getFlowNodeGraph().getTopologicalIterator();
221
222    FlowNode mapperNode = iterator.next();
223    FlowNode reducerNode = iterator.hasNext() ? iterator.next() : null;
224
225    if( reducerNode != null )
226      reducerNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( conf.getNumReduceTasks() ) );
227
228    String mapState = pack( mapperNode, conf );
229    String reduceState = pack( reducerNode, conf );
230
231    // hadoop 20.2 doesn't like dist cache when using local mode
232    int maxSize = Short.MAX_VALUE;
233
234    int length = mapState.length() + reduceState.length();
235
236    if( isHadoopLocalMode( conf ) || length < maxSize ) // seems safe
237      {
238      conf.set( "cascading.flow.step.node.map", mapState );
239
240      if( !Util.isEmpty( reduceState ) )
241        conf.set( "cascading.flow.step.node.reduce", reduceState );
242      }
243    else
244      {
245      conf.set( "cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "map", mapState ) );
246
247      if( !Util.isEmpty( reduceState ) )
248        conf.set( "cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "reduce", reduceState ) );
249      }
250
251    return conf;
252    }
253
254  private static boolean ifCoGroupAndKeysHaveCommonTypes( ProcessLogger processLogger, FlowElement flowElement, Map<Integer, Fields> resolvedKeyFields )
255    {
256    if( !( flowElement instanceof CoGroup ) )
257      return true;
258
259    if( resolvedKeyFields == null || resolvedKeyFields.size() < 2 )
260      return true;
261
262    Iterator<Map.Entry<Integer, Fields>> iterator = resolvedKeyFields.entrySet().iterator();
263
264    Fields fields = iterator.next().getValue();
265
266    while( iterator.hasNext() )
267      {
268      Fields next = iterator.next().getValue();
269
270      if( !Arrays.equals( fields.getTypesClasses(), next.getTypesClasses() ) )
271        {
272        processLogger.logWarn( "unable to perform: {}, on mismatched join types and optimize serialization with type exclusion, fields: {} & {}", flowElement, fields, next );
273        return false;
274        }
275      }
276
277    return true;
278    }
279
280  public boolean isHadoopLocalMode( JobConf conf )
281    {
282    return HadoopUtil.isLocal( conf );
283    }
284
285  protected FlowStepJob<JobConf> createFlowStepJob( ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf initializedStepConfig )
286    {
287    try
288      {
289      return new HadoopFlowStepJob( clientState, this, initializedStepConfig );
290      }
291    catch( NoClassDefFoundError error )
292      {
293      PlatformInfo platformInfo = HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" );
294      String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1";
295
296      logError( String.format( message, platformInfo.toString() ), error );
297
298      throw error;
299      }
300    }
301
302  /**
303   * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown.
304   *
305   * @param config of type JobConf
306   */
307  public void clean( JobConf config )
308    {
309    String stepStatePath = config.get( "cascading.flow.step.path" );
310
311    if( stepStatePath != null )
312      {
313      try
314        {
315        HadoopUtil.removeStateFromDistCache( config, stepStatePath );
316        }
317      catch( IOException exception )
318        {
319        logWarn( "unable to remove step state file: " + stepStatePath, exception );
320        }
321      }
322
323    if( tempSink != null )
324      {
325      try
326        {
327        tempSink.deleteResource( config );
328        }
329      catch( Exception exception )
330        {
331        // sink all exceptions, don't fail app
332        logWarn( "unable to remove temporary file: " + tempSink, exception );
333        }
334      }
335
336    // safe way to handle zero sinks case
337    for( Tap sink : getSinkTaps() )
338      cleanIntermediateData( config, sink );
339
340    for( Tap tap : getTraps() )
341      cleanTapMetaData( config, tap );
342    }
343
344  protected void cleanIntermediateData( JobConf config, Tap sink )
345    {
346    if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
347      {
348      try
349        {
350        sink.deleteResource( config );
351        }
352      catch( Exception exception )
353        {
354        // sink all exceptions, don't fail app
355        logWarn( "unable to remove temporary file: " + sink, exception );
356        }
357      }
358    else
359      {
360      cleanTapMetaData( config, sink );
361      }
362    }
363
364  private void cleanTapMetaData( JobConf jobConf, Tap tap )
365    {
366    try
367      {
368      Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap );
369      }
370    catch( IOException exception )
371      {
372      // ignore exception
373      }
374    }
375
376  private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
377    {
378    if( !traps.isEmpty() )
379      {
380      JobConf trapConf = HadoopUtil.copyJobConf( conf );
381
382      for( Tap tap : traps.values() )
383        tap.sinkConfInit( flowProcess, trapConf );
384      }
385    }
386
387  protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf )
388    {
389    // handles case where same tap is used on multiple branches
390    // we do not want to init the same tap multiple times
391    Set<Tap> uniqueSources = getUniqueStreamedSources();
392
393    JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ];
394    int i = 0;
395
396    for( Tap tap : uniqueSources )
397      {
398      if( tap.getIdentifier() == null )
399        throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
400
401      streamedJobs[ i ] = flowProcess.copyConfig( conf );
402
403      streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
404
405      tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
406
407      i++;
408      }
409
410    Set<Tap> accumulatedSources = getAllAccumulatedSources();
411
412    for( Tap tap : accumulatedSources )
413      {
414      JobConf accumulatedJob = flowProcess.copyConfig( conf );
415
416      tap.sourceConfInit( flowProcess, accumulatedJob );
417
418      Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob );
419      conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
420
421      try
422        {
423        if( DistributedCache.getCacheFiles( accumulatedJob ) != null )
424          DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf );
425        }
426      catch( IOException exception )
427        {
428        throw new CascadingException( exception );
429        }
430      }
431
432    MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last
433    }
434
435  private void initFromStepConfigDef( final JobConf conf )
436    {
437    initConfFromStepConfigDef( new ConfigurationSetter( conf ) );
438    }
439
440  /**
441   * sources are specific to step, remove all known accumulated sources, if any
442   *
443   * @return
444   */
445  private Set<Tap> getUniqueStreamedSources()
446    {
447    Set<Tap> allAccumulatedSources = getAllAccumulatedSources();
448
449    HashSet<Tap> set = new HashSet<>( sources.keySet() );
450
451    set.removeAll( allAccumulatedSources );
452
453    return set;
454    }
455
456  protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
457    {
458    // init sink first so tempSink can take precedence
459    if( getSink() != null )
460      getSink().sinkConfInit( flowProcess, conf );
461
462    Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
463    boolean isFileOutputFormat = false;
464
465    if( outputFormat != null )
466      isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
467
468    Path outputPath = FileOutputFormat.getOutputPath( conf );
469
470    // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
471    // PartitionTap won't set the output, but will set an OutputFormat
472    // MultiSinkTap won't set the output or set the OutputFormat
473    // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
474    if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
475      tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
476
477    // tempSink exists because sink is writeDirect
478    if( tempSink != null )
479      tempSink.sinkConfInit( flowProcess, conf );
480    }
481
482  protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf )
483    {
484    initFromTraps( flowProcess, conf, getTrapMap() );
485    }
486  }