001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import cascading.flow.FlowElement;
025import cascading.flow.FlowProcess;
026import cascading.flow.StepCounters;
027import cascading.flow.stream.StopDataNotificationException;
028import cascading.flow.stream.TrapException;
029import cascading.flow.stream.duct.DuctException;
030import cascading.tap.Tap;
031import cascading.tap.TapException;
032import cascading.tap.TrapProps;
033import cascading.tuple.Fields;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036import cascading.tuple.TupleEntryCollector;
037import cascading.util.TraceUtil;
038import cascading.util.Traceable;
039import cascading.util.Util;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 *
045 */
046public class TrapHandler
047  {
048  private static final Logger LOG = LoggerFactory.getLogger( TrapHandler.class );
049
050  final FlowProcess flowProcess;
051  final FlowElement flowElement;
052  final String elementTrace;
053  final Tap trap;
054  final String trapName;
055
056  boolean recordElementTrace = false;
057  boolean recordThrowableMessage = false;
058  boolean recordThrowableStackTrace = false;
059  boolean logThrowableStackTrace = true;
060  boolean stackTraceTrimLine = true;
061  String stackTraceLineDelimiter = "|";
062
063  boolean recordAnyDiagnostics;
064
065  Fields diagnosticFields = Fields.UNKNOWN;
066  TupleEntry diagnosticEntry;
067
068  public TrapHandler( FlowProcess flowProcess )
069    {
070    this.flowProcess = flowProcess;
071    this.flowElement = null;
072    this.trap = null;
073    this.trapName = null;
074    this.elementTrace = null;
075    }
076
077  public TrapHandler( FlowProcess flowProcess, FlowElement flowElement, Tap trap, String trapName )
078    {
079    this.flowProcess = flowProcess;
080    this.flowElement = flowElement;
081    this.trap = trap;
082    this.trapName = trapName;
083
084    if( flowElement instanceof Traceable )
085      this.elementTrace = ( (Traceable) flowElement ).getTrace();
086    else
087      this.elementTrace = null;
088
089    this.recordElementTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_ELEMENT_TRACE, this.recordElementTrace );
090    this.recordThrowableMessage = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_MESSAGE, this.recordThrowableMessage );
091    this.recordThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_STACK_TRACE, this.recordThrowableStackTrace );
092    this.logThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.LOG_THROWABLE_STACK_TRACE, this.logThrowableStackTrace );
093    this.stackTraceLineDelimiter = flowProcess.getStringProperty( TrapProps.STACK_TRACE_LINE_DELIMITER, this.stackTraceLineDelimiter );
094    this.stackTraceTrimLine = flowProcess.getBooleanProperty( TrapProps.STACK_TRACE_LINE_TRIM, this.stackTraceTrimLine );
095
096    this.recordAnyDiagnostics = this.recordElementTrace || this.recordThrowableMessage || this.recordThrowableStackTrace;
097
098    Fields fields = new Fields();
099
100    if( this.recordElementTrace )
101      fields = fields.append( new Fields( "element-trace" ) );
102
103    if( this.recordThrowableMessage )
104      fields = fields.append( new Fields( "throwable-message" ) );
105
106    if( this.recordThrowableStackTrace )
107      fields = fields.append( new Fields( "throwable-stacktrace" ) );
108
109    if( fields.size() != 0 )
110      this.diagnosticFields = fields;
111
112    this.diagnosticEntry = new TupleEntry( diagnosticFields );
113    }
114
115  protected void handleReThrowableException( String message, Throwable throwable )
116    {
117    LOG.error( message, throwable );
118
119    if( throwable instanceof Error )
120      throw (Error) throwable;
121    else if( throwable instanceof RuntimeException )
122      throw (RuntimeException) throwable;
123    else
124      throw new DuctException( message, throwable );
125    }
126
127  protected void handleException( Throwable exception, TupleEntry tupleEntry )
128    {
129    handleException( trapName, trap, exception, tupleEntry );
130    }
131
132  protected void handleException( String trapName, Tap trap, Throwable throwable, TupleEntry tupleEntry )
133    {
134    if( throwable instanceof StopDataNotificationException )
135      throw (StopDataNotificationException) throwable;
136
137    Throwable cause = throwable.getCause();
138
139    if( cause instanceof OutOfMemoryError )
140      handleReThrowableException( "caught OutOfMemoryException, will not trap, rethrowing", cause );
141
142    if( cause instanceof TrapException )
143      handleReThrowableException( "unable to write trap data, will not trap, rethrowing", cause );
144
145    if( trap == null )
146      handleReThrowableException( "caught Throwable, no trap available, rethrowing", throwable );
147
148    TupleEntryCollector trapCollector = flowProcess.getTrapCollectorFor( trap );
149
150    TupleEntry payload;
151
152    if( cause instanceof TapException && ( (TapException) cause ).getPayload() != null )
153      {
154      payload = new TupleEntry( Fields.UNKNOWN, ( (TapException) cause ).getPayload() );
155      }
156    else if( tupleEntry != null )
157      {
158      payload = tupleEntry;
159      }
160    else
161      {
162      LOG.error( "failure resolving tuple entry", throwable );
163      throw new DuctException( "failure resolving tuple entry", throwable );
164      }
165
166    TupleEntry diagnostics = getDiagnostics( throwable );
167
168    if( diagnostics != TupleEntry.NULL ) // prepend diagnostics, payload is variable
169      payload = diagnostics.appendNew( payload );
170
171    try
172      {
173      trapCollector.add( payload );
174      }
175    catch( Throwable current )
176      {
177      throw new TrapException( "could not write to trap: " + trap.getIdentifier(), current );
178      }
179
180    flowProcess.increment( StepCounters.Tuples_Trapped, 1 );
181
182    if( logThrowableStackTrace )
183      LOG.warn( "exception trap on branch: '" + trapName + "', for " + Util.truncate( print( tupleEntry ), 75 ), throwable );
184    }
185
186  private TupleEntry getDiagnostics( Throwable throwable )
187    {
188    if( !recordAnyDiagnostics )
189      return TupleEntry.NULL;
190
191    Tuple diagnostics = new Tuple();
192
193    if( recordElementTrace )
194      diagnostics.add( elementTrace );
195
196    if( recordThrowableMessage )
197      diagnostics.add( throwable.getMessage() );
198
199    if( recordThrowableStackTrace )
200      diagnostics.add( TraceUtil.stringifyStackTrace( throwable, stackTraceLineDelimiter, stackTraceTrimLine, -1 ) );
201
202    diagnosticEntry.setTuple( diagnostics );
203
204    return diagnosticEntry;
205    }
206
207  private String print( TupleEntry tupleEntry )
208    {
209    if( tupleEntry == null || tupleEntry.getFields() == null )
210      return "[uninitialized]";
211    else if( tupleEntry.getTuple() == null )
212      return "fields: " + tupleEntry.getFields().printVerbose();
213    else
214      return "fields: " + tupleEntry.getFields().printVerbose() + " tuple: " + tupleEntry.getTuple().print();
215    }
216  }
217
218