001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe; 023 024import java.beans.ConstructorProperties; 025 026import cascading.pipe.joiner.Joiner; 027import cascading.tuple.Fields; 028 029/** 030 * The HashJoin pipe allows for two or more tuple streams to join into a single stream via a {@link Joiner} when 031 * all but one tuple stream is considered small enough to fit into memory. 032 * <p> 033 * When planned onto MapReduce, this is effectively a non-blocking "asymmetrical join" or "replicated join", 034 * where the left-most side will not block (accumulate into memory) in order to complete the join, but the right-most 035 * sides will. See below... 036 * <p> 037 * No aggregations can be performed with a HashJoin pipe as there is no guarantee all value will be associated with 038 * a given grouping key. In fact, an Aggregator would see the same grouping many times with a partial set of values. 039 * <p> 040 * For every incoming {@link Pipe} instance, a {@link Fields} instance must be specified that denotes the field names 041 * or positions that should be joined with the other given Pipe instances. If the incoming Pipe instances declare 042 * one or more field with the same name, the declaredFields must be given to name the outgoing Tuple stream fields 043 * to overcome field name collisions. 044 * <p> 045 * By default HashJoin performs an inner join via the {@link cascading.pipe.joiner.InnerJoin} 046 * {@link cascading.pipe.joiner.Joiner} class. 047 * <p> 048 * Self joins can be achieved by using a constructor that takes a single Pipe and a numSelfJoins value. A value of 049 * 1 for numSelfJoins will join the Pipe with itself once. Note that a self join will block until all data is accumulated 050 * thus the stream must be reasonably small. 051 * <p> 052 * Note "outer" joins on the left most side will not behave as expected. All observed keys on the right most sides 053 * will be emitted with {@code null} for the left most stream, thus when running distributed, duplicate values will 054 * emerge from every Map task split on the MapReduce platform. 055 * <p> 056 * HashJoin does not scale well to large data sizes and thus requires streams with more data on the left hand side to 057 * join with more sparse data on the right hand side. That is, always attempt to effect M x N joins where M is large 058 * and N is small, instead of where M is small and N is large. Right hand side streams will be accumulated, and 059 * spilled to disk if the collection reaches a specific threshold when using Hadoop. 060 * <p> 061 * If spills are happening, consider increasing the spill thresholds, see {@link cascading.tuple.collect.SpillableTupleMap}. 062 * <p> 063 * If one of the right hand side streams starts larger than memory but is filtered (likely by a 064 * {@link cascading.operation.Filter} implementation) down to the point it fits into memory, it may be useful to use 065 * a {@link Checkpoint} Pipe to persist the stream and force a new FlowStep (MapReduce job) to read the data from 066 * disk, instead of applying the filter redundantly. This will minimize the amount of data "replicated" across the 067 * network. 068 * <p> 069 * See the {@link cascading.tuple.collect.TupleCollectionFactory} and {@link cascading.tuple.collect.TupleMapFactory} for a means 070 * to use alternative spillable types. 071 * 072 * @see cascading.pipe.joiner.InnerJoin 073 * @see cascading.pipe.joiner.OuterJoin 074 * @see cascading.pipe.joiner.LeftJoin 075 * @see cascading.pipe.joiner.RightJoin 076 * @see cascading.pipe.joiner.MixedJoin 077 * @see cascading.tuple.Fields 078 * @see cascading.tuple.collect.SpillableTupleMap 079 */ 080public class HashJoin extends Splice 081 { 082 /** 083 * Constructor HashJoin creates a new HashJoin instance. 084 * 085 * @param joinName 086 * @param lhs 087 * @param lhsJoinFields 088 * @param rhs 089 * @param rhsJoinFields 090 */ 091 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields"}) 092 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields ) 093 { 094 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null ); 095 } 096 097 /** 098 * Constructor HashJoin creates a new HashJoin instance. 099 * 100 * @param joinName 101 * @param lhs 102 * @param lhsJoinFields 103 * @param rhs 104 * @param rhsJoinFields 105 * @param joiner 106 */ 107 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"}) 108 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner ) 109 { 110 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner ); 111 } 112 113 /** 114 * Constructor HashJoin creates a new HashJoin instance. 115 * 116 * @param joinName 117 * @param lhs 118 * @param lhsJoinFields 119 * @param rhs 120 * @param rhsJoinFields 121 * @param declaredFields 122 */ 123 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"}) 124 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields ) 125 { 126 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null ); 127 } 128 129 /** 130 * Constructor HashJoin creates a new HashJoin instance. 131 * 132 * @param joinName 133 * @param lhs 134 * @param lhsJoinFields 135 * @param rhs 136 * @param rhsJoinFields 137 * @param declaredFields 138 * @param joiner 139 */ 140 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"}) 141 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner ) 142 { 143 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner ); 144 } 145 146 /** 147 * Constructor HashJoin creates a new HashJoin instance. 148 * 149 * @param joinName 150 * @param pipe 151 * @param joinFields 152 * @param numSelfJoins 153 * @param declaredFields 154 * @param joiner 155 */ 156 @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"}) 157 public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 158 { 159 super( joinName, pipe, joinFields, numSelfJoins, declaredFields, joiner ); 160 } 161 162 /** 163 * Constructor HashJoin creates a new HashJoin instance. 164 * 165 * @param joinName 166 * @param pipe 167 * @param joinFields 168 * @param numSelfJoins 169 * @param declaredFields 170 */ 171 @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields"}) 172 public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields ) 173 { 174 super( joinName, pipe, joinFields, numSelfJoins, declaredFields, null, null ); 175 } 176 177 /** 178 * Constructor HashJoin creates a new HashJoin instance. 179 * 180 * @param joinName 181 * @param pipe 182 * @param joinFields 183 * @param numSelfJoins 184 * @param joiner 185 */ 186 @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "joiner"}) 187 public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner ) 188 { 189 super( joinName, pipe, joinFields, numSelfJoins, null, joiner ); 190 } 191 192 /** 193 * Constructor HashJoin creates a new HashJoin instance. 194 * 195 * @param joinName 196 * @param pipes 197 * @param joinFields 198 * @param declaredFields 199 * @param joiner 200 */ 201 @ConstructorProperties({"joinName", "pipes", "joinFields", "declaredFields", "joiner"}) 202 public HashJoin( String joinName, Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner ) 203 { 204 super( joinName, pipes, joinFields, declaredFields, null, joiner ); 205 } 206 207 /** 208 * Constructor HashJoin creates a new HashJoin instance. 209 * 210 * @param lhs 211 * @param lhsJoinFields 212 * @param rhs 213 * @param rhsJoinFields 214 */ 215 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields"}) 216 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields ) 217 { 218 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null ); 219 } 220 221 /** 222 * Constructor HashJoin creates a new HashJoin instance. 223 * 224 * @param lhs 225 * @param lhsJoinFields 226 * @param rhs 227 * @param rhsJoinFields 228 * @param joiner 229 */ 230 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"}) 231 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner ) 232 { 233 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner ); 234 } 235 236 /** 237 * Constructor HashJoin creates a new HashJoin instance. 238 * 239 * @param lhs 240 * @param lhsJoinFields 241 * @param rhs 242 * @param rhsJoinFields 243 * @param declaredFields 244 */ 245 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"}) 246 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields ) 247 { 248 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null ); 249 } 250 251 /** 252 * Constructor HashJoin creates a new HashJoin instance. 253 * 254 * @param lhs 255 * @param lhsJoinFields 256 * @param rhs 257 * @param rhsJoinFields 258 * @param declaredFields 259 * @param joiner 260 */ 261 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"}) 262 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner ) 263 { 264 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner ); 265 } 266 267 /** 268 * Constructor HashJoin creates a new HashJoin instance. 269 * 270 * @param pipe 271 * @param joinFields 272 * @param numSelfJoins 273 * @param declaredFields 274 * @param joiner 275 */ 276 @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"}) 277 public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 278 { 279 super( null, pipe, joinFields, numSelfJoins, declaredFields, joiner ); 280 } 281 282 /** 283 * Constructor HashJoin creates a new HashJoin instance. 284 * 285 * @param pipe 286 * @param joinFields 287 * @param numSelfJoins 288 * @param declaredFields 289 */ 290 @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields"}) 291 public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields ) 292 { 293 super( null, pipe, joinFields, numSelfJoins, declaredFields ); 294 } 295 296 /** 297 * Constructor HashJoin creates a new HashJoin instance. 298 * 299 * @param pipe 300 * @param joinFields 301 * @param numSelfJoins 302 * @param joiner 303 */ 304 @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "joiner"}) 305 public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner ) 306 { 307 super( null, pipe, joinFields, numSelfJoins, null, joiner ); 308 } 309 310 /** 311 * Constructor HashJoin creates a new Join instance. 312 * 313 * @param pipes 314 * @param joinFields 315 * @param declaredFields 316 * @param joiner 317 */ 318 @ConstructorProperties({"pipes", "joinFields", "declaredFields", "joiner"}) 319 public HashJoin( Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner ) 320 { 321 super( null, pipes, joinFields, declaredFields, null, joiner ); 322 } 323 }