001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.local.stream.duct; 022 023import java.lang.ref.WeakReference; 024import java.lang.reflect.UndeclaredThrowableException; 025import java.util.ArrayList; 026import java.util.concurrent.Callable; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.LinkedBlockingQueue; 033 034import cascading.flow.stream.duct.Duct; 035import cascading.flow.stream.duct.Fork; 036import cascading.tuple.TupleEntry; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * This "Fork" avoids a possible deadlock in Fork-and-Join scenarios by running downstream edges into parallel threads. 042 */ 043public class ParallelFork<Outgoing> extends Fork<TupleEntry, Outgoing> 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( ParallelFork.class ); 046 047 abstract static class Message 048 { 049 final protected Duct previous; 050 051 public Message( Duct previous ) 052 { 053 this.previous = previous; 054 } 055 056 abstract public void passOn( Duct next ); 057 058 abstract public boolean isTermination(); 059 } 060 061 static final class StartMessage extends Message 062 { 063 final CountDownLatch startLatch; 064 065 public StartMessage( Duct previous, CountDownLatch startLatch ) 066 { 067 super( previous ); 068 this.startLatch = startLatch; 069 } 070 071 public void passOn( Duct next ) 072 { 073 startLatch.countDown(); 074 next.start( previous ); 075 } 076 077 public boolean isTermination() 078 { 079 return false; 080 } 081 } 082 083 static final class ReceiveMessage extends Message 084 { 085 final TupleEntry tuple; 086 087 public ReceiveMessage( Duct previous, TupleEntry tuple ) 088 { 089 super( previous ); 090 091 // we make a new copy right here, to avoid cross-thread trouble when upstream changes the tuple 092 this.tuple = new TupleEntry( tuple ); 093 } 094 095 public void passOn( Duct next ) 096 { 097 next.receive( previous, tuple ); 098 } 099 100 public boolean isTermination() 101 { 102 return false; 103 } 104 } 105 106 static final class CompleteMessage extends Message 107 { 108 public CompleteMessage( Duct previous ) 109 { 110 super( previous ); 111 } 112 113 public void passOn( Duct next ) 114 { 115 next.complete( previous ); 116 } 117 118 public boolean isTermination() 119 { 120 return true; 121 } 122 } 123 124 private final ArrayList<LinkedBlockingQueue<Message>> buffers; 125 private final ExecutorService executor; 126 private final ArrayList<Callable<Throwable>> actions; 127 private final ArrayList<Future<Throwable>> futures; 128 129 public ParallelFork( Duct[] allNext ) 130 { 131 super( allNext ); 132 133 // Obvious choices for nThread in newFixedThreadPool: 134 // nThreads = allNext.length. Potential to create a lot of thread-thrashing on machines with few cores, but 135 // the OS scheduler should ensure any executable thread gets a chance to proceed (and possibly 136 // complete, enabling others down a Local*Gate to proceed) 137 // nThreads = #of CPU. Would work, possibly by chance as long as #of CPU is "big enough" (see below) 138 // nThreads=1 : "parallel" is parallel with respect to upstream. This could work sometimes, but will still 139 // deadlock in the Fork-CoGroup-HashJoin scenario, as the other side of join could still be starved by 140 // one side not completing. 141 // nThreads = max(# of pipes merged into a CoGroup or HashJoin downstream from here). This is the minimum 142 // required to guarantee one side can't starve another. It COULD probably be queried from the flow graph, 143 // factoring in for all potential combinations... 144 // 145 // Therefore, the easy safe choice is to take allNext.length. 146 // 147 this.executor = Executors.newFixedThreadPool( allNext.length ); 148 149 ArrayList<LinkedBlockingQueue<Message>> buffers = new ArrayList<>( allNext.length ); 150 ArrayList<Future<Throwable>> futures = new ArrayList<>( allNext.length ); 151 ArrayList<Callable<Throwable>> actions = new ArrayList<>( allNext.length ); 152 153 for( final Duct anAllNext : allNext ) 154 { 155 final LinkedBlockingQueue<Message> queue = new LinkedBlockingQueue<>(); 156 157 buffers.add( queue ); 158 Callable<Throwable> action = new Callable<Throwable>() 159 { 160 @Override 161 public Throwable call() throws Exception 162 { 163 try 164 { 165 while( true ) 166 { 167 Message message = queue.take(); 168 message.passOn( anAllNext ); 169 if( message.isTermination() ) 170 { 171 return null; 172 } 173 } 174 } 175 catch( Throwable throwable ) 176 { 177 return throwable; 178 } 179 } 180 }; 181 182 actions.add( action ); 183 } 184 185 this.buffers = buffers; 186 this.actions = actions; 187 this.futures = futures; 188 } 189 190 @Override 191 public void initialize() 192 { 193 super.initialize(); 194 } 195 196 private void broadcastMessage( Message message ) 197 { 198 for( LinkedBlockingQueue<Message> queue : buffers ) 199 { 200 queue.offer( message ); 201 } 202 } 203 204 private WeakReference<Duct> started = null; 205 206 @Override 207 public void start( Duct previous ) 208 { 209 LOG.debug( "StartMessage {} BEGIN", previous ); 210 211 synchronized( this ) 212 { 213 if( started != null ) 214 { 215 LOG.error( "ParallelFork already started! former previous={}, new previous={}", started.get(), previous ); 216 return; 217 } 218 if( completed != null ) 219 { 220 throw new IllegalStateException( "cannot start an already completed ParallelFork" ); 221 } 222 223 started = new WeakReference<>( previous ); 224 } 225 226 try 227 { 228 for( Callable<Throwable> action : actions ) 229 { 230 Future<Throwable> future = executor.submit( action ); 231 futures.add( future ); 232 } 233 234 CountDownLatch startLatch = new CountDownLatch( allNext.length ); 235 broadcastMessage( new StartMessage( previous, startLatch ) ); 236 237 startLatch.await(); // wait for all threads to have started 238 } 239 catch( InterruptedException iex ) 240 { 241 throw new UndeclaredThrowableException( iex ); 242 } 243 } 244 245 @Override 246 public void receive( Duct previous, TupleEntry incoming ) 247 { 248 // incoming is copied once for each downstream pipe, within the current thread. 249 broadcastMessage( new ReceiveMessage( previous, incoming ) ); 250 } 251 252 private WeakReference<Duct> completed = null; /* records origin duct */ 253 254 @Override 255 public void complete( Duct previous ) 256 { 257 synchronized( this ) 258 { 259 if( completed != null ) 260 { 261 LOG.error( "ParallelFork already complete! former previous={} new previous={}", completed.get(), previous ); 262 return; 263 } 264 completed = new WeakReference<>( previous ); 265 } 266 267 // the CompleteMessage will cause the downstream threads to complete 268 broadcastMessage( new CompleteMessage( previous ) ); 269 270 try 271 { 272 for( Future<Throwable> future : futures ) 273 { 274 Throwable throwable; 275 try 276 { 277 throwable = future.get(); 278 } 279 catch( InterruptedException iex ) 280 { 281 throwable = iex; 282 } 283 catch( ExecutionException cex ) 284 { 285 throwable = cex; 286 } 287 288 if( throwable != null ) 289 { 290 throw new RuntimeException( throwable ); 291 } 292 } 293 } 294 finally 295 { 296 executor.shutdown(); 297 } 298 } 299 }