001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.hadoop.io; 023 024import java.io.Closeable; 025import java.io.IOException; 026 027import cascading.flow.FlowProcess; 028import cascading.flow.hadoop.MapRed; 029import cascading.flow.hadoop.util.HadoopUtil; 030import cascading.tap.Tap; 031import cascading.tap.TapException; 032import cascading.tap.hadoop.util.Hadoop18TapUtil; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.mapred.FileOutputFormat; 036import org.apache.hadoop.mapred.OutputCollector; 037import org.apache.hadoop.mapred.OutputFormat; 038import org.apache.hadoop.mapred.RecordReader; 039import org.apache.hadoop.mapred.RecordWriter; 040import org.apache.hadoop.mapred.Reporter; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance; 045 046/** 047 * 048 */ 049public class TapOutputCollector implements OutputCollector, Closeable 050 { 051 private static final Logger LOG = LoggerFactory.getLogger( TapOutputCollector.class ); 052 053 public static final String PART_TASK_PATTERN = "%s%spart-%05d"; 054 public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d"; 055 056 /** Field conf */ 057 private Configuration conf; 058 /** Field writer */ 059 private RecordWriter writer; 060 /** Field filenamePattern */ 061 private String filenamePattern; 062 /** Field filename */ 063 private String filename; 064 /** Field tap */ 065 private Tap<Configuration, RecordReader, OutputCollector> tap; 066 /** Field prefix */ 067 private String prefix; 068 /** Field sequence */ 069 private long sequence; 070 /** Field isFileOutputFormat */ 071 private boolean isFileOutputFormat; 072 private final FlowProcess<? extends Configuration> flowProcess; 073 074 public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap ) throws IOException 075 { 076 this( flowProcess, tap, null ); 077 } 078 079 public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix ) throws IOException 080 { 081 this( flowProcess, tap, prefix, -1 ); 082 } 083 084 public TapOutputCollector( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix, long sequence ) throws IOException 085 { 086 this.tap = tap; 087 this.sequence = sequence; 088 this.prefix = prefix == null || prefix.length() == 0 ? null : prefix; 089 this.flowProcess = flowProcess; 090 this.conf = this.flowProcess.getConfigCopy(); 091 this.filenamePattern = this.conf.get( "cascading.tapcollector.partname", sequence == -1 ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN ); 092 093 initialize(); 094 } 095 096 protected void initialize() throws IOException 097 { 098 tap.sinkConfInit( flowProcess, conf ); 099 100 OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat(); 101 102 // todo: use OutputCommitter class 103 104 isFileOutputFormat = outputFormat instanceof FileOutputFormat; 105 106 if( isFileOutputFormat ) 107 { 108 Hadoop18TapUtil.setupJob( conf ); 109 Hadoop18TapUtil.setupTask( conf ); 110 111 int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) ); 112 113 if( prefix != null ) 114 filename = String.format( filenamePattern, prefix, "/", partition, sequence ); 115 else 116 filename = String.format( filenamePattern, "", "", partition, sequence ); 117 } 118 119 LOG.info( "creating path: {}", filename ); 120 121 writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() ); 122 } 123 124 private Reporter getReporter() 125 { 126 Reporter reporter = Reporter.NULL; 127 128 if( flowProcess instanceof MapRed ) 129 reporter = ( (MapRed) flowProcess ).getReporter(); // may return Reporter.NULL 130 131 return reporter; 132 } 133 134 /** 135 * Method collect writes the given values to the {@link Tap} this instance encapsulates. 136 * 137 * @param writableComparable of type WritableComparable 138 * @param writable of type Writable 139 * @throws IOException when 140 */ 141 public void collect( Object writableComparable, Object writable ) throws IOException 142 { 143 flowProcess.keepAlive(); 144 writer.write( writableComparable, writable ); 145 } 146 147 public void close() 148 { 149 try 150 { 151 if( isFileOutputFormat ) 152 LOG.info( "closing tap collector for: {}", new Path( tap.getIdentifier(), filename ) ); 153 else 154 LOG.info( "closing tap collector for: {}", tap ); 155 156 try 157 { 158 writer.close( getReporter() ); 159 } 160 finally 161 { 162 if( isFileOutputFormat ) 163 { 164 boolean needsTaskCommit = Hadoop18TapUtil.needsTaskCommit( conf ); 165 166 if( needsTaskCommit ) 167 Hadoop18TapUtil.commitTask( conf ); 168 169 Hadoop18TapUtil.cleanupJob( conf ); 170 171 if( !HadoopUtil.isInflow( conf ) ) 172 Hadoop18TapUtil.writeSuccessMarker( conf ); 173 } 174 } 175 } 176 catch( IOException exception ) 177 { 178 LOG.warn( "exception closing: {}", filename, exception ); 179 throw new TapException( "exception closing: " + filename, exception ); 180 } 181 } 182 }