001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.Properties; 028 029import cascading.CascadingException; 030import cascading.flow.FlowStep; 031import cascading.flow.hadoop.util.HadoopUtil; 032import cascading.flow.planner.process.FlowStepGraph; 033import cascading.scheme.NullScheme; 034import cascading.tap.SinkMode; 035import cascading.tap.Tap; 036import cascading.tap.hadoop.Hfs; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.mapred.FileInputFormat; 039import org.apache.hadoop.mapred.FileOutputFormat; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapreduce.Job; 042 043/** 044 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs 045 * pre-configured via the {@link JobConf} object. 046 * <p/> 047 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 048 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 049 * according to their dependencies (topologically). 050 * <p/> 051 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job. 052 * <p/> 053 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap. 054 * <p/> 055 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the 056 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and 057 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into 058 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone. 059 * <p/> 060 * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs. 061 */ 062public class MapReduceFlow extends HadoopFlow 063 { 064 /** Field deleteSinkOnInit */ 065 protected boolean deleteSinkOnInit = false; 066 067 /** 068 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 069 * 070 * @param jobConf of type JobConf 071 */ 072 @ConstructorProperties({"jobConf"}) 073 public MapReduceFlow( JobConf jobConf ) 074 { 075 this( jobConf.getJobName(), jobConf, false ); 076 } 077 078 /** 079 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 080 * 081 * @param jobConf of type JobConf 082 * @param deleteSinkOnInit of type boolean 083 */ 084 @ConstructorProperties({"jobConf", "deleteSinkOnInit"}) 085 public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit ) 086 { 087 this( jobConf.getJobName(), jobConf, deleteSinkOnInit ); 088 } 089 090 /** 091 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 092 * 093 * @param name of type String 094 * @param jobConf of type JobConf 095 */ 096 @ConstructorProperties({"name", "jobConf"}) 097 public MapReduceFlow( String name, JobConf jobConf ) 098 { 099 this( name, jobConf, false ); 100 } 101 102 /** 103 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 104 * 105 * @param name of type String 106 * @param jobConf of type JobConf 107 * @param deleteSinkOnInit of type boolean 108 */ 109 @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"}) 110 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit ) 111 { 112 this( new Properties(), name, jobConf, null, deleteSinkOnInit, true ); 113 } 114 115 /** 116 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 117 * 118 * @param properties of type Properties 119 * @param name of type String 120 * @param jobConf of type JobConf 121 * @param deleteSinkOnInit of type boolean 122 */ 123 @ConstructorProperties({"properties", "name", "jobConf", "deleteSinkOnInit"}) 124 public MapReduceFlow( Properties properties, String name, JobConf jobConf, boolean deleteSinkOnInit ) 125 { 126 this( properties, name, jobConf, null, deleteSinkOnInit, true ); 127 } 128 129 /** 130 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 131 * 132 * @param properties of type Properties 133 * @param name of type String 134 * @param jobConf of type JobConf 135 * @param flowDescriptor of type Map<String, String> 136 * @param deleteSinkOnInit of type boolean 137 */ 138 @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit"}) 139 public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit ) 140 { 141 this( properties, name, jobConf, flowDescriptor, deleteSinkOnInit, true ); 142 } 143 144 /** 145 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 146 * 147 * @param properties of type Properties 148 * @param name of type String 149 * @param jobConf of type JobConf 150 * @param flowDescriptor of type Map<String, String> 151 * @param deleteSinkOnInit of type boolean 152 * @param stopJobsOnExit of type boolean 153 */ 154 @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit", "stopJobsOnExit"}) 155 public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit, boolean stopJobsOnExit ) 156 { 157 super( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, jobConf, name, flowDescriptor ); 158 this.deleteSinkOnInit = deleteSinkOnInit; 159 this.stopJobsOnExit = stopJobsOnExit; 160 161 initializeFrom( jobConf ); // push off initialization allowing for overrides 162 } 163 164 protected void initializeFrom( JobConf jobConf ) 165 { 166 setSources( createSources( jobConf ) ); 167 setSinks( createSinks( jobConf ) ); 168 setTraps( createTraps( jobConf ) ); 169 setFlowStepGraph( makeStepGraph( jobConf ) ); 170 171 // this mirrors BaseFlow#initialize() 172 173 initSteps(); 174 175 this.flowStats = createPrepareFlowStats(); // must be last 176 177 initializeNewJobsMap(); 178 179 initializeChildStats(); 180 } 181 182 protected FlowStepGraph makeStepGraph( JobConf jobConf ) 183 { 184 FlowStepGraph flowStepGraph = new FlowStepGraph(); 185 186 Tap sink = getSinksCollection().iterator().next(); 187 FlowStep<JobConf> step = createFlowStep( jobConf, sink ); 188 189 flowStepGraph.addVertex( step ); 190 191 return flowStepGraph; 192 } 193 194 protected FlowStep<JobConf> createFlowStep( JobConf jobConf, Tap sink ) 195 { 196 return new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink ); 197 } 198 199 protected Map<String, Tap> createSources( JobConf jobConf ) 200 { 201 return fileInputToTaps( jobConf ); 202 } 203 204 protected Map<String, Tap> fileInputToTaps( JobConf jobConf ) 205 { 206 Path[] paths = FileInputFormat.getInputPaths( jobConf ); 207 208 if( paths == null || paths.length == 0 ) 209 { 210 try 211 { 212 paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) ); 213 } 214 catch( IOException exception ) 215 { 216 throw new CascadingException( exception ); 217 } 218 } 219 220 Map<String, Tap> taps = new HashMap<>(); 221 222 if( paths == null ) 223 return taps; 224 225 for( Path path : paths ) 226 toSourceTap( taps, path ); 227 228 return taps; 229 } 230 231 protected Tap toSourceTap( Map<String, Tap> taps, Path path ) 232 { 233 String name = makeNameFromPath( taps, path ); 234 235 return taps.put( name, new Hfs( new NullScheme(), path.toString() ) ); 236 } 237 238 protected Map<String, Tap> createSinks( JobConf jobConf ) 239 { 240 return fileOutputToTaps( jobConf ); 241 } 242 243 protected Map<String, Tap> fileOutputToTaps( JobConf jobConf ) 244 { 245 Path path = FileOutputFormat.getOutputPath( jobConf ); 246 247 if( path == null ) 248 { 249 try 250 { 251 path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) ); 252 } 253 catch( IOException exception ) 254 { 255 throw new CascadingException( exception ); 256 } 257 } 258 259 Map<String, Tap> taps = new HashMap<>(); 260 261 if( path != null ) 262 toSinkTap( taps, path ); 263 264 return taps; 265 } 266 267 protected Tap toSinkTap( Map<String, Tap> taps, Path path ) 268 { 269 String name = makeNameFromPath( taps, path ); 270 271 return taps.put( name, new Hfs( new NullScheme(), path.toString(), deleteSinkOnInit ? SinkMode.REPLACE : SinkMode.KEEP ) ); 272 } 273 274 // find the least sensitive name 275 protected String makeNameFromPath( Map<String, Tap> taps, Path path ) 276 { 277 Path parent = path.getParent(); 278 String name = path.getName(); 279 280 while( taps.containsKey( name ) ) 281 { 282 name = new Path( parent.getName(), name ).toString(); 283 parent = parent.getParent(); 284 } 285 286 return name; 287 } 288 289 protected Map<String, Tap> createTraps( JobConf jobConf ) 290 { 291 return new HashMap<>(); 292 } 293 }