001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.scheme;
023
024import java.io.IOException;
025import java.io.Serializable;
026
027import cascading.flow.FlowProcess;
028import cascading.tap.Tap;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.util.TraceUtil;
032import cascading.util.Traceable;
033
034/**
035 * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple}
036 * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple}
037 * stream, respectively.
038 * <p/>
039 * A Scheme defines the type of resource data will be sourced from or sinked to.
040 * <p/>
041 * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}.
042 * <p/>
043 * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced.
044 * It does not necessarily filter the output since a given implementation may choose to
045 * collapse values and ignore keys depending on the format.
046 * <p/>
047 * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names
048 * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may
049 * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user
050 * code directly (without planner intervention).
051 * <p/>
052 * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will
053 * only see the fields expected.
054 * <p/>
055 * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part.
056 * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by
057 * setting the number of reducers to the given value. This may affect performance, so be cautioned.
058 * </p>
059 * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so
060 * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce,
061 * add a {@link cascading.pipe.GroupBy} to the assembly before sinking.
062 */
063public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable, Traceable
064  {
065  /** Field sinkFields */
066  Fields sinkFields = Fields.ALL;
067  /** Field sourceFields */
068  Fields sourceFields = Fields.UNKNOWN;
069  /** Field numSinkParts */
070  int numSinkParts;
071  /** Field trace */
072  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
073
074  /** Constructor Scheme creates a new Scheme instance. */
075  protected Scheme()
076    {
077    }
078
079  /**
080   * Constructor Scheme creates a new Scheme instance.
081   *
082   * @param sourceFields of type Fields
083   */
084  protected Scheme( Fields sourceFields )
085    {
086    setSourceFields( sourceFields );
087    }
088
089  /**
090   * Constructor Scheme creates a new Scheme instance.
091   *
092   * @param sourceFields of type Fields
093   * @param numSinkParts of type int
094   */
095  protected Scheme( Fields sourceFields, int numSinkParts )
096    {
097    setSourceFields( sourceFields );
098    this.numSinkParts = numSinkParts;
099    }
100
101  /**
102   * Constructor Scheme creates a new Scheme instance.
103   *
104   * @param sourceFields of type Fields
105   * @param sinkFields   of type Fields
106   */
107  protected Scheme( Fields sourceFields, Fields sinkFields )
108    {
109    setSourceFields( sourceFields );
110    setSinkFields( sinkFields );
111    }
112
113  /**
114   * Constructor Scheme creates a new Scheme instance.
115   *
116   * @param sourceFields of type Fields
117   * @param sinkFields   of type Fields
118   * @param numSinkParts of type int
119   */
120  protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts )
121    {
122    setSourceFields( sourceFields );
123    setSinkFields( sinkFields );
124    this.numSinkParts = numSinkParts;
125    }
126
127  /**
128   * Method getSinkFields returns the sinkFields of this Scheme object.
129   *
130   * @return the sinkFields (type Fields) of this Scheme object.
131   */
132  public Fields getSinkFields()
133    {
134    return sinkFields;
135    }
136
137  /**
138   * Method setSinkFields sets the sinkFields of this Scheme object.
139   *
140   * @param sinkFields the sinkFields of this Scheme object.
141   */
142  public void setSinkFields( Fields sinkFields )
143    {
144    if( sinkFields.isUnknown() )
145      this.sinkFields = Fields.ALL;
146    else
147      this.sinkFields = sinkFields;
148    }
149
150  /**
151   * Method getSourceFields returns the sourceFields of this Scheme object.
152   *
153   * @return the sourceFields (type Fields) of this Scheme object.
154   */
155  public Fields getSourceFields()
156    {
157    return sourceFields;
158    }
159
160  /**
161   * Method setSourceFields sets the sourceFields of this Scheme object.
162   *
163   * @param sourceFields the sourceFields of this Scheme object.
164   */
165  public void setSourceFields( Fields sourceFields )
166    {
167    if( sourceFields.isAll() )
168      this.sourceFields = Fields.UNKNOWN;
169    else
170      this.sourceFields = sourceFields;
171    }
172
173  /**
174   * Method getNumSinkParts returns the numSinkParts of this Scheme object.
175   *
176   * @return the numSinkParts (type int) of this Scheme object.
177   */
178  public int getNumSinkParts()
179    {
180    return numSinkParts;
181    }
182
183  /**
184   * Method setNumSinkParts sets the numSinkParts of this Scheme object.
185   *
186   * @param numSinkParts the numSinkParts of this Scheme object.
187   */
188  public void setNumSinkParts( int numSinkParts )
189    {
190    this.numSinkParts = numSinkParts;
191    }
192
193  @Override
194  public String getTrace()
195    {
196    return trace;
197    }
198
199  /**
200   * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this
201   * scheme sources the same fields as it sinks.
202   *
203   * @return the symmetrical (type boolean) of this Scheme object.
204   */
205  public boolean isSymmetrical()
206    {
207    return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() );
208    }
209
210  /**
211   * Method isSource returns true if this Scheme instance can be used as a source.
212   *
213   * @return boolean
214   */
215  public boolean isSource()
216    {
217    return true;
218    }
219
220  /**
221   * Method isSink returns true if this Scheme instance can be used as a sink.
222   *
223   * @return boolean
224   */
225  public boolean isSink()
226    {
227    return true;
228    }
229
230  /**
231   * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically
232   * update the fields it sources. By default the current declared fields are returned.
233   * <p/>
234   * The {@code FlowProcess} presents all known properties resolved by the current planner.
235   * <p/>
236   * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
237   *
238   * @param flowProcess of type FlowProcess
239   * @param tap         of type Tap
240   * @return Fields
241   */
242  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap )
243    {
244    return getSourceFields();
245    }
246
247  /**
248   * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This
249   * method presents to the Scheme the actual source fields after any planner intervention.
250   * <p/>
251   * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
252   *
253   * @param flowProcess of type FlowProcess
254   * @param tap         of type Tap
255   * @param fields      of type Fields
256   */
257  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields )
258    {
259    presentSourceFieldsInternal( fields );
260    }
261
262  protected void presentSourceFieldsInternal( Fields fields )
263    {
264    if( getSourceFields().equals( Fields.UNKNOWN ) )
265      setSourceFields( fields );
266    }
267
268  /**
269   * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically
270   * update the fields it sources. By default the current declared fields are returned.
271   * <p/>
272   * The {@code FlowProcess} presents all known properties resolved by the current planner.
273   * <p/>
274   * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
275   *
276   * @param flowProcess of type FlowProcess
277   * @param tap         of type Tap
278   * @return Fields
279   */
280  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap )
281    {
282    return getSinkFields();
283    }
284
285  /**
286   * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This
287   * method presents to the Scheme the actual source fields after any planner intervention.
288   * <p/>
289   * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
290   *
291   * @param flowProcess of type FlowProcess
292   * @param tap         of type Tap
293   * @param fields      of type Fields
294   */
295  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap, Fields fields )
296    {
297    presentSinkFieldsInternal( fields );
298    }
299
300  protected void presentSinkFieldsInternal( Fields fields )
301    {
302    if( getSinkFields().equals( Fields.ALL ) )
303      setSinkFields( fields );
304    }
305
306  /**
307   * Method sourceInit initializes this instance as a source.
308   * <p/>
309   * This method is executed client side as a means to provide necessary configuration parameters
310   * used by the underlying platform.
311   * <p/>
312   * It is not intended to initialize resources that would be necessary during the execution of this
313   * class, like a "formatter" or "parser".
314   * <p/>
315   * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized
316   * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be
317   * destroyed after use.
318   *
319   * @param flowProcess of type FlowProcess
320   * @param tap         of type Tap
321   * @param conf        of type Config
322   */
323  public abstract void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
324
325  /**
326   * Method sinkInit initializes this instance as a sink.
327   * <p/>
328   * This method is executed client side as a means to provide necessary configuration parameters
329   * used by the underlying platform.
330   * <p/>
331   * It is not intended to initialize resources that would be necessary during the execution of this
332   * class, like a "formatter" or "parser".
333   * <p/>
334   * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized
335   * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be
336   * destroyed after use.
337   *
338   * @param flowProcess of type FlowProcess
339   * @param tap         of type Tap
340   * @param conf        of type Config
341   */
342  public abstract void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
343
344  /**
345   * Method sourcePrepare is used to initialize resources needed during each call of
346   * {@link #source(cascading.flow.FlowProcess, SourceCall)}.
347   * <p/>
348   * This method is guaranteed to be called once before the first invocation of {@link #source(FlowProcess, SourceCall)}.
349   * <p/>
350   * Be sure to place any initialized objects in the {@code SourceContext} so each instance
351   * will remain thread-safe.
352   *
353   * @param flowProcess of type FlowProcess
354   * @param sourceCall  of type SourceCall<SourceContext, Input>
355   */
356  public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
357    {
358    }
359
360  /**
361   * Method sourceRePrepare is used to re-initialize resources needed during each call of
362   * {@link #source(cascading.flow.FlowProcess, SourceCall)} after the {@code Input} object
363   * has been changed, if needed.
364   * <p/>
365   * This method may be called zero or more times. Note {@link #sourcePrepare(FlowProcess, SourceCall)} will always
366   * be called before any {@link #source(FlowProcess, SourceCall)} invocation.
367   *
368   * @param flowProcess of type FlowProcess
369   * @param sourceCall  of type SourceCall<SourceContext, Input>
370   */
371  public void sourceRePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
372    {
373    }
374
375  /**
376   * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate
377   * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true}
378   * on success or {@code false} if no more values available.
379   * <p/>
380   * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or
381   * to simply re-use the existing instance.
382   * <p/>
383   * Note this is only time it is safe to modify a Tuple instance handed over via a method call.
384   * <p/>
385   * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
386   * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
387   * any applicable failure trap Tap.
388   *
389   * @param flowProcess of type FlowProcess
390   * @param sourceCall  of SourceCall
391   * @return returns {@code true} when a Tuple was successfully read
392   */
393  public abstract boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException;
394
395  /**
396   * Method sourceCleanup is used to destroy resources created by
397   * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}.
398   *
399   * @param flowProcess of Process
400   * @param sourceCall  of type SourceCall<SourceContext, Input>
401   */
402  public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
403    {
404    }
405
406  /**
407   * Method sinkPrepare is used to initialize resources needed during each call of
408   * {@link #sink(cascading.flow.FlowProcess, SinkCall)}.
409   * <p/>
410   * This method is guaranteed to be called once before the first invocation of {@link #sink(FlowProcess, SinkCall)}.
411   * <p>
412   * Be sure to place any initialized objects in the {@code SinkContext} so each instance
413   * will remain threadsafe.
414   *
415   * @param flowProcess of type FlowProcess
416   * @param sinkCall    of type SinkCall<SinkContext, Output>
417   */
418  public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
419    {
420    }
421
422  /**
423   * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to
424   * the {@link cascading.scheme.SinkCall#getOutput()}.
425   * <p/>
426   * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
427   * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
428   * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead.
429   *
430   * @param flowProcess of Process
431   * @param sinkCall    of SinkCall
432   */
433  public abstract void sink( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException;
434
435  /**
436   * Method sinkCleanup is used to destroy resources created by
437   * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}.
438   *
439   * @param flowProcess of type FlowProcess
440   * @param sinkCall    of type SinkCall<SinkContext, Output>
441   */
442  public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
443    {
444    }
445
446  @Override
447  public boolean equals( Object object )
448    {
449    if( this == object )
450      return true;
451    if( object == null || getClass() != object.getClass() )
452      return false;
453
454    Scheme scheme = (Scheme) object;
455
456    if( numSinkParts != scheme.numSinkParts )
457      return false;
458    if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null )
459      return false;
460    if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null )
461      return false;
462
463    return true;
464    }
465
466  @Override
467  public String toString()
468    {
469    if( getSinkFields().equals( getSourceFields() ) )
470      return getClass().getSimpleName() + "[" + getSourceFields().print() + "]";
471    else
472      return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]";
473    }
474
475  public int hashCode()
476    {
477    int result;
478    result = sinkFields != null ? sinkFields.hashCode() : 0;
479    result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 );
480    result = 31 * result + numSinkParts;
481    return result;
482    }
483  }