001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop.io; 022 023 import java.io.Closeable; 024 import java.io.IOException; 025 026 import cascading.flow.FlowProcess; 027 import cascading.tap.Tap; 028 import cascading.tap.TapException; 029 import cascading.tap.hadoop.util.Hadoop18TapUtil; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.hadoop.mapred.FileOutputFormat; 032 import org.apache.hadoop.mapred.JobConf; 033 import org.apache.hadoop.mapred.OutputCollector; 034 import org.apache.hadoop.mapred.OutputFormat; 035 import org.apache.hadoop.mapred.RecordReader; 036 import org.apache.hadoop.mapred.RecordWriter; 037 import org.apache.hadoop.mapred.Reporter; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * 043 */ 044 public class TapOutputCollector implements OutputCollector, Closeable 045 { 046 private static final Logger LOG = LoggerFactory.getLogger( TapOutputCollector.class ); 047 048 public static final String PART_TASK_PATTERN = "%s%spart-%05d"; 049 public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d"; 050 051 /** Field conf */ 052 private JobConf conf; 053 /** Field writer */ 054 private RecordWriter writer; 055 /** Field filenamePattern */ 056 private String filenamePattern; 057 /** Field filename */ 058 private String filename; 059 /** Field tap */ 060 private Tap<JobConf, RecordReader, OutputCollector> tap; 061 /** Field prefix */ 062 private String prefix; 063 /** Field sequence */ 064 private long sequence; 065 /** Field isFileOutputFormat */ 066 private boolean isFileOutputFormat; 067 /** Field reporter */ 068 private final Reporter reporter = Reporter.NULL; 069 private final FlowProcess<JobConf> flowProcess; 070 071 public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap ) throws IOException 072 { 073 this( flowProcess, tap, null ); 074 } 075 076 public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, String prefix ) throws IOException 077 { 078 this( flowProcess, tap, prefix, -1 ); 079 } 080 081 public TapOutputCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, String prefix, long sequence ) throws IOException 082 { 083 this.tap = tap; 084 this.sequence = sequence; 085 this.prefix = prefix == null || prefix.length() == 0 ? null : prefix; 086 this.flowProcess = flowProcess; 087 this.conf = this.flowProcess.getConfigCopy(); 088 this.filenamePattern = this.conf.get( "cascading.tapcollector.partname", sequence == -1 ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN ); 089 090 initialize(); 091 } 092 093 protected void initialize() throws IOException 094 { 095 tap.sinkConfInit( flowProcess, conf ); 096 097 OutputFormat outputFormat = conf.getOutputFormat(); 098 099 isFileOutputFormat = outputFormat instanceof FileOutputFormat; 100 101 if( isFileOutputFormat ) 102 { 103 Hadoop18TapUtil.setupJob( conf ); 104 Hadoop18TapUtil.setupTask( conf ); 105 106 if( prefix != null ) 107 filename = String.format( filenamePattern, prefix, "/", conf.getInt( "mapred.task.partition", 0 ), sequence ); 108 else 109 filename = String.format( filenamePattern, "", "", conf.getInt( "mapred.task.partition", 0 ), sequence ); 110 } 111 112 LOG.info( "creating path: {}", filename ); 113 114 writer = outputFormat.getRecordWriter( null, conf, filename, Reporter.NULL ); 115 } 116 117 /** 118 * Method collect writes the given values to the {@link Tap} this instance encapsulates. 119 * 120 * @param writableComparable of type WritableComparable 121 * @param writable of type Writable 122 * @throws IOException when 123 */ 124 public void collect( Object writableComparable, Object writable ) throws IOException 125 { 126 flowProcess.keepAlive(); 127 writer.write( writableComparable, writable ); 128 } 129 130 public void close() 131 { 132 try 133 { 134 if( isFileOutputFormat ) 135 LOG.info( "closing tap collector for: {}", new Path( tap.getIdentifier(), filename ) ); 136 else 137 LOG.info( "closing tap collector for: {}", tap ); 138 139 try 140 { 141 writer.close( reporter ); 142 } 143 finally 144 { 145 if( isFileOutputFormat ) 146 { 147 if( Hadoop18TapUtil.needsTaskCommit( conf ) ) 148 Hadoop18TapUtil.commitTask( conf ); 149 150 Hadoop18TapUtil.cleanupJob( conf ); 151 } 152 } 153 } 154 catch( IOException exception ) 155 { 156 LOG.warn( "exception closing: {}", filename, exception ); 157 throw new TapException( "exception closing: " + filename, exception ); 158 } 159 } 160 161 }