001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Collections; 025 import java.util.HashMap; 026 import java.util.HashSet; 027 import java.util.Iterator; 028 import java.util.Map; 029 import java.util.Set; 030 031 import cascading.flow.FlowException; 032 import cascading.flow.FlowProcess; 033 import cascading.flow.hadoop.planner.HadoopFlowStepJob; 034 import cascading.flow.hadoop.util.HadoopUtil; 035 import cascading.flow.planner.BaseFlowStep; 036 import cascading.flow.planner.FlowStepJob; 037 import cascading.flow.planner.Scope; 038 import cascading.property.ConfigDef; 039 import cascading.tap.Tap; 040 import cascading.tap.hadoop.io.MultiInputFormat; 041 import cascading.tap.hadoop.util.Hadoop18TapUtil; 042 import cascading.tap.hadoop.util.TempHfs; 043 import cascading.tuple.Fields; 044 import cascading.tuple.Tuple; 045 import cascading.tuple.hadoop.TupleSerialization; 046 import cascading.tuple.hadoop.util.CoGroupingComparator; 047 import cascading.tuple.hadoop.util.CoGroupingPartitioner; 048 import cascading.tuple.hadoop.util.GroupingComparator; 049 import cascading.tuple.hadoop.util.GroupingPartitioner; 050 import cascading.tuple.hadoop.util.GroupingSortingComparator; 051 import cascading.tuple.hadoop.util.GroupingSortingPartitioner; 052 import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator; 053 import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 054 import cascading.tuple.hadoop.util.ReverseTupleComparator; 055 import cascading.tuple.hadoop.util.TupleComparator; 056 import cascading.tuple.io.IndexTuple; 057 import cascading.tuple.io.TuplePair; 058 import cascading.util.Util; 059 import cascading.util.Version; 060 import org.apache.hadoop.fs.Path; 061 import org.apache.hadoop.mapred.FileOutputFormat; 062 import org.apache.hadoop.mapred.JobConf; 063 064 import static cascading.flow.hadoop.util.HadoopUtil.serializeBase64; 065 import static cascading.flow.hadoop.util.HadoopUtil.writeStateToDistCache; 066 067 /** 068 * 069 */ 070 public class HadoopFlowStep extends BaseFlowStep<JobConf> 071 { 072 /** Field mapperTraps */ 073 private final Map<String, Tap> mapperTraps = new HashMap<String, Tap>(); 074 /** Field reducerTraps */ 075 private final Map<String, Tap> reducerTraps = new HashMap<String, Tap>(); 076 077 public HadoopFlowStep( String name, int stepNum ) 078 { 079 super( name, stepNum ); 080 } 081 082 public JobConf getInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 083 { 084 JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig ); 085 086 // disable warning 087 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 088 089 conf.setJobName( getStepDisplayName( conf.getInt( "cascading.step.display.id.truncate", Util.ID_LENGTH ) ) ); 090 091 conf.setOutputKeyClass( Tuple.class ); 092 conf.setOutputValueClass( Tuple.class ); 093 094 conf.setMapRunnerClass( FlowMapper.class ); 095 conf.setReducerClass( FlowReducer.class ); 096 097 // set for use by the shuffling phase 098 TupleSerialization.setSerializations( conf ); 099 100 initFromSources( flowProcess, conf ); 101 102 initFromSink( flowProcess, conf ); 103 104 initFromTraps( flowProcess, conf ); 105 106 initFromProcessConfigDef( conf ); 107 108 if( getSink().getScheme().getNumSinkParts() != 0 ) 109 { 110 // if no reducer, set num map tasks to control parts 111 if( getGroup() != null ) 112 conf.setNumReduceTasks( getSink().getScheme().getNumSinkParts() ); 113 else 114 conf.setNumMapTasks( getSink().getScheme().getNumSinkParts() ); 115 } 116 117 conf.setOutputKeyComparatorClass( TupleComparator.class ); 118 119 if( getGroup() == null ) 120 { 121 conf.setNumReduceTasks( 0 ); // disable reducers 122 } 123 else 124 { 125 // must set map output defaults when performing a reduce 126 conf.setMapOutputKeyClass( Tuple.class ); 127 conf.setMapOutputValueClass( Tuple.class ); 128 conf.setPartitionerClass( GroupingPartitioner.class ); 129 130 // handles the case the groupby sort should be reversed 131 if( getGroup().isSortReversed() ) 132 conf.setOutputKeyComparatorClass( ReverseTupleComparator.class ); 133 134 addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors() ); 135 136 if( getGroup().isGroupBy() ) 137 addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors() ); 138 139 if( !getGroup().isGroupBy() ) 140 { 141 conf.setPartitionerClass( CoGroupingPartitioner.class ); 142 conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index 143 conf.setMapOutputValueClass( IndexTuple.class ); 144 conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index 145 conf.setOutputValueGroupingComparator( CoGroupingComparator.class ); 146 } 147 148 if( getGroup().isSorted() ) 149 { 150 conf.setPartitionerClass( GroupingSortingPartitioner.class ); 151 conf.setMapOutputKeyClass( TuplePair.class ); 152 153 if( getGroup().isSortReversed() ) 154 conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class ); 155 else 156 conf.setOutputKeyComparatorClass( GroupingSortingComparator.class ); 157 158 // no need to supply a reverse comparator, only equality is checked 159 conf.setOutputValueGroupingComparator( GroupingComparator.class ); 160 } 161 } 162 163 // perform last so init above will pass to tasks 164 String versionString = Version.getRelease(); 165 166 if( versionString != null ) 167 conf.set( "cascading.version", versionString ); 168 169 conf.set( CASCADING_FLOW_STEP_ID, getID() ); 170 conf.set( "cascading.flow.step.num", Integer.toString( getStepNum() ) ); 171 172 String stepState = pack( this, conf ); 173 174 // hadoop 20.2 doesn't like dist cache when using local mode 175 int maxSize = Short.MAX_VALUE; 176 if( isHadoopLocalMode( conf ) || stepState.length() < maxSize ) // seems safe 177 conf.set( "cascading.flow.step", stepState ); 178 else 179 conf.set( "cascading.flow.step.path", writeStateToDistCache( conf, getID(), stepState ) ); 180 181 return conf; 182 } 183 184 public boolean isHadoopLocalMode( JobConf conf ) 185 { 186 return HadoopUtil.isLocal( conf ); 187 } 188 189 private String pack( Object object, JobConf conf ) 190 { 191 try 192 { 193 return serializeBase64( object, conf, true ); 194 } 195 catch( IOException exception ) 196 { 197 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 198 } 199 } 200 201 protected FlowStepJob<JobConf> createFlowStepJob( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 202 { 203 JobConf initializedConfig = getInitializedConfig( flowProcess, parentConfig ); 204 205 setConf( initializedConfig ); 206 207 return new HadoopFlowStepJob( createClientState( flowProcess ), this, initializedConfig ); 208 } 209 210 /** 211 * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown. 212 * 213 * @param config of type JobConf 214 */ 215 public void clean( JobConf config ) 216 { 217 String stepStatePath = config.get( "cascading.flow.step.path" ); 218 219 if( stepStatePath != null ) 220 { 221 try 222 { 223 HadoopUtil.removeStateFromDistCache( config, stepStatePath ); 224 } 225 catch( IOException exception ) 226 { 227 logWarn( "unable to remove step state file: " + stepStatePath, exception ); 228 } 229 } 230 231 if( tempSink != null ) 232 { 233 try 234 { 235 tempSink.deleteResource( config ); 236 } 237 catch( Exception exception ) 238 { 239 // sink all exceptions, don't fail app 240 logWarn( "unable to remove temporary file: " + tempSink, exception ); 241 } 242 } 243 244 if( getSink() instanceof TempHfs && 245 ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 246 { 247 try 248 { 249 getSink().deleteResource( config ); 250 } 251 catch( Exception exception ) 252 { 253 // sink all exceptions, don't fail app 254 logWarn( "unable to remove temporary file: " + getSink(), exception ); 255 } 256 } 257 else 258 { 259 cleanTapMetaData( config, getSink() ); 260 } 261 262 for( Tap tap : getMapperTraps().values() ) 263 cleanTapMetaData( config, tap ); 264 265 for( Tap tap : getReducerTraps().values() ) 266 cleanTapMetaData( config, tap ); 267 268 } 269 270 private void cleanTapMetaData( JobConf jobConf, Tap tap ) 271 { 272 try 273 { 274 Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap ); 275 } 276 catch( IOException exception ) 277 { 278 // ignore exception 279 } 280 } 281 282 private void addComparators( JobConf conf, String property, Map<String, Fields> map ) 283 { 284 Iterator<Fields> fieldsIterator = map.values().iterator(); 285 286 if( !fieldsIterator.hasNext() ) 287 return; 288 289 Fields fields = fieldsIterator.next(); 290 291 if( fields.hasComparators() ) 292 { 293 conf.set( property, pack( fields, conf ) ); 294 return; 295 } 296 297 // use resolved fields if there are no comparators. 298 Set<Scope> previousScopes = getPreviousScopes( getGroup() ); 299 300 fields = previousScopes.iterator().next().getOutValuesFields(); 301 302 if( fields.size() != 0 ) // allows fields.UNKNOWN to be used 303 conf.setInt( property + ".size", fields.size() ); 304 } 305 306 private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps ) 307 { 308 if( !traps.isEmpty() ) 309 { 310 JobConf trapConf = HadoopUtil.copyJobConf( conf ); 311 312 for( Tap tap : traps.values() ) 313 tap.sinkConfInit( flowProcess, trapConf ); 314 } 315 } 316 317 protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf ) 318 { 319 // handles case where same tap is used on multiple branches 320 // we do not want to init the same tap multiple times 321 Set<Tap> uniqueSources = getUniqueStreamedSources(); 322 323 JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ]; 324 int i = 0; 325 326 for( Tap tap : uniqueSources ) 327 { 328 if( tap.getIdentifier() == null ) 329 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 330 331 streamedJobs[ i ] = flowProcess.copyConfig( conf ); 332 tap.sourceConfInit( flowProcess, streamedJobs[ i ] ); 333 streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) ); 334 i++; 335 } 336 337 Set<Tap> accumulatedSources = getAllAccumulatedSources(); 338 339 for( Tap tap : accumulatedSources ) 340 { 341 JobConf accumulatedJob = flowProcess.copyConfig( conf ); 342 tap.sourceConfInit( flowProcess, accumulatedJob ); 343 Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob ); 344 conf.set( "cascading.step.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 345 } 346 347 MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last 348 } 349 350 public Tap getTapForID( Set<Tap> taps, String id ) 351 { 352 for( Tap tap : taps ) 353 { 354 if( Tap.id( tap ).equals( id ) ) 355 return tap; 356 } 357 358 return null; 359 } 360 361 private void initFromProcessConfigDef( final JobConf conf ) 362 { 363 initConfFromProcessConfigDef( getSetterFor( conf ) ); 364 } 365 366 private ConfigDef.Setter getSetterFor( final JobConf conf ) 367 { 368 return new ConfigDef.Setter() 369 { 370 @Override 371 public String set( String key, String value ) 372 { 373 String oldValue = get( key ); 374 375 conf.set( key, value ); 376 377 return oldValue; 378 } 379 380 @Override 381 public String update( String key, String value ) 382 { 383 String oldValue = get( key ); 384 385 if( oldValue == null ) 386 conf.set( key, value ); 387 else if( !oldValue.contains( value ) ) 388 conf.set( key, oldValue + "," + value ); 389 390 return oldValue; 391 } 392 393 @Override 394 public String get( String key ) 395 { 396 String value = conf.get( key ); 397 398 if( value == null || value.isEmpty() ) 399 return null; 400 401 return value; 402 } 403 }; 404 } 405 406 /** 407 * sources are specific to step, remove all known accumulated sources, if any 408 * 409 * @return 410 */ 411 private Set<Tap> getUniqueStreamedSources() 412 { 413 HashSet<Tap> set = new HashSet<Tap>( sources.keySet() ); 414 415 set.removeAll( getAllAccumulatedSources() ); 416 417 return set; 418 } 419 420 protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf ) 421 { 422 // init sink first so tempSink can take precedence 423 if( getSink() != null ) 424 getSink().sinkConfInit( flowProcess, conf ); 425 426 if( FileOutputFormat.getOutputPath( conf ) == null ) 427 tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true ); 428 429 // tempSink exists because sink is writeDirect 430 if( tempSink != null ) 431 tempSink.sinkConfInit( flowProcess, conf ); 432 } 433 434 protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf ) 435 { 436 initFromTraps( flowProcess, conf, getMapperTraps() ); 437 initFromTraps( flowProcess, conf, getReducerTraps() ); 438 } 439 440 @Override 441 public Set<Tap> getTraps() 442 { 443 Set<Tap> set = new HashSet<Tap>(); 444 445 set.addAll( mapperTraps.values() ); 446 set.addAll( reducerTraps.values() ); 447 448 return Collections.unmodifiableSet( set ); 449 } 450 451 @Override 452 public Tap getTrap( String name ) 453 { 454 Tap trap = getMapperTrap( name ); 455 456 if( trap == null ) 457 trap = getReducerTrap( name ); 458 459 return trap; 460 } 461 462 public Map<String, Tap> getMapperTraps() 463 { 464 return mapperTraps; 465 } 466 467 public Map<String, Tap> getReducerTraps() 468 { 469 return reducerTraps; 470 } 471 472 public Tap getMapperTrap( String name ) 473 { 474 return getMapperTraps().get( name ); 475 } 476 477 public Tap getReducerTrap( String name ) 478 { 479 return getReducerTraps().get( name ); 480 } 481 }