001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashSet;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import cascading.CascadingException;
031    import cascading.flow.FlowProcess;
032    import cascading.flow.FlowSession;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.tap.Tap;
035    import cascading.tuple.Fields;
036    import cascading.tuple.TupleEntry;
037    import cascading.tuple.TupleEntryCollector;
038    import cascading.tuple.TupleEntryIterator;
039    import org.apache.hadoop.mapred.JobConf;
040    import org.apache.hadoop.mapred.OutputCollector;
041    import org.apache.hadoop.mapred.Reporter;
042    import org.apache.hadoop.util.ReflectionUtils;
043    
044    /**
045     * Class HadoopFlowProcess is an implementation of {@link FlowProcess} for Hadoop. Use this interface to get direct
046     * access to the Hadoop JobConf and Reporter interfaces.
047     * <p/>
048     * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to
049     * fail if they are executed on a system other than Hadoop.
050     *
051     * @see cascading.flow.FlowSession
052     * @see JobConf
053     * @see Reporter
054     */
055    public class HadoopFlowProcess extends FlowProcess<JobConf>
056      {
057      /** Field jobConf */
058      final JobConf jobConf;
059      /** Field isMapper */
060      private final boolean isMapper;
061      /** Field reporter */
062      Reporter reporter = Reporter.NULL;
063      /** Field outputCollector */
064      private OutputCollector outputCollector;
065    
066      public HadoopFlowProcess()
067        {
068        this.jobConf = new JobConf();
069        this.isMapper = true;
070        }
071    
072      public HadoopFlowProcess( JobConf jobConf )
073        {
074        this.jobConf = jobConf;
075        this.isMapper = true;
076        }
077    
078      public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf )
079        {
080        super( flowSession );
081        this.jobConf = jobConf;
082        this.isMapper = true;
083        }
084    
085      /**
086       * Constructor HadoopFlowProcess creates a new HadoopFlowProcess instance.
087       *
088       * @param flowSession of type FlowSession
089       * @param jobConf     of type JobConf
090       */
091      public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf, boolean isMapper )
092        {
093        super( flowSession );
094        this.jobConf = jobConf;
095        this.isMapper = isMapper;
096        }
097    
098      public HadoopFlowProcess( HadoopFlowProcess flowProcess, JobConf jobConf )
099        {
100        super( flowProcess.getCurrentSession() );
101        this.jobConf = jobConf;
102        this.isMapper = flowProcess.isMapper();
103        this.reporter = flowProcess.getReporter();
104        }
105    
106      @Override
107      public FlowProcess copyWith( JobConf jobConf )
108        {
109        return new HadoopFlowProcess( this, jobConf );
110        }
111    
112      /**
113       * Method getJobConf returns the jobConf of this HadoopFlowProcess object.
114       *
115       * @return the jobConf (type JobConf) of this HadoopFlowProcess object.
116       */
117      public JobConf getJobConf()
118        {
119        return jobConf;
120        }
121    
122      @Override
123      public JobConf getConfigCopy()
124        {
125        return HadoopUtil.copyJobConf( jobConf );
126        }
127    
128      /**
129       * Method isMapper returns true if this part of the FlowProcess is a MapReduce mapper. If false, it is a reducer.
130       *
131       * @return boolean
132       */
133      public boolean isMapper()
134        {
135        return isMapper;
136        }
137    
138      public int getCurrentNumMappers()
139        {
140        return getJobConf().getNumMapTasks();
141        }
142    
143      public int getCurrentNumReducers()
144        {
145        return getJobConf().getNumReduceTasks();
146        }
147    
148      /**
149       * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task.
150       *
151       * @return int
152       */
153      @Override
154      public int getCurrentSliceNum()
155        {
156        return getJobConf().getInt( "mapred.task.partition", 0 );
157        }
158    
159      @Override
160      public int getNumProcessSlices()
161        {
162        if( isMapper() )
163          return getCurrentNumMappers();
164        else
165          return getCurrentNumReducers();
166        }
167    
168      /**
169       * Method setReporter sets the reporter of this HadoopFlowProcess object.
170       *
171       * @param reporter the reporter of this HadoopFlowProcess object.
172       */
173      public void setReporter( Reporter reporter )
174        {
175        if( reporter == null )
176          this.reporter = Reporter.NULL;
177        else
178          this.reporter = reporter;
179        }
180    
181      /**
182       * Method getReporter returns the reporter of this HadoopFlowProcess object.
183       *
184       * @return the reporter (type Reporter) of this HadoopFlowProcess object.
185       */
186      public Reporter getReporter()
187        {
188        return reporter;
189        }
190    
191      public void setOutputCollector( OutputCollector outputCollector )
192        {
193        this.outputCollector = outputCollector;
194        }
195    
196      public OutputCollector getOutputCollector()
197        {
198        return outputCollector;
199        }
200    
201      @Override
202      public Object getProperty( String key )
203        {
204        return jobConf.get( key );
205        }
206    
207      @Override
208      public Collection<String> getPropertyKeys()
209        {
210        Set<String> keys = new HashSet<String>();
211    
212        for( Map.Entry<String, String> entry : jobConf )
213          keys.add( entry.getKey() );
214    
215        return Collections.unmodifiableSet( keys );
216        }
217    
218      @Override
219      public Object newInstance( String className )
220        {
221        if( className == null || className.isEmpty() )
222          return null;
223    
224        try
225          {
226          Class type = (Class) HadoopFlowProcess.class.getClassLoader().loadClass( className.toString() );
227    
228          return ReflectionUtils.newInstance( type, jobConf );
229          }
230        catch( ClassNotFoundException exception )
231          {
232          throw new CascadingException( "unable to load class: " + className.toString(), exception );
233          }
234        }
235    
236      @Override
237      public void keepAlive()
238        {
239        getReporter().progress();
240        }
241    
242      @Override
243      public void increment( Enum counter, long amount )
244        {
245        getReporter().incrCounter( counter, amount );
246        }
247    
248      @Override
249      public void increment( String group, String counter, long amount )
250        {
251        getReporter().incrCounter( group, counter, amount );
252        }
253    
254      @Override
255      public void setStatus( String status )
256        {
257        getReporter().setStatus( status );
258        }
259    
260      @Override
261      public boolean isCounterStatusInitialized()
262        {
263        return getReporter() != null;
264        }
265    
266      @Override
267      public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
268        {
269        return tap.openForRead( this );
270        }
271    
272      @Override
273      public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
274        {
275        return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
276        }
277    
278      @Override
279      public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
280        {
281        JobConf jobConf = HadoopUtil.copyJobConf( getJobConf() );
282    
283        int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 );
284        String partname;
285    
286        if( jobConf.getBoolean( "mapred.task.is.map", true ) )
287          partname = String.format( "-m-%05d-", stepNum );
288        else
289          partname = String.format( "-r-%05d-", stepNum );
290    
291        jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" );
292    
293        return trap.openForWrite( new HadoopFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks
294        }
295    
296      @Override
297      public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
298        {
299        return new TupleEntryCollector( Fields.size( 2 ) )
300        {
301        @Override
302        protected void collect( TupleEntry tupleEntry )
303          {
304          try
305            {
306            getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) );
307            }
308          catch( IOException exception )
309            {
310            throw new CascadingException( "failed collecting key and value", exception );
311            }
312          }
313        };
314        }
315    
316      @Override
317      public JobConf copyConfig( JobConf jobConf )
318        {
319        return HadoopUtil.copyJobConf( jobConf );
320        }
321    
322      @Override
323      public Map<String, String> diffConfigIntoMap( JobConf defaultConfig, JobConf updatedConfig )
324        {
325        return HadoopUtil.getConfig( defaultConfig, updatedConfig );
326        }
327    
328      @Override
329      public JobConf mergeMapIntoConfig( JobConf defaultConfig, Map<String, String> map )
330        {
331        return HadoopUtil.mergeConf( defaultConfig, map, false );
332        }
333      }