001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.ArrayList;
024    import java.util.List;
025    import java.util.Set;
026    
027    import cascading.flow.FlowElement;
028    import cascading.flow.FlowProcess;
029    import cascading.flow.planner.Scope;
030    import cascading.pipe.Pipe;
031    import cascading.tuple.Fields;
032    import cascading.tuple.TupleEntry;
033    
034    /**
035     *
036     */
037    public abstract class ElementStage<Incoming, Outgoing> extends Stage<Incoming, Outgoing> implements ElementDuct
038      {
039      protected final FlowProcess flowProcess;
040      protected final FlowElement flowElement;
041      protected Set<String> branchNames;
042      protected TrapHandler trapHandler;
043    
044      protected final List<Scope> incomingScopes = new ArrayList<Scope>();
045      protected final List<Scope> outgoingScopes = new ArrayList<Scope>();
046    
047      public ElementStage( FlowProcess flowProcess, FlowElement flowElement )
048        {
049        this.flowElement = flowElement;
050    
051        FlowElement element = flowElement;
052    
053        while( element != null )
054          {
055          if( element.hasConfigDef() )
056            flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() );
057    
058          if( element instanceof Pipe )
059            element = ( (Pipe) element ).getParent();
060          else
061            element = null;
062          }
063    
064        this.flowProcess = flowProcess;
065        }
066    
067      public FlowElement getFlowElement()
068        {
069        return flowElement;
070        }
071    
072      @Override
073      public List<Scope> getIncomingScopes()
074        {
075        return incomingScopes;
076        }
077    
078      public Set<String> getBranchNames()
079        {
080        return branchNames;
081        }
082    
083      public void setBranchNames( Set<String> branchNames )
084        {
085        this.branchNames = branchNames;
086        }
087    
088      public void setTrapHandler( TrapHandler trapHandler )
089        {
090        this.trapHandler = trapHandler;
091        }
092    
093      @Override
094      public boolean hasTrapHandler()
095        {
096        return trapHandler != null;
097        }
098    
099      public void addIncomingScope( Scope incomingScope )
100        {
101        incomingScopes.add( incomingScope );
102        }
103    
104      @Override
105      public List<Scope> getOutgoingScopes()
106        {
107        return outgoingScopes;
108        }
109    
110      public void addOutgoingScope( Scope outgoingScope )
111        {
112        outgoingScopes.add( outgoingScope );
113        }
114    
115      protected Fields getOutgoingFields()
116        {
117        return unwind( next ).getFlowElement().resolveIncomingOperationPassThroughFields( outgoingScopes.get( 0 ) );
118        }
119    
120      private ElementDuct unwind( Duct next )
121        {
122        if( next instanceof ElementDuct )
123          return (ElementDuct) next;
124    
125        return unwind( next.getNext() );
126        }
127    
128      @Override
129      public void cleanup()
130        {
131        super.cleanup();
132    
133        // close if top of stack
134        if( next == null )
135          TrapHandler.closeTraps();
136        }
137    
138      protected void handleReThrowableException( String message, Throwable throwable )
139        {
140        trapHandler.handleReThrowableException( message, throwable );
141        }
142    
143      protected void handleException( Throwable exception, TupleEntry tupleEntry )
144        {
145        trapHandler.handleException( exception, tupleEntry );
146        }
147    
148      @Override
149      public final boolean equals( Object object )
150        {
151        if( this == object )
152          return true;
153        if( !( object instanceof ElementStage ) )
154          return false;
155    
156        ElementStage that = (ElementStage) object;
157    
158        if( flowElement != null ? flowElement != that.flowElement : that.flowElement != null )
159          return false;
160    
161        return true;
162        }
163    
164      @Override
165      public final int hashCode()
166        {
167        return flowElement != null ? System.identityHashCode( flowElement ) : 0;
168        }
169    
170      @Override
171      public String toString()
172        {
173        final StringBuilder sb = new StringBuilder();
174        sb.append( getClass().getSimpleName() );
175        sb.append( "{flowElement=" ).append( flowElement );
176        sb.append( '}' );
177        return sb.toString();
178        }
179      }