001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple;
023
024import java.io.Closeable;
025import java.io.Flushable;
026import java.io.IOException;
027import java.util.function.Supplier;
028
029import cascading.flow.FlowProcess;
030import cascading.scheme.ConcreteCall;
031import cascading.scheme.Scheme;
032import cascading.tap.Tap;
033import cascading.tap.TapException;
034
035/**
036 * Class TupleEntrySchemeCollector is a helper class for wrapping a {@link Scheme} instance, calling
037 * {@link Scheme#sink(cascading.flow.FlowProcess, cascading.scheme.SinkCall)} on every call to {@link #add(TupleEntry)}
038 * or {@link #add(Tuple)}.
039 * <p>
040 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
041 * {@link cascading.tap.Tap#openForWrite(cascading.flow.FlowProcess)} method.
042 */
043public class TupleEntrySchemeCollector<Config, Output> extends TupleEntryCollector
044  {
045  private final FlowProcess<? extends Config> flowProcess;
046  private final Scheme<Config, ?, Output, ?, Object> scheme;
047
048  protected final ConcreteCall<Object, Output> sinkCall;
049  private Supplier<String> loggableIdentifier = () -> "'unknown'";
050  private boolean prepared = false;
051
052  @Deprecated
053  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme )
054    {
055    this( flowProcess, scheme, null, null );
056    }
057
058  @Deprecated
059  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, String loggableIdentifier )
060    {
061    this( flowProcess, scheme, null, loggableIdentifier );
062    }
063
064  @Deprecated
065  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, Output output )
066    {
067    this( flowProcess, scheme, output, null );
068    }
069
070  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Output output )
071    {
072    this( flowProcess, tap, tap.getScheme(), output, tap.getIdentifier() );
073    }
074
075  @Deprecated
076  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, Output output, String loggableIdentifier )
077    {
078    this( flowProcess, null, scheme, output, loggableIdentifier );
079    }
080
081  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme )
082    {
083    this( flowProcess, tap, scheme, null, (Supplier<String>) null );
084    }
085
086  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, String loggableIdentifier )
087    {
088    this( flowProcess, tap, scheme, null, loggableIdentifier );
089    }
090
091  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Output output )
092    {
093    this( flowProcess, tap, scheme, output, (Supplier<String>) null );
094    }
095
096  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Output output, String loggableIdentifier )
097    {
098    this( flowProcess, tap, scheme, output, loggableIdentifier == null ? null : () -> loggableIdentifier );
099    }
100
101  public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Output output, Supplier<String> loggableIdentifier )
102    {
103    super( Fields.asDeclaration( scheme.getSinkFields() ) );
104    this.flowProcess = flowProcess;
105    this.scheme = scheme;
106
107    if( loggableIdentifier != null )
108      this.loggableIdentifier = loggableIdentifier; // only used for logging
109
110    this.sinkCall = createSinkCall();
111    this.sinkCall.setTap( tap );
112    this.sinkCall.setOutgoingEntry( this.tupleEntry ); // created in super ctor
113
114    if( output != null )
115      setOutput( output );
116    }
117
118  /**
119   * Override to provide custom ConcreteCall implementation to expose Tap level resources to the underlying Scheme.
120   *
121   * @return a new ConcreteCall instance
122   */
123  protected <Context, IO> ConcreteCall<Context, IO> createSinkCall()
124    {
125    return new ConcreteCall<>();
126    }
127
128  protected FlowProcess<? extends Config> getFlowProcess()
129    {
130    return flowProcess;
131    }
132
133  @Override
134  public void setFields( Fields declared )
135    {
136    super.setFields( declared );
137
138    if( this.sinkCall != null )
139      this.sinkCall.setOutgoingEntry( this.tupleEntry );
140    }
141
142  protected Output getOutput()
143    {
144    return sinkCall.getOutput();
145    }
146
147  protected void setOutput( Output output )
148    {
149    sinkCall.setOutput( wrapOutput( output ) );
150    }
151
152  protected Output wrapOutput( Output output )
153    {
154    try
155      {
156      return scheme.sinkWrap( flowProcess, output );
157      }
158    catch( IOException exception )
159      {
160      throw new TapException( "could not wrap scheme", exception );
161      }
162    }
163
164  /** Need to defer preparing the scheme till after the fields have been resolved */
165  protected void prepare()
166    {
167    try
168      {
169      scheme.sinkPrepare( flowProcess, sinkCall );
170      }
171    catch( IOException exception )
172      {
173      throw new TapException( "could not prepare scheme", exception );
174      }
175
176    prepared = true;
177    }
178
179  @Override
180  public void add( TupleEntry tupleEntry )
181    {
182    if( !prepared )
183      prepare();
184
185    super.add( tupleEntry );
186    }
187
188  @Override
189  public void add( Tuple tuple )
190    {
191    if( !prepared ) // this is unfortunate
192      prepare();
193
194    super.add( tuple );
195    }
196
197  @Override
198  protected void collect( TupleEntry tupleEntry ) throws IOException
199    {
200    sinkCall.setOutgoingEntry( tupleEntry );
201
202    try
203      {
204      scheme.sink( flowProcess, sinkCall );
205      }
206    catch( Exception exception )
207      {
208      throw new TupleException( "unable to sink into output identifier: " + loggableIdentifier.get(), exception );
209      }
210    }
211
212  @Override
213  public void close()
214    {
215    try
216      {
217      if( sinkCall == null )
218        return;
219
220      try
221        {
222        if( prepared )
223          scheme.sinkCleanup( flowProcess, sinkCall );
224        }
225      catch( IOException exception )
226        {
227        throw new TupleException( "unable to cleanup sink for output identifier: " + loggableIdentifier.get(), exception );
228        }
229      }
230    finally
231      {
232      try
233        {
234        if( getOutput() instanceof Flushable )
235          ( (Flushable) getOutput() ).flush();
236        }
237      catch( IOException exception )
238        {
239        // do nothing
240        }
241
242      try
243        {
244        if( getOutput() instanceof Closeable )
245          ( (Closeable) getOutput() ).close();
246        }
247      catch( IOException exception )
248        {
249        // do nothing
250        }
251
252      super.close();
253      }
254    }
255  }