001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local.planner; 022 023 import java.util.ArrayList; 024 import java.util.Collection; 025 import java.util.List; 026 import java.util.Properties; 027 import java.util.concurrent.Callable; 028 import java.util.concurrent.ExecutorService; 029 import java.util.concurrent.Executors; 030 import java.util.concurrent.Future; 031 032 import cascading.flow.FlowProcess; 033 import cascading.flow.local.LocalFlowStep; 034 import cascading.flow.local.stream.LocalStepStreamGraph; 035 import cascading.flow.stream.Duct; 036 import cascading.flow.stream.StreamGraph; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * 042 */ 043 public class LocalStepRunner implements Callable<Throwable> 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( LocalStepRunner.class ); 046 047 private final FlowProcess<Properties> flowProcess; 048 049 private boolean complete = false; 050 private boolean successful = false; 051 052 private final StreamGraph graph; 053 private final Collection<Duct> heads; 054 private Throwable throwable = null; 055 056 public LocalStepRunner( FlowProcess<Properties> flowProcess, LocalFlowStep step ) 057 { 058 this.flowProcess = flowProcess; 059 this.graph = new LocalStepStreamGraph( this.flowProcess, step ); 060 this.heads = graph.getHeads(); 061 } 062 063 public FlowProcess<Properties> getFlowProcess() 064 { 065 return flowProcess; 066 } 067 068 public boolean isComplete() 069 { 070 return complete; 071 } 072 073 public boolean isSuccessful() 074 { 075 return successful; 076 } 077 078 public Throwable getThrowable() 079 { 080 return throwable; 081 } 082 083 @Override 084 public Throwable call() throws Exception 085 { 086 boolean attemptedCleanup = false; 087 088 try 089 { 090 try 091 { 092 graph.prepare(); 093 } 094 catch( Throwable currentThrowable ) 095 { 096 if( !( currentThrowable instanceof OutOfMemoryError ) ) 097 LOG.error( "unable to prepare operation graph", currentThrowable ); 098 099 complete = true; 100 successful = false; 101 throwable = currentThrowable; 102 103 return throwable; 104 } 105 106 try 107 { 108 List<Future<Throwable>> futures = spawnHeads(); 109 110 for( Future<Throwable> future : futures ) 111 { 112 throwable = future.get(); 113 114 if( throwable != null ) 115 break; 116 } 117 } 118 catch( Throwable currentThrowable ) 119 { 120 if( !( currentThrowable instanceof OutOfMemoryError ) ) 121 LOG.error( "unable to complete step", currentThrowable ); 122 123 throwable = currentThrowable; 124 } 125 126 try 127 { 128 attemptedCleanup = true; // set so we don't try again regardless 129 130 if( !( throwable instanceof OutOfMemoryError ) ) 131 graph.cleanup(); 132 } 133 catch( Throwable currentThrowable ) 134 { 135 if( !( currentThrowable instanceof OutOfMemoryError ) ) 136 LOG.error( "unable to cleanup operation graph", currentThrowable ); 137 138 if( throwable == null ) 139 throwable = currentThrowable; 140 } 141 142 complete = true; 143 successful = throwable == null; 144 145 return throwable; 146 } 147 finally 148 { 149 try 150 { 151 if( !attemptedCleanup ) 152 graph.cleanup(); 153 } 154 catch( Throwable currentThrowable ) 155 { 156 if( !( currentThrowable instanceof OutOfMemoryError ) ) 157 LOG.error( "unable to cleanup operation graph", currentThrowable ); 158 159 if( throwable == null ) 160 throwable = currentThrowable; 161 162 successful = false; 163 } 164 } 165 } 166 167 private List<Future<Throwable>> spawnHeads() 168 { 169 // todo: consider a CyclicBarrier to syn all threads after the openForRead 170 // todo: should find all Callable Ducts and spawn them, group ducts may run on a timer etc 171 ExecutorService executors = Executors.newFixedThreadPool( heads.size() ); 172 List<Future<Throwable>> futures = new ArrayList<Future<Throwable>>(); 173 174 for( Duct head : heads ) 175 futures.add( executors.submit( (Callable) head ) ); 176 177 executors.shutdown(); 178 179 return futures; 180 } 181 }