001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Iterator;
025    
026    import cascading.CascadingException;
027    import cascading.flow.FlowException;
028    import cascading.flow.FlowSession;
029    import cascading.flow.FlowStep;
030    import cascading.flow.SliceCounters;
031    import cascading.flow.hadoop.planner.HadoopFlowStepJob;
032    import cascading.flow.hadoop.stream.HadoopGroupGate;
033    import cascading.flow.hadoop.stream.HadoopReduceStreamGraph;
034    import cascading.flow.hadoop.util.HadoopUtil;
035    import cascading.flow.hadoop.util.TimedIterator;
036    import cascading.flow.stream.Duct;
037    import cascading.flow.stream.ElementDuct;
038    import cascading.tap.Tap;
039    import cascading.tuple.Tuple;
040    import org.apache.hadoop.mapred.JobConf;
041    import org.apache.hadoop.mapred.MapReduceBase;
042    import org.apache.hadoop.mapred.OutputCollector;
043    import org.apache.hadoop.mapred.Reducer;
044    import org.apache.hadoop.mapred.Reporter;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64;
049    import static cascading.flow.hadoop.util.HadoopUtil.readStateFromDistCache;
050    
051    /** Class FlowReducer is the Hadoop Reducer implementation. */
052    public class FlowReducer extends MapReduceBase implements Reducer
053      {
054      private static final Logger LOG = LoggerFactory.getLogger( FlowReducer.class );
055    
056      /** Field flowReducerStack */
057      private HadoopReduceStreamGraph streamGraph;
058      /** Field currentProcess */
059      private HadoopFlowProcess currentProcess;
060      private TimedIterator timedIterator;
061    
062      private boolean calledPrepare = false;
063      private HadoopGroupGate group;
064    
065      /** Constructor FlowReducer creates a new FlowReducer instance. */
066      public FlowReducer()
067        {
068        }
069    
070      @Override
071      public void configure( JobConf jobConf )
072        {
073        try
074          {
075          super.configure( jobConf );
076          HadoopUtil.initLog4j( jobConf );
077    
078          LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) );
079          LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) );
080    
081          currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, false );
082    
083          timedIterator = new TimedIterator( currentProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read );
084    
085          String stepState = jobConf.getRaw( "cascading.flow.step" );
086    
087          if( stepState == null )
088            stepState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ) );
089    
090          HadoopFlowStep step = deserializeBase64( stepState, jobConf, HadoopFlowStep.class );
091    
092          streamGraph = new HadoopReduceStreamGraph( currentProcess, step );
093    
094          group = (HadoopGroupGate) streamGraph.getHeads().iterator().next();
095    
096          for( Duct head : streamGraph.getHeads() )
097            LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() );
098    
099          for( Duct tail : streamGraph.getTails() )
100            LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() );
101    
102          for( Tap trap : step.getReducerTraps().values() )
103            LOG.info( "trapping to: " + trap );
104          }
105        catch( Throwable throwable )
106          {
107          reportIfLocal( throwable );
108    
109          if( throwable instanceof CascadingException )
110            throw (CascadingException) throwable;
111    
112          throw new FlowException( "internal error during reducer configuration", throwable );
113          }
114        }
115    
116      public void reduce( Object key, Iterator values, OutputCollector output, Reporter reporter ) throws IOException
117        {
118        currentProcess.setReporter( reporter );
119        currentProcess.setOutputCollector( output );
120    
121        timedIterator.reset( values ); // allows us to count read tuples
122    
123        if( !calledPrepare )
124          {
125          currentProcess.increment( SliceCounters.Process_Begin_Time, System.currentTimeMillis() );
126    
127          streamGraph.prepare();
128    
129          calledPrepare = true;
130    
131          group.start( group );
132          }
133    
134        try
135          {
136          group.run( (Tuple) key, timedIterator );
137          }
138        catch( OutOfMemoryError error )
139          {
140          throw error;
141          }
142        catch( Throwable throwable )
143          {
144          reportIfLocal( throwable );
145    
146          if( throwable instanceof CascadingException )
147            throw (CascadingException) throwable;
148    
149          throw new FlowException( "internal error during reducer execution", throwable );
150          }
151        }
152    
153    
154      @Override
155      public void close() throws IOException
156        {
157        try
158          {
159          if( calledPrepare )
160            {
161            group.complete( group );
162    
163            streamGraph.cleanup();
164            }
165    
166          super.close();
167          }
168        finally
169          {
170          if( currentProcess != null )
171            currentProcess.increment( SliceCounters.Process_End_Time, System.currentTimeMillis() );
172          }
173        }
174    
175      /**
176       * Report the error to HadoopFlowStepJob if we are running in Hadoops local mode.
177       * @param throwable The throwable that was thrown.
178       */
179      private void reportIfLocal( Throwable throwable )
180        {
181        if ( HadoopUtil.isLocal( currentProcess.getJobConf() ) )
182          HadoopFlowStepJob.reportLocalError( throwable );
183        }
184      }