001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.collect; 022 023 import java.io.Closeable; 024 import java.io.File; 025 import java.io.Flushable; 026 import java.io.IOException; 027 import java.util.Collection; 028 import java.util.Collections; 029 import java.util.Iterator; 030 import java.util.LinkedList; 031 import java.util.List; 032 033 import cascading.flow.FlowProcess; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleException; 036 import cascading.tuple.io.TupleInputStream; 037 import cascading.tuple.io.TupleOutputStream; 038 import cascading.tuple.util.TupleViews; 039 import cascading.util.CloseableIterator; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 043 /** 044 * Class SpillableTupleList is a simple durable Collection that can spill its contents to disk when the 045 * {@code threshold} is met. 046 * <p/> 047 * Using a {@code threshold } of -1 will disable the spill, all values will remain in memory. 048 * <p.></p.> 049 * This class is used by the {@link cascading.pipe.CoGroup} pipe, to set properties specific to a given 050 * CoGroup instance, see the {@link cascading.pipe.CoGroup#getConfigDef()} method. 051 * <p/> 052 * Use the {@link SpillableProps} fluent helper class to set properties. 053 * 054 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleList 055 */ 056 public abstract class SpillableTupleList implements Collection<Tuple>, Spillable 057 { 058 /** Field LOG */ 059 private static final Logger LOG = LoggerFactory.getLogger( SpillableTupleList.class ); 060 061 /** Number of tuples to hold in memory before spilling them to disk. */ 062 @Deprecated 063 public static final String SPILL_THRESHOLD = SpillableProps.LIST_THRESHOLD; 064 065 /** 066 * Whether to enable compress of the spills or not, on by default. 067 * 068 * @see Boolean#parseBoolean(String) 069 */ 070 @Deprecated 071 public static final String SPILL_COMPRESS = SpillableProps.SPILL_COMPRESS; 072 073 /** A comma delimited list of possible codecs to try. This is platform dependent. */ 074 @Deprecated 075 public static final String SPILL_CODECS = SpillableProps.SPILL_CODECS; 076 077 public static int getThreshold( FlowProcess flowProcess, int defaultValue ) 078 { 079 String value = (String) flowProcess.getProperty( SpillableProps.LIST_THRESHOLD ); 080 081 if( value == null || value.length() == 0 ) 082 return defaultValue; 083 084 return Integer.parseInt( value ); 085 } 086 087 protected static Class getCodecClass( FlowProcess flowProcess, String defaultCodecs, Class subClass ) 088 { 089 String compress = (String) flowProcess.getProperty( SpillableProps.SPILL_COMPRESS ); 090 091 if( compress != null && !Boolean.parseBoolean( compress ) ) 092 return null; 093 094 String codecs = (String) flowProcess.getProperty( SpillableProps.SPILL_CODECS ); 095 096 if( codecs == null || codecs.length() == 0 ) 097 codecs = defaultCodecs; 098 099 Class codecClass = null; 100 101 for( String codec : codecs.split( "[,\\s]+" ) ) 102 { 103 try 104 { 105 LOG.info( "attempting to load codec: {}", codec ); 106 codecClass = Thread.currentThread().getContextClassLoader().loadClass( codec ).asSubclass( subClass ); 107 108 if( codecClass != null ) 109 { 110 LOG.info( "found codec: {}", codec ); 111 break; 112 } 113 } 114 catch( ClassNotFoundException exception ) 115 { 116 // do nothing 117 } 118 } 119 120 if( codecClass == null ) 121 { 122 LOG.warn( "codecs set, but unable to load any: {}", codecs ); 123 return null; 124 } 125 126 return codecClass; 127 } 128 129 130 private SpillStrategy spillStrategy; 131 132 /** Field files */ 133 private List<File> files = Collections.EMPTY_LIST; // lazy init if we do a spill 134 /** Field current */ 135 private final List<Object[]> current = new LinkedList<Object[]>(); 136 /** Field size */ 137 private int size = 0; 138 /** Fields listener */ 139 private SpillListener spillListener = SpillListener.NULL; 140 141 private Tuple group; 142 143 protected SpillableTupleList( final int threshold ) 144 { 145 this( new SpillStrategy() 146 { 147 148 @Override 149 public boolean doSpill( Spillable spillable, int size ) 150 { 151 return size >= threshold; 152 } 153 154 @Override 155 public String getSpillReason( Spillable spillable ) 156 { 157 return "met threshold: " + threshold; 158 } 159 } ); 160 } 161 162 protected SpillableTupleList( SpillStrategy spillStrategy ) 163 { 164 this.spillStrategy = spillStrategy; 165 } 166 167 @Override 168 public void setGrouping( Tuple group ) 169 { 170 this.group = group; 171 } 172 173 @Override 174 public Tuple getGrouping() 175 { 176 return group; 177 } 178 179 @Override 180 public void setSpillStrategy( SpillStrategy spillStrategy ) 181 { 182 this.spillStrategy = spillStrategy; 183 } 184 185 @Override 186 public void setSpillListener( SpillListener spillListener ) 187 { 188 this.spillListener = spillListener; 189 } 190 191 @Override 192 public int spillCount() 193 { 194 return files.size(); 195 } 196 197 private class SpilledListIterator implements Iterator<Tuple> 198 { 199 int fileIndex = 0; 200 private Iterator<Tuple> lastIterator; 201 private Iterator<Tuple> iterator; 202 203 private SpilledListIterator() 204 { 205 lastIterator = asTupleIterator(); 206 getNextIterator(); 207 } 208 209 private void getNextIterator() 210 { 211 if( iterator instanceof Closeable ) 212 closeSilent( (Closeable) iterator ); 213 214 if( fileIndex < files.size() ) 215 iterator = getIteratorFor( files.get( fileIndex++ ) ); 216 else 217 iterator = lastIterator; 218 } 219 220 private Iterator<Tuple> getIteratorFor( File file ) 221 { 222 spillListener.notifyReadSpillBegin( SpillableTupleList.this ); 223 224 return createIterator( createTupleInputStream( file ) ); 225 } 226 227 public boolean hasNext() 228 { 229 if( isLastCollection() ) 230 return iterator.hasNext(); 231 232 if( iterator.hasNext() ) 233 return true; 234 235 getNextIterator(); 236 237 return hasNext(); 238 } 239 240 public Tuple next() 241 { 242 if( isLastCollection() || iterator.hasNext() ) 243 return iterator.next(); 244 245 getNextIterator(); 246 247 return next(); 248 } 249 250 private boolean isLastCollection() 251 { 252 return iterator == lastIterator; 253 } 254 255 public void remove() 256 { 257 throw new UnsupportedOperationException( "remove is not supported" ); 258 } 259 } 260 261 /** 262 * Method add will add the given {@link cascading.tuple.Tuple} to this list. 263 * 264 * @param tuple of type Tuple 265 */ 266 @Override 267 public boolean add( Tuple tuple ) 268 { 269 doSpill(); // spill if we break over the threshold 270 271 current.add( Tuple.elements( tuple ).toArray( new Object[ tuple.size() ] ) ); 272 size++; 273 274 return true; 275 } 276 277 @Override 278 public int size() 279 { 280 return size; 281 } 282 283 @Override 284 public boolean isEmpty() 285 { 286 return files.isEmpty() && current.size() == 0; 287 } 288 289 private final boolean doSpill() 290 { 291 if( !spillStrategy.doSpill( this, current.size() ) ) 292 return false; 293 294 long start = System.currentTimeMillis(); 295 spillListener.notifyWriteSpillBegin( this, current.size(), spillStrategy.getSpillReason( this ) ); 296 297 File file = createTempFile(); 298 TupleOutputStream dataOutputStream = createTupleOutputStream( file ); 299 300 try 301 { 302 writeList( dataOutputStream, current ); 303 } 304 finally 305 { 306 flushSilent( dataOutputStream ); 307 closeSilent( dataOutputStream ); 308 } 309 310 spillListener.notifyWriteSpillEnd( this, System.currentTimeMillis() - start ); 311 312 if( files == Collections.EMPTY_LIST ) 313 files = new LinkedList<File>(); 314 315 files.add( file ); 316 current.clear(); 317 318 return true; 319 } 320 321 private void flushSilent( Flushable flushable ) 322 { 323 try 324 { 325 flushable.flush(); 326 } 327 catch( IOException exception ) 328 { 329 // ignore 330 } 331 } 332 333 private void closeSilent( Closeable closeable ) 334 { 335 try 336 { 337 closeable.close(); 338 } 339 catch( IOException exception ) 340 { 341 // ignore 342 } 343 } 344 345 private void writeList( TupleOutputStream dataOutputStream, List<Object[]> list ) 346 { 347 try 348 { 349 dataOutputStream.writeLong( list.size() ); 350 351 for( Object[] elements : list ) 352 dataOutputStream.writeElementArray( elements ); 353 } 354 catch( IOException exception ) 355 { 356 throw new TupleException( "unable to write tuple collection to file output stream", exception ); 357 } 358 } 359 360 protected abstract TupleOutputStream createTupleOutputStream( File file ); 361 362 private Iterator<Tuple> createIterator( final TupleInputStream tupleInputStream ) 363 { 364 final long size; 365 366 try 367 { 368 size = tupleInputStream.readLong(); 369 } 370 catch( IOException exception ) 371 { 372 throw new TupleException( "unable to read 'size' of collection from file input stream", exception ); 373 } 374 375 return new CloseableIterator<Tuple>() 376 { 377 Tuple tuple = new Tuple(); 378 long count = 0; 379 380 @Override 381 public boolean hasNext() 382 { 383 return count < size; 384 } 385 386 @Override 387 public Tuple next() 388 { 389 try 390 { 391 return tupleInputStream.readTuple( tuple ); 392 } 393 catch( IOException exception ) 394 { 395 throw new TupleException( "unable to read next tuple from file input stream containing: " + size + " tuples, successfully read tuples: " + count, exception ); 396 } 397 finally 398 { 399 count++; 400 } 401 } 402 403 @Override 404 public void remove() 405 { 406 throw new UnsupportedOperationException( "remove is not supported" ); 407 } 408 409 @Override 410 public void close() throws IOException 411 { 412 tupleInputStream.close(); 413 } 414 }; 415 } 416 417 protected abstract TupleInputStream createTupleInputStream( File file ); 418 419 private File createTempFile() 420 { 421 try 422 { 423 File file = File.createTempFile( "cascading-spillover", null ); 424 file.deleteOnExit(); 425 426 return file; 427 } 428 catch( IOException exception ) 429 { 430 throw new TupleException( "unable to create temporary file", exception ); 431 } 432 } 433 434 @Override 435 public void clear() 436 { 437 files.clear(); 438 current.clear(); 439 size = 0; 440 } 441 442 @Override 443 public Iterator<Tuple> iterator() 444 { 445 if( files.isEmpty() ) 446 return asTupleIterator(); 447 448 return new SpilledListIterator(); 449 } 450 451 private Iterator<Tuple> asTupleIterator() 452 { 453 final Tuple tuple = TupleViews.createObjectArray(); 454 final Iterator<Object[]> iterator = current.iterator(); 455 456 return new Iterator<Tuple>() 457 { 458 @Override 459 public boolean hasNext() 460 { 461 return iterator.hasNext(); 462 } 463 464 @Override 465 public Tuple next() 466 { 467 return TupleViews.reset( tuple, iterator.next() ); 468 } 469 470 @Override 471 public void remove() 472 { 473 } 474 }; 475 } 476 477 // collection methods, this class cannot only be added to, so they aren't implemented 478 @Override 479 public boolean contains( Object object ) 480 { 481 return false; 482 } 483 484 @Override 485 public Object[] toArray() 486 { 487 return new Object[ 0 ]; 488 } 489 490 @Override 491 public <T> T[] toArray( T[] ts ) 492 { 493 return null; 494 } 495 496 @Override 497 public boolean remove( Object object ) 498 { 499 return false; 500 } 501 502 @Override 503 public boolean containsAll( Collection<?> objects ) 504 { 505 return false; 506 } 507 508 @Override 509 public boolean addAll( Collection<? extends Tuple> tuples ) 510 { 511 return false; 512 } 513 514 @Override 515 public boolean removeAll( Collection<?> objects ) 516 { 517 return false; 518 } 519 520 @Override 521 public boolean retainAll( Collection<?> objects ) 522 { 523 return false; 524 } 525 }