001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.collect; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029 030import cascading.flow.FlowProcess; 031import cascading.flow.FlowProcessWrapper; 032import cascading.tuple.TupleException; 033import cascading.tuple.collect.SpillableTupleList; 034import cascading.tuple.hadoop.TupleSerialization; 035import cascading.tuple.hadoop.io.HadoopTupleInputStream; 036import cascading.tuple.hadoop.io.HadoopTupleOutputStream; 037import cascading.tuple.io.TupleInputStream; 038import cascading.tuple.io.TupleOutputStream; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.io.compress.CodecPool; 041import org.apache.hadoop.io.compress.CompressionCodec; 042import org.apache.hadoop.io.compress.Compressor; 043import org.apache.hadoop.io.compress.Decompressor; 044import org.apache.hadoop.util.ReflectionUtils; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * SpillableTupleList is a simple {@link Iterable} object that can store an unlimited number of {@link cascading.tuple.Tuple} instances by spilling 050 * excess to a temporary disk file. 051 * <p/> 052 * Spills will automatically be compressed using the {@link #defaultCodecs} values. To disable compression or 053 * change the codecs, see {@link cascading.tuple.collect.SpillableProps#SPILL_COMPRESS} and {@link cascading.tuple.collect.SpillableProps#SPILL_CODECS}. 054 * <p/> 055 * It is recommended to add Lzo if available. 056 * {@code "org.apache.hadoop.io.compress.LzoCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec" } 057 */ 058public class HadoopSpillableTupleList extends SpillableTupleList 059 { 060 private static final Logger LOG = LoggerFactory.getLogger( HadoopSpillableTupleList.class ); 061 062 public static final String defaultCodecs = "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec"; 063 064 /** Field codec */ 065 private final CompressionCodec codec; 066 /** Field serializationElementWriter */ 067 private final TupleSerialization tupleSerialization; 068 069 public static synchronized CompressionCodec getCodec( FlowProcess<? extends Configuration> flowProcess, String defaultCodecs ) 070 { 071 Class<? extends CompressionCodec> codecClass = getCodecClass( flowProcess, defaultCodecs, CompressionCodec.class ); 072 073 if( codecClass == null ) 074 return null; 075 076 if( flowProcess instanceof FlowProcessWrapper ) 077 flowProcess = ( (FlowProcessWrapper) flowProcess ).getDelegate(); 078 079 return ReflectionUtils.newInstance( codecClass, flowProcess.getConfig() ); 080 } 081 082 /** 083 * Constructor SpillableTupleList creates a new SpillableTupleList instance using the given threshold value, and 084 * the first available compression codec, if any. 085 * 086 * @param threshold of type long 087 * @param codec of type CompressionCodec 088 */ 089 public HadoopSpillableTupleList( int threshold, CompressionCodec codec, Configuration configuration ) 090 { 091 super( threshold ); 092 this.codec = codec; 093 094 if( configuration == null ) 095 this.tupleSerialization = new TupleSerialization(); 096 else 097 this.tupleSerialization = new TupleSerialization( configuration ); 098 } 099 100 public HadoopSpillableTupleList( int threshold, TupleSerialization tupleSerialization, CompressionCodec codec ) 101 { 102 super( threshold ); 103 this.tupleSerialization = tupleSerialization; 104 this.codec = codec; 105 } 106 107 @Override 108 protected TupleOutputStream createTupleOutputStream( File file ) 109 { 110 OutputStream outputStream; 111 112 try 113 { 114 outputStream = new FileOutputStream( file ); 115 116 Compressor compressor = null; 117 118 if( codec != null ) 119 { 120 compressor = getCompressor(); 121 outputStream = codec.createOutputStream( outputStream, compressor ); 122 } 123 124 final Compressor finalCompressor = compressor; 125 126 return new HadoopTupleOutputStream( outputStream, tupleSerialization.getElementWriter() ) 127 { 128 @Override 129 public void close() throws IOException 130 { 131 try 132 { 133 super.close(); 134 } 135 finally 136 { 137 if( finalCompressor != null ) 138 CodecPool.returnCompressor( finalCompressor ); 139 } 140 } 141 }; 142 } 143 catch( IOException exception ) 144 { 145 throw new TupleException( "unable to create temporary file input stream", exception ); 146 } 147 } 148 149 private Compressor getCompressor() 150 { 151 // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up 152 // so we attempt to force a gc if we see a OOME once. 153 try 154 { 155 return CodecPool.getCompressor( codec ); 156 } 157 catch( OutOfMemoryError error ) 158 { 159 System.gc(); 160 LOG.info( "received OOME when allocating compressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error ); 161 162 return CodecPool.getCompressor( codec ); 163 } 164 } 165 166 @Override 167 protected TupleInputStream createTupleInputStream( File file ) 168 { 169 try 170 { 171 InputStream inputStream; 172 173 inputStream = new FileInputStream( file ); 174 175 Decompressor decompressor = null; 176 177 if( codec != null ) 178 { 179 decompressor = getDecompressor(); 180 inputStream = codec.createInputStream( inputStream, decompressor ); 181 } 182 183 final Decompressor finalDecompressor = decompressor; 184 return new HadoopTupleInputStream( inputStream, tupleSerialization.getElementReader() ) 185 { 186 @Override 187 public void close() throws IOException 188 { 189 try 190 { 191 super.close(); 192 } 193 finally 194 { 195 if( finalDecompressor != null ) 196 CodecPool.returnDecompressor( finalDecompressor ); 197 } 198 } 199 }; 200 } 201 catch( IOException exception ) 202 { 203 throw new TupleException( "unable to create temporary file output stream", exception ); 204 } 205 } 206 207 private Decompressor getDecompressor() 208 { 209 // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up 210 // so we attempt to force a gc if we see a OOME once. 211 try 212 { 213 return CodecPool.getDecompressor( codec ); 214 } 215 catch( OutOfMemoryError error ) 216 { 217 System.gc(); 218 LOG.info( "received OOME when allocating decompressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error ); 219 220 return CodecPool.getDecompressor( codec ); 221 } 222 } 223 }