001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.scheme;
023
024import java.io.IOException;
025import java.io.Serializable;
026
027import cascading.flow.FlowProcess;
028import cascading.tap.Tap;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.util.TraceUtil;
032import cascading.util.Traceable;
033
034/**
035 * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple}
036 * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple}
037 * stream, respectively.
038 * <p>
039 * A Scheme defines the type of resource data will be sourced from or sinked to.
040 * <p>
041 * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}.
042 * <p>
043 * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced.
044 * It does not necessarily filter the output since a given implementation may choose to
045 * collapse values and ignore keys depending on the format.
046 * <p>
047 * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names
048 * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may
049 * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user
050 * code directly (without planner intervention).
051 * <p>
052 * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will
053 * only see the fields expected.
054 * <p>
055 * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part.
056 * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by
057 * setting the number of reducers to the given value. This may affect performance, so be cautioned.
058 * <p>
059 * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so
060 * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce,
061 * add a {@link cascading.pipe.GroupBy} to the assembly before sinking.
062 */
063public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable, Traceable
064  {
065  /** Field sinkFields */
066  Fields sinkFields = Fields.ALL;
067  /** Field sourceFields */
068  Fields sourceFields = Fields.UNKNOWN;
069  /** Field numSinkParts */
070  int numSinkParts;
071  /** Field trace */
072  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
073
074  /** Constructor Scheme creates a new Scheme instance. */
075  protected Scheme()
076    {
077    }
078
079  /**
080   * Constructor Scheme creates a new Scheme instance.
081   *
082   * @param sourceFields of type Fields
083   */
084  protected Scheme( Fields sourceFields )
085    {
086    setSourceFields( sourceFields );
087    }
088
089  /**
090   * Constructor Scheme creates a new Scheme instance.
091   *
092   * @param sourceFields of type Fields
093   * @param numSinkParts of type int
094   */
095  protected Scheme( Fields sourceFields, int numSinkParts )
096    {
097    setSourceFields( sourceFields );
098    this.numSinkParts = numSinkParts;
099    }
100
101  /**
102   * Constructor Scheme creates a new Scheme instance.
103   *
104   * @param sourceFields of type Fields
105   * @param sinkFields   of type Fields
106   */
107  protected Scheme( Fields sourceFields, Fields sinkFields )
108    {
109    setSourceFields( sourceFields );
110    setSinkFields( sinkFields );
111    }
112
113  /**
114   * Constructor Scheme creates a new Scheme instance.
115   *
116   * @param sourceFields of type Fields
117   * @param sinkFields   of type Fields
118   * @param numSinkParts of type int
119   */
120  protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts )
121    {
122    setSourceFields( sourceFields );
123    setSinkFields( sinkFields );
124    this.numSinkParts = numSinkParts;
125    }
126
127  /**
128   * Method getSinkFields returns the sinkFields of this Scheme object.
129   *
130   * @return the sinkFields (type Fields) of this Scheme object.
131   */
132  public Fields getSinkFields()
133    {
134    return sinkFields;
135    }
136
137  /**
138   * Method setSinkFields sets the sinkFields of this Scheme object.
139   *
140   * @param sinkFields the sinkFields of this Scheme object.
141   */
142  public void setSinkFields( Fields sinkFields )
143    {
144    if( sinkFields.isUnknown() )
145      this.sinkFields = Fields.ALL;
146    else
147      this.sinkFields = sinkFields;
148    }
149
150  /**
151   * Method getSourceFields returns the sourceFields of this Scheme object.
152   *
153   * @return the sourceFields (type Fields) of this Scheme object.
154   */
155  public Fields getSourceFields()
156    {
157    return sourceFields;
158    }
159
160  /**
161   * Method setSourceFields sets the sourceFields of this Scheme object.
162   *
163   * @param sourceFields the sourceFields of this Scheme object.
164   */
165  public void setSourceFields( Fields sourceFields )
166    {
167    if( sourceFields.isAll() )
168      this.sourceFields = Fields.UNKNOWN;
169    else
170      this.sourceFields = sourceFields;
171    }
172
173  /**
174   * Method getNumSinkParts returns the numSinkParts of this Scheme object.
175   *
176   * @return the numSinkParts (type int) of this Scheme object.
177   */
178  public int getNumSinkParts()
179    {
180    return numSinkParts;
181    }
182
183  /**
184   * Method setNumSinkParts sets the numSinkParts of this Scheme object.
185   *
186   * @param numSinkParts the numSinkParts of this Scheme object.
187   */
188  public void setNumSinkParts( int numSinkParts )
189    {
190    this.numSinkParts = numSinkParts;
191    }
192
193  @Override
194  public String getTrace()
195    {
196    return trace;
197    }
198
199  /**
200   * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this
201   * scheme sources the same fields as it sinks.
202   *
203   * @return the symmetrical (type boolean) of this Scheme object.
204   */
205  public boolean isSymmetrical()
206    {
207    return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() );
208    }
209
210  /**
211   * Method isSource returns true if this Scheme instance can be used as a source.
212   *
213   * @return boolean
214   */
215  public boolean isSource()
216    {
217    return true;
218    }
219
220  /**
221   * Method isSink returns true if this Scheme instance can be used as a sink.
222   *
223   * @return boolean
224   */
225  public boolean isSink()
226    {
227    return true;
228    }
229
230  /**
231   * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically
232   * update the fields it sources. By default the current declared fields are returned.
233   * <p>
234   * The {@code FlowProcess} presents all known properties resolved by the current planner.
235   * <p>
236   * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
237   *
238   * @param flowProcess of type FlowProcess
239   * @param tap         of type Tap
240   * @return Fields
241   */
242  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap )
243    {
244    return getSourceFields();
245    }
246
247  /**
248   * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This
249   * method presents to the Scheme the actual source fields after any planner intervention.
250   * <p>
251   * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
252   *
253   * @param flowProcess of type FlowProcess
254   * @param tap         of type Tap
255   * @param fields      of type Fields
256   */
257  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields )
258    {
259    presentSourceFieldsInternal( fields );
260    }
261
262  protected void presentSourceFieldsInternal( Fields fields )
263    {
264    if( getSourceFields().equals( Fields.UNKNOWN ) )
265      setSourceFields( fields );
266    }
267
268  /**
269   * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically
270   * update the fields it sources. By default the current declared fields are returned.
271   * <p>
272   * The {@code FlowProcess} presents all known properties resolved by the current planner.
273   * <p>
274   * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
275   *
276   * @param flowProcess of type FlowProcess
277   * @param tap         of type Tap
278   * @return Fields
279   */
280  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap )
281    {
282    return getSinkFields();
283    }
284
285  /**
286   * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This
287   * method presents to the Scheme the actual source fields after any planner intervention.
288   * <p>
289   * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
290   *
291   * @param flowProcess of type FlowProcess
292   * @param tap         of type Tap
293   * @param fields      of type Fields
294   */
295  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields )
296    {
297    presentSinkFieldsInternal( fields );
298    }
299
300  protected void presentSinkFieldsInternal( Fields fields )
301    {
302    if( getSinkFields().equals( Fields.ALL ) )
303      setSinkFields( fields );
304    }
305
306  /**
307   * Method sourceInit initializes this instance as a source.
308   * <p>
309   * This method is executed client side as a means to provide necessary configuration parameters
310   * used by the underlying platform.
311   * <p>
312   * It is not intended to initialize resources that would be necessary during the execution of this
313   * class, like a "formatter" or "parser".
314   * <p>
315   * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized
316   * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be
317   * destroyed after use.
318   *
319   * @param flowProcess of type FlowProcess
320   * @param tap         of type Tap
321   * @param conf        of type Config
322   */
323  public abstract void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
324
325  /**
326   * Method sinkInit initializes this instance as a sink.
327   * <p>
328   * This method is executed client side as a means to provide necessary configuration parameters
329   * used by the underlying platform.
330   * <p>
331   * It is not intended to initialize resources that would be necessary during the execution of this
332   * class, like a "formatter" or "parser".
333   * <p>
334   * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized
335   * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be
336   * destroyed after use.
337   *
338   * @param flowProcess of type FlowProcess
339   * @param tap         of type Tap
340   * @param conf        of type Config
341   */
342  public abstract void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
343
344  /**
345   * Method sourceWrap allows the current Scheme instance to wrap the incoming Input data source giving the underlying
346   * platform direct access to to manage the object in place of the original.
347   * <p>
348   * If the Input is an InputStream, the stream can be decompressed by wrapping in an appropriate de-compressor
349   * InputStream.
350   *
351   * @param flowProcess of type FlowProcess
352   * @param input       the Input provided by the platform
353   * @return the same or an instance of the Input type wrapping the given parameter
354   * @throws IOException if unable to wrap the parameter
355   */
356  public Input sourceWrap( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
357    {
358    return input;
359    }
360
361  /**
362   * Method sourcePrepare is used to initialize resources needed during each call of
363   * {@link #source(cascading.flow.FlowProcess, SourceCall)}.
364   * <p>
365   * This method is guaranteed to be called once before the first invocation of {@link #source(FlowProcess, SourceCall)}.
366   * <p>
367   * Be sure to place any initialized objects in the {@code SourceContext} so each instance
368   * will remain thread-safe.
369   *
370   * @param flowProcess of type FlowProcess
371   * @param sourceCall  of type SourceCall
372   */
373  public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
374    {
375    }
376
377  /**
378   * Method sourceRePrepare is used to re-initialize resources needed during each call of
379   * {@link #source(cascading.flow.FlowProcess, SourceCall)} after the {@code Input} object
380   * has been changed, if needed.
381   * <p>
382   * This method may be called zero or more times. Note {@link #sourcePrepare(FlowProcess, SourceCall)} will always
383   * be called before any {@link #source(FlowProcess, SourceCall)} invocation.
384   *
385   * @param flowProcess of type FlowProcess
386   * @param sourceCall  of type SourceCall
387   */
388  public void sourceRePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
389    {
390    }
391
392  /**
393   * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate
394   * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true}
395   * on success or {@code false} if no more values available.
396   * <p>
397   * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or
398   * to simply re-use the existing instance.
399   * <p>
400   * Note this is only time it is safe to modify a Tuple instance handed over via a method call.
401   * <p>
402   * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
403   * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
404   * any applicable failure trap Tap.
405   *
406   * @param flowProcess of type FlowProcess
407   * @param sourceCall  of SourceCall
408   * @return returns {@code true} when a Tuple was successfully read
409   */
410  public abstract boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException;
411
412  /**
413   * Method sourceCleanup is used to destroy resources created by
414   * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}.
415   *
416   * @param flowProcess of Process
417   * @param sourceCall  of type SourceCall
418   */
419  public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
420    {
421    }
422
423  /**
424   * Method sinkWrap allows the current Scheme instance to wrap the outgoing Output data source giving the underlying
425   * platform direct access to to manage the object in place of the original.
426   * <p>
427   * If the Output is an OutputStream, the stream can be compressed by wrapping in an appropriate compressor
428   * OutputStream.
429   *
430   * @param flowProcess of type FlowProcess
431   * @param output      the Output provided by the platform
432   * @return the same or an instance of the Input type wrapping the given parameter
433   * @throws IOException if unable to wrap the parameter
434   */
435  public Output sinkWrap( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException
436    {
437    return output;
438    }
439
440  /**
441   * Method sinkPrepare is used to initialize resources needed during each call of
442   * {@link #sink(cascading.flow.FlowProcess, SinkCall)}.
443   * <p>
444   * This method is guaranteed to be called once before the first invocation of {@link #sink(FlowProcess, SinkCall)}.
445   * <p>
446   * Be sure to place any initialized objects in the {@code SinkContext} so each instance
447   * will remain threadsafe.
448   *
449   * @param flowProcess of type FlowProcess
450   * @param sinkCall    of type SinkCall
451   */
452  public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
453    {
454    }
455
456  /**
457   * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to
458   * the {@link cascading.scheme.SinkCall#getOutput()}.
459   * <p>
460   * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
461   * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
462   * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead.
463   *
464   * @param flowProcess of Process
465   * @param sinkCall    of SinkCall
466   */
467  public abstract void sink( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException;
468
469  /**
470   * Method sinkCleanup is used to destroy resources created by
471   * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}.
472   *
473   * @param flowProcess of type FlowProcess
474   * @param sinkCall    of type SinkCall
475   */
476  public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
477    {
478    }
479
480  @Override
481  public boolean equals( Object object )
482    {
483    if( this == object )
484      return true;
485    if( object == null || getClass() != object.getClass() )
486      return false;
487
488    Scheme scheme = (Scheme) object;
489
490    if( numSinkParts != scheme.numSinkParts )
491      return false;
492    if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null )
493      return false;
494    if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null )
495      return false;
496
497    return true;
498    }
499
500  @Override
501  public String toString()
502    {
503    if( getSinkFields().equals( getSourceFields() ) )
504      return getClass().getSimpleName() + "[" + getSourceFields().print() + "]";
505    else
506      return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]";
507    }
508
509  public int hashCode()
510    {
511    int result;
512    result = sinkFields != null ? sinkFields.hashCode() : 0;
513    result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 );
514    result = 31 * result + numSinkParts;
515    return result;
516    }
517  }