001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.Set; 029 030import cascading.flow.FlowProcess; 031import cascading.flow.stream.StopDataNotificationException; 032import cascading.flow.stream.duct.Duct; 033import cascading.pipe.Splice; 034import cascading.tuple.Tuple; 035import cascading.tuple.TupleEntry; 036import cascading.tuple.Tuples; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * 042 */ 043public class MemoryCoGroupGate extends MemorySpliceGate 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( MemoryCoGroupGate.class ); 046 047 public MemoryCoGroupGate( FlowProcess flowProcess, Splice splice ) 048 { 049 super( flowProcess, splice ); 050 } 051 052 @Override 053 protected boolean isBlockingStreamed() 054 { 055 return true; 056 } 057 058 @Override 059 public void start( Duct previous ) 060 { 061 } 062 063 @Override 064 public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) 065 { 066 Tuple valuesTuple = incomingEntry.getTupleCopy(); 067 Tuple groupTuple = keyBuilder[ ordinal ].makeResult( valuesTuple, null ); // view on valuesTuple 068 069 groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored 070 071 keys.add( groupTuple ); 072 keyValues[ ordinal ].get( groupTuple ).add( valuesTuple ); 073 } 074 075 @Override 076 public void complete( Duct previous ) 077 { 078 if( count.decrementAndGet() != 0 ) 079 return; 080 081 next.start( this ); 082 083 Collection<Tuple>[] collections = new Collection[ keyValues.length ]; 084 Iterator<Tuple> keyIterator = keys.iterator(); 085 086 Set<Tuple> seenNulls = new HashSet<Tuple>(); 087 088 while( keyIterator.hasNext() ) 089 { 090 Tuple keysTuple = keyIterator.next(); 091 092 keyIterator.remove(); 093 094 // provides sql like semantics 095 if( nullsAreNotEqual && Tuples.frequency( keysTuple, null ) != 0 ) 096 { 097 if( seenNulls.contains( keysTuple ) ) 098 continue; 099 100 seenNulls.add( keysTuple ); 101 102 for( int i = 0; i < keyValues.length; i++ ) 103 { 104 Collection<Tuple> values = keyValues[ i ].remove( keysTuple ); 105 106 if( values == null ) 107 continue; 108 109 for( int j = 0; j < keyValues.length; j++ ) 110 collections[ j ] = Collections.emptyList(); 111 112 collections[ i ] = values; 113 114 try 115 { 116 push( collections, keysTuple ); 117 } 118 catch( StopDataNotificationException exception ) 119 { 120 LOG.info( "received stop data notification: {}", exception.getMessage() ); 121 break; 122 } 123 } 124 } 125 else 126 { 127 // drain the keys and keyValues collections to preserve memory 128 for( int i = 0; i < keyValues.length; i++ ) 129 { 130 collections[ i ] = keyValues[ i ].remove( keysTuple ); 131 132 if( collections[ i ] == null ) 133 collections[ i ] = Collections.emptyList(); 134 } 135 136 try 137 { 138 push( collections, keysTuple ); 139 } 140 catch( StopDataNotificationException exception ) 141 { 142 LOG.info( "received stop data notification: {}", exception.getMessage() ); 143 break; 144 } 145 } 146 } 147 148 keys = createKeySet(); 149 keyValues = createKeyValuesArray(); 150 151 count.set( numIncomingEventingPaths ); 152 153 next.complete( this ); 154 } 155 156 private void push( Collection<Tuple>[] collections, Tuple keysTuple ) 157 { 158 closure.reset( collections ); 159 160 keyEntry.setTuple( closure.getGroupTuple( keysTuple ) ); 161 162 // create Closure type here 163 tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) ); 164 165 next.receive( this, 0, grouping ); 166 } 167 }