001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local.planner; 022 023 import java.io.IOException; 024 import java.util.Properties; 025 import java.util.concurrent.ExecutorService; 026 import java.util.concurrent.Executors; 027 import java.util.concurrent.Future; 028 029 import cascading.flow.local.LocalFlowProcess; 030 import cascading.flow.local.LocalFlowStep; 031 import cascading.flow.planner.FlowStepJob; 032 import cascading.management.state.ClientState; 033 import cascading.stats.FlowStepStats; 034 import cascading.stats.local.LocalStepStats; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * 040 */ 041 public class LocalFlowStepJob extends FlowStepJob<Properties> 042 { 043 private static final Logger LOG = LoggerFactory.getLogger( LocalFlowStepJob.class ); 044 045 private final LocalStepRunner stackRunner; 046 private Future<Throwable> future; 047 048 public LocalFlowStepJob( ClientState clientState, LocalFlowProcess flowProcess, LocalFlowStep flowStep ) 049 { 050 super( clientState, flowStep, 200, 1000 ); 051 flowProcess.setStepStats( (LocalStepStats) this.flowStepStats ); 052 this.stackRunner = new LocalStepRunner( flowProcess, flowStep ); 053 } 054 055 @Override 056 public Properties getConfig() 057 { 058 return flowStep.getConfig(); 059 } 060 061 @Override 062 protected FlowStepStats createStepStats( ClientState clientState ) 063 { 064 return new LocalStepStats( flowStep, clientState ); 065 } 066 067 @Override 068 protected boolean isRemoteExecution() 069 { 070 return false; 071 } 072 073 @Override 074 protected String internalJobId() 075 { 076 return "flow"; 077 } 078 079 @Override 080 protected void internalNonBlockingStart() throws IOException 081 { 082 ExecutorService executors = Executors.newFixedThreadPool( 1 ); 083 084 future = executors.submit( stackRunner ); 085 086 executors.shutdown(); 087 } 088 089 @Override 090 protected boolean internalIsStarted() 091 { 092 return future != null; 093 } 094 095 @Override 096 protected boolean internalNonBlockingIsComplete() throws IOException 097 { 098 return stackRunner.isComplete(); 099 } 100 101 @Override 102 protected Throwable getThrowable() 103 { 104 return stackRunner.getThrowable(); 105 } 106 107 @Override 108 protected boolean internalNonBlockingIsSuccessful() throws IOException 109 { 110 return stackRunner.isSuccessful(); 111 } 112 113 @Override 114 protected void internalBlockOnStop() throws IOException 115 { 116 } 117 118 @Override 119 protected void dumpDebugInfo() 120 { 121 } 122 }