001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.scheme; 023 024import java.io.IOException; 025import java.io.Serializable; 026 027import cascading.flow.FlowProcess; 028import cascading.tap.Tap; 029import cascading.tuple.Fields; 030import cascading.tuple.Tuple; 031import cascading.util.TraceUtil; 032import cascading.util.Traceable; 033 034/** 035 * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple} 036 * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple} 037 * stream, respectively. 038 * <p> 039 * A Scheme defines the type of resource data will be sourced from or sinked to. 040 * <p> 041 * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}. 042 * <p> 043 * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced. 044 * It does not necessarily filter the output since a given implementation may choose to 045 * collapse values and ignore keys depending on the format. 046 * <p> 047 * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names 048 * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may 049 * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user 050 * code directly (without planner intervention). 051 * <p> 052 * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will 053 * only see the fields expected. 054 * <p> 055 * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part. 056 * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by 057 * setting the number of reducers to the given value. This may affect performance, so be cautioned. 058 * <p> 059 * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so 060 * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce, 061 * add a {@link cascading.pipe.GroupBy} to the assembly before sinking. 062 */ 063public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable, Traceable 064 { 065 /** Field sinkFields */ 066 Fields sinkFields = Fields.ALL; 067 /** Field sourceFields */ 068 Fields sourceFields = Fields.UNKNOWN; 069 /** Field numSinkParts */ 070 int numSinkParts; 071 /** Field trace */ 072 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 073 074 /** Constructor Scheme creates a new Scheme instance. */ 075 protected Scheme() 076 { 077 } 078 079 /** 080 * Constructor Scheme creates a new Scheme instance. 081 * 082 * @param sourceFields of type Fields 083 */ 084 protected Scheme( Fields sourceFields ) 085 { 086 setSourceFields( sourceFields ); 087 } 088 089 /** 090 * Constructor Scheme creates a new Scheme instance. 091 * 092 * @param sourceFields of type Fields 093 * @param numSinkParts of type int 094 */ 095 protected Scheme( Fields sourceFields, int numSinkParts ) 096 { 097 setSourceFields( sourceFields ); 098 this.numSinkParts = numSinkParts; 099 } 100 101 /** 102 * Constructor Scheme creates a new Scheme instance. 103 * 104 * @param sourceFields of type Fields 105 * @param sinkFields of type Fields 106 */ 107 protected Scheme( Fields sourceFields, Fields sinkFields ) 108 { 109 setSourceFields( sourceFields ); 110 setSinkFields( sinkFields ); 111 } 112 113 /** 114 * Constructor Scheme creates a new Scheme instance. 115 * 116 * @param sourceFields of type Fields 117 * @param sinkFields of type Fields 118 * @param numSinkParts of type int 119 */ 120 protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts ) 121 { 122 setSourceFields( sourceFields ); 123 setSinkFields( sinkFields ); 124 this.numSinkParts = numSinkParts; 125 } 126 127 /** 128 * Method getSinkFields returns the sinkFields of this Scheme object. 129 * 130 * @return the sinkFields (type Fields) of this Scheme object. 131 */ 132 public Fields getSinkFields() 133 { 134 return sinkFields; 135 } 136 137 /** 138 * Method setSinkFields sets the sinkFields of this Scheme object. 139 * 140 * @param sinkFields the sinkFields of this Scheme object. 141 */ 142 public void setSinkFields( Fields sinkFields ) 143 { 144 if( sinkFields.isUnknown() ) 145 this.sinkFields = Fields.ALL; 146 else 147 this.sinkFields = sinkFields; 148 } 149 150 /** 151 * Method getSourceFields returns the sourceFields of this Scheme object. 152 * 153 * @return the sourceFields (type Fields) of this Scheme object. 154 */ 155 public Fields getSourceFields() 156 { 157 return sourceFields; 158 } 159 160 /** 161 * Method setSourceFields sets the sourceFields of this Scheme object. 162 * 163 * @param sourceFields the sourceFields of this Scheme object. 164 */ 165 public void setSourceFields( Fields sourceFields ) 166 { 167 if( sourceFields.isAll() ) 168 this.sourceFields = Fields.UNKNOWN; 169 else 170 this.sourceFields = sourceFields; 171 } 172 173 /** 174 * Method getNumSinkParts returns the numSinkParts of this Scheme object. 175 * 176 * @return the numSinkParts (type int) of this Scheme object. 177 */ 178 public int getNumSinkParts() 179 { 180 return numSinkParts; 181 } 182 183 /** 184 * Method setNumSinkParts sets the numSinkParts of this Scheme object. 185 * 186 * @param numSinkParts the numSinkParts of this Scheme object. 187 */ 188 public void setNumSinkParts( int numSinkParts ) 189 { 190 this.numSinkParts = numSinkParts; 191 } 192 193 @Override 194 public String getTrace() 195 { 196 return trace; 197 } 198 199 /** 200 * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this 201 * scheme sources the same fields as it sinks. 202 * 203 * @return the symmetrical (type boolean) of this Scheme object. 204 */ 205 public boolean isSymmetrical() 206 { 207 return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() ); 208 } 209 210 /** 211 * Method isSource returns true if this Scheme instance can be used as a source. 212 * 213 * @return boolean 214 */ 215 public boolean isSource() 216 { 217 return true; 218 } 219 220 /** 221 * Method isSink returns true if this Scheme instance can be used as a sink. 222 * 223 * @return boolean 224 */ 225 public boolean isSink() 226 { 227 return true; 228 } 229 230 /** 231 * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically 232 * update the fields it sources. By default the current declared fields are returned. 233 * <p> 234 * The {@code FlowProcess} presents all known properties resolved by the current planner. 235 * <p> 236 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 237 * 238 * @param flowProcess of type FlowProcess 239 * @param tap of type Tap 240 * @return Fields 241 */ 242 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 243 { 244 return getSourceFields(); 245 } 246 247 /** 248 * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This 249 * method presents to the Scheme the actual source fields after any planner intervention. 250 * <p> 251 * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 252 * 253 * @param flowProcess of type FlowProcess 254 * @param tap of type Tap 255 * @param fields of type Fields 256 */ 257 public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields ) 258 { 259 presentSourceFieldsInternal( fields ); 260 } 261 262 protected void presentSourceFieldsInternal( Fields fields ) 263 { 264 if( getSourceFields().equals( Fields.UNKNOWN ) ) 265 setSourceFields( fields ); 266 } 267 268 /** 269 * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically 270 * update the fields it sources. By default the current declared fields are returned. 271 * <p> 272 * The {@code FlowProcess} presents all known properties resolved by the current planner. 273 * <p> 274 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 275 * 276 * @param flowProcess of type FlowProcess 277 * @param tap of type Tap 278 * @return Fields 279 */ 280 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 281 { 282 return getSinkFields(); 283 } 284 285 /** 286 * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This 287 * method presents to the Scheme the actual source fields after any planner intervention. 288 * <p> 289 * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 290 * 291 * @param flowProcess of type FlowProcess 292 * @param tap of type Tap 293 * @param fields of type Fields 294 */ 295 public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields ) 296 { 297 presentSinkFieldsInternal( fields ); 298 } 299 300 protected void presentSinkFieldsInternal( Fields fields ) 301 { 302 if( getSinkFields().equals( Fields.ALL ) ) 303 setSinkFields( fields ); 304 } 305 306 /** 307 * Method sourceInit initializes this instance as a source. 308 * <p> 309 * This method is executed client side as a means to provide necessary configuration parameters 310 * used by the underlying platform. 311 * <p> 312 * It is not intended to initialize resources that would be necessary during the execution of this 313 * class, like a "formatter" or "parser". 314 * <p> 315 * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized 316 * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be 317 * destroyed after use. 318 * 319 * @param flowProcess of type FlowProcess 320 * @param tap of type Tap 321 * @param conf of type Config 322 */ 323 public abstract void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 324 325 /** 326 * Method sinkInit initializes this instance as a sink. 327 * <p> 328 * This method is executed client side as a means to provide necessary configuration parameters 329 * used by the underlying platform. 330 * <p> 331 * It is not intended to initialize resources that would be necessary during the execution of this 332 * class, like a "formatter" or "parser". 333 * <p> 334 * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized 335 * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be 336 * destroyed after use. 337 * 338 * @param flowProcess of type FlowProcess 339 * @param tap of type Tap 340 * @param conf of type Config 341 */ 342 public abstract void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 343 344 /** 345 * Method sourceWrap allows the current Scheme instance to wrap the incoming Input data source giving the underlying 346 * platform direct access to to manage the object in place of the original. 347 * <p> 348 * If the Input is an InputStream, the stream can be decompressed by wrapping in an appropriate de-compressor 349 * InputStream. 350 * 351 * @param flowProcess of type FlowProcess 352 * @param input the Input provided by the platform 353 * @return the same or an instance of the Input type wrapping the given parameter 354 * @throws IOException if unable to wrap the parameter 355 */ 356 public Input sourceWrap( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 357 { 358 return input; 359 } 360 361 /** 362 * Method sourcePrepare is used to initialize resources needed during each call of 363 * {@link #source(cascading.flow.FlowProcess, SourceCall)}. 364 * <p> 365 * This method is guaranteed to be called once before the first invocation of {@link #source(FlowProcess, SourceCall)}. 366 * <p> 367 * Be sure to place any initialized objects in the {@code SourceContext} so each instance 368 * will remain thread-safe. 369 * 370 * @param flowProcess of type FlowProcess 371 * @param sourceCall of type SourceCall 372 */ 373 public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 374 { 375 } 376 377 /** 378 * Method sourceRePrepare is used to re-initialize resources needed during each call of 379 * {@link #source(cascading.flow.FlowProcess, SourceCall)} after the {@code Input} object 380 * has been changed, if needed. 381 * <p> 382 * This method may be called zero or more times. Note {@link #sourcePrepare(FlowProcess, SourceCall)} will always 383 * be called before any {@link #source(FlowProcess, SourceCall)} invocation. 384 * 385 * @param flowProcess of type FlowProcess 386 * @param sourceCall of type SourceCall 387 */ 388 public void sourceRePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 389 { 390 } 391 392 /** 393 * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate 394 * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true} 395 * on success or {@code false} if no more values available. 396 * <p> 397 * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or 398 * to simply re-use the existing instance. 399 * <p> 400 * Note this is only time it is safe to modify a Tuple instance handed over via a method call. 401 * <p> 402 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 403 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 404 * any applicable failure trap Tap. 405 * 406 * @param flowProcess of type FlowProcess 407 * @param sourceCall of SourceCall 408 * @return returns {@code true} when a Tuple was successfully read 409 */ 410 public abstract boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException; 411 412 /** 413 * Method sourceCleanup is used to destroy resources created by 414 * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}. 415 * 416 * @param flowProcess of Process 417 * @param sourceCall of type SourceCall 418 */ 419 public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 420 { 421 } 422 423 /** 424 * Method sinkWrap allows the current Scheme instance to wrap the outgoing Output data source giving the underlying 425 * platform direct access to to manage the object in place of the original. 426 * <p> 427 * If the Output is an OutputStream, the stream can be compressed by wrapping in an appropriate compressor 428 * OutputStream. 429 * 430 * @param flowProcess of type FlowProcess 431 * @param output the Output provided by the platform 432 * @return the same or an instance of the Input type wrapping the given parameter 433 * @throws IOException if unable to wrap the parameter 434 */ 435 public Output sinkWrap( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException 436 { 437 return output; 438 } 439 440 /** 441 * Method sinkPrepare is used to initialize resources needed during each call of 442 * {@link #sink(cascading.flow.FlowProcess, SinkCall)}. 443 * <p> 444 * This method is guaranteed to be called once before the first invocation of {@link #sink(FlowProcess, SinkCall)}. 445 * <p> 446 * Be sure to place any initialized objects in the {@code SinkContext} so each instance 447 * will remain threadsafe. 448 * 449 * @param flowProcess of type FlowProcess 450 * @param sinkCall of type SinkCall 451 */ 452 public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 453 { 454 } 455 456 /** 457 * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to 458 * the {@link cascading.scheme.SinkCall#getOutput()}. 459 * <p> 460 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 461 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 462 * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead. 463 * 464 * @param flowProcess of Process 465 * @param sinkCall of SinkCall 466 */ 467 public abstract void sink( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException; 468 469 /** 470 * Method sinkCleanup is used to destroy resources created by 471 * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}. 472 * 473 * @param flowProcess of type FlowProcess 474 * @param sinkCall of type SinkCall 475 */ 476 public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 477 { 478 } 479 480 @Override 481 public boolean equals( Object object ) 482 { 483 if( this == object ) 484 return true; 485 if( object == null || getClass() != object.getClass() ) 486 return false; 487 488 Scheme scheme = (Scheme) object; 489 490 if( numSinkParts != scheme.numSinkParts ) 491 return false; 492 if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null ) 493 return false; 494 if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null ) 495 return false; 496 497 return true; 498 } 499 500 @Override 501 public String toString() 502 { 503 if( getSinkFields().equals( getSourceFields() ) ) 504 return getClass().getSimpleName() + "[" + getSourceFields().print() + "]"; 505 else 506 return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]"; 507 } 508 509 public int hashCode() 510 { 511 int result; 512 result = sinkFields != null ? sinkFields.hashCode() : 0; 513 result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 ); 514 result = 31 * result + numSinkParts; 515 return result; 516 } 517 }