001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.Iterator; 025 026import cascading.CascadingException; 027import cascading.flow.FlowException; 028import cascading.flow.FlowNode; 029import cascading.flow.FlowSession; 030import cascading.flow.FlowStep; 031import cascading.flow.Flows; 032import cascading.flow.SliceCounters; 033import cascading.flow.hadoop.planner.HadoopFlowStepJob; 034import cascading.flow.hadoop.stream.graph.HadoopMapStreamGraph; 035import cascading.flow.hadoop.util.HadoopUtil; 036import cascading.flow.planner.BaseFlowNode; 037import cascading.flow.stream.duct.Duct; 038import cascading.flow.stream.element.ElementDuct; 039import cascading.flow.stream.element.SourceStage; 040import cascading.tap.Tap; 041import org.apache.hadoop.mapred.JobConf; 042import org.apache.hadoop.mapred.MapRunnable; 043import org.apache.hadoop.mapred.OutputCollector; 044import org.apache.hadoop.mapred.RecordReader; 045import org.apache.hadoop.mapred.Reporter; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import static cascading.flow.hadoop.util.HadoopMRUtil.readStateFromDistCache; 050import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64; 051import static cascading.util.LogUtil.logCounters; 052import static cascading.util.LogUtil.logMemory; 053 054/** Class FlowMapper is the Hadoop Mapper implementation. */ 055public class FlowMapper implements MapRunnable 056 { 057 private static final Logger LOG = LoggerFactory.getLogger( FlowMapper.class ); 058 059 private FlowNode flowNode; 060 private HadoopMapStreamGraph streamGraph; 061 private HadoopFlowProcess currentProcess; 062 063 /** Constructor FlowMapper creates a new FlowMapper instance. */ 064 public FlowMapper() 065 { 066 } 067 068 @Override 069 public void configure( JobConf jobConf ) 070 { 071 try 072 { 073 HadoopUtil.initLog4j( jobConf ); 074 075 LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) ); 076 LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) ); 077 078 currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, true ); 079 080 String mapNodeState = jobConf.getRaw( "cascading.flow.step.node.map" ); 081 082 if( mapNodeState == null ) 083 mapNodeState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ), "map" ); 084 085 flowNode = deserializeBase64( mapNodeState, jobConf, BaseFlowNode.class ); 086 087 LOG.info( "flow node id: {}, ordinal: {}", flowNode.getID(), flowNode.getOrdinal() ); 088 089 Tap source = Flows.getTapForID( flowNode.getSourceTaps(), jobConf.get( "cascading.step.source" ) ); 090 091 streamGraph = new HadoopMapStreamGraph( currentProcess, flowNode, source ); 092 093 for( Duct head : streamGraph.getHeads() ) 094 LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() ); 095 096 for( Duct tail : streamGraph.getTails() ) 097 LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() ); 098 099 for( Tap trap : flowNode.getTraps() ) 100 LOG.info( "trapping to: " + trap ); 101 102 logMemory( LOG, "flow node id: " + flowNode.getID() + ", mem on start" ); 103 } 104 catch( Throwable throwable ) 105 { 106 reportIfLocal( throwable ); 107 108 if( throwable instanceof CascadingException ) 109 throw (CascadingException) throwable; 110 111 throw new FlowException( "internal error during mapper configuration", throwable ); 112 } 113 } 114 115 @Override 116 public void run( RecordReader input, OutputCollector output, Reporter reporter ) throws IOException 117 { 118 currentProcess.setReporter( reporter ); 119 currentProcess.setOutputCollector( output ); 120 121 streamGraph.prepare(); 122 123 long processBeginTime = System.currentTimeMillis(); 124 125 currentProcess.increment( SliceCounters.Process_Begin_Time, processBeginTime ); 126 127 SourceStage streamedHead = streamGraph.getStreamedHead(); 128 Iterator<Duct> iterator = streamGraph.getHeads().iterator(); 129 130 try 131 { 132 try 133 { 134 while( iterator.hasNext() ) 135 { 136 Duct next = iterator.next(); 137 138 if( next != streamedHead ) 139 ( (SourceStage) next ).run( null ); 140 } 141 142 streamedHead.run( input ); 143 } 144 catch( OutOfMemoryError error ) 145 { 146 throw error; 147 } 148 catch( IOException exception ) 149 { 150 reportIfLocal( exception ); 151 throw exception; 152 } 153 catch( Throwable throwable ) 154 { 155 reportIfLocal( throwable ); 156 157 if( throwable instanceof CascadingException ) 158 throw (CascadingException) throwable; 159 160 throw new FlowException( "internal error during mapper execution", throwable ); 161 } 162 } 163 finally 164 { 165 try 166 { 167 streamGraph.cleanup(); 168 } 169 finally 170 { 171 long processEndTime = System.currentTimeMillis(); 172 173 currentProcess.increment( SliceCounters.Process_End_Time, processEndTime ); 174 currentProcess.increment( SliceCounters.Process_Duration, processEndTime - processBeginTime ); 175 176 String message = "flow node id: " + flowNode.getID(); 177 logMemory( LOG, message + ", mem on close" ); 178 logCounters( LOG, message + ", counter:", currentProcess ); 179 } 180 } 181 } 182 183 /** 184 * Report the error to HadoopFlowStepJob if we are running in Hadoops local mode. 185 * 186 * @param throwable The throwable that was thrown. 187 */ 188 private void reportIfLocal( Throwable throwable ) 189 { 190 if( HadoopUtil.isLocal( currentProcess.getJobConf() ) ) 191 HadoopFlowStepJob.reportLocalError( throwable ); 192 } 193 }