001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Iterator;
025    
026    import cascading.CascadingException;
027    import cascading.flow.FlowException;
028    import cascading.flow.FlowSession;
029    import cascading.flow.FlowStep;
030    import cascading.flow.SliceCounters;
031    import cascading.flow.hadoop.planner.HadoopFlowStepJob;
032    import cascading.flow.hadoop.stream.HadoopMapStreamGraph;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.flow.stream.Duct;
035    import cascading.flow.stream.ElementDuct;
036    import cascading.flow.stream.SourceStage;
037    import cascading.tap.Tap;
038    import org.apache.hadoop.mapred.JobConf;
039    import org.apache.hadoop.mapred.MapRunnable;
040    import org.apache.hadoop.mapred.OutputCollector;
041    import org.apache.hadoop.mapred.RecordReader;
042    import org.apache.hadoop.mapred.Reporter;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64;
047    import static cascading.flow.hadoop.util.HadoopUtil.readStateFromDistCache;
048    
049    /** Class FlowMapper is the Hadoop Mapper implementation. */
050    public class FlowMapper implements MapRunnable
051      {
052      private static final Logger LOG = LoggerFactory.getLogger( FlowMapper.class );
053    
054      private HadoopMapStreamGraph streamGraph;
055      /** Field currentProcess */
056      private HadoopFlowProcess currentProcess;
057    
058    
059      /** Constructor FlowMapper creates a new FlowMapper instance. */
060      public FlowMapper()
061        {
062        }
063    
064      @Override
065      public void configure( JobConf jobConf )
066        {
067        try
068          {
069          HadoopUtil.initLog4j( jobConf );
070    
071          LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) );
072          LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) );
073    
074          currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, true );
075    
076          String stepState = jobConf.getRaw( "cascading.flow.step" );
077    
078          if( stepState == null )
079            stepState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ) );
080    
081          HadoopFlowStep step = deserializeBase64( stepState, jobConf, HadoopFlowStep.class );
082          Tap source = step.getTapForID( step.getSources(), jobConf.get( "cascading.step.source" ) );
083    
084          streamGraph = new HadoopMapStreamGraph( currentProcess, step, source );
085    
086          for( Duct head : streamGraph.getHeads() )
087            LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() );
088    
089          for( Duct tail : streamGraph.getTails() )
090            LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() );
091    
092          for( Tap trap : step.getMapperTraps().values() )
093            LOG.info( "trapping to: " + trap );
094          }
095        catch( Throwable throwable )
096          {
097          reportIfLocal( throwable );
098    
099          if( throwable instanceof CascadingException )
100            throw (CascadingException) throwable;
101    
102          throw new FlowException( "internal error during mapper configuration", throwable );
103          }
104        }
105    
106      @Override
107      public void run( RecordReader input, OutputCollector output, Reporter reporter ) throws IOException
108        {
109        currentProcess.setReporter( reporter );
110        currentProcess.increment( SliceCounters.Process_Begin_Time, System.currentTimeMillis() );
111        currentProcess.setOutputCollector( output );
112    
113        streamGraph.prepare();
114    
115        SourceStage streamedHead = streamGraph.getStreamedHead();
116        Iterator<Duct> iterator = streamGraph.getHeads().iterator();
117    
118        try
119          {
120          try
121            {
122            while( iterator.hasNext() )
123              {
124              Duct next = iterator.next();
125    
126              if( next != streamedHead )
127                ( (SourceStage) next ).run( null );
128              }
129    
130            streamedHead.run( input );
131            }
132          catch( OutOfMemoryError error )
133            {
134            throw error;
135            }
136          catch( IOException exception )
137            {
138            reportIfLocal( exception );
139            throw exception;
140            }
141          catch( Throwable throwable )
142            {
143            reportIfLocal( throwable );
144    
145            if( throwable instanceof CascadingException )
146              throw (CascadingException) throwable;
147    
148            throw new FlowException( "internal error during mapper execution", throwable );
149            }
150          }
151        finally
152          {
153          try
154            {
155            streamGraph.cleanup();
156            }
157          finally
158            {
159            currentProcess.increment( SliceCounters.Process_End_Time, System.currentTimeMillis() );
160            }
161          }
162        }
163    
164      /**
165       * Report the error to HadoopFlowStepJob if we are running in Hadoops local mode.
166       * @param throwable The throwable that was thrown.
167       */
168      private void reportIfLocal( Throwable throwable )
169        {
170        if ( HadoopUtil.isLocal( currentProcess.getJobConf() ) )
171          HadoopFlowStepJob.reportLocalError( throwable );
172        }
173      }