001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.io.IOException; 024 025 import cascading.CascadingException; 026 import cascading.flow.FlowProcess; 027 import cascading.flow.SliceCounters; 028 import cascading.flow.StepCounters; 029 import cascading.tap.Tap; 030 import cascading.tuple.Fields; 031 import cascading.tuple.TupleEntry; 032 import cascading.tuple.TupleEntryCollector; 033 034 /** 035 * 036 */ 037 public class SinkStage extends ElementStage<TupleEntry, Void> 038 { 039 private final Tap sink; 040 private TupleEntryCollector collector; 041 042 public SinkStage( FlowProcess flowProcess, Tap sink ) 043 { 044 super( flowProcess, sink ); 045 this.sink = sink; 046 } 047 048 @Override 049 public void bind( StreamGraph streamGraph ) 050 { 051 // do not bind 052 } 053 054 @Override 055 public void prepare() 056 { 057 try 058 { 059 // todo: pass the resolved fields down 060 collector = sink.openForWrite( flowProcess, getOutput() ); 061 062 if( sink.getSinkFields().isAll() ) 063 { 064 Fields fields = getIncomingScopes().get( 0 ).getIncomingTapFields(); 065 collector.setFields( fields ); 066 } 067 } 068 catch( IOException exception ) 069 { 070 throw new DuctException( "failed opening sink", exception ); 071 } 072 } 073 074 protected Object getOutput() 075 { 076 return null; 077 } 078 079 @Override 080 public void start( Duct previous ) 081 { 082 // do nothing 083 } 084 085 @Override 086 public void receive( Duct previous, TupleEntry tupleEntry ) 087 { 088 try 089 { 090 collector.add( tupleEntry ); 091 flowProcess.increment( StepCounters.Tuples_Written, 1 ); 092 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 093 } 094 catch( OutOfMemoryError error ) 095 { 096 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 097 } 098 catch( CascadingException exception ) 099 { 100 handleException( exception, tupleEntry ); 101 } 102 catch( Throwable throwable ) 103 { 104 handleException( new DuctException( "internal error: " + tupleEntry.getTuple().print(), throwable ), tupleEntry ); 105 } 106 } 107 108 @Override 109 public void complete( Duct previous ) 110 { 111 // do nothing 112 } 113 114 @Override 115 public void cleanup() 116 { 117 try 118 { 119 if( collector != null ) 120 collector.close(); 121 122 collector = null; 123 } 124 finally 125 { 126 super.cleanup(); 127 } 128 } 129 }