001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.cascade; 023 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.Map; 027 028import cascading.flow.Flow; 029import cascading.property.UnitOfWorkDef; 030import cascading.tap.Tap; 031 032/** 033 * Class CascadeDef is a fluent interface for defining a {@link Cascade}. 034 * <p> 035 * This allows for ad-hoc building of Cascade data and meta-data like tags. 036 * <p> 037 * Instead of calling one of the {@link CascadeConnector} connect methods, {@link CascadeConnector#connect(CascadeDef)} 038 * can be called. 039 * 040 * @see cascading.property.UnitOfWorkDef 041 * @see cascading.flow.FlowDef 042 */ 043public class CascadeDef extends UnitOfWorkDef<CascadeDef> 044 { 045 Map<String, Flow> flows = new HashMap<String, Flow>(); 046 int maxConcurrentFlows = -1; 047 048 /** 049 * Creates a new instance of a CascadeDef. 050 * 051 * @return a CascadeDef 052 */ 053 public static CascadeDef cascadeDef() 054 { 055 return new CascadeDef(); 056 } 057 058 /** Constructor CascadeDef creates a new CascadeDef instance. */ 059 public CascadeDef() 060 { 061 } 062 063 /** 064 * Method getFlows returns the flows of this CascadeDef object. 065 * 066 * @return the flows (type Collection) of this CascadeDef object. 067 */ 068 public Collection<Flow> getFlows() 069 { 070 return flows.values(); 071 } 072 073 /** 074 * Method getFlowsArray returns the flows as an array of this CascadeDef object. 075 * 076 * @return the flowsArray (type Flow[]) of this CascadeDef object. 077 */ 078 public Flow[] getFlowsArray() 079 { 080 return getFlows().toArray( new Flow[ flows.size() ] ); 081 } 082 083 /** 084 * Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}. 085 * 086 * @param flow of Flow 087 * @return CascadeDef 088 */ 089 public CascadeDef addFlow( Flow flow ) 090 { 091 if( flow == null ) 092 return this; 093 094 if( flows.containsKey( flow.getName() ) ) 095 throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() ); 096 097 Collection<Tap> sinks = flow.getSinksCollection(); 098 099 for( Tap sink : sinks ) 100 { 101 String fullIdentifier = sink.getFullIdentifier( flow.getConfig() ); 102 103 for( Flow existingFlow : flows.values() ) 104 { 105 Collection<Tap> existingSinks = existingFlow.getSinksCollection(); 106 107 for( Tap existingSink : existingSinks ) 108 { 109 if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) ) 110 throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() ); 111 } 112 } 113 } 114 115 flows.put( flow.getName(), flow ); 116 117 return this; 118 } 119 120 /** 121 * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}. 122 * 123 * @param flows of Flow[] 124 * @return CascadeDef 125 */ 126 public CascadeDef addFlows( Flow... flows ) 127 { 128 for( Flow flow : flows ) 129 addFlow( flow ); 130 131 return this; 132 } 133 134 /** 135 * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}. 136 * 137 * @param flows of Collection 138 * @return CascadeDef 139 */ 140 public CascadeDef addFlows( Collection<Flow> flows ) 141 { 142 for( Flow flow : flows ) 143 addFlow( flow ); 144 145 return this; 146 } 147 148 public CascadeDef setMaxConcurrentFlows( int maxConcurrentFlows ) 149 { 150 this.maxConcurrentFlows = maxConcurrentFlows; 151 152 return this; 153 } 154 155 public int getMaxConcurrentFlows() 156 { 157 return maxConcurrentFlows; 158 } 159 }