001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.scheme; 023 024import java.io.IOException; 025import java.io.Serializable; 026 027import cascading.flow.FlowProcess; 028import cascading.tap.Tap; 029import cascading.tuple.Fields; 030import cascading.tuple.Tuple; 031import cascading.util.TraceUtil; 032import cascading.util.Traceable; 033 034/** 035 * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple} 036 * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple} 037 * stream, respectively. 038 * <p/> 039 * A Scheme defines the type of resource data will be sourced from or sinked to. 040 * <p/> 041 * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}. 042 * <p/> 043 * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced. 044 * It does not necessarily filter the output since a given implementation may choose to 045 * collapse values and ignore keys depending on the format. 046 * <p/> 047 * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names 048 * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may 049 * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user 050 * code directly (without planner intervention). 051 * <p/> 052 * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will 053 * only see the fields expected. 054 * <p/> 055 * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part. 056 * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by 057 * setting the number of reducers to the given value. This may affect performance, so be cautioned. 058 * </p> 059 * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so 060 * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce, 061 * add a {@link cascading.pipe.GroupBy} to the assembly before sinking. 062 */ 063public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable, Traceable 064 { 065 /** Field sinkFields */ 066 Fields sinkFields = Fields.ALL; 067 /** Field sourceFields */ 068 Fields sourceFields = Fields.UNKNOWN; 069 /** Field numSinkParts */ 070 int numSinkParts; 071 /** Field trace */ 072 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 073 074 /** Constructor Scheme creates a new Scheme instance. */ 075 protected Scheme() 076 { 077 } 078 079 /** 080 * Constructor Scheme creates a new Scheme instance. 081 * 082 * @param sourceFields of type Fields 083 */ 084 protected Scheme( Fields sourceFields ) 085 { 086 setSourceFields( sourceFields ); 087 } 088 089 /** 090 * Constructor Scheme creates a new Scheme instance. 091 * 092 * @param sourceFields of type Fields 093 * @param numSinkParts of type int 094 */ 095 protected Scheme( Fields sourceFields, int numSinkParts ) 096 { 097 setSourceFields( sourceFields ); 098 this.numSinkParts = numSinkParts; 099 } 100 101 /** 102 * Constructor Scheme creates a new Scheme instance. 103 * 104 * @param sourceFields of type Fields 105 * @param sinkFields of type Fields 106 */ 107 protected Scheme( Fields sourceFields, Fields sinkFields ) 108 { 109 setSourceFields( sourceFields ); 110 setSinkFields( sinkFields ); 111 } 112 113 /** 114 * Constructor Scheme creates a new Scheme instance. 115 * 116 * @param sourceFields of type Fields 117 * @param sinkFields of type Fields 118 * @param numSinkParts of type int 119 */ 120 protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts ) 121 { 122 setSourceFields( sourceFields ); 123 setSinkFields( sinkFields ); 124 this.numSinkParts = numSinkParts; 125 } 126 127 /** 128 * Method getSinkFields returns the sinkFields of this Scheme object. 129 * 130 * @return the sinkFields (type Fields) of this Scheme object. 131 */ 132 public Fields getSinkFields() 133 { 134 return sinkFields; 135 } 136 137 /** 138 * Method setSinkFields sets the sinkFields of this Scheme object. 139 * 140 * @param sinkFields the sinkFields of this Scheme object. 141 */ 142 public void setSinkFields( Fields sinkFields ) 143 { 144 if( sinkFields.isUnknown() ) 145 this.sinkFields = Fields.ALL; 146 else 147 this.sinkFields = sinkFields; 148 } 149 150 /** 151 * Method getSourceFields returns the sourceFields of this Scheme object. 152 * 153 * @return the sourceFields (type Fields) of this Scheme object. 154 */ 155 public Fields getSourceFields() 156 { 157 return sourceFields; 158 } 159 160 /** 161 * Method setSourceFields sets the sourceFields of this Scheme object. 162 * 163 * @param sourceFields the sourceFields of this Scheme object. 164 */ 165 public void setSourceFields( Fields sourceFields ) 166 { 167 if( sourceFields.isAll() ) 168 this.sourceFields = Fields.UNKNOWN; 169 else 170 this.sourceFields = sourceFields; 171 } 172 173 /** 174 * Method getNumSinkParts returns the numSinkParts of this Scheme object. 175 * 176 * @return the numSinkParts (type int) of this Scheme object. 177 */ 178 public int getNumSinkParts() 179 { 180 return numSinkParts; 181 } 182 183 /** 184 * Method setNumSinkParts sets the numSinkParts of this Scheme object. 185 * 186 * @param numSinkParts the numSinkParts of this Scheme object. 187 */ 188 public void setNumSinkParts( int numSinkParts ) 189 { 190 this.numSinkParts = numSinkParts; 191 } 192 193 @Override 194 public String getTrace() 195 { 196 return trace; 197 } 198 199 /** 200 * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this 201 * scheme sources the same fields as it sinks. 202 * 203 * @return the symmetrical (type boolean) of this Scheme object. 204 */ 205 public boolean isSymmetrical() 206 { 207 return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() ); 208 } 209 210 /** 211 * Method isSource returns true if this Scheme instance can be used as a source. 212 * 213 * @return boolean 214 */ 215 public boolean isSource() 216 { 217 return true; 218 } 219 220 /** 221 * Method isSink returns true if this Scheme instance can be used as a sink. 222 * 223 * @return boolean 224 */ 225 public boolean isSink() 226 { 227 return true; 228 } 229 230 /** 231 * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically 232 * update the fields it sources. By default the current declared fields are returned. 233 * <p/> 234 * The {@code FlowProcess} presents all known properties resolved by the current planner. 235 * <p/> 236 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 237 * 238 * @param flowProcess of type FlowProcess 239 * @param tap of type Tap 240 * @return Fields 241 */ 242 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 243 { 244 return getSourceFields(); 245 } 246 247 /** 248 * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This 249 * method presents to the Scheme the actual source fields after any planner intervention. 250 * <p/> 251 * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 252 * 253 * @param flowProcess of type FlowProcess 254 * @param tap of type Tap 255 * @param fields of type Fields 256 */ 257 public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields ) 258 { 259 presentSourceFieldsInternal( fields ); 260 } 261 262 protected void presentSourceFieldsInternal( Fields fields ) 263 { 264 if( getSourceFields().equals( Fields.UNKNOWN ) ) 265 setSourceFields( fields ); 266 } 267 268 /** 269 * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically 270 * update the fields it sources. By default the current declared fields are returned. 271 * <p/> 272 * The {@code FlowProcess} presents all known properties resolved by the current planner. 273 * <p/> 274 * The {@code tap} instance is the parent {@link Tap} for this Scheme instance. 275 * 276 * @param flowProcess of type FlowProcess 277 * @param tap of type Tap 278 * @return Fields 279 */ 280 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 281 { 282 return getSinkFields(); 283 } 284 285 /** 286 * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This 287 * method presents to the Scheme the actual source fields after any planner intervention. 288 * <p/> 289 * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}. 290 * 291 * @param flowProcess of type FlowProcess 292 * @param tap of type Tap 293 * @param fields of type Fields 294 */ 295 public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields ) 296 { 297 presentSinkFieldsInternal( fields ); 298 } 299 300 protected void presentSinkFieldsInternal( Fields fields ) 301 { 302 if( getSinkFields().equals( Fields.ALL ) ) 303 setSinkFields( fields ); 304 } 305 306 /** 307 * Method sourceInit initializes this instance as a source. 308 * <p/> 309 * This method is executed client side as a means to provide necessary configuration parameters 310 * used by the underlying platform. 311 * <p/> 312 * It is not intended to initialize resources that would be necessary during the execution of this 313 * class, like a "formatter" or "parser". 314 * <p/> 315 * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized 316 * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be 317 * destroyed after use. 318 * 319 * @param flowProcess of type FlowProcess 320 * @param tap of type Tap 321 * @param conf of type Config 322 */ 323 public abstract void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 324 325 /** 326 * Method sinkInit initializes this instance as a sink. 327 * <p/> 328 * This method is executed client side as a means to provide necessary configuration parameters 329 * used by the underlying platform. 330 * <p/> 331 * It is not intended to initialize resources that would be necessary during the execution of this 332 * class, like a "formatter" or "parser". 333 * <p/> 334 * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized 335 * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be 336 * destroyed after use. 337 * 338 * @param flowProcess of type FlowProcess 339 * @param tap of type Tap 340 * @param conf of type Config 341 */ 342 public abstract void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ); 343 344 /** 345 * Method sourcePrepare is used to initialize resources needed during each call of 346 * {@link #source(cascading.flow.FlowProcess, SourceCall)}. 347 * <p/> 348 * This method is guaranteed to be called once before the first invocation of {@link #source(FlowProcess, SourceCall)}. 349 * <p/> 350 * Be sure to place any initialized objects in the {@code SourceContext} so each instance 351 * will remain thread-safe. 352 * 353 * @param flowProcess of type FlowProcess 354 * @param sourceCall of type SourceCall<SourceContext, Input> 355 */ 356 public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 357 { 358 } 359 360 /** 361 * Method sourceRePrepare is used to re-initialize resources needed during each call of 362 * {@link #source(cascading.flow.FlowProcess, SourceCall)} after the {@code Input} object 363 * has been changed, if needed. 364 * <p/> 365 * This method may be called zero or more times. Note {@link #sourcePrepare(FlowProcess, SourceCall)} will always 366 * be called before any {@link #source(FlowProcess, SourceCall)} invocation. 367 * 368 * @param flowProcess of type FlowProcess 369 * @param sourceCall of type SourceCall<SourceContext, Input> 370 */ 371 public void sourceRePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 372 { 373 } 374 375 /** 376 * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate 377 * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true} 378 * on success or {@code false} if no more values available. 379 * <p/> 380 * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or 381 * to simply re-use the existing instance. 382 * <p/> 383 * Note this is only time it is safe to modify a Tuple instance handed over via a method call. 384 * <p/> 385 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 386 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 387 * any applicable failure trap Tap. 388 * 389 * @param flowProcess of type FlowProcess 390 * @param sourceCall of SourceCall 391 * @return returns {@code true} when a Tuple was successfully read 392 */ 393 public abstract boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException; 394 395 /** 396 * Method sourceCleanup is used to destroy resources created by 397 * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}. 398 * 399 * @param flowProcess of Process 400 * @param sourceCall of type SourceCall<SourceContext, Input> 401 */ 402 public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException 403 { 404 } 405 406 /** 407 * Method sinkPrepare is used to initialize resources needed during each call of 408 * {@link #sink(cascading.flow.FlowProcess, SinkCall)}. 409 * <p/> 410 * This method is guaranteed to be called once before the first invocation of {@link #sink(FlowProcess, SinkCall)}. 411 * <p> 412 * Be sure to place any initialized objects in the {@code SinkContext} so each instance 413 * will remain threadsafe. 414 * 415 * @param flowProcess of type FlowProcess 416 * @param sinkCall of type SinkCall<SinkContext, Output> 417 */ 418 public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 419 { 420 } 421 422 /** 423 * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to 424 * the {@link cascading.scheme.SinkCall#getOutput()}. 425 * <p/> 426 * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular 427 * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to 428 * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead. 429 * 430 * @param flowProcess of Process 431 * @param sinkCall of SinkCall 432 */ 433 public abstract void sink( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException; 434 435 /** 436 * Method sinkCleanup is used to destroy resources created by 437 * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}. 438 * 439 * @param flowProcess of type FlowProcess 440 * @param sinkCall of type SinkCall<SinkContext, Output> 441 */ 442 public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException 443 { 444 } 445 446 @Override 447 public boolean equals( Object object ) 448 { 449 if( this == object ) 450 return true; 451 if( object == null || getClass() != object.getClass() ) 452 return false; 453 454 Scheme scheme = (Scheme) object; 455 456 if( numSinkParts != scheme.numSinkParts ) 457 return false; 458 if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null ) 459 return false; 460 if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null ) 461 return false; 462 463 return true; 464 } 465 466 @Override 467 public String toString() 468 { 469 if( getSinkFields().equals( getSourceFields() ) ) 470 return getClass().getSimpleName() + "[" + getSourceFields().print() + "]"; 471 else 472 return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]"; 473 } 474 475 public int hashCode() 476 { 477 int result; 478 result = sinkFields != null ? sinkFields.hashCode() : 0; 479 result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 ); 480 result = 31 * result + numSinkParts; 481 return result; 482 } 483 }