001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 026 import cascading.flow.FlowProcess; 027 import cascading.tap.Tap; 028 import cascading.tuple.Fields; 029 import cascading.tuple.Tuple; 030 import cascading.util.Util; 031 032 /** 033 * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple} 034 * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple} 035 * stream, respectively. 036 * <p/> 037 * A Scheme defines the type of resource data will be sourced from or sinked to. 038 * <p/> 039 * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}. 040 * <p/> 041 * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced. 042 * It does not necessarily filter the output since a given implementation may choose to 043 * collapse values and ignore keys depending on the format. 044 * <p/> 045 * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names 046 * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may 047 * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user 048 * code directly (without planner intervention). 049 * <p/> 050 * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will 051 * only see the fields expected. 052 * <p/> 053 * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part. 054 * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by 055 * setting the number of reducers to the given value. This may affect performance, so be cautioned. 056 * </p> 057 * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so 058 * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce, 059 * add a {@link cascading.pipe.GroupBy} to the assembly before sinking. 060 */ 061 public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable 062 { 063 /** Field sinkFields */ 064 Fields sinkFields = Fields.ALL; 065 /** Field sourceFields */ 066 Fields sourceFields = Fields.UNKNOWN; 067 /** Field numSinkParts */ 068 int numSinkParts; 069 /** Field trace */ 070 private String trace = Util.captureDebugTrace( getClass() ); 071 072 /** Constructor Scheme creates a new Scheme instance. */ 073 protected Scheme() 074 { 075 } 076 077 /** 078 * Constructor Scheme creates a new Scheme instance. 079 * 080 * @param sourceFields of type Fields 081 */ 082 protected Scheme( Fields sourceFields ) 083 { 084 setSourceFields( sourceFields ); 085 } 086 087 /** 088 * Constructor Scheme creates a new Scheme instance. 089 * 090 * @param sourceFields of type Fields 091 * @param numSinkParts of type int 092 */ 093 protected Scheme( Fields sourceFields, int numSinkParts ) 094 { 095 setSourceFields( sourceFields ); 096 this.numSinkParts = numSinkParts; 097 } 098 099 /** 100 * Constructor Scheme creates a new Scheme instance. 101 * 102 * @param sourceFields of type Fields 103 * @param sinkFields of type Fields 104 */ 105 protected Scheme( Fields sourceFields, Fields sinkFields ) 106 { 107 setSourceFields( sourceFields ); 108 setSinkFields( sinkFields ); 109 } 110 111 /** 112 * Constructor Scheme creates a new Scheme instance. 113 * 114 * @param sourceFields of type Fields 115 * @param sinkFields of type Fields 116 * @param numSinkParts of type int 117 */ 118 protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts ) 119 { 120 setSourceFields( sourceFields ); 121 setSinkFields( sinkFields ); 122 this.numSinkParts = numSinkParts; 123 } 124 125 /** 126 * Method getSinkFields returns the sinkFields of this Scheme object. 127 * 128 * @return the sinkFields (type Fields) of this Scheme object. 129 */ 130 public Fields getSinkFields() 131 { 132 return sinkFields; 133 } 134 135 /** 136 * Method setSinkFields sets the sinkFields of this Scheme object. 137 * 138 * @param sinkFields the sinkFields of this Scheme object. 139 */ 140 public void setSinkFields( Fields sinkFields ) 141 { 142 if( sinkFields.isUnknown() ) 143 this.sinkFields = Fields.ALL; 144 else 145 this.sinkFields = sinkFields; 146 } 147 148 /** 149 * Method getSourceFields returns the sourceFields of this Scheme object. 150 * 151 * @return the sourceFields (type Fields) of this Scheme object. 152 */ 153 public Fields getSourceFields() 154 { 155 return sourceFields; 156 } 157 158 /** 159 * Method setSourceFields sets the sourceFields of this Scheme object. 160 * 161 * @param sourceFields the sourceFields of this Scheme object. 162 */ 163 public void setSourceFields( Fields sourceFields ) 164 { 165 if( sourceFields.isAll() ) 166 this.sourceFields = Fields.UNKNOWN; 167 else 168 this.sourceFields = sourceFields; 169 } 170 171 /** 172 * Method getNumSinkParts returns the numSinkParts of this Scheme object. 173 * 174 * @return the numSinkParts (type int) of this Scheme object. 175 */ 176 public int getNumSinkParts() 177 { 178 return numSinkParts; 179 } 180 181 /** 182 * Method setNumSinkParts sets the numSinkParts of this Scheme object. 183 * 184 * @param numSinkParts the numSinkParts of this Scheme object. 185 */ 186 public void setNumSinkParts( int numSinkParts ) 187 { 188 this.numSinkParts = numSinkParts; 189 } 190 191 /** 192 * Method getTrace returns a String that pinpoint where this instance was created for debugging. 193 * 194 * @return String 195 */ 196 public String getTrace() 197 { 198 return trace; 199 } 200 201 /** 202 * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this 203 * scheme sources the same fields as it sinks. 204 * 205 * @return the symmetrical (type boolean) of this Scheme object. 206 */ 207 public boolean isSymmetrical() 208 { 209 return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() ); 210 } 211 212 /** 213 * Method isSource returns true if this Scheme instance can be used as a source. 214 * 215 * @return boolean 216 */ 217 public boolean isSource() 218 { 219 return true; 220 } 221 222 /** 223 * Method isSink returns true if this Scheme instance can be used as a sink. 224 * 225 * @return boolean 226 */ 227 public boolean isSink() 228 { 229 return true; 230 } 231 232 /** 233 * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically 234 * update the fields it sources. By default the current declared fields are returned. 235 * <p/> 236 * The {@code FlowProcess} presents all known properties resolved by the current planner. 237 * <p/> 238 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 239 * 240 * @param flowProcess of type FlowProcess 241 * @param tap of type Tap 242 * @return Fields 243 */ 244 public Fields retrieveSourceFields( FlowProcess<Config> flowProcess, Tap tap ) 245 { 246 return getSourceFields(); 247 } 248 249 /** 250 * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This 251 * method presents to the Scheme the actual source fields after any planner intervention. 252 * <p/> 253 * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 254 * 255 * @param flowProcess of type FlowProcess 256 * @param tap of type Tap 257 * @param fields of type Fields 258 */ 259 public void presentSourceFields( FlowProcess<Config> flowProcess, Tap tap, Fields fields ) 260 { 261 presentSourceFieldsInternal( fields ); 262 } 263 264 protected void presentSourceFieldsInternal( Fields fields ) 265 { 266 if( getSourceFields().equals( Fields.UNKNOWN ) ) 267 setSourceFields( fields ); 268 } 269 270 /** 271 * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically 272 * update the fields it sources. By default the current declared fields are returned. 273 * <p/> 274 * The {@code FlowProcess} presents all known properties resolved by the current planner. 275 * <p/> 276 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 277 * 278 * @param flowProcess of type FlowProcess 279 * @param tap of type Tap 280 * @return Fields 281 */ 282 public Fields retrieveSinkFields( FlowProcess<Config> flowProcess, Tap tap ) 283 { 284 return getSinkFields(); 285 } 286 287 /** 288 * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This 289 * method presents to the Scheme the actual source fields after any planner intervention. 290 * <p/> 291 * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 292 * 293 * @param flowProcess of type FlowProcess 294 * @param tap of type Tap 295 * @param fields of type Fields 296 */ 297 public void presentSinkFields( FlowProcess<Config> flowProcess, Tap tap, Fields fields ) 298 { 299 presentSinkFieldsInternal( fields ); 300 } 301 302 protected void presentSinkFieldsInternal( Fields fields ) 303 { 304 if( getSinkFields().equals( Fields.ALL ) ) 305 setSinkFields( fields ); 306 } 307 308 /** 309 * Method sourceInit initializes this instance as a source. 310 * <p/> 311 * This method is executed client side as a means to provide necessary configuration parameters 312 * used by the underlying platform. 313 * <p/> 314 * It is not intended to initialize resources that would be necessary during the execution of this 315 * class, like a "formatter" or "parser". 316 * <p/> 317 * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized 318 * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be 319 * destroyed after use. 320 * 321 * @param flowProcess of type FlowProcess 322 * @param tap of type Tap 323 * @param conf of type Config 324 */ 325 public abstract void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 326 327 /** 328 * Method sinkInit initializes this instance as a sink. 329 * <p/> 330 * This method is executed client side as a means to provide necessary configuration parameters 331 * used by the underlying platform. 332 * <p/> 333 * It is not intended to initialize resources that would be necessary during the execution of this 334 * class, like a "formatter" or "parser". 335 * <p/> 336 * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized 337 * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be 338 * destroyed after use. 339 * 340 * @param flowProcess of type FlowProcess 341 * @param tap of type Tap 342 * @param conf of type Config 343 */ 344 public abstract void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 345 346 /** 347 * Method sourcePrepare is used to initialize resources needed during each call of 348 * {@link #source(cascading.flow.FlowProcess, SourceCall)}. 349 * <p/> 350 * Be sure to place any initialized objects in the {@code SourceContext} so each instance 351 * will remain threadsafe. 352 * 353 * @param flowProcess of type FlowProcess 354 * @param sourceCall of type SourceCall<SourceContext, Input> 355 */ 356 public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 357 { 358 } 359 360 /** 361 * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate 362 * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true} 363 * on success or {@code false} if no more values available. 364 * <p/> 365 * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or 366 * to simply re-use the existing instance. 367 * <p/> 368 * Note this is only time it is safe to modify a Tuple instance handed over via a method call. 369 * <p/> 370 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 371 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 372 * any applicable failure trap Tap. 373 * 374 * @param flowProcess of type FlowProcess 375 * @param sourceCall of SourceCall 376 * @return returns {@code true} when a Tuple was successfully read 377 */ 378 public abstract boolean source( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException; 379 380 /** 381 * Method sourceCleanup is used to destroy resources created by 382 * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}. 383 * 384 * @param flowProcess of Process 385 * @param sourceCall of type SourceCall<SourceContext, Input> 386 */ 387 public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 388 { 389 } 390 391 /** 392 * Method sinkPrepare is used to initialize resources needed during each call of 393 * {@link #sink(cascading.flow.FlowProcess, SinkCall)}. 394 * <p/> 395 * Be sure to place any initialized objects in the {@code SinkContext} so each instance 396 * will remain threadsafe. 397 * 398 * @param flowProcess of type FlowProcess 399 * @param sinkCall of type SinkCall<SinkContext, Output> 400 */ 401 public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 402 { 403 } 404 405 /** 406 * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to 407 * the {@link cascading.scheme.SinkCall#getOutput()}. 408 * <p/> 409 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 410 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 411 * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead. 412 * 413 * @param flowProcess of Process 414 * @param sinkCall of SinkCall 415 */ 416 public abstract void sink( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException; 417 418 /** 419 * Method sinkCleanup is used to destroy resources created by 420 * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}. 421 * 422 * @param flowProcess of type FlowProcess 423 * @param sinkCall of type SinkCall<SinkContext, Output> 424 */ 425 public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 426 { 427 } 428 429 @Override 430 public boolean equals( Object object ) 431 { 432 if( this == object ) 433 return true; 434 if( object == null || getClass() != object.getClass() ) 435 return false; 436 437 Scheme scheme = (Scheme) object; 438 439 if( numSinkParts != scheme.numSinkParts ) 440 return false; 441 if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null ) 442 return false; 443 if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null ) 444 return false; 445 446 return true; 447 } 448 449 @Override 450 public String toString() 451 { 452 if( getSinkFields().equals( getSourceFields() ) ) 453 return getClass().getSimpleName() + "[" + getSourceFields().print() + "]"; 454 else 455 return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]"; 456 } 457 458 public int hashCode() 459 { 460 int result; 461 result = sinkFields != null ? sinkFields.hashCode() : 0; 462 result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 ); 463 result = 31 * result + numSinkParts; 464 return result; 465 } 466 }