001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.util.Arrays; 026 import java.util.Collection; 027 import java.util.HashMap; 028 import java.util.Map; 029 import java.util.Properties; 030 031 import cascading.flow.FlowException; 032 import cascading.flow.FlowProcess; 033 import cascading.flow.hadoop.util.HadoopUtil; 034 import cascading.scheme.Scheme; 035 import cascading.scheme.SinkCall; 036 import cascading.scheme.SourceCall; 037 import cascading.tap.Tap; 038 import cascading.tuple.TupleEntryCollector; 039 import cascading.tuple.TupleEntryIterator; 040 import riffle.process.scheduler.ProcessException; 041 import riffle.process.scheduler.ProcessWrapper; 042 043 /** 044 * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs. 045 * <p/> 046 * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 047 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 048 * according to their dependencies (topologically). 049 * <p/> 050 * Though this class sub-classes {@link HadoopFlow}, it does not support all the methods available or features. 051 * <p/> 052 * Currently {@link cascading.flow.FlowListener}s are supported but the 053 * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not. 054 */ 055 public class ProcessFlow<P> extends HadoopFlow 056 { 057 /** Field process */ 058 private final P process; 059 /** Field processWrapper */ 060 private final ProcessWrapper processWrapper; 061 062 private boolean isStarted = false; // only used for event handling 063 064 /** 065 * Constructor ProcessFlow creates a new ProcessFlow instance. 066 * 067 * @param name of type String 068 * @param process of type JobConf 069 */ 070 @ConstructorProperties({"name", "process"}) 071 public ProcessFlow( String name, P process ) 072 { 073 this( new Properties(), name, process ); 074 } 075 076 /** 077 * Constructor ProcessFlow creates a new ProcessFlow instance. 078 * 079 * @param properties of type Map<Object, Object> 080 * @param name of type String 081 * @param process of type P 082 */ 083 @ConstructorProperties({"properties", "name", "process"}) 084 public ProcessFlow( Map<Object, Object> properties, String name, P process ) 085 { 086 this( properties, name, process, null ); 087 } 088 089 /** 090 * Constructor ProcessFlow creates a new ProcessFlow instance. 091 * 092 * @param properties of type Map<Object, Object> 093 * @param name of type String 094 * @param process of type P 095 * @param flowDescriptor pf type LinkedHashMap<String, String> 096 */ 097 @ConstructorProperties({"properties", "name", "process", "flowDescriptor"}) 098 public ProcessFlow( Map<Object, Object> properties, String name, P process, Map<String, String> flowDescriptor ) 099 { 100 super( HadoopUtil.getPlatformInfo(), properties, null, name, flowDescriptor ); 101 this.process = process; 102 this.processWrapper = new ProcessWrapper( this.process ); 103 104 setName( name ); 105 setTapFromProcess(); 106 } 107 108 /** 109 * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies. 110 * <p/> 111 * This method may be called repeatedly to re-configure the source and sink taps. 112 */ 113 public void setTapFromProcess() 114 { 115 setSources( createSources( this.processWrapper ) ); 116 setSinks( createSinks( this.processWrapper ) ); 117 setTraps( createTraps( this.processWrapper ) ); 118 } 119 120 /** 121 * Method getProcess returns the process of this ProcessFlow object. 122 * 123 * @return the process (type P) of this ProcessFlow object. 124 */ 125 public P getProcess() 126 { 127 return process; 128 } 129 130 @Override 131 public void prepare() 132 { 133 try 134 { 135 processWrapper.prepare(); 136 } 137 catch( ProcessException exception ) 138 { 139 if( exception.getCause() instanceof RuntimeException ) 140 throw (RuntimeException) exception.getCause(); 141 142 throw new FlowException( "could not call prepare on process", exception.getCause() ); 143 } 144 } 145 146 @Override 147 public void start() 148 { 149 try 150 { 151 fireOnStarting(); 152 processWrapper.start(); 153 flowStats.markStarted(); 154 isStarted = true; 155 } 156 catch( ProcessException exception ) 157 { 158 if( exception.getCause() instanceof RuntimeException ) 159 throw (RuntimeException) exception.getCause(); 160 161 throw new FlowException( "could not call start on process", exception.getCause() ); 162 } 163 } 164 165 @Override 166 public void stop() 167 { 168 try 169 { 170 fireOnStopping(); 171 processWrapper.stop(); 172 173 if( !flowStats.isFinished() ) 174 flowStats.markStopped(); 175 } 176 catch( ProcessException exception ) 177 { 178 flowStats.markFailed( exception ); 179 if( exception.getCause() instanceof RuntimeException ) 180 throw (RuntimeException) exception.getCause(); 181 182 throw new FlowException( "could not call stop on process", exception.getCause() ); 183 } 184 } 185 186 @Override 187 public void complete() 188 { 189 try 190 { 191 if( !isStarted ) 192 { 193 fireOnStarting(); 194 isStarted = true; 195 flowStats.markStarted(); 196 } 197 198 flowStats.markRunning(); 199 processWrapper.complete(); 200 fireOnCompleted(); 201 flowStats.markSuccessful(); 202 } 203 catch( ProcessException exception ) 204 { 205 flowStats.markFailed( exception ); 206 if( exception.getCause() instanceof RuntimeException ) 207 throw (RuntimeException) exception.getCause(); 208 209 throw new FlowException( "could not call complete on process", exception.getCause() ); 210 } 211 } 212 213 @Override 214 public void cleanup() 215 { 216 try 217 { 218 processWrapper.cleanup(); 219 } 220 catch( ProcessException exception ) 221 { 222 if( exception.getCause() instanceof RuntimeException ) 223 throw (RuntimeException) exception.getCause(); 224 225 throw new FlowException( "could not call cleanup on process", exception.getCause() ); 226 } 227 } 228 229 private Map<String, Tap> createSources( ProcessWrapper processParent ) 230 { 231 try 232 { 233 return makeTapMap( processParent.getDependencyIncoming() ); 234 } 235 catch( ProcessException exception ) 236 { 237 if( exception.getCause() instanceof RuntimeException ) 238 throw (RuntimeException) exception.getCause(); 239 240 throw new FlowException( "could not get process incoming dependency", exception.getCause() ); 241 } 242 } 243 244 private Map<String, Tap> createSinks( ProcessWrapper processParent ) 245 { 246 try 247 { 248 return makeTapMap( processParent.getDependencyOutgoing() ); 249 } 250 catch( ProcessException exception ) 251 { 252 if( exception.getCause() instanceof RuntimeException ) 253 throw (RuntimeException) exception.getCause(); 254 255 throw new FlowException( "could not get process outgoing dependency", exception.getCause() ); 256 } 257 } 258 259 private Map<String, Tap> makeTapMap( Object resource ) 260 { 261 Collection paths = makeCollection( resource ); 262 263 Map<String, Tap> taps = new HashMap<String, Tap>(); 264 265 for( Object path : paths ) 266 { 267 if( path instanceof Tap ) 268 taps.put( ( (Tap) path ).getIdentifier(), (Tap) path ); 269 else 270 taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) ); 271 } 272 return taps; 273 } 274 275 private Collection makeCollection( Object resource ) 276 { 277 if( resource instanceof Collection ) 278 return (Collection) resource; 279 else if( resource instanceof Object[] ) 280 return Arrays.asList( (Object[]) resource ); 281 else 282 return Arrays.asList( resource ); 283 } 284 285 private Map<String, Tap> createTraps( ProcessWrapper processParent ) 286 { 287 return new HashMap<String, Tap>(); 288 } 289 290 @Override 291 public String toString() 292 { 293 return getName() + ":" + process; 294 } 295 296 static class NullScheme extends Scheme 297 { 298 public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 299 { 300 } 301 302 public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 303 { 304 } 305 306 public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException 307 { 308 throw new UnsupportedOperationException( "sourcing is not supported in the scheme" ); 309 } 310 311 @Override 312 public String toString() 313 { 314 return getClass().getSimpleName(); 315 } 316 317 public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException 318 { 319 throw new UnsupportedOperationException( "sinking is not supported in the scheme" ); 320 } 321 } 322 323 /** 324 * 325 */ 326 static class ProcessTap extends Tap 327 { 328 private final String token; 329 330 ProcessTap( NullScheme scheme, String token ) 331 { 332 super( scheme ); 333 this.token = token; 334 } 335 336 @Override 337 public String getIdentifier() 338 { 339 return token; 340 } 341 342 @Override 343 public TupleEntryIterator openForRead( FlowProcess flowProcess, Object input ) throws IOException 344 { 345 return null; 346 } 347 348 @Override 349 public TupleEntryCollector openForWrite( FlowProcess flowProcess, Object output ) throws IOException 350 { 351 return null; 352 } 353 354 @Override 355 public boolean createResource( Object conf ) throws IOException 356 { 357 return false; 358 } 359 360 @Override 361 public boolean deleteResource( Object conf ) throws IOException 362 { 363 return false; 364 } 365 366 @Override 367 public boolean resourceExists( Object conf ) throws IOException 368 { 369 return false; 370 } 371 372 @Override 373 public long getModifiedTime( Object conf ) throws IOException 374 { 375 return 0; 376 } 377 378 @Override 379 public String toString() 380 { 381 return token; 382 } 383 } 384 }