001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.concurrent.CountDownLatch;
029
030import cascading.flow.FlowProcess;
031import cascading.flow.stream.duct.Duct;
032import cascading.flow.stream.graph.StreamGraph;
033import cascading.pipe.HashJoin;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 *
041 */
042public class MemoryHashJoinGate extends MemorySpliceGate
043  {
044  private static final Logger LOG = LoggerFactory.getLogger( MemoryHashJoinGate.class );
045
046  protected CountDownLatch latch;
047
048  private Collection<Tuple>[] collections;
049  private ArrayList<Tuple> streamedCollection;
050
051  public MemoryHashJoinGate( FlowProcess flowProcess, HashJoin join )
052    {
053    super( flowProcess, join );
054    }
055
056  @Override
057  public void bind( StreamGraph streamGraph )
058    {
059    super.bind( streamGraph );
060
061    count.set( numIncomingEventingPaths ); // the number of paths incoming
062    latch = new CountDownLatch( numIncomingEventingPaths - 1 );
063    }
064
065  @Override
066  public void prepare()
067    {
068    super.prepare();
069
070    streamedCollection = new ArrayList<Tuple>( Arrays.asList( new Tuple() ) ); // placeholder in collection
071    collections = new Collection[ getNumDeclaredIncomingBranches() ];
072    collections[ 0 ] = streamedCollection;
073
074    if( nullsAreNotEqual )
075      LOG.warn( "HashJoin does not fully support key comparators where null values are not treated equal" );
076    }
077
078  @Override
079  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
080    {
081    Tuple incomingTuple = ordinal != 0 ? incomingEntry.getTupleCopy() : incomingEntry.getTuple();
082    Tuple keyTuple = keyBuilder[ ordinal ].makeResult( incomingTuple, null ); // view in incomingTuple
083
084    keyTuple = getDelegatedTuple( keyTuple );
085
086    if( ordinal != 0 )
087      {
088      keys.add( keyTuple );
089      keyValues[ ordinal ].get( keyTuple ).add( incomingTuple ); // always a copy
090      return;
091      }
092
093    waitOnLatch();
094
095    keys.remove( keyTuple );
096
097    streamedCollection.set( 0, incomingTuple ); // no need to copy, temp setting
098
099    performJoinWith( keyTuple );
100    }
101
102  private void performJoinWith( Tuple keyTuple )
103    {
104    // never replace the first array, pos == 0
105    for( int i = 1; i < keyValues.length; i++ )
106      {
107      // if key does not exist, #get will create an empty array list,
108      // and store the key, which is not a copy
109      if( keyValues[ i ].containsKey( keyTuple ) )
110        collections[ i ] = keyValues[ i ].get( keyTuple );
111      else
112        collections[ i ] = Collections.EMPTY_LIST;
113      }
114
115    closure.reset( collections );
116
117    keyEntry.setTuple( keyTuple );
118    tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
119
120    next.receive( this, 0, grouping );
121    }
122
123  @Override
124  public void complete( Duct previous )
125    {
126    countDownLatch();
127
128    if( count.decrementAndGet() != 0 )
129      return;
130
131    collections[ 0 ] = Collections.EMPTY_LIST;
132
133    for( Tuple keyTuple : keys )
134      performJoinWith( keyTuple );
135
136    keys = createKeySet();
137    keyValues = createKeyValuesArray();
138
139    super.complete( previous );
140    }
141
142  protected void waitOnLatch()
143    {
144    try
145      {
146      latch.await();
147      }
148    catch( InterruptedException exception )
149      {
150      throw new RuntimeException( "interrupted", exception );
151      }
152    }
153
154  protected void countDownLatch()
155    {
156    latch.countDown();
157    }
158
159  @Override
160  protected boolean isBlockingStreamed()
161    {
162    return false;
163    }
164  }