001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.io.IOException; 024 025import cascading.CascadingException; 026import cascading.flow.FlowProcess; 027import cascading.flow.SliceCounters; 028import cascading.flow.StepCounters; 029import cascading.flow.stream.duct.Duct; 030import cascading.flow.stream.duct.DuctException; 031import cascading.flow.stream.graph.StreamGraph; 032import cascading.tap.Tap; 033import cascading.tuple.Fields; 034import cascading.tuple.TupleEntry; 035import cascading.tuple.TupleEntryCollector; 036 037/** 038 * 039 */ 040public class SinkStage extends ElementStage<TupleEntry, Void> 041 { 042 private final Tap sink; 043 private TupleEntryCollector collector; 044 045 public SinkStage( FlowProcess flowProcess, Tap sink ) 046 { 047 super( flowProcess, sink ); 048 this.sink = sink; 049 } 050 051 public Tap getSink() 052 { 053 return sink; 054 } 055 056 @Override 057 public void bind( StreamGraph streamGraph ) 058 { 059 // do not bind 060 } 061 062 @Override 063 public void prepare() 064 { 065 try 066 { 067 // todo: pass the resolved fields down 068 collector = sink.openForWrite( flowProcess, getOutput() ); 069 070 if( sink.getSinkFields().isAll() ) 071 { 072 Fields fields = getIncomingScopes().get( 0 ).getIncomingTapFields(); 073 collector.setFields( fields ); 074 } 075 } 076 catch( IOException exception ) 077 { 078 throw new DuctException( "failed opening sink", exception ); 079 } 080 } 081 082 protected Object getOutput() 083 { 084 return null; 085 } 086 087 @Override 088 public void start( Duct previous ) 089 { 090 // do nothing 091 } 092 093 @Override 094 public void receive( Duct previous, TupleEntry tupleEntry ) 095 { 096 try 097 { 098 collector.add( tupleEntry ); 099 flowProcess.increment( StepCounters.Tuples_Written, 1 ); 100 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 101 } 102 catch( OutOfMemoryError error ) 103 { 104 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 105 } 106 catch( CascadingException exception ) 107 { 108 handleException( exception, tupleEntry ); 109 } 110 catch( Throwable throwable ) 111 { 112 handleException( new DuctException( "internal error: " + tupleEntry.getTuple().print(), throwable ), tupleEntry ); 113 } 114 } 115 116 @Override 117 public void complete( Duct previous ) 118 { 119 // do nothing 120 } 121 122 @Override 123 public void cleanup() 124 { 125 try 126 { 127 if( collector != null ) 128 collector.close(); 129 130 collector = null; 131 } 132 finally 133 { 134 super.cleanup(); 135 } 136 } 137 }