001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.collect; 022 023import java.io.Closeable; 024import java.io.File; 025import java.io.Flushable; 026import java.io.IOException; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Iterator; 030import java.util.LinkedList; 031import java.util.List; 032 033import cascading.flow.FlowProcess; 034import cascading.tuple.Tuple; 035import cascading.tuple.TupleException; 036import cascading.tuple.io.TupleInputStream; 037import cascading.tuple.io.TupleOutputStream; 038import cascading.tuple.util.TupleViews; 039import cascading.util.CloseableIterator; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Class SpillableTupleList is a simple durable Collection that can spill its contents to disk when the 045 * {@code threshold} is met. 046 * <p/> 047 * Using a {@code threshold } of -1 will disable the spill, all values will remain in memory. 048 * <p.></p.> 049 * This class is used by the {@link cascading.pipe.CoGroup} pipe, to set properties specific to a given 050 * CoGroup instance, see the {@link cascading.pipe.CoGroup#getConfigDef()} method. 051 * <p/> 052 * Use the {@link SpillableProps} fluent helper class to set properties. 053 * 054 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleList 055 */ 056public abstract class SpillableTupleList implements Collection<Tuple>, Spillable 057 { 058 /** Field LOG */ 059 private static final Logger LOG = LoggerFactory.getLogger( SpillableTupleList.class ); 060 061 public static int getThreshold( FlowProcess flowProcess, int defaultValue ) 062 { 063 String value = (String) flowProcess.getProperty( SpillableProps.LIST_THRESHOLD ); 064 065 if( value == null || value.length() == 0 ) 066 return defaultValue; 067 068 return Integer.parseInt( value ); 069 } 070 071 protected static Class getCodecClass( FlowProcess flowProcess, String defaultCodecs, Class subClass ) 072 { 073 String compress = (String) flowProcess.getProperty( SpillableProps.SPILL_COMPRESS ); 074 075 if( compress != null && !Boolean.parseBoolean( compress ) ) 076 return null; 077 078 String codecs = (String) flowProcess.getProperty( SpillableProps.SPILL_CODECS ); 079 080 if( codecs == null || codecs.length() == 0 ) 081 codecs = defaultCodecs; 082 083 Class codecClass = null; 084 085 for( String codec : codecs.split( "[,\\s]+" ) ) 086 { 087 try 088 { 089 LOG.info( "attempting to load codec: {}", codec ); 090 codecClass = Thread.currentThread().getContextClassLoader().loadClass( codec ).asSubclass( subClass ); 091 092 if( codecClass != null ) 093 { 094 LOG.info( "found codec: {}", codec ); 095 break; 096 } 097 } 098 catch( ClassNotFoundException exception ) 099 { 100 // do nothing 101 } 102 } 103 104 if( codecClass == null ) 105 { 106 LOG.warn( "codecs set, but unable to load any: {}", codecs ); 107 return null; 108 } 109 110 return codecClass; 111 } 112 113 private SpillStrategy spillStrategy; 114 115 /** Field files */ 116 private List<File> files = Collections.EMPTY_LIST; // lazy init if we do a spill 117 /** Field current */ 118 private final List<Object[]> current = new LinkedList<Object[]>(); 119 /** Field size */ 120 private int size = 0; 121 /** Fields listener */ 122 private SpillListener spillListener = SpillListener.NULL; 123 124 private Tuple group; 125 126 protected SpillableTupleList( final int threshold ) 127 { 128 this( new SpillStrategy() 129 { 130 131 @Override 132 public boolean doSpill( Spillable spillable, int size ) 133 { 134 return size >= threshold; 135 } 136 137 @Override 138 public String getSpillReason( Spillable spillable ) 139 { 140 return "met threshold: " + threshold; 141 } 142 } ); 143 } 144 145 protected SpillableTupleList( SpillStrategy spillStrategy ) 146 { 147 this.spillStrategy = spillStrategy; 148 } 149 150 @Override 151 public void setGrouping( Tuple group ) 152 { 153 this.group = group; 154 } 155 156 @Override 157 public Tuple getGrouping() 158 { 159 return group; 160 } 161 162 @Override 163 public void setSpillStrategy( SpillStrategy spillStrategy ) 164 { 165 this.spillStrategy = spillStrategy; 166 } 167 168 @Override 169 public void setSpillListener( SpillListener spillListener ) 170 { 171 this.spillListener = spillListener; 172 } 173 174 @Override 175 public int spillCount() 176 { 177 return files.size(); 178 } 179 180 private class SpilledListIterator implements Iterator<Tuple> 181 { 182 int fileIndex = 0; 183 private Iterator<Tuple> lastIterator; 184 private Iterator<Tuple> iterator; 185 186 private SpilledListIterator() 187 { 188 lastIterator = asTupleIterator(); 189 getNextIterator(); 190 } 191 192 private void getNextIterator() 193 { 194 if( iterator instanceof Closeable ) 195 closeSilent( (Closeable) iterator ); 196 197 if( fileIndex < files.size() ) 198 iterator = getIteratorFor( files.get( fileIndex++ ) ); 199 else 200 iterator = lastIterator; 201 } 202 203 private Iterator<Tuple> getIteratorFor( File file ) 204 { 205 spillListener.notifyReadSpillBegin( SpillableTupleList.this ); 206 207 return createIterator( createTupleInputStream( file ) ); 208 } 209 210 public boolean hasNext() 211 { 212 if( isLastCollection() ) 213 return iterator.hasNext(); 214 215 if( iterator.hasNext() ) 216 return true; 217 218 getNextIterator(); 219 220 return hasNext(); 221 } 222 223 public Tuple next() 224 { 225 if( isLastCollection() || iterator.hasNext() ) 226 return iterator.next(); 227 228 getNextIterator(); 229 230 return next(); 231 } 232 233 private boolean isLastCollection() 234 { 235 return iterator == lastIterator; 236 } 237 238 public void remove() 239 { 240 throw new UnsupportedOperationException( "remove is not supported" ); 241 } 242 } 243 244 /** 245 * Method add will add the given {@link cascading.tuple.Tuple} to this list. 246 * 247 * @param tuple of type Tuple 248 */ 249 @Override 250 public boolean add( Tuple tuple ) 251 { 252 doSpill(); // spill if we break over the threshold 253 254 current.add( Tuple.elements( tuple ).toArray( new Object[ tuple.size() ] ) ); 255 size++; 256 257 return true; 258 } 259 260 @Override 261 public int size() 262 { 263 return size; 264 } 265 266 @Override 267 public boolean isEmpty() 268 { 269 return files.isEmpty() && current.size() == 0; 270 } 271 272 private final boolean doSpill() 273 { 274 if( !spillStrategy.doSpill( this, current.size() ) ) 275 return false; 276 277 long start = System.currentTimeMillis(); 278 spillListener.notifyWriteSpillBegin( this, current.size(), spillStrategy.getSpillReason( this ) ); 279 280 File file = createTempFile(); 281 TupleOutputStream dataOutputStream = createTupleOutputStream( file ); 282 283 try 284 { 285 writeList( dataOutputStream, current ); 286 } 287 finally 288 { 289 flushSilent( dataOutputStream ); 290 closeSilent( dataOutputStream ); 291 } 292 293 spillListener.notifyWriteSpillEnd( this, System.currentTimeMillis() - start ); 294 295 if( files == Collections.EMPTY_LIST ) 296 files = new LinkedList<File>(); 297 298 files.add( file ); 299 current.clear(); 300 301 return true; 302 } 303 304 private void flushSilent( Flushable flushable ) 305 { 306 try 307 { 308 flushable.flush(); 309 } 310 catch( IOException exception ) 311 { 312 // ignore 313 } 314 } 315 316 private void closeSilent( Closeable closeable ) 317 { 318 try 319 { 320 closeable.close(); 321 } 322 catch( IOException exception ) 323 { 324 // ignore 325 } 326 } 327 328 private void writeList( TupleOutputStream dataOutputStream, List<Object[]> list ) 329 { 330 try 331 { 332 dataOutputStream.writeLong( list.size() ); 333 334 for( Object[] elements : list ) 335 dataOutputStream.writeElementArray( elements ); 336 } 337 catch( IOException exception ) 338 { 339 throw new TupleException( "unable to write tuple collection to file output stream", exception ); 340 } 341 } 342 343 protected abstract TupleOutputStream createTupleOutputStream( File file ); 344 345 private Iterator<Tuple> createIterator( final TupleInputStream tupleInputStream ) 346 { 347 final long size; 348 349 try 350 { 351 size = tupleInputStream.readLong(); 352 } 353 catch( IOException exception ) 354 { 355 throw new TupleException( "unable to read 'size' of collection from file input stream", exception ); 356 } 357 358 return new CloseableIterator<Tuple>() 359 { 360 Tuple tuple = new Tuple(); 361 long count = 0; 362 363 @Override 364 public boolean hasNext() 365 { 366 return count < size; 367 } 368 369 @Override 370 public Tuple next() 371 { 372 try 373 { 374 return tupleInputStream.readTuple( tuple ); 375 } 376 catch( IOException exception ) 377 { 378 throw new TupleException( "unable to read next tuple from file input stream containing: " + size + " tuples, successfully read tuples: " + count, exception ); 379 } 380 finally 381 { 382 count++; 383 } 384 } 385 386 @Override 387 public void remove() 388 { 389 throw new UnsupportedOperationException( "remove is not supported" ); 390 } 391 392 @Override 393 public void close() throws IOException 394 { 395 tupleInputStream.close(); 396 } 397 }; 398 } 399 400 protected abstract TupleInputStream createTupleInputStream( File file ); 401 402 private File createTempFile() 403 { 404 try 405 { 406 File file = File.createTempFile( "cascading-spillover", null ); 407 file.deleteOnExit(); 408 409 return file; 410 } 411 catch( IOException exception ) 412 { 413 throw new TupleException( "unable to create temporary file", exception ); 414 } 415 } 416 417 @Override 418 public void clear() 419 { 420 files.clear(); 421 current.clear(); 422 size = 0; 423 } 424 425 @Override 426 public Iterator<Tuple> iterator() 427 { 428 if( files.isEmpty() ) 429 return asTupleIterator(); 430 431 return new SpilledListIterator(); 432 } 433 434 private Iterator<Tuple> asTupleIterator() 435 { 436 final Tuple tuple = TupleViews.createObjectArray(); 437 final Iterator<Object[]> iterator = current.iterator(); 438 439 return new Iterator<Tuple>() 440 { 441 @Override 442 public boolean hasNext() 443 { 444 return iterator.hasNext(); 445 } 446 447 @Override 448 public Tuple next() 449 { 450 return TupleViews.reset( tuple, iterator.next() ); 451 } 452 453 @Override 454 public void remove() 455 { 456 } 457 }; 458 } 459 460 // collection methods, this class cannot only be added to, so they aren't implemented 461 @Override 462 public boolean contains( Object object ) 463 { 464 return false; 465 } 466 467 @Override 468 public Object[] toArray() 469 { 470 return new Object[ 0 ]; 471 } 472 473 @Override 474 public <T> T[] toArray( T[] ts ) 475 { 476 return null; 477 } 478 479 @Override 480 public boolean remove( Object object ) 481 { 482 return false; 483 } 484 485 @Override 486 public boolean containsAll( Collection<?> objects ) 487 { 488 return false; 489 } 490 491 @Override 492 public boolean addAll( Collection<? extends Tuple> tuples ) 493 { 494 return false; 495 } 496 497 @Override 498 public boolean removeAll( Collection<?> objects ) 499 { 500 return false; 501 } 502 503 @Override 504 public boolean retainAll( Collection<?> objects ) 505 { 506 return false; 507 } 508 }