001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.Iterator;
028    import java.util.Map;
029    import java.util.Set;
030    
031    import cascading.flow.FlowException;
032    import cascading.flow.FlowProcess;
033    import cascading.flow.hadoop.planner.HadoopFlowStepJob;
034    import cascading.flow.hadoop.util.HadoopUtil;
035    import cascading.flow.planner.BaseFlowStep;
036    import cascading.flow.planner.FlowStepJob;
037    import cascading.flow.planner.Scope;
038    import cascading.property.ConfigDef;
039    import cascading.tap.Tap;
040    import cascading.tap.hadoop.io.MultiInputFormat;
041    import cascading.tap.hadoop.util.Hadoop18TapUtil;
042    import cascading.tap.hadoop.util.TempHfs;
043    import cascading.tuple.Fields;
044    import cascading.tuple.Tuple;
045    import cascading.tuple.hadoop.TupleSerialization;
046    import cascading.tuple.hadoop.util.CoGroupingComparator;
047    import cascading.tuple.hadoop.util.CoGroupingPartitioner;
048    import cascading.tuple.hadoop.util.GroupingComparator;
049    import cascading.tuple.hadoop.util.GroupingPartitioner;
050    import cascading.tuple.hadoop.util.GroupingSortingComparator;
051    import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
052    import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
053    import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
054    import cascading.tuple.hadoop.util.ReverseTupleComparator;
055    import cascading.tuple.hadoop.util.TupleComparator;
056    import cascading.tuple.io.IndexTuple;
057    import cascading.tuple.io.TuplePair;
058    import cascading.util.Util;
059    import cascading.util.Version;
060    import org.apache.hadoop.fs.Path;
061    import org.apache.hadoop.mapred.FileOutputFormat;
062    import org.apache.hadoop.mapred.JobConf;
063    
064    import static cascading.flow.hadoop.util.HadoopUtil.serializeBase64;
065    import static cascading.flow.hadoop.util.HadoopUtil.writeStateToDistCache;
066    
067    /**
068     *
069     */
070    public class HadoopFlowStep extends BaseFlowStep<JobConf>
071      {
072      /** Field mapperTraps */
073      private final Map<String, Tap> mapperTraps = new HashMap<String, Tap>();
074      /** Field reducerTraps */
075      private final Map<String, Tap> reducerTraps = new HashMap<String, Tap>();
076    
077      public HadoopFlowStep( String name, int stepNum )
078        {
079        super( name, stepNum );
080        }
081    
082      public JobConf getInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
083        {
084        JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig );
085    
086        // disable warning
087        conf.setBoolean( "mapred.used.genericoptionsparser", true );
088    
089        conf.setJobName( getStepDisplayName( conf.getInt( "cascading.step.display.id.truncate", Util.ID_LENGTH ) ) );
090    
091        conf.setOutputKeyClass( Tuple.class );
092        conf.setOutputValueClass( Tuple.class );
093    
094        conf.setMapRunnerClass( FlowMapper.class );
095        conf.setReducerClass( FlowReducer.class );
096    
097        // set for use by the shuffling phase
098        TupleSerialization.setSerializations( conf );
099    
100        initFromSources( flowProcess, conf );
101    
102        initFromSink( flowProcess, conf );
103    
104        initFromTraps( flowProcess, conf );
105    
106        initFromProcessConfigDef( conf );
107    
108        if( getSink().getScheme().getNumSinkParts() != 0 )
109          {
110          // if no reducer, set num map tasks to control parts
111          if( getGroup() != null )
112            conf.setNumReduceTasks( getSink().getScheme().getNumSinkParts() );
113          else
114            conf.setNumMapTasks( getSink().getScheme().getNumSinkParts() );
115          }
116    
117        conf.setOutputKeyComparatorClass( TupleComparator.class );
118    
119        if( getGroup() == null )
120          {
121          conf.setNumReduceTasks( 0 ); // disable reducers
122          }
123        else
124          {
125          // must set map output defaults when performing a reduce
126          conf.setMapOutputKeyClass( Tuple.class );
127          conf.setMapOutputValueClass( Tuple.class );
128          conf.setPartitionerClass( GroupingPartitioner.class );
129    
130          // handles the case the groupby sort should be reversed
131          if( getGroup().isSortReversed() )
132            conf.setOutputKeyComparatorClass( ReverseTupleComparator.class );
133    
134          addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors() );
135    
136          if( getGroup().isGroupBy() )
137            addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors() );
138    
139          if( !getGroup().isGroupBy() )
140            {
141            conf.setPartitionerClass( CoGroupingPartitioner.class );
142            conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index
143            conf.setMapOutputValueClass( IndexTuple.class );
144            conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index
145            conf.setOutputValueGroupingComparator( CoGroupingComparator.class );
146            }
147    
148          if( getGroup().isSorted() )
149            {
150            conf.setPartitionerClass( GroupingSortingPartitioner.class );
151            conf.setMapOutputKeyClass( TuplePair.class );
152    
153            if( getGroup().isSortReversed() )
154              conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class );
155            else
156              conf.setOutputKeyComparatorClass( GroupingSortingComparator.class );
157    
158            // no need to supply a reverse comparator, only equality is checked
159            conf.setOutputValueGroupingComparator( GroupingComparator.class );
160            }
161          }
162    
163        // perform last so init above will pass to tasks
164        String versionString = Version.getRelease();
165    
166        if( versionString != null )
167          conf.set( "cascading.version", versionString );
168    
169        conf.set( CASCADING_FLOW_STEP_ID, getID() );
170        conf.set( "cascading.flow.step.num", Integer.toString( getStepNum() ) );
171    
172        String stepState = pack( this, conf );
173    
174        // hadoop 20.2 doesn't like dist cache when using local mode
175        int maxSize = Short.MAX_VALUE;
176        if( isHadoopLocalMode( conf ) || stepState.length() < maxSize ) // seems safe
177          conf.set( "cascading.flow.step", stepState );
178        else
179          conf.set( "cascading.flow.step.path", writeStateToDistCache( conf, getID(), stepState ) );
180    
181        return conf;
182        }
183    
184      public boolean isHadoopLocalMode( JobConf conf )
185        {
186        return HadoopUtil.isLocal( conf );
187        }
188    
189      private String pack( Object object, JobConf conf )
190        {
191        try
192          {
193          return serializeBase64( object, conf, true );
194          }
195        catch( IOException exception )
196          {
197          throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
198          }
199        }
200    
201      protected FlowStepJob<JobConf> createFlowStepJob( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
202        {
203        JobConf initializedConfig = getInitializedConfig( flowProcess, parentConfig );
204    
205        setConf( initializedConfig );
206    
207        return new HadoopFlowStepJob( createClientState( flowProcess ), this, initializedConfig );
208        }
209    
210      /**
211       * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown.
212       *
213       * @param config of type JobConf
214       */
215      public void clean( JobConf config )
216        {
217        String stepStatePath = config.get( "cascading.flow.step.path" );
218    
219        if( stepStatePath != null )
220          {
221          try
222            {
223            HadoopUtil.removeStateFromDistCache( config, stepStatePath );
224            }
225          catch( IOException exception )
226            {
227            logWarn( "unable to remove step state file: " + stepStatePath, exception );
228            }
229          }
230    
231        if( tempSink != null )
232          {
233          try
234            {
235            tempSink.deleteResource( config );
236            }
237          catch( Exception exception )
238            {
239            // sink all exceptions, don't fail app
240            logWarn( "unable to remove temporary file: " + tempSink, exception );
241            }
242          }
243    
244        if( getSink() instanceof TempHfs &&
245          ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
246          {
247          try
248            {
249            getSink().deleteResource( config );
250            }
251          catch( Exception exception )
252            {
253            // sink all exceptions, don't fail app
254            logWarn( "unable to remove temporary file: " + getSink(), exception );
255            }
256          }
257        else
258          {
259          cleanTapMetaData( config, getSink() );
260          }
261    
262        for( Tap tap : getMapperTraps().values() )
263          cleanTapMetaData( config, tap );
264    
265        for( Tap tap : getReducerTraps().values() )
266          cleanTapMetaData( config, tap );
267    
268        }
269    
270      private void cleanTapMetaData( JobConf jobConf, Tap tap )
271        {
272        try
273          {
274          Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap );
275          }
276        catch( IOException exception )
277          {
278          // ignore exception
279          }
280        }
281    
282      private void addComparators( JobConf conf, String property, Map<String, Fields> map )
283        {
284        Iterator<Fields> fieldsIterator = map.values().iterator();
285    
286        if( !fieldsIterator.hasNext() )
287          return;
288    
289        Fields fields = fieldsIterator.next();
290    
291        if( fields.hasComparators() )
292          {
293          conf.set( property, pack( fields, conf ) );
294          return;
295          }
296    
297        // use resolved fields if there are no comparators.
298        Set<Scope> previousScopes = getPreviousScopes( getGroup() );
299    
300        fields = previousScopes.iterator().next().getOutValuesFields();
301    
302        if( fields.size() != 0 ) // allows fields.UNKNOWN to be used
303          conf.setInt( property + ".size", fields.size() );
304        }
305    
306      private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
307        {
308        if( !traps.isEmpty() )
309          {
310          JobConf trapConf = HadoopUtil.copyJobConf( conf );
311    
312          for( Tap tap : traps.values() )
313            tap.sinkConfInit( flowProcess, trapConf );
314          }
315        }
316    
317      protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf )
318        {
319        // handles case where same tap is used on multiple branches
320        // we do not want to init the same tap multiple times
321        Set<Tap> uniqueSources = getUniqueStreamedSources();
322    
323        JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ];
324        int i = 0;
325    
326        for( Tap tap : uniqueSources )
327          {
328          if( tap.getIdentifier() == null )
329            throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
330    
331          streamedJobs[ i ] = flowProcess.copyConfig( conf );
332          tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
333          streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
334          i++;
335          }
336    
337        Set<Tap> accumulatedSources = getAllAccumulatedSources();
338    
339        for( Tap tap : accumulatedSources )
340          {
341          JobConf accumulatedJob = flowProcess.copyConfig( conf );
342          tap.sourceConfInit( flowProcess, accumulatedJob );
343          Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob );
344          conf.set( "cascading.step.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
345          }
346    
347        MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last
348        }
349    
350      public Tap getTapForID( Set<Tap> taps, String id )
351        {
352        for( Tap tap : taps )
353          {
354          if( Tap.id( tap ).equals( id ) )
355            return tap;
356          }
357    
358        return null;
359        }
360    
361      private void initFromProcessConfigDef( final JobConf conf )
362        {
363        initConfFromProcessConfigDef( getSetterFor( conf ) );
364        }
365    
366      private ConfigDef.Setter getSetterFor( final JobConf conf )
367        {
368        return new ConfigDef.Setter()
369        {
370        @Override
371        public String set( String key, String value )
372          {
373          String oldValue = get( key );
374    
375          conf.set( key, value );
376    
377          return oldValue;
378          }
379    
380        @Override
381        public String update( String key, String value )
382          {
383          String oldValue = get( key );
384    
385          if( oldValue == null )
386            conf.set( key, value );
387          else if( !oldValue.contains( value ) )
388            conf.set( key, oldValue + "," + value );
389    
390          return oldValue;
391          }
392    
393        @Override
394        public String get( String key )
395          {
396          String value = conf.get( key );
397    
398          if( value == null || value.isEmpty() )
399            return null;
400    
401          return value;
402          }
403        };
404        }
405    
406      /**
407       * sources are specific to step, remove all known accumulated sources, if any
408       *
409       * @return
410       */
411      private Set<Tap> getUniqueStreamedSources()
412        {
413        HashSet<Tap> set = new HashSet<Tap>( sources.keySet() );
414    
415        set.removeAll( getAllAccumulatedSources() );
416    
417        return set;
418        }
419    
420      protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
421        {
422        // init sink first so tempSink can take precedence
423        if( getSink() != null )
424          getSink().sinkConfInit( flowProcess, conf );
425    
426        if( FileOutputFormat.getOutputPath( conf ) == null )
427          tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
428    
429        // tempSink exists because sink is writeDirect
430        if( tempSink != null )
431          tempSink.sinkConfInit( flowProcess, conf );
432        }
433    
434      protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf )
435        {
436        initFromTraps( flowProcess, conf, getMapperTraps() );
437        initFromTraps( flowProcess, conf, getReducerTraps() );
438        }
439    
440      @Override
441      public Set<Tap> getTraps()
442        {
443        Set<Tap> set = new HashSet<Tap>();
444    
445        set.addAll( mapperTraps.values() );
446        set.addAll( reducerTraps.values() );
447    
448        return Collections.unmodifiableSet( set );
449        }
450    
451      @Override
452      public Tap getTrap( String name )
453        {
454        Tap trap = getMapperTrap( name );
455    
456        if( trap == null )
457          trap = getReducerTrap( name );
458    
459        return trap;
460        }
461    
462      public Map<String, Tap> getMapperTraps()
463        {
464        return mapperTraps;
465        }
466    
467      public Map<String, Tap> getReducerTraps()
468        {
469        return reducerTraps;
470        }
471    
472      public Tap getMapperTrap( String name )
473        {
474        return getMapperTraps().get( name );
475        }
476    
477      public Tap getReducerTrap( String name )
478        {
479        return getReducerTraps().get( name );
480        }
481      }