001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Set; 026 027import cascading.flow.FlowElement; 028import cascading.flow.FlowProcess; 029import cascading.flow.planner.Scope; 030import cascading.flow.planner.ScopedElement; 031import cascading.flow.stream.duct.Duct; 032import cascading.flow.stream.duct.Stage; 033import cascading.pipe.Pipe; 034import cascading.tuple.Fields; 035import cascading.tuple.TupleEntry; 036 037/** 038 * 039 */ 040public abstract class ElementStage<Incoming, Outgoing> extends Stage<Incoming, Outgoing> implements ElementDuct 041 { 042 protected final FlowProcess flowProcess; 043 protected final FlowElement flowElement; 044 protected Set<String> branchNames; 045 protected TrapHandler trapHandler; 046 047 protected final List<Scope> incomingScopes = new ArrayList<Scope>(); 048 protected final List<Scope> outgoingScopes = new ArrayList<Scope>(); 049 050 public ElementStage( FlowProcess flowProcess, FlowElement flowElement ) 051 { 052 this.flowElement = flowElement; 053 054 FlowElement element = flowElement; 055 056 while( element != null ) 057 { 058 if( element instanceof ScopedElement && ( (ScopedElement) element ).hasConfigDef() ) 059 flowProcess = new ElementFlowProcess( flowProcess, ( (ScopedElement) element ).getConfigDef() ); 060 061 if( element instanceof Pipe ) 062 element = ( (Pipe) element ).getParent(); 063 else 064 element = null; 065 } 066 067 this.flowProcess = flowProcess; 068 } 069 070 public FlowElement getFlowElement() 071 { 072 return flowElement; 073 } 074 075 @Override 076 public List<Scope> getIncomingScopes() 077 { 078 return incomingScopes; 079 } 080 081 public Set<String> getBranchNames() 082 { 083 return branchNames; 084 } 085 086 public void setBranchNames( Set<String> branchNames ) 087 { 088 this.branchNames = branchNames; 089 } 090 091 public void setTrapHandler( TrapHandler trapHandler ) 092 { 093 this.trapHandler = trapHandler; 094 } 095 096 @Override 097 public boolean hasTrapHandler() 098 { 099 return trapHandler != null; 100 } 101 102 @Override 103 public List<Scope> getOutgoingScopes() 104 { 105 return outgoingScopes; 106 } 107 108 protected Fields getOutgoingFields() 109 { 110 return ( (ScopedElement) unwind( next ).getFlowElement() ).resolveIncomingOperationPassThroughFields( outgoingScopes.get( 0 ) ); 111 } 112 113 private ElementDuct unwind( Duct next ) 114 { 115 if( next instanceof ElementDuct ) 116 return (ElementDuct) next; 117 118 return unwind( next.getNext() ); 119 } 120 121 @Override 122 public void cleanup() 123 { 124 super.cleanup(); 125 126 // close if top of stack 127 if( next == null ) 128 flowProcess.closeTrapCollectors(); 129 } 130 131 protected void handleReThrowableException( String message, Throwable throwable ) 132 { 133 trapHandler.handleReThrowableException( message, throwable ); 134 } 135 136 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 137 { 138 trapHandler.handleException( exception, tupleEntry ); 139 } 140 141 @Override 142 public final boolean equals( Object object ) 143 { 144 if( this == object ) 145 return true; 146 if( !( object instanceof ElementStage ) ) 147 return false; 148 149 ElementStage that = (ElementStage) object; 150 151 if( flowElement != null ? flowElement != that.flowElement : that.flowElement != null ) 152 return false; 153 154 return true; 155 } 156 157 @Override 158 public final int hashCode() 159 { 160 return flowElement != null ? System.identityHashCode( flowElement ) : 0; 161 } 162 163 @Override 164 public String toString() 165 { 166 final StringBuilder sb = new StringBuilder(); 167 sb.append( getClass().getSimpleName() ); 168 sb.append( "{flowElement=" ).append( flowElement ); 169 sb.append( '}' ); 170 return sb.toString(); 171 } 172 }