001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Iterator; 025 026 import cascading.CascadingException; 027 import cascading.flow.FlowException; 028 import cascading.flow.FlowSession; 029 import cascading.flow.FlowStep; 030 import cascading.flow.SliceCounters; 031 import cascading.flow.hadoop.planner.HadoopFlowStepJob; 032 import cascading.flow.hadoop.stream.HadoopMapStreamGraph; 033 import cascading.flow.hadoop.util.HadoopUtil; 034 import cascading.flow.stream.Duct; 035 import cascading.flow.stream.ElementDuct; 036 import cascading.flow.stream.SourceStage; 037 import cascading.tap.Tap; 038 import org.apache.hadoop.mapred.JobConf; 039 import org.apache.hadoop.mapred.MapRunnable; 040 import org.apache.hadoop.mapred.OutputCollector; 041 import org.apache.hadoop.mapred.RecordReader; 042 import org.apache.hadoop.mapred.Reporter; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64; 047 import static cascading.flow.hadoop.util.HadoopUtil.readStateFromDistCache; 048 049 /** Class FlowMapper is the Hadoop Mapper implementation. */ 050 public class FlowMapper implements MapRunnable 051 { 052 private static final Logger LOG = LoggerFactory.getLogger( FlowMapper.class ); 053 054 private HadoopMapStreamGraph streamGraph; 055 /** Field currentProcess */ 056 private HadoopFlowProcess currentProcess; 057 058 059 /** Constructor FlowMapper creates a new FlowMapper instance. */ 060 public FlowMapper() 061 { 062 } 063 064 @Override 065 public void configure( JobConf jobConf ) 066 { 067 try 068 { 069 HadoopUtil.initLog4j( jobConf ); 070 071 LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) ); 072 LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) ); 073 074 currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, true ); 075 076 String stepState = jobConf.getRaw( "cascading.flow.step" ); 077 078 if( stepState == null ) 079 stepState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ) ); 080 081 HadoopFlowStep step = deserializeBase64( stepState, jobConf, HadoopFlowStep.class ); 082 Tap source = step.getTapForID( step.getSources(), jobConf.get( "cascading.step.source" ) ); 083 084 streamGraph = new HadoopMapStreamGraph( currentProcess, step, source ); 085 086 for( Duct head : streamGraph.getHeads() ) 087 LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() ); 088 089 for( Duct tail : streamGraph.getTails() ) 090 LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() ); 091 092 for( Tap trap : step.getMapperTraps().values() ) 093 LOG.info( "trapping to: " + trap ); 094 } 095 catch( Throwable throwable ) 096 { 097 reportIfLocal( throwable ); 098 099 if( throwable instanceof CascadingException ) 100 throw (CascadingException) throwable; 101 102 throw new FlowException( "internal error during mapper configuration", throwable ); 103 } 104 } 105 106 @Override 107 public void run( RecordReader input, OutputCollector output, Reporter reporter ) throws IOException 108 { 109 currentProcess.setReporter( reporter ); 110 currentProcess.increment( SliceCounters.Process_Begin_Time, System.currentTimeMillis() ); 111 currentProcess.setOutputCollector( output ); 112 113 streamGraph.prepare(); 114 115 SourceStage streamedHead = streamGraph.getStreamedHead(); 116 Iterator<Duct> iterator = streamGraph.getHeads().iterator(); 117 118 try 119 { 120 try 121 { 122 while( iterator.hasNext() ) 123 { 124 Duct next = iterator.next(); 125 126 if( next != streamedHead ) 127 ( (SourceStage) next ).run( null ); 128 } 129 130 streamedHead.run( input ); 131 } 132 catch( OutOfMemoryError error ) 133 { 134 throw error; 135 } 136 catch( IOException exception ) 137 { 138 reportIfLocal( exception ); 139 throw exception; 140 } 141 catch( Throwable throwable ) 142 { 143 reportIfLocal( throwable ); 144 145 if( throwable instanceof CascadingException ) 146 throw (CascadingException) throwable; 147 148 throw new FlowException( "internal error during mapper execution", throwable ); 149 } 150 } 151 finally 152 { 153 try 154 { 155 streamGraph.cleanup(); 156 } 157 finally 158 { 159 currentProcess.increment( SliceCounters.Process_End_Time, System.currentTimeMillis() ); 160 } 161 } 162 } 163 164 /** 165 * Report the error to HadoopFlowStepJob if we are running in Hadoops local mode. 166 * @param throwable The throwable that was thrown. 167 */ 168 private void reportIfLocal( Throwable throwable ) 169 { 170 if ( HadoopUtil.isLocal( currentProcess.getJobConf() ) ) 171 HadoopFlowStepJob.reportLocalError( throwable ); 172 } 173 }