001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.local.planner;
022    
023    import java.io.IOException;
024    import java.util.Properties;
025    import java.util.concurrent.ExecutorService;
026    import java.util.concurrent.Executors;
027    import java.util.concurrent.Future;
028    
029    import cascading.flow.local.LocalFlowProcess;
030    import cascading.flow.local.LocalFlowStep;
031    import cascading.flow.planner.FlowStepJob;
032    import cascading.management.state.ClientState;
033    import cascading.stats.FlowStepStats;
034    import cascading.stats.local.LocalStepStats;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     *
040     */
041    public class LocalFlowStepJob extends FlowStepJob<Properties>
042      {
043      private static final Logger LOG = LoggerFactory.getLogger( LocalFlowStepJob.class );
044    
045      private final LocalStepRunner stackRunner;
046      private Future<Throwable> future;
047    
048      public LocalFlowStepJob( ClientState clientState, LocalFlowProcess flowProcess, LocalFlowStep flowStep )
049        {
050        super( clientState, flowStep, 200, 1000 );
051        flowProcess.setStepStats( (LocalStepStats) this.flowStepStats );
052        this.stackRunner = new LocalStepRunner( flowProcess, flowStep );
053        }
054    
055      @Override
056      public Properties getConfig()
057        {
058        return flowStep.getConfig();
059        }
060    
061      @Override
062      protected FlowStepStats createStepStats( ClientState clientState )
063        {
064        return new LocalStepStats( flowStep, clientState );
065        }
066    
067      @Override
068      protected boolean isRemoteExecution()
069        {
070        return false;
071        }
072    
073      @Override
074      protected String internalJobId()
075        {
076        return "flow";
077        }
078    
079      @Override
080      protected void internalNonBlockingStart() throws IOException
081        {
082        ExecutorService executors = Executors.newFixedThreadPool( 1 );
083    
084        future = executors.submit( stackRunner );
085    
086        executors.shutdown();
087        }
088    
089      @Override
090      protected boolean internalIsStarted()
091        {
092        return future != null;
093        }
094    
095      @Override
096      protected boolean internalNonBlockingIsComplete() throws IOException
097        {
098        return stackRunner.isComplete();
099        }
100    
101      @Override
102      protected Throwable getThrowable()
103        {
104        return stackRunner.getThrowable();
105        }
106    
107      @Override
108      protected boolean internalNonBlockingIsSuccessful() throws IOException
109        {
110        return stackRunner.isSuccessful();
111        }
112    
113      @Override
114      protected void internalBlockOnStop() throws IOException
115        {
116        }
117    
118      @Override
119      protected void dumpDebugInfo()
120        {
121        }
122      }