001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashSet;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import cascading.CascadingException;
031    import cascading.flow.FlowProcess;
032    import cascading.flow.FlowSession;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.tap.Tap;
035    import cascading.tuple.Fields;
036    import cascading.tuple.TupleEntry;
037    import cascading.tuple.TupleEntryCollector;
038    import cascading.tuple.TupleEntryIterator;
039    import org.apache.hadoop.mapred.JobConf;
040    import org.apache.hadoop.mapred.OutputCollector;
041    import org.apache.hadoop.mapred.Reporter;
042    import org.apache.hadoop.util.ReflectionUtils;
043    
044    /**
045     * Class HadoopFlowProcess is an implementation of {@link FlowProcess} for Hadoop. Use this interface to get direct
046     * access to the Hadoop JobConf and Reporter interfaces.
047     * <p/>
048     * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to
049     * fail if they are executed on a system other than Hadoop.
050     *
051     * @see cascading.flow.FlowSession
052     * @see JobConf
053     * @see Reporter
054     */
055    public class HadoopFlowProcess extends FlowProcess<JobConf>
056      {
057      /** Field jobConf */
058      final JobConf jobConf;
059      /** Field isMapper */
060      private final boolean isMapper;
061      /** Field reporter */
062      Reporter reporter = Reporter.NULL;
063      private OutputCollector outputCollector;
064    
065      public HadoopFlowProcess()
066        {
067        this.jobConf = new JobConf();
068        this.isMapper = true;
069        }
070    
071      public HadoopFlowProcess( JobConf jobConf )
072        {
073        this.jobConf = jobConf;
074        this.isMapper = true;
075        }
076    
077      public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf )
078        {
079        super( flowSession );
080        this.jobConf = jobConf;
081        this.isMapper = true;
082        }
083    
084      /**
085       * Constructor HadoopFlowProcess creates a new HadoopFlowProcess instance.
086       *
087       * @param flowSession of type FlowSession
088       * @param jobConf     of type JobConf
089       */
090      public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf, boolean isMapper )
091        {
092        super( flowSession );
093        this.jobConf = jobConf;
094        this.isMapper = isMapper;
095        }
096    
097      public HadoopFlowProcess( HadoopFlowProcess flowProcess, JobConf jobConf )
098        {
099        super( flowProcess.getCurrentSession() );
100        this.jobConf = jobConf;
101        this.isMapper = flowProcess.isMapper();
102        this.reporter = flowProcess.getReporter();
103        }
104    
105      @Override
106      public FlowProcess copyWith( JobConf jobConf )
107        {
108        return new HadoopFlowProcess( this, jobConf );
109        }
110    
111      /**
112       * Method getJobConf returns the jobConf of this HadoopFlowProcess object.
113       *
114       * @return the jobConf (type JobConf) of this HadoopFlowProcess object.
115       */
116      public JobConf getJobConf()
117        {
118        return jobConf;
119        }
120    
121      @Override
122      public JobConf getConfigCopy()
123        {
124        return HadoopUtil.copyJobConf( jobConf );
125        }
126    
127      /**
128       * Method isMapper returns true if this part of the FlowProcess is a MapReduce mapper. If false, it is a reducer.
129       *
130       * @return boolean
131       */
132      public boolean isMapper()
133        {
134        return isMapper;
135        }
136    
137      public int getCurrentNumMappers()
138        {
139        return getJobConf().getNumMapTasks();
140        }
141    
142      public int getCurrentNumReducers()
143        {
144        return getJobConf().getNumReduceTasks();
145        }
146    
147      /**
148       * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task.
149       *
150       * @return int
151       */
152      @Override
153      public int getCurrentSliceNum()
154        {
155        return getJobConf().getInt( "mapred.task.partition", 0 );
156        }
157    
158      @Override
159      public int getNumProcessSlices()
160        {
161        if( isMapper() )
162          return getCurrentNumMappers();
163        else
164          return getCurrentNumReducers();
165        }
166    
167      /**
168       * Method setReporter sets the reporter of this HadoopFlowProcess object.
169       *
170       * @param reporter the reporter of this HadoopFlowProcess object.
171       */
172      public void setReporter( Reporter reporter )
173        {
174        this.reporter = reporter;
175        }
176    
177      /**
178       * Method getReporter returns the reporter of this HadoopFlowProcess object.
179       *
180       * @return the reporter (type Reporter) of this HadoopFlowProcess object.
181       */
182      public Reporter getReporter()
183        {
184        return reporter;
185        }
186    
187      public void setOutputCollector( OutputCollector outputCollector )
188        {
189        this.outputCollector = outputCollector;
190        }
191    
192      public OutputCollector getOutputCollector()
193        {
194        return outputCollector;
195        }
196    
197      @Override
198      public Object getProperty( String key )
199        {
200        return jobConf.get( key );
201        }
202    
203      @Override
204      public Collection<String> getPropertyKeys()
205        {
206        Set<String> keys = new HashSet<String>();
207    
208        for( Map.Entry<String, String> entry : jobConf )
209          keys.add( entry.getKey() );
210    
211        return Collections.unmodifiableSet( keys );
212        }
213    
214      @Override
215      public Object newInstance( String className )
216        {
217        if( className == null || className.isEmpty() )
218          return null;
219    
220        try
221          {
222          Class type = (Class) HadoopFlowProcess.class.getClassLoader().loadClass( className.toString() );
223    
224          return ReflectionUtils.newInstance( type, jobConf );
225          }
226        catch( ClassNotFoundException exception )
227          {
228          throw new CascadingException( "unable to load class: " + className.toString(), exception );
229          }
230        }
231    
232      @Override
233      public void keepAlive()
234        {
235        getReporter().progress();
236        }
237    
238      @Override
239      public void increment( Enum counter, long amount )
240        {
241        getReporter().incrCounter( counter, amount );
242        }
243    
244      @Override
245      public void increment( String group, String counter, long amount )
246        {
247        getReporter().incrCounter( group, counter, amount );
248        }
249    
250      @Override
251      public void setStatus( String status )
252        {
253        getReporter().setStatus( status );
254        }
255    
256      @Override
257      public boolean isCounterStatusInitialized()
258        {
259        return getReporter() != null;
260        }
261    
262      @Override
263      public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
264        {
265        return tap.openForRead( this );
266        }
267    
268      @Override
269      public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
270        {
271        return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
272        }
273    
274      @Override
275      public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
276        {
277        JobConf jobConf = HadoopUtil.copyJobConf( getJobConf() );
278    
279        int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 );
280        String partname;
281    
282        if( jobConf.getBoolean( "mapred.task.is.map", true ) )
283          partname = String.format( "-m-%05d-", stepNum );
284        else
285          partname = String.format( "-r-%05d-", stepNum );
286    
287        jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" );
288    
289        return trap.openForWrite( new HadoopFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks
290        }
291    
292      @Override
293      public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
294        {
295        return new TupleEntryCollector( Fields.size( 2 ) )
296        {
297        @Override
298        protected void collect( TupleEntry tupleEntry )
299          {
300          try
301            {
302            getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) );
303            }
304          catch( IOException exception )
305            {
306            throw new CascadingException( "failed collecting key and value", exception );
307            }
308          }
309        };
310        }
311    
312      @Override
313      public JobConf copyConfig( JobConf jobConf )
314        {
315        return HadoopUtil.copyJobConf( jobConf );
316        }
317    
318      @Override
319      public Map<String, String> diffConfigIntoMap( JobConf defaultConfig, JobConf updatedConfig )
320        {
321        return HadoopUtil.getConfig( defaultConfig, updatedConfig );
322        }
323    
324      @Override
325      public JobConf mergeMapIntoConfig( JobConf defaultConfig, Map<String, String> map )
326        {
327        return HadoopUtil.mergeConf( defaultConfig, map, false );
328        }
329      }