001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import cascading.flow.FlowElement;
027    import cascading.flow.FlowProcess;
028    import cascading.flow.StepCounters;
029    import cascading.tap.Tap;
030    import cascading.tap.TapException;
031    import cascading.tap.TrapProps;
032    import cascading.tuple.Fields;
033    import cascading.tuple.Tuple;
034    import cascading.tuple.TupleEntry;
035    import cascading.tuple.TupleEntryCollector;
036    import cascading.util.TraceUtil;
037    import cascading.util.Traceable;
038    import cascading.util.Util;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    
042    /**
043     *
044     */
045    public class TrapHandler
046      {
047      private static final Logger LOG = LoggerFactory.getLogger( TrapHandler.class );
048    
049      static final Map<Tap, TupleEntryCollector> trapCollectors = new HashMap<Tap, TupleEntryCollector>();
050    
051      final FlowProcess flowProcess;
052      final FlowElement flowElement;
053      final String elementTrace;
054      final Tap trap;
055      final String trapName;
056    
057      boolean recordElementTrace = false;
058      boolean recordThrowableMessage = false;
059      boolean recordThrowableStackTrace = false;
060      boolean logThrowableStackTrace = true;
061      boolean stackTraceTrimLine = true;
062      String stackTraceLineDelimiter = "|";
063    
064      boolean recordAnyDiagnostics;
065    
066      Fields diagnosticFields = Fields.UNKNOWN;
067      TupleEntry diagnosticEntry;
068    
069      static TupleEntryCollector getTrapCollector( Tap trap, FlowProcess flowProcess )
070        {
071        TupleEntryCollector trapCollector = trapCollectors.get( trap );
072    
073        if( trapCollector == null )
074          {
075          try
076            {
077            trapCollector = flowProcess.openTrapForWrite( trap );
078            trapCollectors.put( trap, trapCollector );
079            }
080          catch( Throwable throwable )
081            {
082            throw new TrapException( "could not open trap: " + trap.getIdentifier(), throwable );
083            }
084          }
085    
086        return trapCollector;
087        }
088    
089      static synchronized void closeTraps()
090        {
091        for( TupleEntryCollector trapCollector : trapCollectors.values() )
092          {
093          try
094            {
095            trapCollector.close();
096            }
097          catch( Exception exception )
098            {
099            // do nothing
100            }
101          }
102    
103        trapCollectors.clear();
104        }
105    
106      public TrapHandler( FlowProcess flowProcess )
107        {
108        this.flowProcess = flowProcess;
109        this.flowElement = null;
110        this.trap = null;
111        this.trapName = null;
112        this.elementTrace = null;
113        }
114    
115      public TrapHandler( FlowProcess flowProcess, FlowElement flowElement, Tap trap, String trapName )
116        {
117        this.flowProcess = flowProcess;
118        this.flowElement = flowElement;
119        this.trap = trap;
120        this.trapName = trapName;
121    
122        if( flowElement instanceof Traceable )
123          this.elementTrace = ( (Traceable) flowElement ).getTrace();
124        else
125          this.elementTrace = null;
126    
127        this.recordElementTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_ELEMENT_TRACE, this.recordElementTrace );
128        this.recordThrowableMessage = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_MESSAGE, this.recordThrowableMessage );
129        this.recordThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_STACK_TRACE, this.recordThrowableStackTrace );
130        this.logThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.LOG_THROWABLE_STACK_TRACE, this.logThrowableStackTrace );
131        this.stackTraceLineDelimiter = flowProcess.getStringProperty( TrapProps.STACK_TRACE_LINE_DELIMITER, this.stackTraceLineDelimiter );
132        this.stackTraceTrimLine = flowProcess.getBooleanProperty( TrapProps.STACK_TRACE_LINE_TRIM, this.stackTraceTrimLine );
133    
134        this.recordAnyDiagnostics = this.recordElementTrace || this.recordThrowableMessage || this.recordThrowableStackTrace;
135    
136        Fields fields = new Fields();
137    
138        if( this.recordElementTrace )
139          fields = fields.append( new Fields( "element-trace" ) );
140    
141        if( this.recordThrowableMessage )
142          fields = fields.append( new Fields( "throwable-message" ) );
143    
144        if( this.recordThrowableStackTrace )
145          fields = fields.append( new Fields( "throwable-stacktrace" ) );
146    
147        if( fields.size() != 0 )
148          this.diagnosticFields = fields;
149    
150        this.diagnosticEntry = new TupleEntry( diagnosticFields );
151        }
152    
153      protected void handleReThrowableException( String message, Throwable throwable )
154        {
155        LOG.error( message, throwable );
156    
157        if( throwable instanceof Error )
158          throw (Error) throwable;
159        else if( throwable instanceof RuntimeException )
160          throw (RuntimeException) throwable;
161        else
162          throw new DuctException( message, throwable );
163        }
164    
165      protected void handleException( Throwable exception, TupleEntry tupleEntry )
166        {
167        handleException( trapName, trap, exception, tupleEntry );
168        }
169    
170      protected void handleException( String trapName, Tap trap, Throwable throwable, TupleEntry tupleEntry )
171        {
172        Throwable cause = throwable.getCause();
173    
174        if( cause instanceof OutOfMemoryError )
175          handleReThrowableException( "caught OutOfMemoryException, will not trap, rethrowing", cause );
176    
177        if( cause instanceof TrapException )
178          handleReThrowableException( "unable to write trap data, will not trap, rethrowing", cause );
179    
180        if( trap == null )
181          handleReThrowableException( "caught Throwable, no trap available, rethrowing", throwable );
182    
183        TupleEntryCollector trapCollector = getTrapCollector( trap, flowProcess );
184    
185        TupleEntry payload;
186    
187        if( cause instanceof TapException && ( (TapException) cause ).getPayload() != null )
188          {
189          payload = new TupleEntry( Fields.UNKNOWN, ( (TapException) cause ).getPayload() );
190          }
191        else if( tupleEntry != null )
192          {
193          payload = tupleEntry;
194          }
195        else
196          {
197          LOG.error( "failure resolving tuple entry", throwable );
198          throw new DuctException( "failure resolving tuple entry", throwable );
199          }
200    
201        TupleEntry diagnostics = getDiagnostics( throwable );
202    
203        if( diagnostics != TupleEntry.NULL ) // prepend diagnostics, payload is variable
204          payload = diagnostics.appendNew( payload );
205    
206        try
207          {
208          trapCollector.add( payload );
209          }
210        catch( Throwable current )
211          {
212          throw new TrapException( "could not write to trap: " + trap.getIdentifier(), current );
213          }
214    
215        flowProcess.increment( StepCounters.Tuples_Trapped, 1 );
216    
217        if( logThrowableStackTrace )
218          LOG.warn( "exception trap on branch: '" + trapName + "', for " + Util.truncate( print( tupleEntry ), 75 ), throwable );
219        }
220    
221      private TupleEntry getDiagnostics( Throwable throwable )
222        {
223        if( !recordAnyDiagnostics )
224          return TupleEntry.NULL;
225    
226        Tuple diagnostics = new Tuple();
227    
228        if( recordElementTrace )
229          diagnostics.add( elementTrace );
230    
231        if( recordThrowableMessage )
232          diagnostics.add( throwable.getMessage() );
233    
234        if( recordThrowableStackTrace )
235          diagnostics.add( TraceUtil.stringifyStackTrace( throwable, stackTraceLineDelimiter, stackTraceTrimLine, -1 ) );
236    
237        diagnosticEntry.setTuple( diagnostics );
238    
239        return diagnosticEntry;
240        }
241    
242      private String print( TupleEntry tupleEntry )
243        {
244        if( tupleEntry == null || tupleEntry.getFields() == null )
245          return "[uninitialized]";
246        else if( tupleEntry.getTuple() == null )
247          return "fields: " + tupleEntry.getFields().printVerbose();
248        else
249          return "fields: " + tupleEntry.getFields().printVerbose() + " tuple: " + tupleEntry.getTuple().print();
250        }
251      }
252    
253