001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple; 023 024import java.io.Closeable; 025import java.io.Flushable; 026import java.io.IOException; 027import java.util.function.Supplier; 028 029import cascading.flow.FlowProcess; 030import cascading.scheme.ConcreteCall; 031import cascading.scheme.Scheme; 032import cascading.tap.Tap; 033import cascading.tap.TapException; 034 035/** 036 * Class TupleEntrySchemeCollector is a helper class for wrapping a {@link Scheme} instance, calling 037 * {@link Scheme#sink(cascading.flow.FlowProcess, cascading.scheme.SinkCall)} on every call to {@link #add(TupleEntry)} 038 * or {@link #add(Tuple)}. 039 * <p> 040 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the 041 * {@link cascading.tap.Tap#openForWrite(cascading.flow.FlowProcess)} method. 042 */ 043public class TupleEntrySchemeCollector<Config, Output> extends TupleEntryCollector 044 { 045 private final FlowProcess<? extends Config> flowProcess; 046 private final Scheme<Config, ?, Output, ?, Object> scheme; 047 048 protected final ConcreteCall<Object, Output> sinkCall; 049 private Supplier<String> loggableIdentifier = () -> "'unknown'"; 050 private boolean prepared = false; 051 052 @Deprecated 053 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme ) 054 { 055 this( flowProcess, scheme, null, null ); 056 } 057 058 @Deprecated 059 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, String loggableIdentifier ) 060 { 061 this( flowProcess, scheme, null, loggableIdentifier ); 062 } 063 064 @Deprecated 065 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, Output output ) 066 { 067 this( flowProcess, scheme, output, null ); 068 } 069 070 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Output output ) 071 { 072 this( flowProcess, tap, tap.getScheme(), output, tap.getIdentifier() ); 073 } 074 075 @Deprecated 076 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Scheme scheme, Output output, String loggableIdentifier ) 077 { 078 this( flowProcess, null, scheme, output, loggableIdentifier ); 079 } 080 081 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme ) 082 { 083 this( flowProcess, tap, scheme, null, (Supplier<String>) null ); 084 } 085 086 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, String loggableIdentifier ) 087 { 088 this( flowProcess, tap, scheme, null, loggableIdentifier ); 089 } 090 091 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Output output ) 092 { 093 this( flowProcess, tap, scheme, output, (Supplier<String>) null ); 094 } 095 096 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Output output, String loggableIdentifier ) 097 { 098 this( flowProcess, tap, scheme, output, loggableIdentifier == null ? null : () -> loggableIdentifier ); 099 } 100 101 public TupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Output output, Supplier<String> loggableIdentifier ) 102 { 103 super( Fields.asDeclaration( scheme.getSinkFields() ) ); 104 this.flowProcess = flowProcess; 105 this.scheme = scheme; 106 107 if( loggableIdentifier != null ) 108 this.loggableIdentifier = loggableIdentifier; // only used for logging 109 110 this.sinkCall = createSinkCall(); 111 this.sinkCall.setTap( tap ); 112 this.sinkCall.setOutgoingEntry( this.tupleEntry ); // created in super ctor 113 114 if( output != null ) 115 setOutput( output ); 116 } 117 118 /** 119 * Override to provide custom ConcreteCall implementation to expose Tap level resources to the underlying Scheme. 120 * 121 * @return a new ConcreteCall instance 122 */ 123 protected <Context, IO> ConcreteCall<Context, IO> createSinkCall() 124 { 125 return new ConcreteCall<>(); 126 } 127 128 protected FlowProcess<? extends Config> getFlowProcess() 129 { 130 return flowProcess; 131 } 132 133 @Override 134 public void setFields( Fields declared ) 135 { 136 super.setFields( declared ); 137 138 if( this.sinkCall != null ) 139 this.sinkCall.setOutgoingEntry( this.tupleEntry ); 140 } 141 142 protected Output getOutput() 143 { 144 return sinkCall.getOutput(); 145 } 146 147 protected void setOutput( Output output ) 148 { 149 sinkCall.setOutput( wrapOutput( output ) ); 150 } 151 152 protected Output wrapOutput( Output output ) 153 { 154 try 155 { 156 return scheme.sinkWrap( flowProcess, output ); 157 } 158 catch( IOException exception ) 159 { 160 throw new TapException( "could not wrap scheme", exception ); 161 } 162 } 163 164 /** Need to defer preparing the scheme till after the fields have been resolved */ 165 protected void prepare() 166 { 167 try 168 { 169 scheme.sinkPrepare( flowProcess, sinkCall ); 170 } 171 catch( IOException exception ) 172 { 173 throw new TapException( "could not prepare scheme", exception ); 174 } 175 176 prepared = true; 177 } 178 179 @Override 180 public void add( TupleEntry tupleEntry ) 181 { 182 if( !prepared ) 183 prepare(); 184 185 super.add( tupleEntry ); 186 } 187 188 @Override 189 public void add( Tuple tuple ) 190 { 191 if( !prepared ) // this is unfortunate 192 prepare(); 193 194 super.add( tuple ); 195 } 196 197 @Override 198 protected void collect( TupleEntry tupleEntry ) throws IOException 199 { 200 sinkCall.setOutgoingEntry( tupleEntry ); 201 202 try 203 { 204 scheme.sink( flowProcess, sinkCall ); 205 } 206 catch( Exception exception ) 207 { 208 throw new TupleException( "unable to sink into output identifier: " + loggableIdentifier.get(), exception ); 209 } 210 } 211 212 @Override 213 public void close() 214 { 215 try 216 { 217 if( sinkCall == null ) 218 return; 219 220 try 221 { 222 if( prepared ) 223 scheme.sinkCleanup( flowProcess, sinkCall ); 224 } 225 catch( IOException exception ) 226 { 227 throw new TupleException( "unable to cleanup sink for output identifier: " + loggableIdentifier.get(), exception ); 228 } 229 } 230 finally 231 { 232 try 233 { 234 if( getOutput() instanceof Flushable ) 235 ( (Flushable) getOutput() ).flush(); 236 } 237 catch( IOException exception ) 238 { 239 // do nothing 240 } 241 242 try 243 { 244 if( getOutput() instanceof Closeable ) 245 ( (Closeable) getOutput() ).close(); 246 } 247 catch( IOException exception ) 248 { 249 // do nothing 250 } 251 252 super.close(); 253 } 254 } 255 }