001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import cascading.flow.FlowElement; 025import cascading.flow.FlowProcess; 026import cascading.flow.StepCounters; 027import cascading.flow.stream.StopDataNotificationException; 028import cascading.flow.stream.TrapException; 029import cascading.flow.stream.duct.DuctException; 030import cascading.tap.Tap; 031import cascading.tap.TapException; 032import cascading.tap.TrapProps; 033import cascading.tuple.Fields; 034import cascading.tuple.Tuple; 035import cascading.tuple.TupleEntry; 036import cascading.tuple.TupleEntryCollector; 037import cascading.util.TraceUtil; 038import cascading.util.Traceable; 039import cascading.util.Util; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * 045 */ 046public class TrapHandler 047 { 048 private static final Logger LOG = LoggerFactory.getLogger( TrapHandler.class ); 049 050 final FlowProcess flowProcess; 051 final FlowElement flowElement; 052 final String elementTrace; 053 final Tap trap; 054 final String trapName; 055 056 boolean recordElementTrace = false; 057 boolean recordThrowableMessage = false; 058 boolean recordThrowableStackTrace = false; 059 boolean logThrowableStackTrace = true; 060 boolean stackTraceTrimLine = true; 061 String stackTraceLineDelimiter = "|"; 062 063 boolean recordAnyDiagnostics; 064 065 Fields diagnosticFields = Fields.UNKNOWN; 066 TupleEntry diagnosticEntry; 067 068 public TrapHandler( FlowProcess flowProcess ) 069 { 070 this.flowProcess = flowProcess; 071 this.flowElement = null; 072 this.trap = null; 073 this.trapName = null; 074 this.elementTrace = null; 075 } 076 077 public TrapHandler( FlowProcess flowProcess, FlowElement flowElement, Tap trap, String trapName ) 078 { 079 this.flowProcess = flowProcess; 080 this.flowElement = flowElement; 081 this.trap = trap; 082 this.trapName = trapName; 083 084 if( flowElement instanceof Traceable ) 085 this.elementTrace = ( (Traceable) flowElement ).getTrace(); 086 else 087 this.elementTrace = null; 088 089 this.recordElementTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_ELEMENT_TRACE, this.recordElementTrace ); 090 this.recordThrowableMessage = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_MESSAGE, this.recordThrowableMessage ); 091 this.recordThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_STACK_TRACE, this.recordThrowableStackTrace ); 092 this.logThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.LOG_THROWABLE_STACK_TRACE, this.logThrowableStackTrace ); 093 this.stackTraceLineDelimiter = flowProcess.getStringProperty( TrapProps.STACK_TRACE_LINE_DELIMITER, this.stackTraceLineDelimiter ); 094 this.stackTraceTrimLine = flowProcess.getBooleanProperty( TrapProps.STACK_TRACE_LINE_TRIM, this.stackTraceTrimLine ); 095 096 this.recordAnyDiagnostics = this.recordElementTrace || this.recordThrowableMessage || this.recordThrowableStackTrace; 097 098 Fields fields = new Fields(); 099 100 if( this.recordElementTrace ) 101 fields = fields.append( new Fields( "element-trace" ) ); 102 103 if( this.recordThrowableMessage ) 104 fields = fields.append( new Fields( "throwable-message" ) ); 105 106 if( this.recordThrowableStackTrace ) 107 fields = fields.append( new Fields( "throwable-stacktrace" ) ); 108 109 if( fields.size() != 0 ) 110 this.diagnosticFields = fields; 111 112 this.diagnosticEntry = new TupleEntry( diagnosticFields ); 113 } 114 115 protected void handleReThrowableException( String message, Throwable throwable ) 116 { 117 LOG.error( message, throwable ); 118 119 if( throwable instanceof Error ) 120 throw (Error) throwable; 121 else if( throwable instanceof RuntimeException ) 122 throw (RuntimeException) throwable; 123 else 124 throw new DuctException( message, throwable ); 125 } 126 127 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 128 { 129 handleException( trapName, trap, exception, tupleEntry ); 130 } 131 132 protected void handleException( String trapName, Tap trap, Throwable throwable, TupleEntry tupleEntry ) 133 { 134 if( throwable instanceof StopDataNotificationException ) 135 throw (StopDataNotificationException) throwable; 136 137 Throwable cause = throwable.getCause(); 138 139 if( cause instanceof OutOfMemoryError ) 140 handleReThrowableException( "caught OutOfMemoryException, will not trap, rethrowing", cause ); 141 142 if( cause instanceof TrapException ) 143 handleReThrowableException( "unable to write trap data, will not trap, rethrowing", cause ); 144 145 if( trap == null ) 146 handleReThrowableException( "caught Throwable, no trap available, rethrowing", throwable ); 147 148 TupleEntryCollector trapCollector = flowProcess.getTrapCollectorFor( trap ); 149 150 TupleEntry payload; 151 152 if( cause instanceof TapException && ( (TapException) cause ).getPayload() != null ) 153 { 154 payload = new TupleEntry( Fields.UNKNOWN, ( (TapException) cause ).getPayload() ); 155 } 156 else if( tupleEntry != null ) 157 { 158 payload = tupleEntry; 159 } 160 else 161 { 162 LOG.error( "failure resolving tuple entry", throwable ); 163 throw new DuctException( "failure resolving tuple entry", throwable ); 164 } 165 166 TupleEntry diagnostics = getDiagnostics( throwable ); 167 168 if( diagnostics != TupleEntry.NULL ) // prepend diagnostics, payload is variable 169 payload = diagnostics.appendNew( payload ); 170 171 try 172 { 173 trapCollector.add( payload ); 174 } 175 catch( Throwable current ) 176 { 177 throw new TrapException( "could not write to trap: " + trap.getIdentifier(), current ); 178 } 179 180 flowProcess.increment( StepCounters.Tuples_Trapped, 1 ); 181 182 if( logThrowableStackTrace ) 183 LOG.warn( "exception trap on branch: '" + trapName + "', for " + Util.truncate( print( tupleEntry ), 75 ), throwable ); 184 } 185 186 private TupleEntry getDiagnostics( Throwable throwable ) 187 { 188 if( !recordAnyDiagnostics ) 189 return TupleEntry.NULL; 190 191 Tuple diagnostics = new Tuple(); 192 193 if( recordElementTrace ) 194 diagnostics.add( elementTrace ); 195 196 if( recordThrowableMessage ) 197 diagnostics.add( throwable.getMessage() ); 198 199 if( recordThrowableStackTrace ) 200 diagnostics.add( TraceUtil.stringifyStackTrace( throwable, stackTraceLineDelimiter, stackTraceTrimLine, -1 ) ); 201 202 diagnosticEntry.setTuple( diagnostics ); 203 204 return diagnosticEntry; 205 } 206 207 private String print( TupleEntry tupleEntry ) 208 { 209 if( tupleEntry == null || tupleEntry.getFields() == null ) 210 return "[uninitialized]"; 211 else if( tupleEntry.getTuple() == null ) 212 return "fields: " + tupleEntry.getFields().printVerbose(); 213 else 214 return "fields: " + tupleEntry.getFields().printVerbose() + " tuple: " + tupleEntry.getTuple().print(); 215 } 216 } 217 218