001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashSet; 027 import java.util.Map; 028 import java.util.Set; 029 030 import cascading.CascadingException; 031 import cascading.flow.FlowProcess; 032 import cascading.flow.FlowSession; 033 import cascading.flow.hadoop.util.HadoopUtil; 034 import cascading.tap.Tap; 035 import cascading.tuple.Fields; 036 import cascading.tuple.TupleEntry; 037 import cascading.tuple.TupleEntryCollector; 038 import cascading.tuple.TupleEntryIterator; 039 import org.apache.hadoop.mapred.JobConf; 040 import org.apache.hadoop.mapred.OutputCollector; 041 import org.apache.hadoop.mapred.Reporter; 042 import org.apache.hadoop.util.ReflectionUtils; 043 044 /** 045 * Class HadoopFlowProcess is an implementation of {@link FlowProcess} for Hadoop. Use this interface to get direct 046 * access to the Hadoop JobConf and Reporter interfaces. 047 * <p/> 048 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to 049 * fail if they are executed on a system other than Hadoop. 050 * 051 * @see cascading.flow.FlowSession 052 * @see JobConf 053 * @see Reporter 054 */ 055 public class HadoopFlowProcess extends FlowProcess<JobConf> 056 { 057 /** Field jobConf */ 058 final JobConf jobConf; 059 /** Field isMapper */ 060 private final boolean isMapper; 061 /** Field reporter */ 062 Reporter reporter = Reporter.NULL; 063 private OutputCollector outputCollector; 064 065 public HadoopFlowProcess() 066 { 067 this.jobConf = new JobConf(); 068 this.isMapper = true; 069 } 070 071 public HadoopFlowProcess( JobConf jobConf ) 072 { 073 this.jobConf = jobConf; 074 this.isMapper = true; 075 } 076 077 public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf ) 078 { 079 super( flowSession ); 080 this.jobConf = jobConf; 081 this.isMapper = true; 082 } 083 084 /** 085 * Constructor HadoopFlowProcess creates a new HadoopFlowProcess instance. 086 * 087 * @param flowSession of type FlowSession 088 * @param jobConf of type JobConf 089 */ 090 public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf, boolean isMapper ) 091 { 092 super( flowSession ); 093 this.jobConf = jobConf; 094 this.isMapper = isMapper; 095 } 096 097 public HadoopFlowProcess( HadoopFlowProcess flowProcess, JobConf jobConf ) 098 { 099 super( flowProcess.getCurrentSession() ); 100 this.jobConf = jobConf; 101 this.isMapper = flowProcess.isMapper(); 102 this.reporter = flowProcess.getReporter(); 103 } 104 105 @Override 106 public FlowProcess copyWith( JobConf jobConf ) 107 { 108 return new HadoopFlowProcess( this, jobConf ); 109 } 110 111 /** 112 * Method getJobConf returns the jobConf of this HadoopFlowProcess object. 113 * 114 * @return the jobConf (type JobConf) of this HadoopFlowProcess object. 115 */ 116 public JobConf getJobConf() 117 { 118 return jobConf; 119 } 120 121 @Override 122 public JobConf getConfigCopy() 123 { 124 return HadoopUtil.copyJobConf( jobConf ); 125 } 126 127 /** 128 * Method isMapper returns true if this part of the FlowProcess is a MapReduce mapper. If false, it is a reducer. 129 * 130 * @return boolean 131 */ 132 public boolean isMapper() 133 { 134 return isMapper; 135 } 136 137 public int getCurrentNumMappers() 138 { 139 return getJobConf().getNumMapTasks(); 140 } 141 142 public int getCurrentNumReducers() 143 { 144 return getJobConf().getNumReduceTasks(); 145 } 146 147 /** 148 * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task. 149 * 150 * @return int 151 */ 152 @Override 153 public int getCurrentSliceNum() 154 { 155 return getJobConf().getInt( "mapred.task.partition", 0 ); 156 } 157 158 @Override 159 public int getNumProcessSlices() 160 { 161 if( isMapper() ) 162 return getCurrentNumMappers(); 163 else 164 return getCurrentNumReducers(); 165 } 166 167 /** 168 * Method setReporter sets the reporter of this HadoopFlowProcess object. 169 * 170 * @param reporter the reporter of this HadoopFlowProcess object. 171 */ 172 public void setReporter( Reporter reporter ) 173 { 174 this.reporter = reporter; 175 } 176 177 /** 178 * Method getReporter returns the reporter of this HadoopFlowProcess object. 179 * 180 * @return the reporter (type Reporter) of this HadoopFlowProcess object. 181 */ 182 public Reporter getReporter() 183 { 184 return reporter; 185 } 186 187 public void setOutputCollector( OutputCollector outputCollector ) 188 { 189 this.outputCollector = outputCollector; 190 } 191 192 public OutputCollector getOutputCollector() 193 { 194 return outputCollector; 195 } 196 197 @Override 198 public Object getProperty( String key ) 199 { 200 return jobConf.get( key ); 201 } 202 203 @Override 204 public Collection<String> getPropertyKeys() 205 { 206 Set<String> keys = new HashSet<String>(); 207 208 for( Map.Entry<String, String> entry : jobConf ) 209 keys.add( entry.getKey() ); 210 211 return Collections.unmodifiableSet( keys ); 212 } 213 214 @Override 215 public Object newInstance( String className ) 216 { 217 if( className == null || className.isEmpty() ) 218 return null; 219 220 try 221 { 222 Class type = (Class) HadoopFlowProcess.class.getClassLoader().loadClass( className.toString() ); 223 224 return ReflectionUtils.newInstance( type, jobConf ); 225 } 226 catch( ClassNotFoundException exception ) 227 { 228 throw new CascadingException( "unable to load class: " + className.toString(), exception ); 229 } 230 } 231 232 @Override 233 public void keepAlive() 234 { 235 getReporter().progress(); 236 } 237 238 @Override 239 public void increment( Enum counter, long amount ) 240 { 241 getReporter().incrCounter( counter, amount ); 242 } 243 244 @Override 245 public void increment( String group, String counter, long amount ) 246 { 247 getReporter().incrCounter( group, counter, amount ); 248 } 249 250 @Override 251 public void setStatus( String status ) 252 { 253 getReporter().setStatus( status ); 254 } 255 256 @Override 257 public boolean isCounterStatusInitialized() 258 { 259 return getReporter() != null; 260 } 261 262 @Override 263 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 264 { 265 return tap.openForRead( this ); 266 } 267 268 @Override 269 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 270 { 271 return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks 272 } 273 274 @Override 275 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 276 { 277 JobConf jobConf = HadoopUtil.copyJobConf( getJobConf() ); 278 279 int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 ); 280 String partname; 281 282 if( jobConf.getBoolean( "mapred.task.is.map", true ) ) 283 partname = String.format( "-m-%05d-", stepNum ); 284 else 285 partname = String.format( "-r-%05d-", stepNum ); 286 287 jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" ); 288 289 return trap.openForWrite( new HadoopFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks 290 } 291 292 @Override 293 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 294 { 295 return new TupleEntryCollector( Fields.size( 2 ) ) 296 { 297 @Override 298 protected void collect( TupleEntry tupleEntry ) 299 { 300 try 301 { 302 getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) ); 303 } 304 catch( IOException exception ) 305 { 306 throw new CascadingException( "failed collecting key and value", exception ); 307 } 308 } 309 }; 310 } 311 312 @Override 313 public JobConf copyConfig( JobConf jobConf ) 314 { 315 return HadoopUtil.copyJobConf( jobConf ); 316 } 317 318 @Override 319 public Map<String, String> diffConfigIntoMap( JobConf defaultConfig, JobConf updatedConfig ) 320 { 321 return HadoopUtil.getConfig( defaultConfig, updatedConfig ); 322 } 323 324 @Override 325 public JobConf mergeMapIntoConfig( JobConf defaultConfig, Map<String, String> map ) 326 { 327 return HadoopUtil.mergeConf( defaultConfig, map, false ); 328 } 329 }