001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.CascadingException;
032import cascading.flow.FlowElement;
033import cascading.flow.FlowException;
034import cascading.flow.FlowNode;
035import cascading.flow.FlowProcess;
036import cascading.flow.FlowRuntimeProps;
037import cascading.flow.hadoop.planner.HadoopFlowStepJob;
038import cascading.flow.hadoop.util.HadoopMRUtil;
039import cascading.flow.hadoop.util.HadoopUtil;
040import cascading.flow.planner.BaseFlowStep;
041import cascading.flow.planner.FlowStepJob;
042import cascading.flow.planner.PlatformInfo;
043import cascading.flow.planner.graph.ElementGraph;
044import cascading.flow.planner.process.FlowNodeGraph;
045import cascading.flow.planner.process.ProcessEdge;
046import cascading.management.state.ClientState;
047import cascading.pipe.CoGroup;
048import cascading.tap.Tap;
049import cascading.tap.hadoop.io.MultiInputFormat;
050import cascading.tap.hadoop.util.Hadoop18TapUtil;
051import cascading.tap.hadoop.util.TempHfs;
052import cascading.tuple.Fields;
053import cascading.tuple.hadoop.TupleSerialization;
054import cascading.tuple.hadoop.util.CoGroupingComparator;
055import cascading.tuple.hadoop.util.CoGroupingPartitioner;
056import cascading.tuple.hadoop.util.GroupingComparator;
057import cascading.tuple.hadoop.util.GroupingPartitioner;
058import cascading.tuple.hadoop.util.GroupingSortingComparator;
059import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
060import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
061import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
062import cascading.tuple.hadoop.util.ReverseTupleComparator;
063import cascading.tuple.hadoop.util.TupleComparator;
064import cascading.tuple.io.KeyIndexTuple;
065import cascading.tuple.io.KeyTuple;
066import cascading.tuple.io.TuplePair;
067import cascading.tuple.io.ValueIndexTuple;
068import cascading.tuple.io.ValueTuple;
069import cascading.util.ProcessLogger;
070import cascading.util.Util;
071import cascading.util.Version;
072import org.apache.hadoop.filecache.DistributedCache;
073import org.apache.hadoop.fs.Path;
074import org.apache.hadoop.mapred.FileOutputFormat;
075import org.apache.hadoop.mapred.JobConf;
076import org.apache.hadoop.mapred.OutputFormat;
077
078import static cascading.flow.hadoop.util.HadoopUtil.*;
079
080/**
081 *
082 */
083public class HadoopFlowStep extends BaseFlowStep<JobConf>
084  {
085  protected HadoopFlowStep()
086    {
087    }
088
089  protected HadoopFlowStep( String name, int ordinal )
090    {
091    super( name, ordinal );
092    }
093
094  public HadoopFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph )
095    {
096    super( elementGraph, flowNodeGraph );
097    }
098
099  @Override
100  public Map<Object, Object> getConfigAsProperties()
101    {
102    return HadoopUtil.createProperties( getConfig() );
103    }
104
105  public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
106    {
107    JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig );
108
109    // disable warning
110    conf.setBoolean( "mapred.used.genericoptionsparser", true );
111
112    conf.setJobName( getStepDisplayName( conf.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) );
113
114    conf.setOutputKeyClass( KeyTuple.class );
115    conf.setOutputValueClass( ValueTuple.class );
116
117    conf.setMapRunnerClass( FlowMapper.class );
118    conf.setReducerClass( FlowReducer.class );
119
120    // set for use by the shuffling phase
121    TupleSerialization.setSerializations( conf );
122
123    initFromSources( flowProcess, conf );
124
125    initFromSink( flowProcess, conf );
126
127    initFromTraps( flowProcess, conf );
128
129    initFromStepConfigDef( conf );
130
131    int numSinkParts = getSink().getScheme().getNumSinkParts();
132
133    if( numSinkParts != 0 )
134      {
135      // if no reducer, set num map tasks to control parts
136      if( getGroup() != null )
137        conf.setNumReduceTasks( numSinkParts );
138      else
139        conf.setNumMapTasks( numSinkParts );
140      }
141    else if( getGroup() != null )
142      {
143      int gatherPartitions = conf.getNumReduceTasks();
144
145      if( gatherPartitions == 0 )
146        gatherPartitions = conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 );
147
148      if( gatherPartitions == 0 )
149        throw new FlowException( getName(), "a default number of gather partitions must be set, see FlowRuntimeProps" );
150
151      conf.setNumReduceTasks( gatherPartitions );
152      }
153
154    conf.setOutputKeyComparatorClass( TupleComparator.class );
155
156    ProcessEdge processEdge = Util.getFirst( getFlowNodeGraph().edgeSet() );
157
158    if( getGroup() == null )
159      {
160      conf.setNumReduceTasks( 0 ); // disable reducers
161      }
162    else
163      {
164      // must set map output defaults when performing a reduce
165      conf.setMapOutputKeyClass( KeyTuple.class );
166      conf.setMapOutputValueClass( ValueTuple.class );
167      conf.setPartitionerClass( GroupingPartitioner.class );
168
169      // handles the case the groupby sort should be reversed
170      if( getGroup().isSortReversed() )
171        conf.setOutputKeyComparatorClass( ReverseTupleComparator.class );
172
173      Integer ordinal = (Integer) Util.getFirst( processEdge.getSinkExpectedOrdinals() );
174
175      addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors(), (Fields) processEdge.getResolvedKeyFields().get( ordinal ) );
176
177      if( getGroup().isGroupBy() )
178        addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors(), (Fields) processEdge.getResolvedSortFields().get( ordinal ) );
179
180      if( !getGroup().isGroupBy() )
181        {
182        conf.setPartitionerClass( CoGroupingPartitioner.class );
183        conf.setMapOutputKeyClass( KeyIndexTuple.class ); // allows groups to be sorted by index
184        conf.setMapOutputValueClass( ValueIndexTuple.class );
185        conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index
186        conf.setOutputValueGroupingComparator( CoGroupingComparator.class );
187        }
188
189      if( getGroup().isSorted() )
190        {
191        conf.setPartitionerClass( GroupingSortingPartitioner.class );
192        conf.setMapOutputKeyClass( TuplePair.class );
193
194        if( getGroup().isSortReversed() )
195          conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class );
196        else
197          conf.setOutputKeyComparatorClass( GroupingSortingComparator.class );
198
199        // no need to supply a reverse comparator, only equality is checked
200        conf.setOutputValueGroupingComparator( GroupingComparator.class );
201        }
202      }
203
204    // if we write type information into the stream, we can perform comparisons in indexed tuples
205    // thus, if the edge is a CoGroup and they keys are not common types, force writing of type information
206    if( processEdge != null && ifCoGroupAndKeysHaveCommonTypes( this, processEdge.getFlowElement(), processEdge.getResolvedKeyFields() ) )
207      {
208      conf.set( "cascading.node.ordinals", Util.join( processEdge.getSinkExpectedOrdinals(), "," ) );
209      addFields( conf, "cascading.node.key.fields", processEdge.getResolvedKeyFields() );
210      addFields( conf, "cascading.node.sort.fields", processEdge.getResolvedSortFields() );
211      addFields( conf, "cascading.node.value.fields", processEdge.getResolvedValueFields() );
212      }
213
214    // perform last so init above will pass to tasks
215    String versionString = Version.getRelease();
216
217    if( versionString != null )
218      conf.set( "cascading.version", versionString );
219
220    conf.set( CASCADING_FLOW_STEP_ID, getID() );
221    conf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) );
222
223    HadoopUtil.setIsInflow( conf );
224
225    Iterator<FlowNode> iterator = getFlowNodeGraph().getTopologicalIterator();
226
227    FlowNode mapperNode = iterator.next();
228    FlowNode reducerNode = iterator.hasNext() ? iterator.next() : null;
229
230    if( reducerNode != null )
231      reducerNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( conf.getNumReduceTasks() ) );
232
233    String mapState = pack( mapperNode, conf );
234    String reduceState = pack( reducerNode, conf );
235
236    // hadoop 20.2 doesn't like dist cache when using local mode
237    int maxSize = Short.MAX_VALUE;
238
239    int length = mapState.length() + reduceState.length();
240
241    if( isHadoopLocalMode( conf ) || length < maxSize ) // seems safe
242      {
243      conf.set( "cascading.flow.step.node.map", mapState );
244
245      if( !Util.isEmpty( reduceState ) )
246        conf.set( "cascading.flow.step.node.reduce", reduceState );
247      }
248    else
249      {
250      conf.set( "cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "map", mapState ) );
251
252      if( !Util.isEmpty( reduceState ) )
253        conf.set( "cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "reduce", reduceState ) );
254      }
255
256    return conf;
257    }
258
259  private static boolean ifCoGroupAndKeysHaveCommonTypes( ProcessLogger processLogger, FlowElement flowElement, Map<Integer, Fields> resolvedKeyFields )
260    {
261    if( !( flowElement instanceof CoGroup ) )
262      return true;
263
264    if( resolvedKeyFields == null || resolvedKeyFields.size() < 2 )
265      return true;
266
267    Iterator<Map.Entry<Integer, Fields>> iterator = resolvedKeyFields.entrySet().iterator();
268
269    Fields fields = iterator.next().getValue();
270
271    while( iterator.hasNext() )
272      {
273      Fields next = iterator.next().getValue();
274
275      if( !Arrays.equals( fields.getTypesClasses(), next.getTypesClasses() ) )
276        {
277        processLogger.logWarn( "unable to perform: {}, on mismatched join types and optimize serialization with type exclusion, fields: {} & {}", flowElement, fields, next );
278        return false;
279        }
280      }
281
282    return true;
283    }
284
285  public boolean isHadoopLocalMode( JobConf conf )
286    {
287    return HadoopUtil.isLocal( conf );
288    }
289
290  protected FlowStepJob<JobConf> createFlowStepJob( ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf initializedStepConfig )
291    {
292    try
293      {
294      return new HadoopFlowStepJob( clientState, this, initializedStepConfig );
295      }
296    catch( NoClassDefFoundError error )
297      {
298      PlatformInfo platformInfo = HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" );
299      String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1";
300
301      logError( String.format( message, platformInfo.toString() ), error );
302
303      throw error;
304      }
305    }
306
307  /**
308   * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown.
309   *
310   * @param config of type JobConf
311   */
312  public void clean( JobConf config )
313    {
314    String stepStatePath = config.get( "cascading.flow.step.path" );
315
316    if( stepStatePath != null )
317      {
318      try
319        {
320        HadoopUtil.removeStateFromDistCache( config, stepStatePath );
321        }
322      catch( IOException exception )
323        {
324        logWarn( "unable to remove step state file: " + stepStatePath, exception );
325        }
326      }
327
328    if( tempSink != null )
329      {
330      try
331        {
332        tempSink.deleteResource( config );
333        }
334      catch( Exception exception )
335        {
336        // sink all exceptions, don't fail app
337        logWarn( "unable to remove temporary file: " + tempSink, exception );
338        }
339      }
340
341    // safe way to handle zero sinks case
342    for( Tap sink : getSinkTaps() )
343      cleanIntermediateData( config, sink );
344
345    for( Tap tap : getTraps() )
346      cleanTapMetaData( config, tap );
347    }
348
349  protected void cleanIntermediateData( JobConf config, Tap sink )
350    {
351    if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
352      {
353      try
354        {
355        sink.deleteResource( config );
356        }
357      catch( Exception exception )
358        {
359        // sink all exceptions, don't fail app
360        logWarn( "unable to remove temporary file: " + sink, exception );
361        }
362      }
363    else
364      {
365      cleanTapMetaData( config, sink );
366      }
367    }
368
369  private void cleanTapMetaData( JobConf jobConf, Tap tap )
370    {
371    try
372      {
373      Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap );
374      }
375    catch( IOException exception )
376      {
377      // ignore exception
378      }
379    }
380
381  private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
382    {
383    if( !traps.isEmpty() )
384      {
385      JobConf trapConf = HadoopUtil.copyJobConf( conf );
386
387      for( Tap tap : traps.values() )
388        tap.sinkConfInit( flowProcess, trapConf );
389      }
390    }
391
392  protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf )
393    {
394    // handles case where same tap is used on multiple branches
395    // we do not want to init the same tap multiple times
396    Set<Tap> uniqueSources = getUniqueStreamedSources();
397
398    JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ];
399    int i = 0;
400
401    for( Tap tap : uniqueSources )
402      {
403      if( tap.getIdentifier() == null )
404        throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
405
406      streamedJobs[ i ] = flowProcess.copyConfig( conf );
407
408      streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
409
410      tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
411
412      i++;
413      }
414
415    Set<Tap> accumulatedSources = getAllAccumulatedSources();
416
417    for( Tap tap : accumulatedSources )
418      {
419      JobConf accumulatedJob = flowProcess.copyConfig( conf );
420
421      tap.sourceConfInit( flowProcess, accumulatedJob );
422
423      Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob );
424      conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
425
426      try
427        {
428        if( DistributedCache.getCacheFiles( accumulatedJob ) != null )
429          DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf );
430        }
431      catch( IOException exception )
432        {
433        throw new CascadingException( exception );
434        }
435      }
436
437    MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last
438    }
439
440  private void initFromStepConfigDef( final JobConf conf )
441    {
442    initConfFromStepConfigDef( new ConfigurationSetter( conf ) );
443    }
444
445  /**
446   * sources are specific to step, remove all known accumulated sources, if any
447   *
448   * @return
449   */
450  private Set<Tap> getUniqueStreamedSources()
451    {
452    Set<Tap> allAccumulatedSources = getAllAccumulatedSources();
453
454    // if a source is dual accumulated and streamed, honor the streamed annotation
455    allAccumulatedSources.removeAll( getAllStreamedSources() );
456
457    // start with the full source declaration and removed undesired taps. the above methods are dependent on
458    // annotations which may not exist, so we are safeguarding a declared tap is treated streamed by default
459    HashSet<Tap> set = new HashSet<>( sources.keySet() );
460
461    set.removeAll( allAccumulatedSources );
462
463    return set;
464    }
465
466  protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
467    {
468    // init sink first so tempSink can take precedence
469    if( getSink() != null )
470      getSink().sinkConfInit( flowProcess, conf );
471
472    Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
473    boolean isFileOutputFormat = false;
474
475    if( outputFormat != null )
476      isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
477
478    Path outputPath = FileOutputFormat.getOutputPath( conf );
479
480    // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
481    // PartitionTap won't set the output, but will set an OutputFormat
482    // MultiSinkTap won't set the output or set the OutputFormat
483    // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
484    if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
485      tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
486
487    // tempSink exists because sink is writeDirect
488    if( tempSink != null )
489      tempSink.sinkConfInit( flowProcess, conf );
490    }
491
492  protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf )
493    {
494    initFromTraps( flowProcess, conf, getTrapMap() );
495    }
496  }