001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.util.Arrays; 026 import java.util.Collection; 027 import java.util.HashMap; 028 import java.util.Map; 029 import java.util.Properties; 030 031 import cascading.flow.FlowException; 032 import cascading.flow.FlowProcess; 033 import cascading.flow.hadoop.util.HadoopUtil; 034 import cascading.scheme.Scheme; 035 import cascading.scheme.SinkCall; 036 import cascading.scheme.SourceCall; 037 import cascading.tap.Tap; 038 import cascading.tuple.TupleEntryCollector; 039 import cascading.tuple.TupleEntryIterator; 040 import riffle.process.scheduler.ProcessException; 041 import riffle.process.scheduler.ProcessWrapper; 042 043 /** 044 * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs. 045 * <p/> 046 * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 047 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 048 * according to their dependencies (topologically). 049 * <p/> 050 * Though this class sub-classes {@link HadoopFlow}, it does not support all the methods available or features. 051 * <p/> 052 * Currently {@link cascading.flow.FlowListener}s are supported but the 053 * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not. 054 */ 055 public class ProcessFlow<P> extends HadoopFlow 056 { 057 /** Field process */ 058 private final P process; 059 /** Field processWrapper */ 060 private final ProcessWrapper processWrapper; 061 062 private boolean isStarted = false; // only used for event handling 063 064 /** 065 * Constructor ProcessFlow creates a new ProcessFlow instance. 066 * 067 * @param name of type String 068 * @param process of type JobConf 069 */ 070 @ConstructorProperties({"name", "process"}) 071 public ProcessFlow( String name, P process ) 072 { 073 this( new Properties(), name, process ); 074 } 075 076 /** 077 * Constructor ProcessFlow creates a new ProcessFlow instance. 078 * 079 * @param properties of type Map<Object, Object> 080 * @param name of type String 081 * @param process of type P 082 */ 083 @ConstructorProperties({"properties", "name", "process"}) 084 public ProcessFlow( Map<Object, Object> properties, String name, P process ) 085 { 086 super( HadoopUtil.getPlatformInfo(), properties, null, name ); 087 this.process = process; 088 this.processWrapper = new ProcessWrapper( this.process ); 089 090 setName( name ); 091 setTapFromProcess(); 092 } 093 094 /** 095 * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies. 096 * <p/> 097 * This method may be called repeatedly to re-configure the source and sink taps. 098 */ 099 public void setTapFromProcess() 100 { 101 setSources( createSources( this.processWrapper ) ); 102 setSinks( createSinks( this.processWrapper ) ); 103 setTraps( createTraps( this.processWrapper ) ); 104 } 105 106 /** 107 * Method getProcess returns the process of this ProcessFlow object. 108 * 109 * @return the process (type P) of this ProcessFlow object. 110 */ 111 public P getProcess() 112 { 113 return process; 114 } 115 116 @Override 117 public void prepare() 118 { 119 try 120 { 121 processWrapper.prepare(); 122 } 123 catch( ProcessException exception ) 124 { 125 if( exception.getCause() instanceof RuntimeException ) 126 throw (RuntimeException) exception.getCause(); 127 128 throw new FlowException( "could not call prepare on process", exception.getCause() ); 129 } 130 } 131 132 @Override 133 public void start() 134 { 135 try 136 { 137 fireOnStarting(); 138 processWrapper.start(); 139 flowStats.markStarted(); 140 isStarted = true; 141 } 142 catch( ProcessException exception ) 143 { 144 if( exception.getCause() instanceof RuntimeException ) 145 throw (RuntimeException) exception.getCause(); 146 147 throw new FlowException( "could not call start on process", exception.getCause() ); 148 } 149 } 150 151 @Override 152 public void stop() 153 { 154 try 155 { 156 fireOnStopping(); 157 processWrapper.stop(); 158 flowStats.markStopped(); 159 } 160 catch( ProcessException exception ) 161 { 162 flowStats.markFailed( exception ); 163 if( exception.getCause() instanceof RuntimeException ) 164 throw (RuntimeException) exception.getCause(); 165 166 throw new FlowException( "could not call stop on process", exception.getCause() ); 167 } 168 } 169 170 @Override 171 public void complete() 172 { 173 try 174 { 175 if( !isStarted ) 176 { 177 fireOnStarting(); 178 isStarted = true; 179 flowStats.markStarted(); 180 } 181 182 flowStats.markRunning(); 183 processWrapper.complete(); 184 fireOnCompleted(); 185 flowStats.markSuccessful(); 186 } 187 catch( ProcessException exception ) 188 { 189 flowStats.markFailed( exception ); 190 if( exception.getCause() instanceof RuntimeException ) 191 throw (RuntimeException) exception.getCause(); 192 193 throw new FlowException( "could not call complete on process", exception.getCause() ); 194 } 195 } 196 197 @Override 198 public void cleanup() 199 { 200 try 201 { 202 processWrapper.cleanup(); 203 } 204 catch( ProcessException exception ) 205 { 206 if( exception.getCause() instanceof RuntimeException ) 207 throw (RuntimeException) exception.getCause(); 208 209 throw new FlowException( "could not call cleanup on process", exception.getCause() ); 210 } 211 } 212 213 private Map<String, Tap> createSources( ProcessWrapper processParent ) 214 { 215 try 216 { 217 return makeTapMap( processParent.getDependencyIncoming() ); 218 } 219 catch( ProcessException exception ) 220 { 221 if( exception.getCause() instanceof RuntimeException ) 222 throw (RuntimeException) exception.getCause(); 223 224 throw new FlowException( "could not get process incoming dependency", exception.getCause() ); 225 } 226 } 227 228 private Map<String, Tap> createSinks( ProcessWrapper processParent ) 229 { 230 try 231 { 232 return makeTapMap( processParent.getDependencyOutgoing() ); 233 } 234 catch( ProcessException exception ) 235 { 236 if( exception.getCause() instanceof RuntimeException ) 237 throw (RuntimeException) exception.getCause(); 238 239 throw new FlowException( "could not get process outgoing dependency", exception.getCause() ); 240 } 241 } 242 243 private Map<String, Tap> makeTapMap( Object resource ) 244 { 245 Collection paths = makeCollection( resource ); 246 247 Map<String, Tap> taps = new HashMap<String, Tap>(); 248 249 for( Object path : paths ) 250 { 251 if( path instanceof Tap ) 252 taps.put( ( (Tap) path ).getIdentifier(), (Tap) path ); 253 else 254 taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) ); 255 } 256 return taps; 257 } 258 259 private Collection makeCollection( Object resource ) 260 { 261 if( resource instanceof Collection ) 262 return (Collection) resource; 263 else if( resource instanceof Object[] ) 264 return Arrays.asList( (Object[]) resource ); 265 else 266 return Arrays.asList( resource ); 267 } 268 269 private Map<String, Tap> createTraps( ProcessWrapper processParent ) 270 { 271 return new HashMap<String, Tap>(); 272 } 273 274 @Override 275 public String toString() 276 { 277 return getName() + ":" + process; 278 } 279 280 static class NullScheme extends Scheme 281 { 282 public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 283 { 284 } 285 286 public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 287 { 288 } 289 290 public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException 291 { 292 throw new UnsupportedOperationException( "sourcing is not supported in the scheme" ); 293 } 294 295 @Override 296 public String toString() 297 { 298 return getClass().getSimpleName(); 299 } 300 301 public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException 302 { 303 throw new UnsupportedOperationException( "sinking is not supported in the scheme" ); 304 } 305 } 306 307 /** 308 * 309 */ 310 static class ProcessTap extends Tap 311 { 312 private final String token; 313 314 ProcessTap( NullScheme scheme, String token ) 315 { 316 super( scheme ); 317 this.token = token; 318 } 319 320 @Override 321 public String getIdentifier() 322 { 323 return token; 324 } 325 326 @Override 327 public TupleEntryIterator openForRead( FlowProcess flowProcess, Object input ) throws IOException 328 { 329 return null; 330 } 331 332 @Override 333 public TupleEntryCollector openForWrite( FlowProcess flowProcess, Object output ) throws IOException 334 { 335 return null; 336 } 337 338 @Override 339 public boolean createResource( Object conf ) throws IOException 340 { 341 return false; 342 } 343 344 @Override 345 public boolean deleteResource( Object conf ) throws IOException 346 { 347 return false; 348 } 349 350 @Override 351 public boolean resourceExists( Object conf ) throws IOException 352 { 353 return false; 354 } 355 356 @Override 357 public long getModifiedTime( Object conf ) throws IOException 358 { 359 return 0; 360 } 361 362 @Override 363 public String toString() 364 { 365 return token; 366 } 367 } 368 }