001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Set; 026 027import cascading.flow.FlowElement; 028import cascading.flow.FlowProcess; 029import cascading.flow.planner.Scope; 030import cascading.flow.stream.duct.Stage; 031import cascading.flow.stream.graph.IORole; 032import cascading.pipe.Boundary; 033import cascading.pipe.Pipe; 034import cascading.tuple.TupleEntry; 035 036/** 037 * 038 */ 039public abstract class BoundaryStage<Incoming, Outgoing> extends Stage<Incoming, Outgoing> implements ElementDuct 040 { 041 protected Boundary boundary; 042 protected final FlowProcess flowProcess; 043 protected IORole role = IORole.both; 044 045 protected final List<Scope> incomingScopes = new ArrayList<>(); 046 protected final List<Scope> outgoingScopes = new ArrayList<>(); 047 048 private TrapHandler trapHandler; 049 private Set<String> branchNames; 050 051 public BoundaryStage( FlowProcess flowProcess, Boundary boundary ) 052 { 053 this.boundary = boundary; 054 055 Pipe element = boundary; 056 057 while( element != null ) 058 { 059 if( element.hasConfigDef() ) 060 flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() ); 061 062 element = element.getParent(); 063 } 064 065 this.flowProcess = flowProcess; 066 } 067 068 public BoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role ) 069 { 070 this.boundary = boundary; 071 this.flowProcess = flowProcess; 072 this.role = role; 073 } 074 075 public Boundary getBoundary() 076 { 077 return boundary; 078 } 079 080 protected void handleReThrowableException( String message, Throwable throwable ) 081 { 082 trapHandler.handleReThrowableException( message, throwable ); 083 } 084 085 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 086 { 087 trapHandler.handleException( exception, tupleEntry ); 088 } 089 090 @Override 091 public void initialize() 092 { 093 super.initialize(); 094 095 if( incomingScopes.size() == 0 ) 096 throw new IllegalStateException( "incoming scopes may not be empty" ); 097 098 if( outgoingScopes.size() == 0 ) 099 throw new IllegalStateException( "outgoing scope may not be empty" ); 100 } 101 102 public void setBranchNames( Set<String> branchNames ) 103 { 104 this.branchNames = branchNames; 105 } 106 107 public Set<String> getBranchNames() 108 { 109 return branchNames; 110 } 111 112 @Override 113 public void setTrapHandler( TrapHandler trapHandler ) 114 { 115 this.trapHandler = trapHandler; 116 } 117 118 @Override 119 public boolean hasTrapHandler() 120 { 121 return trapHandler != null; 122 } 123 124 @Override 125 public FlowElement getFlowElement() 126 { 127 return boundary; 128 } 129 130 @Override 131 public List<Scope> getOutgoingScopes() 132 { 133 return outgoingScopes; 134 } 135 136 @Override 137 public List<Scope> getIncomingScopes() 138 { 139 return incomingScopes; 140 } 141 }