001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.io.IOException;
024    
025    import cascading.CascadingException;
026    import cascading.flow.FlowProcess;
027    import cascading.flow.SliceCounters;
028    import cascading.flow.StepCounters;
029    import cascading.tap.Tap;
030    import cascading.tuple.Fields;
031    import cascading.tuple.TupleEntry;
032    import cascading.tuple.TupleEntryCollector;
033    
034    /**
035     *
036     */
037    public class SinkStage extends ElementStage<TupleEntry, Void>
038      {
039      private final Tap sink;
040      private TupleEntryCollector collector;
041    
042      public SinkStage( FlowProcess flowProcess, Tap sink )
043        {
044        super( flowProcess, sink );
045        this.sink = sink;
046        }
047    
048      @Override
049      public void bind( StreamGraph streamGraph )
050        {
051        // do not bind
052        }
053    
054      @Override
055      public void prepare()
056        {
057        try
058          {
059          // todo: pass the resolved fields down
060          collector = sink.openForWrite( flowProcess, getOutput() );
061    
062          if( sink.getSinkFields().isAll() )
063            {
064            Fields fields = getIncomingScopes().get( 0 ).getIncomingTapFields();
065            collector.setFields( fields );
066            }
067          }
068        catch( IOException exception )
069          {
070          throw new DuctException( "failed opening sink", exception );
071          }
072        }
073    
074      protected Object getOutput()
075        {
076        return null;
077        }
078    
079      @Override
080      public void start( Duct previous )
081        {
082        // do nothing
083        }
084    
085      @Override
086      public void receive( Duct previous, TupleEntry tupleEntry )
087        {
088        try
089          {
090          collector.add( tupleEntry );
091          flowProcess.increment( StepCounters.Tuples_Written, 1 );
092          flowProcess.increment( SliceCounters.Tuples_Written, 1 );
093          }
094        catch( OutOfMemoryError error )
095          {
096          handleReThrowableException( "out of memory, try increasing task memory allocation", error );
097          }
098        catch( CascadingException exception )
099          {
100          handleException( exception, tupleEntry );
101          }
102        catch( Throwable throwable )
103          {
104          handleException( new DuctException( "internal error: " + tupleEntry.getTuple().print(), throwable ), tupleEntry );
105          }
106        }
107    
108      @Override
109      public void complete( Duct previous )
110        {
111        // do nothing
112        }
113    
114      @Override
115      public void cleanup()
116        {
117        try
118          {
119          if( collector != null )
120            collector.close();
121    
122          collector = null;
123          }
124        finally
125          {
126          super.cleanup();
127          }
128        }
129      }