001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import cascading.flow.FlowElement; 027 import cascading.flow.FlowProcess; 028 import cascading.flow.StepCounters; 029 import cascading.tap.Tap; 030 import cascading.tap.TapException; 031 import cascading.tap.TrapProps; 032 import cascading.tuple.Fields; 033 import cascading.tuple.Tuple; 034 import cascading.tuple.TupleEntry; 035 import cascading.tuple.TupleEntryCollector; 036 import cascading.util.TraceUtil; 037 import cascading.util.Traceable; 038 import cascading.util.Util; 039 import org.slf4j.Logger; 040 import org.slf4j.LoggerFactory; 041 042 /** 043 * 044 */ 045 public class TrapHandler 046 { 047 private static final Logger LOG = LoggerFactory.getLogger( TrapHandler.class ); 048 049 static final Map<Tap, TupleEntryCollector> trapCollectors = new HashMap<Tap, TupleEntryCollector>(); 050 051 final FlowProcess flowProcess; 052 final FlowElement flowElement; 053 final String elementTrace; 054 final Tap trap; 055 final String trapName; 056 057 boolean recordElementTrace = false; 058 boolean recordThrowableMessage = false; 059 boolean recordThrowableStackTrace = false; 060 boolean logThrowableStackTrace = true; 061 boolean stackTraceTrimLine = true; 062 String stackTraceLineDelimiter = "|"; 063 064 boolean recordAnyDiagnostics; 065 066 Fields diagnosticFields = Fields.UNKNOWN; 067 TupleEntry diagnosticEntry; 068 069 static TupleEntryCollector getTrapCollector( Tap trap, FlowProcess flowProcess ) 070 { 071 TupleEntryCollector trapCollector = trapCollectors.get( trap ); 072 073 if( trapCollector == null ) 074 { 075 try 076 { 077 trapCollector = flowProcess.openTrapForWrite( trap ); 078 trapCollectors.put( trap, trapCollector ); 079 } 080 catch( Throwable throwable ) 081 { 082 throw new TrapException( "could not open trap: " + trap.getIdentifier(), throwable ); 083 } 084 } 085 086 return trapCollector; 087 } 088 089 static synchronized void closeTraps() 090 { 091 for( TupleEntryCollector trapCollector : trapCollectors.values() ) 092 { 093 try 094 { 095 trapCollector.close(); 096 } 097 catch( Exception exception ) 098 { 099 // do nothing 100 } 101 } 102 103 trapCollectors.clear(); 104 } 105 106 public TrapHandler( FlowProcess flowProcess ) 107 { 108 this.flowProcess = flowProcess; 109 this.flowElement = null; 110 this.trap = null; 111 this.trapName = null; 112 this.elementTrace = null; 113 } 114 115 public TrapHandler( FlowProcess flowProcess, FlowElement flowElement, Tap trap, String trapName ) 116 { 117 this.flowProcess = flowProcess; 118 this.flowElement = flowElement; 119 this.trap = trap; 120 this.trapName = trapName; 121 122 if( flowElement instanceof Traceable ) 123 this.elementTrace = ( (Traceable) flowElement ).getTrace(); 124 else 125 this.elementTrace = null; 126 127 this.recordElementTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_ELEMENT_TRACE, this.recordElementTrace ); 128 this.recordThrowableMessage = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_MESSAGE, this.recordThrowableMessage ); 129 this.recordThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_STACK_TRACE, this.recordThrowableStackTrace ); 130 this.logThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.LOG_THROWABLE_STACK_TRACE, this.logThrowableStackTrace ); 131 this.stackTraceLineDelimiter = flowProcess.getStringProperty( TrapProps.STACK_TRACE_LINE_DELIMITER, this.stackTraceLineDelimiter ); 132 this.stackTraceTrimLine = flowProcess.getBooleanProperty( TrapProps.STACK_TRACE_LINE_TRIM, this.stackTraceTrimLine ); 133 134 this.recordAnyDiagnostics = this.recordElementTrace || this.recordThrowableMessage || this.recordThrowableStackTrace; 135 136 Fields fields = new Fields(); 137 138 if( this.recordElementTrace ) 139 fields = fields.append( new Fields( "element-trace" ) ); 140 141 if( this.recordThrowableMessage ) 142 fields = fields.append( new Fields( "throwable-message" ) ); 143 144 if( this.recordThrowableStackTrace ) 145 fields = fields.append( new Fields( "throwable-stacktrace" ) ); 146 147 if( fields.size() != 0 ) 148 this.diagnosticFields = fields; 149 150 this.diagnosticEntry = new TupleEntry( diagnosticFields ); 151 } 152 153 protected void handleReThrowableException( String message, Throwable throwable ) 154 { 155 LOG.error( message, throwable ); 156 157 if( throwable instanceof Error ) 158 throw (Error) throwable; 159 else if( throwable instanceof RuntimeException ) 160 throw (RuntimeException) throwable; 161 else 162 throw new DuctException( message, throwable ); 163 } 164 165 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 166 { 167 handleException( trapName, trap, exception, tupleEntry ); 168 } 169 170 protected void handleException( String trapName, Tap trap, Throwable throwable, TupleEntry tupleEntry ) 171 { 172 Throwable cause = throwable.getCause(); 173 174 if( cause instanceof OutOfMemoryError ) 175 handleReThrowableException( "caught OutOfMemoryException, will not trap, rethrowing", cause ); 176 177 if( cause instanceof TrapException ) 178 handleReThrowableException( "unable to write trap data, will not trap, rethrowing", cause ); 179 180 if( trap == null ) 181 handleReThrowableException( "caught Throwable, no trap available, rethrowing", throwable ); 182 183 TupleEntryCollector trapCollector = getTrapCollector( trap, flowProcess ); 184 185 TupleEntry payload; 186 187 if( cause instanceof TapException && ( (TapException) cause ).getPayload() != null ) 188 { 189 payload = new TupleEntry( Fields.UNKNOWN, ( (TapException) cause ).getPayload() ); 190 } 191 else if( tupleEntry != null ) 192 { 193 payload = tupleEntry; 194 } 195 else 196 { 197 LOG.error( "failure resolving tuple entry", throwable ); 198 throw new DuctException( "failure resolving tuple entry", throwable ); 199 } 200 201 TupleEntry diagnostics = getDiagnostics( throwable ); 202 203 if( diagnostics != TupleEntry.NULL ) // prepend diagnostics, payload is variable 204 payload = diagnostics.appendNew( payload ); 205 206 try 207 { 208 trapCollector.add( payload ); 209 } 210 catch( Throwable current ) 211 { 212 throw new TrapException( "could not write to trap: " + trap.getIdentifier(), current ); 213 } 214 215 flowProcess.increment( StepCounters.Tuples_Trapped, 1 ); 216 217 if( logThrowableStackTrace ) 218 LOG.warn( "exception trap on branch: '" + trapName + "', for " + Util.truncate( print( tupleEntry ), 75 ), throwable ); 219 } 220 221 private TupleEntry getDiagnostics( Throwable throwable ) 222 { 223 if( !recordAnyDiagnostics ) 224 return TupleEntry.NULL; 225 226 Tuple diagnostics = new Tuple(); 227 228 if( recordElementTrace ) 229 diagnostics.add( elementTrace ); 230 231 if( recordThrowableMessage ) 232 diagnostics.add( throwable.getMessage() ); 233 234 if( recordThrowableStackTrace ) 235 diagnostics.add( TraceUtil.stringifyStackTrace( throwable, stackTraceLineDelimiter, stackTraceTrimLine, -1 ) ); 236 237 diagnosticEntry.setTuple( diagnostics ); 238 239 return diagnosticEntry; 240 } 241 242 private String print( TupleEntry tupleEntry ) 243 { 244 if( tupleEntry == null || tupleEntry.getFields() == null ) 245 return "[uninitialized]"; 246 else if( tupleEntry.getTuple() == null ) 247 return "fields: " + tupleEntry.getFields().printVerbose(); 248 else 249 return "fields: " + tupleEntry.getFields().printVerbose() + " tuple: " + tupleEntry.getTuple().print(); 250 } 251 } 252 253