001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.concurrent.CountDownLatch; 029 030import cascading.flow.FlowProcess; 031import cascading.flow.stream.duct.Duct; 032import cascading.flow.stream.graph.StreamGraph; 033import cascading.pipe.HashJoin; 034import cascading.tuple.Tuple; 035import cascading.tuple.TupleEntry; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * 041 */ 042public class MemoryHashJoinGate extends MemorySpliceGate 043 { 044 private static final Logger LOG = LoggerFactory.getLogger( MemoryHashJoinGate.class ); 045 046 protected CountDownLatch latch; 047 048 private Collection<Tuple>[] collections; 049 private ArrayList<Tuple> streamedCollection; 050 051 public MemoryHashJoinGate( FlowProcess flowProcess, HashJoin join ) 052 { 053 super( flowProcess, join ); 054 } 055 056 @Override 057 public void bind( StreamGraph streamGraph ) 058 { 059 super.bind( streamGraph ); 060 061 count.set( numIncomingEventingPaths ); // the number of paths incoming 062 latch = new CountDownLatch( numIncomingEventingPaths - 1 ); 063 } 064 065 @Override 066 public void prepare() 067 { 068 super.prepare(); 069 070 streamedCollection = new ArrayList<Tuple>( Arrays.asList( new Tuple() ) ); // placeholder in collection 071 collections = new Collection[ getNumDeclaredIncomingBranches() ]; 072 collections[ 0 ] = streamedCollection; 073 074 if( nullsAreNotEqual ) 075 LOG.warn( "HashJoin does not fully support key comparators where null values are not treated equal" ); 076 } 077 078 @Override 079 public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) 080 { 081 Tuple incomingTuple = ordinal != 0 ? incomingEntry.getTupleCopy() : incomingEntry.getTuple(); 082 Tuple keyTuple = keyBuilder[ ordinal ].makeResult( incomingTuple, null ); // view in incomingTuple 083 084 keyTuple = getDelegatedTuple( keyTuple ); 085 086 if( ordinal != 0 ) 087 { 088 keys.add( keyTuple ); 089 keyValues[ ordinal ].get( keyTuple ).add( incomingTuple ); // always a copy 090 return; 091 } 092 093 waitOnLatch(); 094 095 keys.remove( keyTuple ); 096 097 streamedCollection.set( 0, incomingTuple ); // no need to copy, temp setting 098 099 performJoinWith( keyTuple ); 100 } 101 102 private void performJoinWith( Tuple keyTuple ) 103 { 104 // never replace the first array, pos == 0 105 for( int i = 1; i < keyValues.length; i++ ) 106 { 107 // if key does not exist, #get will create an empty array list, 108 // and store the key, which is not a copy 109 if( keyValues[ i ].containsKey( keyTuple ) ) 110 collections[ i ] = keyValues[ i ].get( keyTuple ); 111 else 112 collections[ i ] = Collections.EMPTY_LIST; 113 } 114 115 closure.reset( collections ); 116 117 keyEntry.setTuple( keyTuple ); 118 tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) ); 119 120 next.receive( this, 0, grouping ); 121 } 122 123 @Override 124 public void complete( Duct previous ) 125 { 126 countDownLatch(); 127 128 if( count.decrementAndGet() != 0 ) 129 return; 130 131 collections[ 0 ] = Collections.EMPTY_LIST; 132 133 for( Tuple keyTuple : keys ) 134 performJoinWith( keyTuple ); 135 136 keys = createKeySet(); 137 keyValues = createKeyValuesArray(); 138 139 super.complete( previous ); 140 } 141 142 protected void waitOnLatch() 143 { 144 try 145 { 146 latch.await(); 147 } 148 catch( InterruptedException exception ) 149 { 150 throw new RuntimeException( "interrupted", exception ); 151 } 152 } 153 154 protected void countDownLatch() 155 { 156 latch.countDown(); 157 } 158 159 @Override 160 protected boolean isBlockingStreamed() 161 { 162 return false; 163 } 164 }