001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Iterator; 025 026 import cascading.CascadingException; 027 import cascading.flow.FlowException; 028 import cascading.flow.FlowSession; 029 import cascading.flow.FlowStep; 030 import cascading.flow.SliceCounters; 031 import cascading.flow.hadoop.planner.HadoopFlowStepJob; 032 import cascading.flow.hadoop.stream.HadoopGroupGate; 033 import cascading.flow.hadoop.stream.HadoopReduceStreamGraph; 034 import cascading.flow.hadoop.util.HadoopUtil; 035 import cascading.flow.hadoop.util.TimedIterator; 036 import cascading.flow.stream.Duct; 037 import cascading.flow.stream.ElementDuct; 038 import cascading.tap.Tap; 039 import cascading.tuple.Tuple; 040 import org.apache.hadoop.mapred.JobConf; 041 import org.apache.hadoop.mapred.MapReduceBase; 042 import org.apache.hadoop.mapred.OutputCollector; 043 import org.apache.hadoop.mapred.Reducer; 044 import org.apache.hadoop.mapred.Reporter; 045 import org.slf4j.Logger; 046 import org.slf4j.LoggerFactory; 047 048 import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64; 049 import static cascading.flow.hadoop.util.HadoopUtil.readStateFromDistCache; 050 051 /** Class FlowReducer is the Hadoop Reducer implementation. */ 052 public class FlowReducer extends MapReduceBase implements Reducer 053 { 054 private static final Logger LOG = LoggerFactory.getLogger( FlowReducer.class ); 055 056 /** Field flowReducerStack */ 057 private HadoopReduceStreamGraph streamGraph; 058 /** Field currentProcess */ 059 private HadoopFlowProcess currentProcess; 060 private TimedIterator timedIterator; 061 062 private boolean calledPrepare = false; 063 private HadoopGroupGate group; 064 065 /** Constructor FlowReducer creates a new FlowReducer instance. */ 066 public FlowReducer() 067 { 068 } 069 070 @Override 071 public void configure( JobConf jobConf ) 072 { 073 try 074 { 075 super.configure( jobConf ); 076 HadoopUtil.initLog4j( jobConf ); 077 078 LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) ); 079 LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) ); 080 081 currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, false ); 082 083 timedIterator = new TimedIterator( currentProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read ); 084 085 String stepState = jobConf.getRaw( "cascading.flow.step" ); 086 087 if( stepState == null ) 088 stepState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ) ); 089 090 HadoopFlowStep step = deserializeBase64( stepState, jobConf, HadoopFlowStep.class ); 091 092 streamGraph = new HadoopReduceStreamGraph( currentProcess, step ); 093 094 group = (HadoopGroupGate) streamGraph.getHeads().iterator().next(); 095 096 for( Duct head : streamGraph.getHeads() ) 097 LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() ); 098 099 for( Duct tail : streamGraph.getTails() ) 100 LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() ); 101 102 for( Tap trap : step.getReducerTraps().values() ) 103 LOG.info( "trapping to: " + trap ); 104 } 105 catch( Throwable throwable ) 106 { 107 reportIfLocal( throwable ); 108 109 if( throwable instanceof CascadingException ) 110 throw (CascadingException) throwable; 111 112 throw new FlowException( "internal error during reducer configuration", throwable ); 113 } 114 } 115 116 public void reduce( Object key, Iterator values, OutputCollector output, Reporter reporter ) throws IOException 117 { 118 currentProcess.setReporter( reporter ); 119 currentProcess.setOutputCollector( output ); 120 121 timedIterator.reset( values ); // allows us to count read tuples 122 123 if( !calledPrepare ) 124 { 125 currentProcess.increment( SliceCounters.Process_Begin_Time, System.currentTimeMillis() ); 126 127 streamGraph.prepare(); 128 129 calledPrepare = true; 130 131 group.start( group ); 132 } 133 134 try 135 { 136 group.run( (Tuple) key, timedIterator ); 137 } 138 catch( OutOfMemoryError error ) 139 { 140 throw error; 141 } 142 catch( Throwable throwable ) 143 { 144 reportIfLocal( throwable ); 145 146 if( throwable instanceof CascadingException ) 147 throw (CascadingException) throwable; 148 149 throw new FlowException( "internal error during reducer execution", throwable ); 150 } 151 } 152 153 154 @Override 155 public void close() throws IOException 156 { 157 try 158 { 159 if( calledPrepare ) 160 { 161 group.complete( group ); 162 163 streamGraph.cleanup(); 164 } 165 166 super.close(); 167 } 168 finally 169 { 170 if( currentProcess != null ) 171 currentProcess.increment( SliceCounters.Process_End_Time, System.currentTimeMillis() ); 172 } 173 } 174 175 /** 176 * Report the error to HadoopFlowStepJob if we are running in Hadoops local mode. 177 * @param throwable The throwable that was thrown. 178 */ 179 private void reportIfLocal( Throwable throwable ) 180 { 181 if ( HadoopUtil.isLocal( currentProcess.getJobConf() ) ) 182 HadoopFlowStepJob.reportLocalError( throwable ); 183 } 184 }