001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.local.planner;
022    
023    import java.util.ArrayList;
024    import java.util.Collection;
025    import java.util.List;
026    import java.util.Properties;
027    import java.util.concurrent.Callable;
028    import java.util.concurrent.ExecutorService;
029    import java.util.concurrent.Executors;
030    import java.util.concurrent.Future;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.flow.local.LocalFlowStep;
034    import cascading.flow.local.stream.LocalStepStreamGraph;
035    import cascading.flow.stream.Duct;
036    import cascading.flow.stream.StreamGraph;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     *
042     */
043    public class LocalStepRunner implements Callable<Throwable>
044      {
045      private static final Logger LOG = LoggerFactory.getLogger( LocalStepRunner.class );
046    
047      private final FlowProcess<Properties> flowProcess;
048    
049      private boolean complete = false;
050      private boolean successful = false;
051    
052      private final StreamGraph graph;
053      private final Collection<Duct> heads;
054      private Throwable throwable = null;
055    
056      public LocalStepRunner( FlowProcess<Properties> flowProcess, LocalFlowStep step )
057        {
058        this.flowProcess = flowProcess;
059        this.graph = new LocalStepStreamGraph( this.flowProcess, step );
060        this.heads = graph.getHeads();
061        }
062    
063      public FlowProcess<Properties> getFlowProcess()
064        {
065        return flowProcess;
066        }
067    
068      public boolean isComplete()
069        {
070        return complete;
071        }
072    
073      public boolean isSuccessful()
074        {
075        return successful;
076        }
077    
078      public Throwable getThrowable()
079        {
080        return throwable;
081        }
082    
083      @Override
084      public Throwable call() throws Exception
085        {
086        boolean attemptedCleanup = false;
087    
088        try
089          {
090          try
091            {
092            graph.prepare();
093            }
094          catch( Throwable currentThrowable )
095            {
096            if( !( currentThrowable instanceof OutOfMemoryError ) )
097              LOG.error( "unable to prepare operation graph", currentThrowable );
098    
099            complete = true;
100            successful = false;
101            throwable = currentThrowable;
102    
103            return throwable;
104            }
105    
106          try
107            {
108            List<Future<Throwable>> futures = spawnHeads();
109    
110            for( Future<Throwable> future : futures )
111              {
112              throwable = future.get();
113    
114              if( throwable != null )
115                break;
116              }
117            }
118          catch( Throwable currentThrowable )
119            {
120            if( !( currentThrowable instanceof OutOfMemoryError ) )
121              LOG.error( "unable to complete step", currentThrowable );
122    
123            throwable = currentThrowable;
124            }
125    
126          try
127            {
128            attemptedCleanup = true; // set so we don't try again regardless
129    
130            if( !( throwable instanceof OutOfMemoryError ) )
131              graph.cleanup();
132            }
133          catch( Throwable currentThrowable )
134            {
135            if( !( currentThrowable instanceof OutOfMemoryError ) )
136              LOG.error( "unable to cleanup operation graph", currentThrowable );
137    
138            if( throwable == null )
139              throwable = currentThrowable;
140            }
141    
142          complete = true;
143          successful = throwable == null;
144    
145          return throwable;
146          }
147        finally
148          {
149          try
150            {
151            if( !attemptedCleanup )
152              graph.cleanup();
153            }
154          catch( Throwable currentThrowable )
155            {
156            if( !( currentThrowable instanceof OutOfMemoryError ) )
157              LOG.error( "unable to cleanup operation graph", currentThrowable );
158    
159            if( throwable == null )
160              throwable = currentThrowable;
161    
162            successful = false;
163            }
164          }
165        }
166    
167      private List<Future<Throwable>> spawnHeads()
168        {
169        // todo: consider a CyclicBarrier to syn all threads after the openForRead
170        // todo: should find all Callable Ducts and spawn them, group ducts may run on a timer etc
171        ExecutorService executors = Executors.newFixedThreadPool( heads.size() );
172        List<Future<Throwable>> futures = new ArrayList<Future<Throwable>>();
173    
174        for( Duct head : heads )
175          futures.add( executors.submit( (Callable) head ) );
176    
177        executors.shutdown();
178    
179        return futures;
180        }
181      }