001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.cascade; 022 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.flow.Flow; 028import cascading.property.UnitOfWorkDef; 029import cascading.tap.Tap; 030 031/** 032 * Class CascadeDef is a fluent interface for defining a {@link Cascade}. 033 * <p/> 034 * This allows for ad-hoc building of Cascade data and meta-data like tags. 035 * <p/> 036 * Instead of calling one of the {@link CascadeConnector} connect methods, {@link CascadeConnector#connect(CascadeDef)} 037 * can be called. 038 * 039 * @see cascading.property.UnitOfWorkDef 040 * @see cascading.flow.FlowDef 041 */ 042public class CascadeDef extends UnitOfWorkDef<CascadeDef> 043 { 044 Map<String, Flow> flows = new HashMap<String, Flow>(); 045 int maxConcurrentFlows = -1; 046 047 /** 048 * Creates a new instance of a CascadeDef. 049 * 050 * @return a CascadeDef 051 */ 052 public static CascadeDef cascadeDef() 053 { 054 return new CascadeDef(); 055 } 056 057 /** Constructor CascadeDef creates a new CascadeDef instance. */ 058 public CascadeDef() 059 { 060 } 061 062 /** 063 * Method getFlows returns the flows of this CascadeDef object. 064 * 065 * @return the flows (type Collection<Flow>) of this CascadeDef object. 066 */ 067 public Collection<Flow> getFlows() 068 { 069 return flows.values(); 070 } 071 072 /** 073 * Method getFlowsArray returns the flows as an array of this CascadeDef object. 074 * 075 * @return the flowsArray (type Flow[]) of this CascadeDef object. 076 */ 077 public Flow[] getFlowsArray() 078 { 079 return getFlows().toArray( new Flow[ flows.size() ] ); 080 } 081 082 /** 083 * Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}. 084 * 085 * @param flow of Flow 086 * @return CascadeDef 087 */ 088 public CascadeDef addFlow( Flow flow ) 089 { 090 if( flow == null ) 091 return this; 092 093 if( flows.containsKey( flow.getName() ) ) 094 throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() ); 095 096 Collection<Tap> sinks = flow.getSinksCollection(); 097 098 for( Tap sink : sinks ) 099 { 100 String fullIdentifier = sink.getFullIdentifier( flow.getConfig() ); 101 102 for( Flow existingFlow : flows.values() ) 103 { 104 Collection<Tap> existingSinks = existingFlow.getSinksCollection(); 105 106 for( Tap existingSink : existingSinks ) 107 { 108 if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) ) 109 throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() ); 110 } 111 } 112 } 113 114 flows.put( flow.getName(), flow ); 115 116 return this; 117 } 118 119 /** 120 * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}. 121 * 122 * @param flows of Flow[] 123 * @return CascadeDef 124 */ 125 public CascadeDef addFlows( Flow... flows ) 126 { 127 for( Flow flow : flows ) 128 addFlow( flow ); 129 130 return this; 131 } 132 133 /** 134 * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}. 135 * 136 * @param flows of Collection<Flow> 137 * @return CascadeDef 138 */ 139 public CascadeDef addFlows( Collection<Flow> flows ) 140 { 141 for( Flow flow : flows ) 142 addFlow( flow ); 143 144 return this; 145 } 146 147 public CascadeDef setMaxConcurrentFlows( int maxConcurrentFlows ) 148 { 149 this.maxConcurrentFlows = maxConcurrentFlows; 150 151 return this; 152 } 153 154 public int getMaxConcurrentFlows() 155 { 156 return maxConcurrentFlows; 157 } 158 }