001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.tap.Tap;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.util.TraceUtil;
031    import cascading.util.Traceable;
032    
033    /**
034     * A Scheme defines what is stored in a {@link Tap} instance by declaring the {@link Tuple}
035     * field names, and alternately parsing or rendering the incoming or outgoing {@link Tuple}
036     * stream, respectively.
037     * <p/>
038     * A Scheme defines the type of resource data will be sourced from or sinked to.
039     * <p/>
040     * The default sourceFields are {@link Fields#UNKNOWN} and the default sinkFields are {@link Fields#ALL}.
041     * <p/>
042     * Any given sourceFields only label the values in the {@link Tuple}s as they are sourced.
043     * It does not necessarily filter the output since a given implementation may choose to
044     * collapse values and ignore keys depending on the format.
045     * <p/>
046     * If the sinkFields are {@link Fields#ALL}, the Cascading planner will attempt to resolve the actual field names
047     * and make them available via the {@link cascading.scheme.SinkCall#getOutgoingEntry()} method. Sometimes this may
048     * not be possible (in the case the {@link Tap#openForWrite(cascading.flow.FlowProcess)} method is called from user
049     * code directly (without planner intervention).
050     * <p/>
051     * If the sinkFields are a valid selector, the {@link #sink(cascading.flow.FlowProcess, SinkCall)} method will
052     * only see the fields expected.
053     * <p/>
054     * Setting the {@code numSinkParts} value to 1 (one) attempts to ensure the output resource has only one part.
055     * In the case of MapReduce, this is only a suggestion for the Map side, on the Reduce side it does this by
056     * setting the number of reducers to the given value. This may affect performance, so be cautioned.
057     * </p>
058     * Note that setting numSinkParts does not force the planner to insert a final Reduce operation in the job, so
059     * numSinkParts may be ignored entirely if the final job is Map only. To force the Flow to have a final Reduce,
060     * add a {@link cascading.pipe.GroupBy} to the assembly before sinking.
061     */
062    public abstract class Scheme<Config, Input, Output, SourceContext, SinkContext> implements Serializable, Traceable
063      {
064      /** Field sinkFields */
065      Fields sinkFields = Fields.ALL;
066      /** Field sourceFields */
067      Fields sourceFields = Fields.UNKNOWN;
068      /** Field numSinkParts */
069      int numSinkParts;
070      /** Field trace */
071      private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
072    
073      /** Constructor Scheme creates a new Scheme instance. */
074      protected Scheme()
075        {
076        }
077    
078      /**
079       * Constructor Scheme creates a new Scheme instance.
080       *
081       * @param sourceFields of type Fields
082       */
083      protected Scheme( Fields sourceFields )
084        {
085        setSourceFields( sourceFields );
086        }
087    
088      /**
089       * Constructor Scheme creates a new Scheme instance.
090       *
091       * @param sourceFields of type Fields
092       * @param numSinkParts of type int
093       */
094      protected Scheme( Fields sourceFields, int numSinkParts )
095        {
096        setSourceFields( sourceFields );
097        this.numSinkParts = numSinkParts;
098        }
099    
100      /**
101       * Constructor Scheme creates a new Scheme instance.
102       *
103       * @param sourceFields of type Fields
104       * @param sinkFields   of type Fields
105       */
106      protected Scheme( Fields sourceFields, Fields sinkFields )
107        {
108        setSourceFields( sourceFields );
109        setSinkFields( sinkFields );
110        }
111    
112      /**
113       * Constructor Scheme creates a new Scheme instance.
114       *
115       * @param sourceFields of type Fields
116       * @param sinkFields   of type Fields
117       * @param numSinkParts of type int
118       */
119      protected Scheme( Fields sourceFields, Fields sinkFields, int numSinkParts )
120        {
121        setSourceFields( sourceFields );
122        setSinkFields( sinkFields );
123        this.numSinkParts = numSinkParts;
124        }
125    
126      /**
127       * Method getSinkFields returns the sinkFields of this Scheme object.
128       *
129       * @return the sinkFields (type Fields) of this Scheme object.
130       */
131      public Fields getSinkFields()
132        {
133        return sinkFields;
134        }
135    
136      /**
137       * Method setSinkFields sets the sinkFields of this Scheme object.
138       *
139       * @param sinkFields the sinkFields of this Scheme object.
140       */
141      public void setSinkFields( Fields sinkFields )
142        {
143        if( sinkFields.isUnknown() )
144          this.sinkFields = Fields.ALL;
145        else
146          this.sinkFields = sinkFields;
147        }
148    
149      /**
150       * Method getSourceFields returns the sourceFields of this Scheme object.
151       *
152       * @return the sourceFields (type Fields) of this Scheme object.
153       */
154      public Fields getSourceFields()
155        {
156        return sourceFields;
157        }
158    
159      /**
160       * Method setSourceFields sets the sourceFields of this Scheme object.
161       *
162       * @param sourceFields the sourceFields of this Scheme object.
163       */
164      public void setSourceFields( Fields sourceFields )
165        {
166        if( sourceFields.isAll() )
167          this.sourceFields = Fields.UNKNOWN;
168        else
169          this.sourceFields = sourceFields;
170        }
171    
172      /**
173       * Method getNumSinkParts returns the numSinkParts of this Scheme object.
174       *
175       * @return the numSinkParts (type int) of this Scheme object.
176       */
177      public int getNumSinkParts()
178        {
179        return numSinkParts;
180        }
181    
182      /**
183       * Method setNumSinkParts sets the numSinkParts of this Scheme object.
184       *
185       * @param numSinkParts the numSinkParts of this Scheme object.
186       */
187      public void setNumSinkParts( int numSinkParts )
188        {
189        this.numSinkParts = numSinkParts;
190        }
191    
192      @Override
193      public String getTrace()
194        {
195        return trace;
196        }
197    
198      /**
199       * Method isSymmetrical returns {@code true} if the sink fields equal the source fields. That is, this
200       * scheme sources the same fields as it sinks.
201       *
202       * @return the symmetrical (type boolean) of this Scheme object.
203       */
204      public boolean isSymmetrical()
205        {
206        return getSourceFields().equals( Fields.UNKNOWN ) && getSinkFields().equals( Fields.ALL ) || getSinkFields().equals( getSourceFields() );
207        }
208    
209      /**
210       * Method isSource returns true if this Scheme instance can be used as a source.
211       *
212       * @return boolean
213       */
214      public boolean isSource()
215        {
216        return true;
217        }
218    
219      /**
220       * Method isSink returns true if this Scheme instance can be used as a sink.
221       *
222       * @return boolean
223       */
224      public boolean isSink()
225        {
226        return true;
227        }
228    
229      /**
230       * Method retrieveSourceFields notifies a Scheme when it is appropriate to dynamically
231       * update the fields it sources. By default the current declared fields are returned.
232       * <p/>
233       * The {@code FlowProcess} presents all known properties resolved by the current planner.
234       * <p/>
235       * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
236       *
237       * @param flowProcess of type FlowProcess
238       * @param tap         of type Tap
239       * @return Fields
240       */
241      public Fields retrieveSourceFields( FlowProcess<Config> flowProcess, Tap tap )
242        {
243        return getSourceFields();
244        }
245    
246      /**
247       * Method presentSourceFields is called after the planner is invoked and all fields are resolved. This
248       * method presents to the Scheme the actual source fields after any planner intervention.
249       * <p/>
250       * This method is called after {@link #retrieveSourceFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
251       *
252       * @param flowProcess of type FlowProcess
253       * @param tap         of type Tap
254       * @param fields      of type Fields
255       */
256      public void presentSourceFields( FlowProcess<Config> flowProcess, Tap tap, Fields fields )
257        {
258        presentSourceFieldsInternal( fields );
259        }
260    
261      protected void presentSourceFieldsInternal( Fields fields )
262        {
263        if( getSourceFields().equals( Fields.UNKNOWN ) )
264          setSourceFields( fields );
265        }
266    
267      /**
268       * Method retrieveSinkFields notifies a Scheme when it is appropriate to dynamically
269       * update the fields it sources. By default the current declared fields are returned.
270       * <p/>
271       * The {@code FlowProcess} presents all known properties resolved by the current planner.
272       * <p/>
273       * The {@code tap} instance is the parent {@link Tap} for this Scheme instance.
274       *
275       * @param flowProcess of type FlowProcess
276       * @param tap         of type Tap
277       * @return Fields
278       */
279      public Fields retrieveSinkFields( FlowProcess<Config> flowProcess, Tap tap )
280        {
281        return getSinkFields();
282        }
283    
284      /**
285       * Method presentSinkFields is called after the planner is invoked and all fields are resolved. This
286       * method presents to the Scheme the actual source fields after any planner intervention.
287       * <p/>
288       * This method is called after {@link #retrieveSinkFields(cascading.flow.FlowProcess, cascading.tap.Tap)}.
289       *
290       * @param flowProcess of type FlowProcess
291       * @param tap         of type Tap
292       * @param fields      of type Fields
293       */
294      public void presentSinkFields( FlowProcess<Config> flowProcess, Tap tap, Fields fields )
295        {
296        presentSinkFieldsInternal( fields );
297        }
298    
299      protected void presentSinkFieldsInternal( Fields fields )
300        {
301        if( getSinkFields().equals( Fields.ALL ) )
302          setSinkFields( fields );
303        }
304    
305      /**
306       * Method sourceInit initializes this instance as a source.
307       * <p/>
308       * This method is executed client side as a means to provide necessary configuration parameters
309       * used by the underlying platform.
310       * <p/>
311       * It is not intended to initialize resources that would be necessary during the execution of this
312       * class, like a "formatter" or "parser".
313       * <p/>
314       * See {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)} if resources much be initialized
315       * before use. And {@link #sourceCleanup(cascading.flow.FlowProcess, SourceCall)} if resources must be
316       * destroyed after use.
317       *
318       * @param flowProcess of type FlowProcess
319       * @param tap         of type Tap
320       * @param conf        of type Config
321       */
322      public abstract void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
323    
324      /**
325       * Method sinkInit initializes this instance as a sink.
326       * <p/>
327       * This method is executed client side as a means to provide necessary configuration parameters
328       * used by the underlying platform.
329       * <p/>
330       * It is not intended to initialize resources that would be necessary during the execution of this
331       * class, like a "formatter" or "parser".
332       * <p/>
333       * See {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)} if resources much be initialized
334       * before use. And {@link #sinkCleanup(cascading.flow.FlowProcess, SinkCall)} if resources must be
335       * destroyed after use.
336       *
337       * @param flowProcess of type FlowProcess
338       * @param tap         of type Tap
339       * @param conf        of type Config
340       */
341      public abstract void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf );
342    
343      /**
344       * Method sourcePrepare is used to initialize resources needed during each call of
345       * {@link #source(cascading.flow.FlowProcess, SourceCall)}.
346       * <p/>
347       * Be sure to place any initialized objects in the {@code SourceContext} so each instance
348       * will remain threadsafe.
349       *
350       * @param flowProcess of type FlowProcess
351       * @param sourceCall  of type SourceCall<SourceContext, Input>
352       */
353      public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
354        {
355        }
356    
357      /**
358       * Method source will read a new "record" or value from {@link cascading.scheme.SourceCall#getInput()} and populate
359       * the available {@link Tuple} via {@link cascading.scheme.SourceCall#getIncomingEntry()} and return {@code true}
360       * on success or {@code false} if no more values available.
361       * <p/>
362       * It's ok to set a new Tuple instance on the {@code incomingEntry} {@link cascading.tuple.TupleEntry}, or
363       * to simply re-use the existing instance.
364       * <p/>
365       * Note this is only time it is safe to modify a Tuple instance handed over via a method call.
366       * <p/>
367       * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
368       * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
369       * any applicable failure trap Tap.
370       *
371       * @param flowProcess of type FlowProcess
372       * @param sourceCall  of SourceCall
373       * @return returns {@code true} when a Tuple was successfully read
374       */
375      public abstract boolean source( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException;
376    
377      /**
378       * Method sourceCleanup is used to destroy resources created by
379       * {@link #sourcePrepare(cascading.flow.FlowProcess, SourceCall)}.
380       *
381       * @param flowProcess of Process
382       * @param sourceCall  of type SourceCall<SourceContext, Input>
383       */
384      public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<SourceContext, Input> sourceCall ) throws IOException
385        {
386        }
387    
388      /**
389       * Method sinkPrepare is used to initialize resources needed during each call of
390       * {@link #sink(cascading.flow.FlowProcess, SinkCall)}.
391       * <p/>
392       * Be sure to place any initialized objects in the {@code SinkContext} so each instance
393       * will remain threadsafe.
394       *
395       * @param flowProcess of type FlowProcess
396       * @param sinkCall    of type SinkCall<SinkContext, Output>
397       */
398      public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
399        {
400        }
401    
402      /**
403       * Method sink writes out the given {@link Tuple} found on {@link cascading.scheme.SinkCall#getOutgoingEntry()} to
404       * the {@link cascading.scheme.SinkCall#getOutput()}.
405       * <p/>
406       * This method may optionally throw a {@link cascading.tap.TapException} if it cannot process a particular
407       * instance of data. If the payload Tuple is set on the TapException, that Tuple will be written to
408       * any applicable failure trap Tap. If not set, the incoming Tuple will be written instead.
409       *
410       * @param flowProcess of Process
411       * @param sinkCall    of SinkCall
412       */
413      public abstract void sink( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException;
414    
415      /**
416       * Method sinkCleanup is used to destroy resources created by
417       * {@link #sinkPrepare(cascading.flow.FlowProcess, SinkCall)}.
418       *
419       * @param flowProcess of type FlowProcess
420       * @param sinkCall    of type SinkCall<SinkContext, Output>
421       */
422      public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<SinkContext, Output> sinkCall ) throws IOException
423        {
424        }
425    
426      @Override
427      public boolean equals( Object object )
428        {
429        if( this == object )
430          return true;
431        if( object == null || getClass() != object.getClass() )
432          return false;
433    
434        Scheme scheme = (Scheme) object;
435    
436        if( numSinkParts != scheme.numSinkParts )
437          return false;
438        if( sinkFields != null ? !sinkFields.equals( scheme.sinkFields ) : scheme.sinkFields != null )
439          return false;
440        if( sourceFields != null ? !sourceFields.equals( scheme.sourceFields ) : scheme.sourceFields != null )
441          return false;
442    
443        return true;
444        }
445    
446      @Override
447      public String toString()
448        {
449        if( getSinkFields().equals( getSourceFields() ) )
450          return getClass().getSimpleName() + "[" + getSourceFields().print() + "]";
451        else
452          return getClass().getSimpleName() + "[" + getSourceFields().print() + "->" + getSinkFields().print() + "]";
453        }
454    
455      public int hashCode()
456        {
457        int result;
458        result = sinkFields != null ? sinkFields.hashCode() : 0;
459        result = 31 * result + ( sourceFields != null ? sourceFields.hashCode() : 0 );
460        result = 31 * result + numSinkParts;
461        return result;
462        }
463      }