001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.collect;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029
030import cascading.flow.FlowProcess;
031import cascading.flow.FlowProcessWrapper;
032import cascading.tuple.TupleException;
033import cascading.tuple.collect.SpillableTupleList;
034import cascading.tuple.hadoop.TupleSerialization;
035import cascading.tuple.hadoop.io.HadoopTupleInputStream;
036import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
037import cascading.tuple.io.TupleInputStream;
038import cascading.tuple.io.TupleOutputStream;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.io.compress.CodecPool;
041import org.apache.hadoop.io.compress.CompressionCodec;
042import org.apache.hadoop.io.compress.Compressor;
043import org.apache.hadoop.io.compress.Decompressor;
044import org.apache.hadoop.util.ReflectionUtils;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * SpillableTupleList is a simple {@link Iterable} object that can store an unlimited number of {@link cascading.tuple.Tuple} instances by spilling
050 * excess to a temporary disk file.
051 * <p/>
052 * Spills will automatically be compressed using the {@link #defaultCodecs} values. To disable compression or
053 * change the codecs, see {@link cascading.tuple.collect.SpillableProps#SPILL_COMPRESS} and {@link cascading.tuple.collect.SpillableProps#SPILL_CODECS}.
054 * <p/>
055 * It is recommended to add Lzo if available.
056 * {@code "org.apache.hadoop.io.compress.LzoCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec" }
057 */
058public class HadoopSpillableTupleList extends SpillableTupleList
059  {
060  private static final Logger LOG = LoggerFactory.getLogger( HadoopSpillableTupleList.class );
061
062  public static final String defaultCodecs = "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec";
063
064  /** Field codec */
065  private final CompressionCodec codec;
066  /** Field serializationElementWriter */
067  private final TupleSerialization tupleSerialization;
068
069  public static synchronized CompressionCodec getCodec( FlowProcess<? extends Configuration> flowProcess, String defaultCodecs )
070    {
071    Class<? extends CompressionCodec> codecClass = getCodecClass( flowProcess, defaultCodecs, CompressionCodec.class );
072
073    if( codecClass == null )
074      return null;
075
076    if( flowProcess instanceof FlowProcessWrapper )
077      flowProcess = ( (FlowProcessWrapper) flowProcess ).getDelegate();
078
079    return ReflectionUtils.newInstance( codecClass, flowProcess.getConfig() );
080    }
081
082  /**
083   * Constructor SpillableTupleList creates a new SpillableTupleList instance using the given threshold value, and
084   * the first available compression codec, if any.
085   *
086   * @param threshold of type long
087   * @param codec     of type CompressionCodec
088   */
089  public HadoopSpillableTupleList( int threshold, CompressionCodec codec, Configuration configuration )
090    {
091    super( threshold );
092    this.codec = codec;
093
094    if( configuration == null )
095      this.tupleSerialization = new TupleSerialization();
096    else
097      this.tupleSerialization = new TupleSerialization( configuration );
098    }
099
100  public HadoopSpillableTupleList( int threshold, TupleSerialization tupleSerialization, CompressionCodec codec )
101    {
102    super( threshold );
103    this.tupleSerialization = tupleSerialization;
104    this.codec = codec;
105    }
106
107  @Override
108  protected TupleOutputStream createTupleOutputStream( File file )
109    {
110    OutputStream outputStream;
111
112    try
113      {
114      outputStream = new FileOutputStream( file );
115
116      Compressor compressor = null;
117
118      if( codec != null )
119        {
120        compressor = getCompressor();
121        outputStream = codec.createOutputStream( outputStream, compressor );
122        }
123
124      final Compressor finalCompressor = compressor;
125
126      return new HadoopTupleOutputStream( outputStream, tupleSerialization.getElementWriter() )
127      {
128      @Override
129      public void close() throws IOException
130        {
131        try
132          {
133          super.close();
134          }
135        finally
136          {
137          if( finalCompressor != null )
138            CodecPool.returnCompressor( finalCompressor );
139          }
140        }
141      };
142      }
143    catch( IOException exception )
144      {
145      throw new TupleException( "unable to create temporary file input stream", exception );
146      }
147    }
148
149  private Compressor getCompressor()
150    {
151    // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up
152    // so we attempt to force a gc if we see a OOME once.
153    try
154      {
155      return CodecPool.getCompressor( codec );
156      }
157    catch( OutOfMemoryError error )
158      {
159      System.gc();
160      LOG.info( "received OOME when allocating compressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error );
161
162      return CodecPool.getCompressor( codec );
163      }
164    }
165
166  @Override
167  protected TupleInputStream createTupleInputStream( File file )
168    {
169    try
170      {
171      InputStream inputStream;
172
173      inputStream = new FileInputStream( file );
174
175      Decompressor decompressor = null;
176
177      if( codec != null )
178        {
179        decompressor = getDecompressor();
180        inputStream = codec.createInputStream( inputStream, decompressor );
181        }
182
183      final Decompressor finalDecompressor = decompressor;
184      return new HadoopTupleInputStream( inputStream, tupleSerialization.getElementReader() )
185      {
186      @Override
187      public void close() throws IOException
188        {
189        try
190          {
191          super.close();
192          }
193        finally
194          {
195          if( finalDecompressor != null )
196            CodecPool.returnDecompressor( finalDecompressor );
197          }
198        }
199      };
200      }
201    catch( IOException exception )
202      {
203      throw new TupleException( "unable to create temporary file output stream", exception );
204      }
205    }
206
207  private Decompressor getDecompressor()
208    {
209    // some codecs are using direct memory, and the gc for direct memory cannot sometimes keep up
210    // so we attempt to force a gc if we see a OOME once.
211    try
212      {
213      return CodecPool.getDecompressor( codec );
214      }
215    catch( OutOfMemoryError error )
216      {
217      System.gc();
218      LOG.info( "received OOME when allocating decompressor for codec: {}, retrying once", codec.getClass().getCanonicalName(), error );
219
220      return CodecPool.getDecompressor( codec );
221      }
222    }
223  }