001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.io;
022
023import java.io.Closeable;
024import java.io.IOException;
025
026import cascading.flow.FlowProcess;
027import cascading.flow.hadoop.MapRed;
028import cascading.tap.Tap;
029import cascading.tap.TapException;
030import cascading.tap.hadoop.util.Hadoop18TapUtil;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.mapred.FileOutputFormat;
034import org.apache.hadoop.mapred.OutputCollector;
035import org.apache.hadoop.mapred.OutputFormat;
036import org.apache.hadoop.mapred.RecordReader;
037import org.apache.hadoop.mapred.RecordWriter;
038import org.apache.hadoop.mapred.Reporter;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance;
043
044/**
045 *
046 */
047public class TapOutputCollector implements OutputCollector, Closeable
048  {
049  private static final Logger LOG = LoggerFactory.getLogger( TapOutputCollector.class );
050
051  public static final String PART_TASK_PATTERN = "%s%spart-%05d";
052  public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d";
053
054  /** Field conf */
055  private Configuration conf;
056  /** Field writer */
057  private RecordWriter writer;
058  /** Field filenamePattern */
059  private String filenamePattern;
060  /** Field filename */
061  private String filename;
062  /** Field tap */
063  private Tap<Configuration, RecordReader, OutputCollector> tap;
064  /** Field prefix */
065  private String prefix;
066  /** Field sequence */
067  private long sequence;
068  /** Field isFileOutputFormat */
069  private boolean isFileOutputFormat;
070  private final FlowProcess<? extends Configuration> flowProcess;
071
072  public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap ) throws IOException
073    {
074    this( flowProcess, tap, null );
075    }
076
077  public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix ) throws IOException
078    {
079    this( flowProcess, tap, prefix, -1 );
080    }
081
082  public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix, long sequence ) throws IOException
083    {
084    this.tap = tap;
085    this.sequence = sequence;
086    this.prefix = prefix == null || prefix.length() == 0 ? null : prefix;
087    this.flowProcess = flowProcess;
088    this.conf = this.flowProcess.getConfigCopy();
089    this.filenamePattern = this.conf.get( "cascading.tapcollector.partname", sequence == -1 ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN );
090
091    initialize();
092    }
093
094  protected void initialize() throws IOException
095    {
096    tap.sinkConfInit( flowProcess, conf );
097
098    OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
099
100    // todo: use OutputCommitter class
101
102    isFileOutputFormat = outputFormat instanceof FileOutputFormat;
103
104    if( isFileOutputFormat )
105      {
106      Hadoop18TapUtil.setupJob( conf );
107      Hadoop18TapUtil.setupTask( conf );
108
109      int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
110
111      if( prefix != null )
112        filename = String.format( filenamePattern, prefix, "/", partition, sequence );
113      else
114        filename = String.format( filenamePattern, "", "", partition, sequence );
115      }
116
117    LOG.info( "creating path: {}", filename );
118
119    writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );
120    }
121
122  private Reporter getReporter()
123    {
124    Reporter reporter = Reporter.NULL;
125
126    if( flowProcess instanceof MapRed )
127      reporter = ( (MapRed) flowProcess ).getReporter(); // may return Reporter.NULL
128
129    return reporter;
130    }
131
132  /**
133   * Method collect writes the given values to the {@link Tap} this instance encapsulates.
134   *
135   * @param writableComparable of type WritableComparable
136   * @param writable           of type Writable
137   * @throws IOException when
138   */
139  public void collect( Object writableComparable, Object writable ) throws IOException
140    {
141    flowProcess.keepAlive();
142    writer.write( writableComparable, writable );
143    }
144
145  public void close()
146    {
147    try
148      {
149      if( isFileOutputFormat )
150        LOG.info( "closing tap collector for: {}", new Path( tap.getIdentifier(), filename ) );
151      else
152        LOG.info( "closing tap collector for: {}", tap );
153
154      try
155        {
156        writer.close( getReporter() );
157        }
158      finally
159        {
160        if( isFileOutputFormat )
161          {
162          if( Hadoop18TapUtil.needsTaskCommit( conf ) )
163            Hadoop18TapUtil.commitTask( conf );
164
165          Hadoop18TapUtil.cleanupJob( conf );
166          }
167        }
168      }
169    catch( IOException exception )
170      {
171      LOG.warn( "exception closing: {}", filename, exception );
172      throw new TapException( "exception closing: " + filename, exception );
173      }
174    }
175  }