001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe; 022 023 import java.beans.ConstructorProperties; 024 025 import cascading.pipe.joiner.Joiner; 026 import cascading.tuple.Fields; 027 028 /** 029 * The HashJoin pipe allows for two or more tuple streams to join into a single stream via a {@link Joiner} when 030 * all but one tuple stream is considered small enough to fit into memory. 031 * <p/> 032 * When planned onto MapReduce, this is effectively a non-blocking "asymmetrical join" or "replicated join", 033 * where the left-most side will not block (accumulate into memory) in order to complete the join, but the right-most 034 * sides will. See below... 035 * <p/> 036 * No aggregations can be performed with a HashJoin pipe as there is no guarantee all value will be associated with 037 * a given grouping key. In fact, an Aggregator would see the same grouping many times with a partial set of values. 038 * <p/> 039 * For every incoming {@link Pipe} instance, a {@link Fields} instance must be specified that denotes the field names 040 * or positions that should be joined with the other given Pipe instances. If the incoming Pipe instances declare 041 * one or more field with the same name, the declaredFields must be given to name the outgoing Tuple stream fields 042 * to overcome field name collisions. 043 * <p/> 044 * By default HashJoin performs an inner join via the {@link cascading.pipe.joiner.InnerJoin} 045 * {@link cascading.pipe.joiner.Joiner} class. 046 * <p/> 047 * Self joins can be achieved by using a constructor that takes a single Pipe and a numSelfJoins value. A value of 048 * 1 for numSelfJoins will join the Pipe with itself once. Note that a self join will block until all data is accumulated 049 * thus the stream must be reasonably small. 050 * <p/> 051 * Note "outer" joins on the left most side will not behave as expected. All observed keys on the right most sides 052 * will be emitted with {@code null} for the left most stream, thus when running distributed, duplicate values will 053 * emerge from every Map task split on the MapReduce platform. 054 * <p/> 055 * HashJoin does not scale well to large data sizes and thus requires streams with more data on the left hand side to 056 * join with more sparse data on the right hand side. That is, always attempt to effect M x N joins where M is large 057 * and N is small, instead of where M is small and N is large. Right hand side streams will be accumulated, and 058 * spilled to disk if the collection reaches a specific threshold when using Hadoop. 059 * <p/> 060 * If spills are happening, consider increasing the spill thresholds, see {@link cascading.tuple.collect.SpillableTupleMap}. 061 * <p/> 062 * <p/> 063 * If one of the right hand side streams starts larger than memory but is filtered (likely by a 064 * {@link cascading.operation.Filter} implementation) down to the point it fits into memory, it may be useful to use 065 * a {@link Checkpoint} Pipe to persist the stream and force a new FlowStep (MapReduce job) to read the data from 066 * disk, instead of applying the filter redundantly. This will minimize the amount of data "replicated" across the 067 * network. 068 * <p/> 069 * See the {@link cascading.tuple.collect.TupleCollectionFactory} and {@link cascading.tuple.collect.TupleMapFactory} for a means 070 * to use alternative spillable types. 071 * 072 * @see cascading.pipe.joiner.InnerJoin 073 * @see cascading.pipe.joiner.OuterJoin 074 * @see cascading.pipe.joiner.LeftJoin 075 * @see cascading.pipe.joiner.RightJoin 076 * @see cascading.pipe.joiner.MixedJoin 077 * @see cascading.tuple.Fields 078 * @see cascading.tuple.collect.SpillableTupleMap 079 */ 080 public class HashJoin extends Splice 081 { 082 /** 083 * Constructor HashJoin creates a new HashJoin instance. 084 * 085 * @param joinName 086 * @param lhs 087 * @param lhsJoinFields 088 * @param rhs 089 * @param rhsJoinFields 090 */ 091 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields"}) 092 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields ) 093 { 094 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null ); 095 } 096 097 /** 098 * Constructor HashJoin creates a new HashJoin instance. 099 * 100 * @param joinName 101 * @param lhs 102 * @param lhsJoinFields 103 * @param rhs 104 * @param rhsJoinFields 105 * @param joiner 106 */ 107 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"}) 108 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner ) 109 { 110 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner ); 111 } 112 113 /** 114 * Constructor HashJoin creates a new HashJoin instance. 115 * 116 * @param joinName 117 * @param lhs 118 * @param lhsJoinFields 119 * @param rhs 120 * @param rhsJoinFields 121 * @param declaredFields 122 */ 123 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"}) 124 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields ) 125 { 126 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null ); 127 } 128 129 /** 130 * Constructor HashJoin creates a new HashJoin instance. 131 * 132 * @param joinName 133 * @param lhs 134 * @param lhsJoinFields 135 * @param rhs 136 * @param rhsJoinFields 137 * @param declaredFields 138 * @param joiner 139 */ 140 @ConstructorProperties({"joinName", "lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"}) 141 public HashJoin( String joinName, Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner ) 142 { 143 super( joinName, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner ); 144 } 145 146 /** 147 * Constructor HashJoin creates a new HashJoin instance. 148 * 149 * @param joinName 150 * @param pipe 151 * @param joinFields 152 * @param numSelfJoins 153 * @param declaredFields 154 * @param joiner 155 */ 156 @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"}) 157 public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 158 { 159 super( joinName, pipe, joinFields, numSelfJoins, declaredFields, joiner ); 160 } 161 162 /** 163 * Constructor HashJoin creates a new HashJoin instance. 164 * 165 * @param joinName 166 * @param pipe 167 * @param joinFields 168 * @param numSelfJoins 169 * @param declaredFields 170 */ 171 @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "declaredFields"}) 172 public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields ) 173 { 174 super( joinName, pipe, joinFields, numSelfJoins, declaredFields, null, null ); 175 } 176 177 /** 178 * Constructor HashJoin creates a new HashJoin instance. 179 * 180 * @param joinName 181 * @param pipe 182 * @param joinFields 183 * @param numSelfJoins 184 * @param joiner 185 */ 186 @ConstructorProperties({"joinName", "pipe", "joinFields", "numSelfJoins", "joiner"}) 187 public HashJoin( String joinName, Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner ) 188 { 189 super( joinName, pipe, joinFields, numSelfJoins, null, joiner ); 190 } 191 192 /** 193 * Constructor HashJoin creates a new HashJoin instance. 194 * 195 * @param joinName 196 * @param pipes 197 * @param joinFields 198 * @param declaredFields 199 * @param joiner 200 */ 201 @ConstructorProperties({"joinName", "pipes", "joinFields", "declaredFields", "joiner"}) 202 public HashJoin( String joinName, Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner ) 203 { 204 super( joinName, pipes, joinFields, declaredFields, null, joiner ); 205 } 206 207 /** 208 * Constructor HashJoin creates a new HashJoin instance. 209 * 210 * @param lhs 211 * @param lhsJoinFields 212 * @param rhs 213 * @param rhsJoinFields 214 */ 215 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields"}) 216 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields ) 217 { 218 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null ); 219 } 220 221 /** 222 * Constructor HashJoin creates a new HashJoin instance. 223 * 224 * @param lhs 225 * @param lhsJoinFields 226 * @param rhs 227 * @param rhsJoinFields 228 * @param joiner 229 */ 230 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "joiner"}) 231 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Joiner joiner ) 232 { 233 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), null, null, joiner ); 234 } 235 236 /** 237 * Constructor HashJoin creates a new HashJoin instance. 238 * 239 * @param lhs 240 * @param lhsJoinFields 241 * @param rhs 242 * @param rhsJoinFields 243 * @param declaredFields 244 */ 245 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields"}) 246 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields ) 247 { 248 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null ); 249 } 250 251 /** 252 * Constructor HashJoin creates a new HashJoin instance. 253 * 254 * @param lhs 255 * @param lhsJoinFields 256 * @param rhs 257 * @param rhsJoinFields 258 * @param declaredFields 259 * @param joiner 260 */ 261 @ConstructorProperties({"lhs", "lhsJoinFields", "rhs", "rhsJoinFields", "declaredFields", "joiner"}) 262 public HashJoin( Pipe lhs, Fields lhsJoinFields, Pipe rhs, Fields rhsJoinFields, Fields declaredFields, Joiner joiner ) 263 { 264 super( null, Pipe.pipes( lhs, rhs ), Fields.fields( lhsJoinFields, rhsJoinFields ), declaredFields, null, joiner ); 265 } 266 267 /** 268 * Constructor HashJoin creates a new HashJoin instance. 269 * 270 * @param pipe 271 * @param joinFields 272 * @param numSelfJoins 273 * @param declaredFields 274 * @param joiner 275 */ 276 @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields", "joiner"}) 277 public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 278 { 279 super( null, pipe, joinFields, numSelfJoins, declaredFields, joiner ); 280 } 281 282 /** 283 * Constructor HashJoin creates a new HashJoin instance. 284 * 285 * @param pipe 286 * @param joinFields 287 * @param numSelfJoins 288 * @param declaredFields 289 */ 290 @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "declaredFields"}) 291 public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Fields declaredFields ) 292 { 293 super( null, pipe, joinFields, numSelfJoins, declaredFields ); 294 } 295 296 /** 297 * Constructor HashJoin creates a new HashJoin instance. 298 * 299 * @param pipe 300 * @param joinFields 301 * @param numSelfJoins 302 * @param joiner 303 */ 304 @ConstructorProperties({"pipe", "joinFields", "numSelfJoins", "joiner"}) 305 public HashJoin( Pipe pipe, Fields joinFields, int numSelfJoins, Joiner joiner ) 306 { 307 super( null, pipe, joinFields, numSelfJoins, null, joiner ); 308 } 309 310 /** 311 * Constructor HashJoin creates a new Join instance. 312 * 313 * @param pipes 314 * @param joinFields 315 * @param declaredFields 316 * @param joiner 317 */ 318 @ConstructorProperties({"pipes", "joinFields", "declaredFields", "joiner"}) 319 public HashJoin( Pipe[] pipes, Fields[] joinFields, Fields declaredFields, Joiner joiner ) 320 { 321 super( null, pipes, joinFields, declaredFields, null, joiner ); 322 } 323 }