001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.util.HashMap; 025 import java.util.Map; 026 import java.util.Properties; 027 028 import cascading.flow.FlowStep; 029 import cascading.flow.hadoop.planner.HadoopStepGraph; 030 import cascading.flow.hadoop.util.HadoopUtil; 031 import cascading.flow.planner.FlowStepGraph; 032 import cascading.scheme.NullScheme; 033 import cascading.tap.Tap; 034 import cascading.tap.hadoop.Hfs; 035 import org.apache.hadoop.fs.Path; 036 import org.apache.hadoop.mapred.FileInputFormat; 037 import org.apache.hadoop.mapred.FileOutputFormat; 038 import org.apache.hadoop.mapred.JobConf; 039 040 /** 041 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs 042 * pre-configured via the {@link JobConf} object. 043 * <p/> 044 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 045 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 046 * according to their dependencies (topologically). 047 * <p/> 048 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job. 049 * <p/> 050 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap. 051 * <p/> 052 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the 053 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and 054 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into 055 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone. 056 */ 057 public class MapReduceFlow extends HadoopFlow 058 { 059 /** Field deleteSinkOnInit */ 060 protected boolean deleteSinkOnInit = false; 061 062 /** 063 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 064 * 065 * @param jobConf of type JobConf 066 */ 067 @ConstructorProperties({"jobConf"}) 068 public MapReduceFlow( JobConf jobConf ) 069 { 070 this( jobConf.getJobName(), jobConf, false ); 071 } 072 073 /** 074 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 075 * 076 * @param jobConf of type JobConf 077 * @param deleteSinkOnInit of type boolean 078 */ 079 @ConstructorProperties({"jobConf", "deleteSinkOnInit"}) 080 public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit ) 081 { 082 this( jobConf.getJobName(), jobConf, deleteSinkOnInit ); 083 } 084 085 /** 086 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 087 * 088 * @param name of type String 089 * @param jobConf of type JobConf 090 */ 091 @ConstructorProperties({"name", "jobConf"}) 092 public MapReduceFlow( String name, JobConf jobConf ) 093 { 094 this( name, jobConf, false ); 095 } 096 097 /** 098 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 099 * 100 * @param name of type String 101 * @param jobConf of type JobConf 102 * @param deleteSinkOnInit of type boolean 103 */ 104 @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"}) 105 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit ) 106 { 107 this( name, jobConf, deleteSinkOnInit, true ); 108 } 109 110 /** 111 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 112 * 113 * @param name of type String 114 * @param jobConf of type JobConf 115 * @param deleteSinkOnInit of type boolean 116 * @param stopJobsOnExit of type boolean 117 */ 118 @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit", "stopJobsOnExit"}) 119 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit, boolean stopJobsOnExit ) 120 { 121 super( HadoopUtil.getPlatformInfo(), new Properties(), jobConf, name ); 122 this.deleteSinkOnInit = deleteSinkOnInit; 123 this.stopJobsOnExit = stopJobsOnExit; 124 125 setSources( createSources( jobConf ) ); 126 setSinks( createSinks( jobConf ) ); 127 setTraps( createTraps( jobConf ) ); 128 setFlowStepGraph( makeStepGraph( jobConf ) ); 129 initSteps(); 130 131 initializeNewJobsMap(); 132 } 133 134 private FlowStepGraph makeStepGraph( JobConf jobConf ) 135 { 136 FlowStepGraph<JobConf> flowStepGraph = new HadoopStepGraph(); 137 138 Tap sink = getSinksCollection().iterator().next(); 139 FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink ); 140 141 flowStepGraph.addVertex( step ); 142 143 return flowStepGraph; 144 } 145 146 protected Map<String, Tap> createSources( JobConf jobConf ) 147 { 148 Path[] paths = FileInputFormat.getInputPaths( jobConf ); 149 150 Map<String, Tap> taps = new HashMap<String, Tap>(); 151 152 for( Path path : paths ) 153 taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) ); 154 155 return taps; 156 } 157 158 protected Map<String, Tap> createSinks( JobConf jobConf ) 159 { 160 Map<String, Tap> taps = new HashMap<String, Tap>(); 161 162 String path = FileOutputFormat.getOutputPath( jobConf ).toString(); 163 164 taps.put( path, new Hfs( new NullScheme(), path, deleteSinkOnInit ) ); 165 166 return taps; 167 } 168 169 protected Map<String, Tap> createTraps( JobConf jobConf ) 170 { 171 return new HashMap<String, Tap>(); 172 } 173 }