001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop.io; 022 023import java.io.Closeable; 024import java.io.IOException; 025 026import cascading.flow.FlowProcess; 027import cascading.flow.hadoop.MapRed; 028import cascading.tap.Tap; 029import cascading.tap.TapException; 030import cascading.tap.hadoop.util.Hadoop18TapUtil; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.mapred.FileOutputFormat; 034import org.apache.hadoop.mapred.OutputCollector; 035import org.apache.hadoop.mapred.OutputFormat; 036import org.apache.hadoop.mapred.RecordReader; 037import org.apache.hadoop.mapred.RecordWriter; 038import org.apache.hadoop.mapred.Reporter; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance; 043 044/** 045 * 046 */ 047public class TapOutputCollector implements OutputCollector, Closeable 048 { 049 private static final Logger LOG = LoggerFactory.getLogger( TapOutputCollector.class ); 050 051 public static final String PART_TASK_PATTERN = "%s%spart-%05d"; 052 public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d"; 053 054 /** Field conf */ 055 private Configuration conf; 056 /** Field writer */ 057 private RecordWriter writer; 058 /** Field filenamePattern */ 059 private String filenamePattern; 060 /** Field filename */ 061 private String filename; 062 /** Field tap */ 063 private Tap<Configuration, RecordReader, OutputCollector> tap; 064 /** Field prefix */ 065 private String prefix; 066 /** Field sequence */ 067 private long sequence; 068 /** Field isFileOutputFormat */ 069 private boolean isFileOutputFormat; 070 private final FlowProcess<? extends Configuration> flowProcess; 071 072 public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap ) throws IOException 073 { 074 this( flowProcess, tap, null ); 075 } 076 077 public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix ) throws IOException 078 { 079 this( flowProcess, tap, prefix, -1 ); 080 } 081 082 public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix, long sequence ) throws IOException 083 { 084 this.tap = tap; 085 this.sequence = sequence; 086 this.prefix = prefix == null || prefix.length() == 0 ? null : prefix; 087 this.flowProcess = flowProcess; 088 this.conf = this.flowProcess.getConfigCopy(); 089 this.filenamePattern = this.conf.get( "cascading.tapcollector.partname", sequence == -1 ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN ); 090 091 initialize(); 092 } 093 094 protected void initialize() throws IOException 095 { 096 tap.sinkConfInit( flowProcess, conf ); 097 098 OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat(); 099 100 // todo: use OutputCommitter class 101 102 isFileOutputFormat = outputFormat instanceof FileOutputFormat; 103 104 if( isFileOutputFormat ) 105 { 106 Hadoop18TapUtil.setupJob( conf ); 107 Hadoop18TapUtil.setupTask( conf ); 108 109 int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) ); 110 111 if( prefix != null ) 112 filename = String.format( filenamePattern, prefix, "/", partition, sequence ); 113 else 114 filename = String.format( filenamePattern, "", "", partition, sequence ); 115 } 116 117 LOG.info( "creating path: {}", filename ); 118 119 writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() ); 120 } 121 122 private Reporter getReporter() 123 { 124 Reporter reporter = Reporter.NULL; 125 126 if( flowProcess instanceof MapRed ) 127 reporter = ( (MapRed) flowProcess ).getReporter(); // may return Reporter.NULL 128 129 return reporter; 130 } 131 132 /** 133 * Method collect writes the given values to the {@link Tap} this instance encapsulates. 134 * 135 * @param writableComparable of type WritableComparable 136 * @param writable of type Writable 137 * @throws IOException when 138 */ 139 public void collect( Object writableComparable, Object writable ) throws IOException 140 { 141 flowProcess.keepAlive(); 142 writer.write( writableComparable, writable ); 143 } 144 145 public void close() 146 { 147 try 148 { 149 if( isFileOutputFormat ) 150 LOG.info( "closing tap collector for: {}", new Path( tap.getIdentifier(), filename ) ); 151 else 152 LOG.info( "closing tap collector for: {}", tap ); 153 154 try 155 { 156 writer.close( getReporter() ); 157 } 158 finally 159 { 160 if( isFileOutputFormat ) 161 { 162 if( Hadoop18TapUtil.needsTaskCommit( conf ) ) 163 Hadoop18TapUtil.commitTask( conf ); 164 165 Hadoop18TapUtil.cleanupJob( conf ); 166 } 167 } 168 } 169 catch( IOException exception ) 170 { 171 LOG.warn( "exception closing: {}", filename, exception ); 172 throw new TapException( "exception closing: " + filename, exception ); 173 } 174 } 175 }