001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.hadoop.io;
023
024import java.io.Closeable;
025import java.io.IOException;
026
027import cascading.flow.FlowProcess;
028import cascading.flow.hadoop.MapRed;
029import cascading.flow.hadoop.util.HadoopUtil;
030import cascading.tap.Tap;
031import cascading.tap.TapException;
032import cascading.tap.hadoop.util.Hadoop18TapUtil;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.mapred.FileOutputFormat;
036import org.apache.hadoop.mapred.OutputCollector;
037import org.apache.hadoop.mapred.OutputFormat;
038import org.apache.hadoop.mapred.RecordReader;
039import org.apache.hadoop.mapred.RecordWriter;
040import org.apache.hadoop.mapred.Reporter;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance;
045
046/**
047 *
048 */
049public class TapOutputCollector implements OutputCollector, Closeable
050  {
051  private static final Logger LOG = LoggerFactory.getLogger( TapOutputCollector.class );
052
053  public static final String PART_TASK_PATTERN = "%s%spart-%05d";
054  public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d";
055
056  /** Field conf */
057  private Configuration conf;
058  /** Field writer */
059  private RecordWriter writer;
060  /** Field filenamePattern */
061  private String filenamePattern;
062  /** Field filename */
063  private String filename;
064  /** Field tap */
065  private Tap<Configuration, RecordReader, OutputCollector> tap;
066  /** Field prefix */
067  private String prefix;
068  /** Field sequence */
069  private long sequence;
070  /** Field isFileOutputFormat */
071  private boolean isFileOutputFormat;
072  private final FlowProcess<? extends Configuration> flowProcess;
073
074  public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap ) throws IOException
075    {
076    this( flowProcess, tap, null );
077    }
078
079  public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix ) throws IOException
080    {
081    this( flowProcess, tap, prefix, -1 );
082    }
083
084  public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix, long sequence ) throws IOException
085    {
086    this.tap = tap;
087    this.sequence = sequence;
088    this.prefix = prefix == null || prefix.length() == 0 ? null : prefix;
089    this.flowProcess = flowProcess;
090    this.conf = this.flowProcess.getConfigCopy();
091    this.filenamePattern = this.conf.get( "cascading.tapcollector.partname", sequence == -1 ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN );
092
093    initialize();
094    }
095
096  protected void initialize() throws IOException
097    {
098    tap.sinkConfInit( flowProcess, conf );
099
100    OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
101
102    // todo: use OutputCommitter class
103
104    isFileOutputFormat = outputFormat instanceof FileOutputFormat;
105
106    if( isFileOutputFormat )
107      {
108      Hadoop18TapUtil.setupJob( conf );
109      Hadoop18TapUtil.setupTask( conf );
110
111      int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
112
113      if( prefix != null )
114        filename = String.format( filenamePattern, prefix, "/", partition, sequence );
115      else
116        filename = String.format( filenamePattern, "", "", partition, sequence );
117      }
118
119    LOG.info( "creating path: {}", filename );
120
121    writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );
122    }
123
124  private Reporter getReporter()
125    {
126    Reporter reporter = Reporter.NULL;
127
128    if( flowProcess instanceof MapRed )
129      reporter = ( (MapRed) flowProcess ).getReporter(); // may return Reporter.NULL
130
131    return reporter;
132    }
133
134  /**
135   * Method collect writes the given values to the {@link Tap} this instance encapsulates.
136   *
137   * @param writableComparable of type WritableComparable
138   * @param writable           of type Writable
139   * @throws IOException when
140   */
141  public void collect( Object writableComparable, Object writable ) throws IOException
142    {
143    flowProcess.keepAlive();
144    writer.write( writableComparable, writable );
145    }
146
147  public void close()
148    {
149    try
150      {
151      if( isFileOutputFormat )
152        LOG.info( "closing tap collector for: {}", new Path( tap.getIdentifier(), filename ) );
153      else
154        LOG.info( "closing tap collector for: {}", tap );
155
156      try
157        {
158        writer.close( getReporter() );
159        }
160      finally
161        {
162        if( isFileOutputFormat )
163          {
164          boolean needsTaskCommit = Hadoop18TapUtil.needsTaskCommit( conf );
165
166          if( needsTaskCommit )
167            Hadoop18TapUtil.commitTask( conf );
168
169          Hadoop18TapUtil.cleanupJob( conf );
170
171          if( !HadoopUtil.isInflow( conf ) )
172            Hadoop18TapUtil.writeSuccessMarker( conf );
173          }
174        }
175      }
176    catch( IOException exception )
177      {
178      LOG.warn( "exception closing: {}", filename, exception );
179      throw new TapException( "exception closing: " + filename, exception );
180      }
181    }
182  }