001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.cascade;
023
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.Map;
027
028import cascading.flow.Flow;
029import cascading.property.UnitOfWorkDef;
030import cascading.tap.Tap;
031
032/**
033 * Class CascadeDef is a fluent interface for defining a {@link Cascade}.
034 * <p>
035 * This allows for ad-hoc building of Cascade data and meta-data like tags.
036 * <p>
037 * Instead of calling one of the {@link CascadeConnector} connect methods, {@link CascadeConnector#connect(CascadeDef)}
038 * can be called.
039 *
040 * @see cascading.property.UnitOfWorkDef
041 * @see cascading.flow.FlowDef
042 */
043public class CascadeDef extends UnitOfWorkDef<CascadeDef>
044  {
045  Map<String, Flow> flows = new HashMap<String, Flow>();
046  int maxConcurrentFlows = -1;
047
048  /**
049   * Creates a new instance of a CascadeDef.
050   *
051   * @return a CascadeDef
052   */
053  public static CascadeDef cascadeDef()
054    {
055    return new CascadeDef();
056    }
057
058  /** Constructor CascadeDef creates a new CascadeDef instance. */
059  public CascadeDef()
060    {
061    }
062
063  /**
064   * Method getFlows returns the flows of this CascadeDef object.
065   *
066   * @return the flows (type Collection) of this CascadeDef object.
067   */
068  public Collection<Flow> getFlows()
069    {
070    return flows.values();
071    }
072
073  /**
074   * Method getFlowsArray returns the flows as an array of this CascadeDef object.
075   *
076   * @return the flowsArray (type Flow[]) of this CascadeDef object.
077   */
078  public Flow[] getFlowsArray()
079    {
080    return getFlows().toArray( new Flow[ flows.size() ] );
081    }
082
083  /**
084   * Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}.
085   *
086   * @param flow of Flow
087   * @return CascadeDef
088   */
089  public CascadeDef addFlow( Flow flow )
090    {
091    if( flow == null )
092      return this;
093
094    if( flows.containsKey( flow.getName() ) )
095      throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() );
096
097    Collection<Tap> sinks = flow.getSinksCollection();
098
099    for( Tap sink : sinks )
100      {
101      String fullIdentifier = sink.getFullIdentifier( flow.getConfig() );
102
103      for( Flow existingFlow : flows.values() )
104        {
105        Collection<Tap> existingSinks = existingFlow.getSinksCollection();
106
107        for( Tap existingSink : existingSinks )
108          {
109          if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) )
110            throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() );
111          }
112        }
113      }
114
115    flows.put( flow.getName(), flow );
116
117    return this;
118    }
119
120  /**
121   * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}.
122   *
123   * @param flows of Flow[]
124   * @return CascadeDef
125   */
126  public CascadeDef addFlows( Flow... flows )
127    {
128    for( Flow flow : flows )
129      addFlow( flow );
130
131    return this;
132    }
133
134  /**
135   * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}.
136   *
137   * @param flows of Collection
138   * @return CascadeDef
139   */
140  public CascadeDef addFlows( Collection<Flow> flows )
141    {
142    for( Flow flow : flows )
143      addFlow( flow );
144
145    return this;
146    }
147
148  public CascadeDef setMaxConcurrentFlows( int maxConcurrentFlows )
149    {
150    this.maxConcurrentFlows = maxConcurrentFlows;
151
152    return this;
153    }
154
155  public int getMaxConcurrentFlows()
156    {
157    return maxConcurrentFlows;
158    }
159  }