001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.Iterator;
025
026import cascading.CascadingException;
027import cascading.flow.FlowException;
028import cascading.flow.FlowNode;
029import cascading.flow.FlowSession;
030import cascading.flow.FlowStep;
031import cascading.flow.SliceCounters;
032import cascading.flow.hadoop.planner.HadoopFlowStepJob;
033import cascading.flow.hadoop.stream.HadoopGroupGate;
034import cascading.flow.hadoop.stream.graph.HadoopReduceStreamGraph;
035import cascading.flow.hadoop.util.HadoopUtil;
036import cascading.flow.hadoop.util.TimedIterator;
037import cascading.flow.planner.BaseFlowNode;
038import cascading.flow.stream.duct.Duct;
039import cascading.flow.stream.element.ElementDuct;
040import cascading.tap.Tap;
041import cascading.tuple.Tuple;
042import cascading.util.Util;
043import org.apache.hadoop.mapred.JobConf;
044import org.apache.hadoop.mapred.MapReduceBase;
045import org.apache.hadoop.mapred.OutputCollector;
046import org.apache.hadoop.mapred.Reducer;
047import org.apache.hadoop.mapred.Reporter;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import static cascading.flow.hadoop.util.HadoopMRUtil.readStateFromDistCache;
052import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64;
053import static cascading.util.LogUtil.logCounters;
054import static cascading.util.LogUtil.logMemory;
055
056/** Class FlowReducer is the Hadoop Reducer implementation. */
057public class FlowReducer extends MapReduceBase implements Reducer
058  {
059  private static final Logger LOG = LoggerFactory.getLogger( FlowReducer.class );
060
061  private FlowNode flowNode;
062  private HadoopReduceStreamGraph streamGraph;
063  private HadoopFlowProcess currentProcess;
064  private TimedIterator<Tuple>[] timedIterators;
065
066  private boolean calledPrepare = false;
067  private HadoopGroupGate group;
068  private long processBeginTime;
069
070  /** Constructor FlowReducer creates a new FlowReducer instance. */
071  public FlowReducer()
072    {
073    }
074
075  @Override
076  public void configure( JobConf jobConf )
077    {
078    try
079      {
080      super.configure( jobConf );
081      HadoopUtil.initLog4j( jobConf );
082
083      LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) );
084      LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) );
085
086      currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, false );
087
088      timedIterators = TimedIterator.iterators( new TimedIterator<Tuple>( currentProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read ) );
089
090      String reduceNodeState = jobConf.getRaw( "cascading.flow.step.node.reduce" );
091
092      if( reduceNodeState == null )
093        reduceNodeState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ), "reduce" );
094
095      flowNode = deserializeBase64( reduceNodeState, jobConf, BaseFlowNode.class );
096
097      LOG.info( "flow node id: {}, ordinal: {}", flowNode.getID(), flowNode.getOrdinal() );
098
099      streamGraph = new HadoopReduceStreamGraph( currentProcess, flowNode, Util.getFirst( flowNode.getSourceElements() ) );
100
101      group = (HadoopGroupGate) streamGraph.getHeads().iterator().next();
102
103      for( Duct head : streamGraph.getHeads() )
104        LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() );
105
106      for( Duct tail : streamGraph.getTails() )
107        LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() );
108
109      for( Tap trap : flowNode.getTraps() )
110        LOG.info( "trapping to: " + trap );
111
112      logMemory( LOG, "flow node id: " + flowNode.getID() + ", mem on start" );
113      }
114    catch( Throwable throwable )
115      {
116      reportIfLocal( throwable );
117
118      if( throwable instanceof CascadingException )
119        throw (CascadingException) throwable;
120
121      throw new FlowException( "internal error during reducer configuration", throwable );
122      }
123    }
124
125  public void reduce( Object key, Iterator values, OutputCollector output, Reporter reporter ) throws IOException
126    {
127    currentProcess.setReporter( reporter );
128    currentProcess.setOutputCollector( output );
129
130    timedIterators[ 0 ].reset( values ); // allows us to count read tuples
131
132    if( !calledPrepare )
133      {
134      streamGraph.prepare();
135
136      calledPrepare = true;
137
138      processBeginTime = System.currentTimeMillis();
139      currentProcess.increment( SliceCounters.Process_Begin_Time, processBeginTime );
140
141      group.start( group );
142      }
143
144    try
145      {
146      group.accept( (Tuple) key, timedIterators );
147      }
148    catch( OutOfMemoryError error )
149      {
150      throw error;
151      }
152    catch( Throwable throwable )
153      {
154      reportIfLocal( throwable );
155
156      if( throwable instanceof CascadingException )
157        throw (CascadingException) throwable;
158
159      throw new FlowException( "internal error during reducer execution", throwable );
160      }
161    }
162
163  @Override
164  public void close() throws IOException
165    {
166    try
167      {
168      if( calledPrepare )
169        {
170        group.complete( group );
171
172        streamGraph.cleanup();
173        }
174
175      super.close();
176      }
177    finally
178      {
179      if( currentProcess != null )
180        {
181        long processEndTime = System.currentTimeMillis();
182        currentProcess.increment( SliceCounters.Process_End_Time, processEndTime );
183        currentProcess.increment( SliceCounters.Process_Duration, processEndTime - processBeginTime );
184        }
185
186      String message = "flow node id: " + flowNode.getID();
187      logMemory( LOG, message + ", mem on close" );
188      logCounters( LOG, message + ", counter:", currentProcess );
189      }
190    }
191
192  /**
193   * Report the error to HadoopFlowStepJob if we are running in Hadoops local mode.
194   *
195   * @param throwable The throwable that was thrown.
196   */
197  private void reportIfLocal( Throwable throwable )
198    {
199    if( HadoopUtil.isLocal( currentProcess.getJobConf() ) )
200      HadoopFlowStepJob.reportLocalError( throwable );
201    }
202  }