@Process public abstract class BaseFlow<Config> extends Object implements Flow<Config>, ProcessLogger
Modifier and Type | Class and Description |
---|---|
static class |
BaseFlow.FlowHolder
Class FlowHolder is a helper class for wrapping Flow instances.
|
Modifier and Type | Field and Description |
---|---|
protected String |
flowCanonicalHash
Field flowCanonicalHash
|
protected HashMap<String,String> |
flowDescriptor |
protected FlowElementGraph |
flowElementGraph
Field flowElementGraph
|
protected FlowStats |
flowStats
Field flowStats
|
protected FlowStepGraph |
flowStepGraph
Field stepGraph
|
protected PlatformInfo |
platformInfo |
protected ShutdownUtil.Hook |
shutdownHook |
protected Map<String,Tap> |
sinks
Field sinks
|
protected Map<String,Tap> |
sources
Field sources
|
protected List<FlowStep<Config>> |
steps
Field steps
|
protected boolean |
stop
Field stop
|
protected boolean |
stopJobsOnExit
Field stopJobsOnExit
|
protected Thread |
thread
Field thread
|
CASCADING_FLOW_ID
NULL
Modifier | Constructor and Description |
---|---|
protected |
BaseFlow()
Used for testing.
|
protected |
BaseFlow(PlatformInfo platformInfo,
Map<Object,Object> properties,
Config defaultConfig,
FlowDef flowDef) |
protected |
BaseFlow(PlatformInfo platformInfo,
Map<Object,Object> properties,
Config defaultConfig,
String name,
Map<String,String> flowDescriptor) |
protected |
BaseFlow(PlatformInfo platformInfo,
String name)
Does not initialize stats
|
Modifier and Type | Method and Description |
---|---|
void |
addListener(FlowListener flowListener)
Method addListener registers the given flowListener with this instance.
|
protected void |
addPlannerProperties() |
protected void |
addSessionProperties(Map<Object,Object> properties) |
void |
addStepListener(FlowStepListener flowStepListener)
Method addStepListener registers the given flowStepListener with this instance.
|
boolean |
areSinksStale()
Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources.
|
boolean |
areSourcesNewer(long sinkModified)
Method areSourcesNewer returns true if any source is newer than the given sinkModified date value.
|
void |
cleanup() |
void |
complete()
Method complete starts the current Flow instance if it has not be previously started, then block until completion.
|
Config |
createConfig(Map<Object,Object> properties,
Config defaultConfig) |
protected String |
createFlowCanonicalHash(FlowElementGraph flowElementGraph) |
protected FlowStats |
createFlowStats() |
protected Thread |
createFlowThread(String threadName) |
protected FlowStats |
createPrepareFlowStats() |
void |
deleteCheckpointsIfNotUpdate() |
void |
deleteCheckpointsIfReplace() |
void |
deleteSinks()
Method deleteSinks deletes all sinks, whether or not they are configured for
SinkMode.UPDATE . |
void |
deleteSinksIfNotUpdate()
Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the
SinkMode.UPDATE flag. |
void |
deleteSinksIfReplace() |
void |
deleteTrapsIfNotUpdate() |
void |
deleteTrapsIfReplace() |
protected void |
fireOnCompleted() |
protected void |
fireOnStarting() |
protected void |
fireOnStopping() |
protected void |
fireOnThrowable() |
protected void |
fireOnThrowable(Throwable throwable) |
String |
getCascadeID()
Returns the parent Cascade ID that owns this Flow instance.
|
CascadingServices |
getCascadingServices() |
List<String> |
getCheckpointNames() |
Map<String,Tap> |
getCheckpoints()
Method getCheckpoints returns the checkpoint taps of this Flow object.
|
Collection<Tap> |
getCheckpointsCollection()
Method getCheckpointsCollection returns a
Collection of checkpoint Tap s for this Flow object. |
List<String> |
getClassPath() |
protected ClientState |
getClientState() |
protected Fields |
getFieldsFor(FlowElementGraph pipeGraph,
Tap tap) |
String |
getFlowCanonicalHash()
The hash value can be used to determine if two unique Flow instances performed the same work.
|
Map<String,String> |
getFlowDescriptor()
Returns an immutable map of properties giving more details about the Flow object.
|
FlowSession |
getFlowSession() |
FlowSkipStrategy |
getFlowSkipStrategy()
Method getFlowSkipStrategy returns the current
FlowSkipStrategy used by this Flow. |
FlowStats |
getFlowStats()
Method getFlowStats returns the flowStats of this Flow object.
|
List<FlowStep<Config>> |
getFlowSteps()
Method getFlowSteps returns the flowSteps of this Flow object.
|
FlowStepStrategy |
getFlowStepStrategy()
Returns the current
FlowStepStrategy instance. |
BaseFlow.FlowHolder |
getHolder()
Used to return a simple wrapper for use as an edge in a graph where there can only be
one instance of every edge.
|
String |
getID()
Method getID returns the ID of this Flow object.
|
protected abstract int |
getMaxNumParallelSteps() |
String |
getName()
Method getName returns the name of this Flow object.
|
PlannerInfo |
getPlannerInfo()
Returns any meta-data about the planner that created this Flow instance.
|
PlatformInfo |
getPlatformInfo()
Returns any meta-data about the underlying platform this Flow instance will run against.
|
String |
getRunID()
Returns the run ID given when this Flow instance was defined in the FlowDef.
|
Tap |
getSink()
Method getSink returns the first sink of this Flow object.
|
Tap |
getSink(String name) |
long |
getSinkModified()
Method getSinkModified returns the youngest modified date of any sink
Tap managed by this Flow instance. |
List<String> |
getSinkNames() |
Map<String,Tap> |
getSinks()
Method getSinks returns the sinks of this Flow object.
|
Collection<Tap> |
getSinksCollection()
Method getSinksCollection returns a
Collection of sink Tap s for this Flow object. |
Tap |
getSource(String name) |
List<String> |
getSourceNames() |
Map<String,Tap> |
getSources()
Method getSources returns the sources of this Flow object.
|
Collection<Tap> |
getSourcesCollection()
Method getSourcesCollection returns a
Collection of source Tap s for this Flow object. |
UnitOfWorkSpawnStrategy |
getSpawnStrategy() |
FlowStats |
getStats() |
int |
getSubmitPriority()
Method getSubmitPriority returns the submitPriority of this Flow object.
|
String |
getTags() |
protected long |
getTotalSliceCPUMilliSeconds() |
List<String> |
getTrapNames() |
Map<String,Tap> |
getTraps()
Method getTraps returns the traps of this Flow object.
|
Collection<Tap> |
getTrapsCollection()
Method getTrapsCollection returns a
Collection of trap Tap s for this Flow object. |
protected void |
handleExecutorShutdown() |
boolean |
hasListeners()
Method hasListeners returns true if
FlowListener instances have been registered. |
boolean |
hasStepListeners()
Method hasStepListeners returns true if
FlowStepListener instances have been registered
with any of the FlowStep s belonging to this instance |
protected abstract void |
initConfig(Map<Object,Object> properties,
Config parentConfig)
This method creates a new internal Config with the parentConfig as defaults using the properties to override
the defaults.
|
protected void |
initFromProperties(Map<Object,Object> properties) |
void |
initialize(FlowElementGraph flowElementGraph,
FlowStepGraph flowStepGraph) |
protected void |
initializeChildStats() |
protected void |
initializeNewJobsMap() |
protected void |
initSteps() |
protected abstract void |
internalClean(boolean stop) |
protected abstract void |
internalShutdown() |
protected abstract void |
internalStart() |
protected void |
internalStopAllJobs() |
boolean |
isDebugEnabled() |
boolean |
isInfoEnabled() |
boolean |
isSkipFlow()
Method isSkipFlow returns true if the parent
Cascade should skip this Flow instance. |
boolean |
isStopJobsOnExit()
Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object.
|
void |
logDebug(String message,
Object... arguments) |
void |
logError(String message,
Object... arguments) |
void |
logError(String message,
Throwable throwable) |
void |
logInfo(String message,
Object... arguments) |
void |
logWarn(String message) |
void |
logWarn(String message,
Object... arguments) |
void |
logWarn(String message,
Throwable throwable) |
protected abstract Config |
newConfig(Config defaultConfig) |
TupleEntryIterator |
openSink()
Method openSink opens the first sink Tap.
|
TupleEntryIterator |
openSink(String name)
Method openSink opens the named sink Tap.
|
TupleEntryIterator |
openSource()
Method openSource opens the first source Tap.
|
TupleEntryIterator |
openSource(String name)
Method openSource opens the named source Tap.
|
TupleEntryIterator |
openTapForRead(Tap tap)
Method openTapForRead return a
TupleEntryIterator for the given Tap instance. |
TupleEntryCollector |
openTapForWrite(Tap tap)
Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
|
TupleEntryIterator |
openTrap()
Method openTrap opens the first trap Tap.
|
TupleEntryIterator |
openTrap(String name)
Method openTrap opens the named trap Tap.
|
void |
prepare()
Method prepare is used by a
Cascade to notify the given Flow it should initialize or clear any resources
necessary for Flow.start() to be called successfully. |
protected void |
presentSinkFields(FlowElementGraph pipeGraph)
Present the current resolved fields for the Tap
|
protected void |
presentSourceFields(FlowElementGraph pipeGraph)
Present the current resolved fields for the Tap
|
protected void |
registerShutdownHook() |
boolean |
removeListener(FlowListener flowListener)
Method removeListener removes the given flowListener from this instance.
|
boolean |
removeStepListener(FlowStepListener flowStepListener)
Method removeStepListener removes the given flowStepListener from this instance.
|
boolean |
resourceExists(Tap tap)
Method resourceExists returns true if the resource represented by the given Tap instance exists.
|
protected void |
retrieveSinkFields()
Force a Scheme to fetch any fields from a meta-data store
|
protected void |
retrieveSourceFields()
Force a Scheme to fetch any fields from a meta-data store
|
void |
setCascade(Cascade cascade) |
protected void |
setCheckpoints(Map<String,Tap> checkpoints) |
protected abstract void |
setConfigProperty(Config config,
Object key,
Object value) |
FlowSkipStrategy |
setFlowSkipStrategy(FlowSkipStrategy flowSkipStrategy)
Method setFlowSkipStrategy sets a new
FlowSkipStrategy , the current strategy is returned. |
protected void |
setFlowStepGraph(FlowStepGraph flowStepGraph) |
void |
setFlowStepStrategy(FlowStepStrategy flowStepStrategy)
Sets a default
FlowStepStrategy instance. |
protected void |
setName(String name) |
void |
setPlannerInfo(PlannerInfo plannerInfo) |
protected void |
setSinks(Map<String,Tap> sinks) |
protected void |
setSources(Map<String,Tap> sources) |
void |
setSpawnStrategy(UnitOfWorkSpawnStrategy spawnStrategy) |
void |
setSubmitPriority(int submitPriority)
Method setSubmitPriority sets the submitPriority of this Flow object.
|
protected void |
setTraps(Map<String,Tap> traps) |
void |
start()
Method start begins the execution of this Flow instance.
|
void |
stop()
Method stop stops all running jobs, killing any currently executing.
|
String |
toString() |
FlowElementGraph |
updateSchemes(FlowElementGraph pipeGraph) |
void |
writeDOT(String filename)
Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package.
|
void |
writeStepsDOT(String filename)
Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getConfig, getConfigAsProperties, getConfigCopy, getFlowProcess, getProperty, stepsAreLocal
protected PlatformInfo platformInfo
protected boolean stopJobsOnExit
protected FlowStepGraph flowStepGraph
protected boolean stop
protected String flowCanonicalHash
protected FlowElementGraph flowElementGraph
protected ShutdownUtil.Hook shutdownHook
protected HashMap<String,String> flowDescriptor
protected BaseFlow()
protected BaseFlow(PlatformInfo platformInfo, String name)
name
- protected BaseFlow(PlatformInfo platformInfo, Map<Object,Object> properties, Config defaultConfig, String name, Map<String,String> flowDescriptor)
public void setPlannerInfo(PlannerInfo plannerInfo)
public PlannerInfo getPlannerInfo()
Flow
getPlannerInfo
in interface Flow<Config>
public PlatformInfo getPlatformInfo()
Flow
getPlatformInfo
in interface Flow<Config>
public void initialize(FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph)
public FlowElementGraph updateSchemes(FlowElementGraph pipeGraph)
protected void retrieveSourceFields()
protected void presentSourceFields(FlowElementGraph pipeGraph)
pipeGraph
- protected void retrieveSinkFields()
protected void presentSinkFields(FlowElementGraph pipeGraph)
pipeGraph
- protected Fields getFieldsFor(FlowElementGraph pipeGraph, Tap tap)
protected void addSessionProperties(Map<Object,Object> properties)
protected void addPlannerProperties()
protected FlowStats createPrepareFlowStats()
protected FlowStats createFlowStats()
public CascadingServices getCascadingServices()
protected ClientState getClientState()
protected void initSteps()
public String getName()
Flow
public String getID()
Flow
public int getSubmitPriority()
Flow
getSubmitPriority
in interface Flow<Config>
public void setSubmitPriority(int submitPriority)
Flow
setSubmitPriority
in interface Flow<Config>
submitPriority
- the submitPriority of this FlowStep object.public String getFlowCanonicalHash()
protected String createFlowCanonicalHash(FlowElementGraph flowElementGraph)
protected void setSources(Map<String,Tap> sources)
protected void setCheckpoints(Map<String,Tap> checkpoints)
protected void setFlowStepGraph(FlowStepGraph flowStepGraph)
protected abstract void initConfig(Map<Object,Object> properties, Config parentConfig)
properties
- of type MapparentConfig
- of type Configprotected abstract void setConfigProperty(Config config, Object key, Object value)
protected void initFromProperties(Map<Object,Object> properties)
public FlowSession getFlowSession()
public FlowStats getFlowStats()
Flow
getFlowStats
in interface Flow<Config>
public Map<String,String> getFlowDescriptor()
Flow
FlowDef.addDescription(String, String)
to set values on a given Flow.
Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
For known description types, see FlowDescriptors
.getFlowDescriptor
in interface Flow<Config>
public FlowStats getStats()
getStats
in interface UnitOfWork<FlowStats>
public boolean hasListeners()
Flow
FlowListener
instances have been registered.hasListeners
in interface Flow<Config>
public void addListener(FlowListener flowListener)
Flow
addListener
in interface Flow<Config>
flowListener
- of type FlowListenerpublic boolean removeListener(FlowListener flowListener)
Flow
removeListener
in interface Flow<Config>
flowListener
- of type FlowListenerpublic boolean hasStepListeners()
Flow
FlowStepListener
instances have been registered
with any of the FlowStep
s belonging to this instancehasStepListeners
in interface Flow<Config>
public void addStepListener(FlowStepListener flowStepListener)
Flow
addStepListener
in interface Flow<Config>
flowStepListener
- of type addStepListenerpublic boolean removeStepListener(FlowStepListener flowStepListener)
Flow
removeStepListener
in interface Flow<Config>
flowStepListener
- of type FlowStepListenerFlowStep
belonging to this instancepublic Map<String,Tap> getSources()
Flow
getSources
in interface Flow<Config>
public List<String> getSourceNames()
getSourceNames
in interface Flow<Config>
@DependencyIncoming public Collection<Tap> getSourcesCollection()
Flow
Collection
of source Tap
s for this Flow object.getSourcesCollection
in interface Flow<Config>
public Map<String,Tap> getSinks()
Flow
public List<String> getSinkNames()
getSinkNames
in interface Flow<Config>
@DependencyOutgoing public Collection<Tap> getSinksCollection()
Flow
Collection
of sink Tap
s for this Flow object.getSinksCollection
in interface Flow<Config>
public Tap getSink()
Flow
public Map<String,Tap> getTraps()
Flow
public List<String> getTrapNames()
getTrapNames
in interface Flow<Config>
public Collection<Tap> getTrapsCollection()
Flow
Collection
of trap Tap
s for this Flow object.getTrapsCollection
in interface Flow<Config>
public Map<String,Tap> getCheckpoints()
Flow
getCheckpoints
in interface Flow<Config>
public List<String> getCheckpointNames()
getCheckpointNames
in interface Flow<Config>
public Collection<Tap> getCheckpointsCollection()
Flow
Collection
of checkpoint Tap
s for this Flow object.getCheckpointsCollection
in interface Flow<Config>
public boolean isStopJobsOnExit()
Flow
true
.isStopJobsOnExit
in interface Flow<Config>
public FlowSkipStrategy getFlowSkipStrategy()
Flow
FlowSkipStrategy
used by this Flow.getFlowSkipStrategy
in interface Flow<Config>
public FlowSkipStrategy setFlowSkipStrategy(FlowSkipStrategy flowSkipStrategy)
Flow
FlowSkipStrategy
, the current strategy is returned.
FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is FlowSkipIfSinkNotStale
.
An alternative strategy would be FlowSkipIfSinkExists
.
A FlowSkipStrategy will not be consulted when executing a Flow directly through Flow.start()
or Flow.complete()
. Only
when the Flow is executed through a Cascade
instance.setFlowSkipStrategy
in interface Flow<Config>
flowSkipStrategy
- of type FlowSkipStrategypublic boolean isSkipFlow() throws IOException
Flow
Cascade
should skip this Flow instance. True is returned
if the current FlowSkipStrategy
returns true.isSkipFlow
in interface Flow<Config>
IOException
- whenpublic boolean areSinksStale() throws IOException
Flow
Tap.isReplace()
returns true.areSinksStale
in interface Flow<Config>
IOException
- whenpublic boolean areSourcesNewer(long sinkModified) throws IOException
Flow
areSourcesNewer
in interface Flow<Config>
sinkModified
- of type longIOException
- whenpublic long getSinkModified() throws IOException
Flow
Tap
managed by this Flow instance.
If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned,
atleast one of the sinks are marked for delete (returns true
).getSinkModified
in interface Flow<Config>
IOException
- whenpublic FlowStepStrategy getFlowStepStrategy()
Flow
FlowStepStrategy
instance.getFlowStepStrategy
in interface Flow<Config>
public void setFlowStepStrategy(FlowStepStrategy flowStepStrategy)
Flow
FlowStepStrategy
instance.
Use a FlowStepStrategy to change FlowStep
configuration properties
before the properties are submitted to the underlying platform for the step
unit of work.setFlowStepStrategy
in interface Flow<Config>
flowStepStrategy
- The FlowStepStrategy to use.public List<FlowStep<Config>> getFlowSteps()
Flow
getFlowSteps
in interface Flow<Config>
@ProcessPrepare public void prepare()
Flow
Cascade
to notify the given Flow it should initialize or clear any resources
necessary for Flow.start()
to be called successfully.
Specifically, this implementation calls deleteSinksIfNotUpdate()
&& deleteTrapsIfNotUpdate()
.@ProcessStart public void start()
Flow
Flow.complete()
to block until this Flow completes.protected Thread createFlowThread(String threadName)
protected abstract void internalStart()
@ProcessStop public void stop()
Flow
protected abstract void internalClean(boolean stop)
@ProcessComplete public void complete()
Flow
@ProcessCleanup public void cleanup()
public TupleEntryIterator openSource() throws IOException
Flow
openSource
in interface Flow<Config>
IOException
- whenpublic TupleEntryIterator openSource(String name) throws IOException
Flow
openSource
in interface Flow<Config>
name
- of type StringIOException
- whenpublic TupleEntryIterator openSink() throws IOException
Flow
openSink
in interface Flow<Config>
IOException
- whenpublic TupleEntryIterator openSink(String name) throws IOException
Flow
openSink
in interface Flow<Config>
name
- of type StringIOException
- whenpublic TupleEntryIterator openTrap() throws IOException
Flow
openTrap
in interface Flow<Config>
IOException
- whenpublic TupleEntryIterator openTrap(String name) throws IOException
Flow
openTrap
in interface Flow<Config>
name
- of type StringIOException
- whenpublic void deleteSinks() throws IOException
SinkMode.UPDATE
.
Use with caution.IOException
- whendeleteSinksIfNotUpdate()
public void deleteSinksIfNotUpdate() throws IOException
SinkMode.UPDATE
flag.
Typically used by a Cascade
before executing the flow if the sinks are stale.
Use with caution.IOException
- whenpublic void deleteSinksIfReplace() throws IOException
IOException
public void deleteTrapsIfNotUpdate() throws IOException
IOException
public void deleteCheckpointsIfNotUpdate() throws IOException
IOException
public void deleteTrapsIfReplace() throws IOException
IOException
public void deleteCheckpointsIfReplace() throws IOException
IOException
public boolean resourceExists(Tap tap) throws IOException
Flow
resourceExists
in interface Flow<Config>
tap
- of type TapIOException
- whenpublic TupleEntryIterator openTapForRead(Tap tap) throws IOException
Flow
TupleEntryIterator
for the given Tap instance.
Note the returned iterator will return the same instance of TupleEntry
on every call,
thus a copy must be made of either the TupleEntry or the underlying Tuple
instance if they are to be
stored in a Collection.openTapForRead
in interface Flow<Config>
tap
- of type TapIOException
- when there is an error opening the resourcepublic TupleEntryCollector openTapForWrite(Tap tap) throws IOException
Flow
openTapForWrite
in interface Flow<Config>
tap
- of type TapIOException
- when there is an error opening the resourceprotected long getTotalSliceCPUMilliSeconds()
protected abstract int getMaxNumParallelSteps()
protected abstract void internalShutdown()
protected void initializeNewJobsMap()
protected void initializeChildStats()
protected void internalStopAllJobs()
protected void handleExecutorShutdown()
protected void fireOnCompleted()
protected void fireOnThrowable(Throwable throwable)
protected void fireOnThrowable()
protected void fireOnStopping()
protected void fireOnStarting()
public final boolean isInfoEnabled()
isInfoEnabled
in interface ProcessLogger
public final boolean isDebugEnabled()
isDebugEnabled
in interface ProcessLogger
public void logInfo(String message, Object... arguments)
logInfo
in interface ProcessLogger
public void logDebug(String message, Object... arguments)
logDebug
in interface ProcessLogger
public void logWarn(String message)
logWarn
in interface ProcessLogger
public void logWarn(String message, Throwable throwable)
logWarn
in interface ProcessLogger
public void logWarn(String message, Object... arguments)
logWarn
in interface ProcessLogger
public void logError(String message, Object... arguments)
logError
in interface ProcessLogger
public void logError(String message, Throwable throwable)
logError
in interface ProcessLogger
public void writeDOT(String filename)
Flow
public void writeStepsDOT(String filename)
Flow
writeStepsDOT
in interface Flow<Config>
filename
- of type Stringpublic BaseFlow.FlowHolder getHolder()
public void setCascade(Cascade cascade)
public String getCascadeID()
Flow
getCascadeID
in interface Flow<Config>
public String getRunID()
Flow
public List<String> getClassPath()
public void setSpawnStrategy(UnitOfWorkSpawnStrategy spawnStrategy)
setSpawnStrategy
in interface UnitOfWork<FlowStats>
public UnitOfWorkSpawnStrategy getSpawnStrategy()
getSpawnStrategy
in interface UnitOfWork<FlowStats>
protected void registerShutdownHook()
Copyright © 2007-2015 Concurrent, Inc. All Rights Reserved.