001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.Closeable;
024    import java.io.IOException;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.tap.Tap;
028    import cascading.tap.TapException;
029    import cascading.tap.hadoop.util.Hadoop18TapUtil;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.hadoop.mapred.FileOutputFormat;
032    import org.apache.hadoop.mapred.JobConf;
033    import org.apache.hadoop.mapred.OutputCollector;
034    import org.apache.hadoop.mapred.OutputFormat;
035    import org.apache.hadoop.mapred.RecordReader;
036    import org.apache.hadoop.mapred.RecordWriter;
037    import org.apache.hadoop.mapred.Reporter;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     *
043     */
044    public class TapOutputCollector implements OutputCollector, Closeable
045      {
046      private static final Logger LOG = LoggerFactory.getLogger( TapOutputCollector.class );
047    
048      public static final String PART_TASK_PATTERN = "%s%spart-%05d";
049      public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d";
050    
051      /** Field conf */
052      private JobConf conf;
053      /** Field writer */
054      private RecordWriter writer;
055      /** Field filenamePattern */
056      private String filenamePattern;
057      /** Field filename */
058      private String filename;
059      /** Field tap */
060      private Tap<JobConf, RecordReader, OutputCollector> tap;
061      /** Field prefix */
062      private String prefix;
063      /** Field sequence */
064      private long sequence;
065      /** Field isFileOutputFormat */
066      private boolean isFileOutputFormat;
067      /** Field reporter */
068      private final Reporter reporter = Reporter.NULL;
069      private final FlowProcess<JobConf> flowProcess;
070    
071      public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap ) throws IOException
072        {
073        this( flowProcess, tap, null );
074        }
075    
076      public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, String prefix ) throws IOException
077        {
078        this( flowProcess, tap, prefix, -1 );
079        }
080    
081      public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, String prefix, long sequence ) throws IOException
082        {
083        this.tap = tap;
084        this.sequence = sequence;
085        this.prefix = prefix == null || prefix.length() == 0 ? null : prefix;
086        this.flowProcess = flowProcess;
087        this.conf = this.flowProcess.getConfigCopy();
088        this.filenamePattern = this.conf.get( "cascading.tapcollector.partname", sequence == -1 ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN );
089    
090        initialize();
091        }
092    
093      protected void initialize() throws IOException
094        {
095        tap.sinkConfInit( flowProcess, conf );
096    
097        OutputFormat outputFormat = conf.getOutputFormat();
098    
099        isFileOutputFormat = outputFormat instanceof FileOutputFormat;
100    
101        if( isFileOutputFormat )
102          {
103          Hadoop18TapUtil.setupJob( conf );
104          Hadoop18TapUtil.setupTask( conf );
105    
106          if( prefix != null )
107            filename = String.format( filenamePattern, prefix, "/", conf.getInt( "mapred.task.partition", 0 ), sequence );
108          else
109            filename = String.format( filenamePattern, "", "", conf.getInt( "mapred.task.partition", 0 ), sequence );
110          }
111    
112        LOG.info( "creating path: {}", filename );
113    
114        writer = outputFormat.getRecordWriter( null, conf, filename, Reporter.NULL );
115        }
116    
117      /**
118       * Method collect writes the given values to the {@link Tap} this instance encapsulates.
119       *
120       * @param writableComparable of type WritableComparable
121       * @param writable           of type Writable
122       * @throws IOException when
123       */
124      public void collect( Object writableComparable, Object writable ) throws IOException
125        {
126        flowProcess.keepAlive();
127        writer.write( writableComparable, writable );
128        }
129    
130      public void close()
131        {
132        try
133          {
134          if( isFileOutputFormat )
135            LOG.info( "closing tap collector for: {}", new Path( tap.getIdentifier(), filename ) );
136          else
137            LOG.info( "closing tap collector for: {}", tap );
138    
139          try
140            {
141            writer.close( reporter );
142            }
143          finally
144            {
145            if( isFileOutputFormat )
146              {
147              if( Hadoop18TapUtil.needsTaskCommit( conf ) )
148                Hadoop18TapUtil.commitTask( conf );
149    
150              Hadoop18TapUtil.cleanupJob( conf );
151              }
152            }
153          }
154        catch( IOException exception )
155          {
156          LOG.warn( "exception closing: {}", filename, exception );
157          throw new TapException( "exception closing: " + filename, exception );
158          }
159        }
160    
161      }