001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.CascadingException; 031import cascading.flow.FlowProcess; 032import cascading.flow.FlowSession; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.tap.Tap; 035import cascading.tuple.Fields; 036import cascading.tuple.TupleEntry; 037import cascading.tuple.TupleEntryCollector; 038import cascading.tuple.TupleEntryIterator; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapred.OutputCollector; 042import org.apache.hadoop.mapred.Reporter; 043import org.apache.hadoop.util.ReflectionUtils; 044 045/** 046 * Class HadoopFlowProcess is an implementation of {@link FlowProcess} for Hadoop. Use this interface to get direct 047 * access to the Hadoop JobConf and Reporter interfaces. 048 * <p/> 049 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to 050 * fail if they are executed on a system other than Hadoop. 051 * 052 * @see cascading.flow.FlowSession 053 * @see JobConf 054 * @see Reporter 055 */ 056public class HadoopFlowProcess extends FlowProcess<JobConf> implements MapRed 057 { 058 /** Field jobConf */ 059 final JobConf jobConf; 060 /** Field isMapper */ 061 private final boolean isMapper; 062 /** Field reporter */ 063 Reporter reporter = Reporter.NULL; 064 /** Field outputCollector */ 065 private OutputCollector outputCollector; 066 067 public HadoopFlowProcess() 068 { 069 this.jobConf = new JobConf(); 070 this.isMapper = true; 071 } 072 073 public HadoopFlowProcess( Configuration jobConf ) 074 { 075 this( new JobConf( jobConf ) ); 076 } 077 078 public HadoopFlowProcess( JobConf jobConf ) 079 { 080 this.jobConf = jobConf; 081 this.isMapper = true; 082 } 083 084 public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf ) 085 { 086 super( flowSession ); 087 this.jobConf = jobConf; 088 this.isMapper = true; 089 } 090 091 /** 092 * Constructor HadoopFlowProcess creates a new HadoopFlowProcess instance. 093 * 094 * @param flowSession of type FlowSession 095 * @param jobConf of type JobConf 096 */ 097 public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf, boolean isMapper ) 098 { 099 super( flowSession ); 100 this.jobConf = jobConf; 101 this.isMapper = isMapper; 102 } 103 104 public HadoopFlowProcess( HadoopFlowProcess flowProcess, JobConf jobConf ) 105 { 106 super( flowProcess ); 107 this.jobConf = jobConf; 108 this.isMapper = flowProcess.isMapper(); 109 this.reporter = flowProcess.getReporter(); 110 } 111 112 @Override 113 public FlowProcess copyWith( JobConf jobConf ) 114 { 115 return new HadoopFlowProcess( this, jobConf ); 116 } 117 118 /** 119 * Method getJobConf returns the jobConf of this HadoopFlowProcess object. 120 * 121 * @return the jobConf (type JobConf) of this HadoopFlowProcess object. 122 */ 123 public JobConf getJobConf() 124 { 125 return jobConf; 126 } 127 128 @Override 129 public JobConf getConfig() 130 { 131 return jobConf; 132 } 133 134 @Override 135 public JobConf getConfigCopy() 136 { 137 return HadoopUtil.copyJobConf( jobConf ); 138 } 139 140 /** 141 * Method isMapper returns true if this part of the FlowProcess is a MapReduce mapper. If false, it is a reducer. 142 * 143 * @return boolean 144 */ 145 public boolean isMapper() 146 { 147 return isMapper; 148 } 149 150 public int getCurrentNumMappers() 151 { 152 return getJobConf().getNumMapTasks(); 153 } 154 155 public int getCurrentNumReducers() 156 { 157 return getJobConf().getNumReduceTasks(); 158 } 159 160 /** 161 * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task. 162 * 163 * @return int 164 */ 165 @Override 166 public int getCurrentSliceNum() 167 { 168 return getJobConf().getInt( "mapred.task.partition", 0 ); 169 } 170 171 @Override 172 public int getNumProcessSlices() 173 { 174 if( isMapper() ) 175 return getCurrentNumMappers(); 176 else 177 return getCurrentNumReducers(); 178 } 179 180 /** 181 * Method setReporter sets the reporter of this HadoopFlowProcess object. 182 * 183 * @param reporter the reporter of this HadoopFlowProcess object. 184 */ 185 public void setReporter( Reporter reporter ) 186 { 187 if( reporter == null ) 188 this.reporter = Reporter.NULL; 189 else 190 this.reporter = reporter; 191 } 192 193 @Override 194 public Reporter getReporter() 195 { 196 return reporter; 197 } 198 199 public void setOutputCollector( OutputCollector outputCollector ) 200 { 201 this.outputCollector = outputCollector; 202 } 203 204 public OutputCollector getOutputCollector() 205 { 206 return outputCollector; 207 } 208 209 @Override 210 public Object getProperty( String key ) 211 { 212 return jobConf.get( key ); 213 } 214 215 @Override 216 public Collection<String> getPropertyKeys() 217 { 218 Set<String> keys = new HashSet<String>(); 219 220 for( Map.Entry<String, String> entry : jobConf ) 221 keys.add( entry.getKey() ); 222 223 return Collections.unmodifiableSet( keys ); 224 } 225 226 @Override 227 public Object newInstance( String className ) 228 { 229 if( className == null || className.isEmpty() ) 230 return null; 231 232 try 233 { 234 Class type = (Class) HadoopFlowProcess.class.getClassLoader().loadClass( className.toString() ); 235 236 return ReflectionUtils.newInstance( type, jobConf ); 237 } 238 catch( ClassNotFoundException exception ) 239 { 240 throw new CascadingException( "unable to load class: " + className.toString(), exception ); 241 } 242 } 243 244 @Override 245 public void keepAlive() 246 { 247 getReporter().progress(); 248 } 249 250 @Override 251 public void increment( Enum counter, long amount ) 252 { 253 getReporter().incrCounter( counter, amount ); 254 } 255 256 @Override 257 public void increment( String group, String counter, long amount ) 258 { 259 getReporter().incrCounter( group, counter, amount ); 260 } 261 262 @Override 263 public long getCounterValue( Enum counter ) 264 { 265 return getReporter().getCounter( counter ).getValue(); 266 } 267 268 @Override 269 public long getCounterValue( String group, String counter ) 270 { 271 return getReporter().getCounter( group, counter ).getValue(); 272 } 273 274 @Override 275 public void setStatus( String status ) 276 { 277 getReporter().setStatus( status ); 278 } 279 280 @Override 281 public boolean isCounterStatusInitialized() 282 { 283 return getReporter() != null; 284 } 285 286 @Override 287 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 288 { 289 return tap.openForRead( this ); 290 } 291 292 @Override 293 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 294 { 295 return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks 296 } 297 298 @Override 299 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 300 { 301 JobConf jobConf = HadoopUtil.copyJobConf( getJobConf() ); 302 303 int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 ); 304 String partname; 305 306 if( jobConf.getBoolean( "mapred.task.is.map", true ) ) 307 partname = String.format( "-m-%05d-", stepNum ); 308 else 309 partname = String.format( "-r-%05d-", stepNum ); 310 311 jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" ); 312 313 return trap.openForWrite( new HadoopFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks 314 } 315 316 @Override 317 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 318 { 319 return new TupleEntryCollector( Fields.size( 2 ) ) 320 { 321 @Override 322 protected void collect( TupleEntry tupleEntry ) 323 { 324 try 325 { 326 getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) ); 327 } 328 catch( IOException exception ) 329 { 330 throw new CascadingException( "failed collecting key and value", exception ); 331 } 332 } 333 }; 334 } 335 336 @Override 337 public <C> C copyConfig( C config ) 338 { 339 return HadoopUtil.copyJobConf( config ); 340 } 341 342 @Override 343 public <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig ) 344 { 345 return HadoopUtil.getConfig( (Configuration) defaultConfig, (Configuration) updatedConfig ); 346 } 347 348 @Override 349 public JobConf mergeMapIntoConfig( JobConf defaultConfig, Map<String, String> map ) 350 { 351 return HadoopUtil.mergeConf( defaultConfig, map, false ); 352 } 353 }