001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local.stream.duct;
022
023import java.lang.ref.WeakReference;
024import java.lang.reflect.UndeclaredThrowableException;
025import java.util.ArrayList;
026import java.util.concurrent.Callable;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.LinkedBlockingQueue;
033
034import cascading.flow.stream.duct.Duct;
035import cascading.flow.stream.duct.Fork;
036import cascading.tuple.TupleEntry;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * This "Fork" avoids a possible deadlock in Fork-and-Join scenarios by running downstream edges into parallel threads.
042 */
043public class ParallelFork<Outgoing> extends Fork<TupleEntry, Outgoing>
044  {
045  private static final Logger LOG = LoggerFactory.getLogger( ParallelFork.class );
046
047  abstract static class Message
048    {
049    final protected Duct previous;
050
051    public Message( Duct previous )
052      {
053      this.previous = previous;
054      }
055
056    abstract public void passOn( Duct next );
057
058    abstract public boolean isTermination();
059    }
060
061  static final class StartMessage extends Message
062    {
063    final CountDownLatch startLatch;
064
065    public StartMessage( Duct previous, CountDownLatch startLatch )
066      {
067      super( previous );
068      this.startLatch = startLatch;
069      }
070
071    public void passOn( Duct next )
072      {
073      startLatch.countDown();
074      next.start( previous );
075      }
076
077    public boolean isTermination()
078      {
079      return false;
080      }
081    }
082
083  static final class ReceiveMessage extends Message
084    {
085    final TupleEntry tuple;
086
087    public ReceiveMessage( Duct previous, TupleEntry tuple )
088      {
089      super( previous );
090
091      // we make a new copy right here, to avoid cross-thread trouble when upstream changes the tuple
092      this.tuple = new TupleEntry( tuple );
093      }
094
095    public void passOn( Duct next )
096      {
097      next.receive( previous, tuple );
098      }
099
100    public boolean isTermination()
101      {
102      return false;
103      }
104    }
105
106  static final class CompleteMessage extends Message
107    {
108    public CompleteMessage( Duct previous )
109      {
110      super( previous );
111      }
112
113    public void passOn( Duct next )
114      {
115      next.complete( previous );
116      }
117
118    public boolean isTermination()
119      {
120      return true;
121      }
122    }
123
124  private final ArrayList<LinkedBlockingQueue<Message>> buffers;
125  private final ExecutorService executor;
126  private final ArrayList<Callable<Throwable>> actions;
127  private final ArrayList<Future<Throwable>> futures;
128
129  public ParallelFork( Duct[] allNext )
130    {
131    super( allNext );
132
133    // Obvious choices for nThread in newFixedThreadPool:
134    // nThreads = allNext.length. Potential to create a lot of thread-thrashing on machines with few cores, but
135    // the OS scheduler should ensure any executable thread gets a chance to proceed (and possibly
136    // complete, enabling others down a Local*Gate to proceed)
137    // nThreads = #of CPU. Would work, possibly by chance as long as #of CPU is "big enough" (see below)
138    // nThreads=1 : "parallel" is parallel with respect to upstream. This could work sometimes, but will still
139    // deadlock in the Fork-CoGroup-HashJoin scenario, as the other side of join could still be starved by
140    // one side not completing.
141    // nThreads = max(# of pipes merged into a CoGroup or HashJoin downstream from here). This is the minimum
142    // required to guarantee one side can't starve another. It COULD probably be queried from the flow graph,
143    // factoring in for all potential combinations...
144    //
145    // Therefore, the easy safe choice is to take allNext.length.
146    //
147    this.executor = Executors.newFixedThreadPool( allNext.length );
148
149    ArrayList<LinkedBlockingQueue<Message>> buffers = new ArrayList<>( allNext.length );
150    ArrayList<Future<Throwable>> futures = new ArrayList<>( allNext.length );
151    ArrayList<Callable<Throwable>> actions = new ArrayList<>( allNext.length );
152
153    for( final Duct anAllNext : allNext )
154      {
155      final LinkedBlockingQueue<Message> queue = new LinkedBlockingQueue<>();
156
157      buffers.add( queue );
158      Callable<Throwable> action = new Callable<Throwable>()
159        {
160        @Override
161        public Throwable call() throws Exception
162          {
163          try
164            {
165            while( true )
166              {
167              Message message = queue.take();
168              message.passOn( anAllNext );
169              if( message.isTermination() )
170                {
171                return null;
172                }
173              }
174            }
175          catch( Throwable throwable )
176            {
177            return throwable;
178            }
179          }
180        };
181
182      actions.add( action );
183      }
184
185    this.buffers = buffers;
186    this.actions = actions;
187    this.futures = futures;
188    }
189
190  @Override
191  public void initialize()
192    {
193    super.initialize();
194    }
195
196  private void broadcastMessage( Message message )
197    {
198    for( LinkedBlockingQueue<Message> queue : buffers )
199      {
200      queue.offer( message );
201      }
202    }
203
204  private WeakReference<Duct> started = null;
205
206  @Override
207  public void start( Duct previous )
208    {
209    LOG.debug( "StartMessage {} BEGIN", previous );
210
211    synchronized( this )
212      {
213      if( started != null )
214        {
215        LOG.error( "ParallelFork already started! former previous={}, new previous={}", started.get(), previous );
216        return;
217        }
218      if( completed != null )
219        {
220        throw new IllegalStateException( "cannot start an already completed ParallelFork" );
221        }
222
223      started = new WeakReference<>( previous );
224      }
225
226    try
227      {
228      for( Callable<Throwable> action : actions )
229        {
230        Future<Throwable> future = executor.submit( action );
231        futures.add( future );
232        }
233
234      CountDownLatch startLatch = new CountDownLatch( allNext.length );
235      broadcastMessage( new StartMessage( previous, startLatch ) );
236
237      startLatch.await(); // wait for all threads to have started
238      }
239    catch( InterruptedException iex )
240      {
241      throw new UndeclaredThrowableException( iex );
242      }
243    }
244
245  @Override
246  public void receive( Duct previous, TupleEntry incoming )
247    {
248    // incoming is copied once for each downstream pipe, within the current thread.
249    broadcastMessage( new ReceiveMessage( previous, incoming ) );
250    }
251
252  private WeakReference<Duct> completed = null; /* records origin duct */
253
254  @Override
255  public void complete( Duct previous )
256    {
257    synchronized( this )
258      {
259      if( completed != null )
260        {
261        LOG.error( "ParallelFork already complete! former previous={} new previous={}", completed.get(), previous );
262        return;
263        }
264      completed = new WeakReference<>( previous );
265      }
266
267    // the CompleteMessage will cause the downstream threads to complete
268    broadcastMessage( new CompleteMessage( previous ) );
269
270    try
271      {
272      for( Future<Throwable> future : futures )
273        {
274        Throwable throwable;
275        try
276          {
277          throwable = future.get();
278          }
279        catch( InterruptedException iex )
280          {
281          throwable = iex;
282          }
283        catch( ExecutionException cex )
284          {
285          throwable = cex;
286          }
287
288        if( throwable != null )
289          {
290          throw new RuntimeException( throwable );
291          }
292        }
293      }
294    finally
295      {
296      executor.shutdown();
297      }
298    }
299  }