001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.cascade;
022
023import java.beans.ConstructorProperties;
024import java.util.Collection;
025import java.util.Map;
026
027import cascading.cascade.planner.FlowGraph;
028import cascading.cascade.planner.IdentifierGraph;
029import cascading.flow.Flow;
030import cascading.tap.Tap;
031import cascading.util.Util;
032
033import static cascading.cascade.CascadeDef.cascadeDef;
034
035/**
036 * Class CascadeConnector is used to construct a new {@link Cascade} instance from a collection of {@link cascading.flow.Flow} instance.
037 * <p/>
038 * Note order is not significant when adding passing Flow instances to the {@code connect}
039 * method. This connector will order them based on their dependencies, if any.
040 * <p/>
041 * One Flow is dependent on another if the first sinks (produces) output that the second Flow sources (consumes) as
042 * input. A sink and source are considered equivalent if the fully qualified identifier, typically {@link Tap#getFullIdentifier(Object)}
043 * from either are {@code equals()}.
044 * <p/>
045 * <p/>
046 * Note that checkpoint sink Taps from an upstream Flow may be the sources to downstream Flow instances.
047 * <p/>
048 * The {@link CascadeDef} is a convenience class for dynamically defining a Cascade that can be passed to the
049 * {@link CascadeConnector#connect(CascadeDef)} method.
050 * <p/>
051 * Use the {@link CascadeProps} fluent helper class to create global properties to pass to the CascadeConnector
052 * constructor.
053 *
054 * @see CascadeDef
055 * @see CascadeProps
056 */
057public class CascadeConnector
058  {
059  /** Field properties */
060  private Map<Object, Object> properties;
061
062  /** Constructor CascadeConnector creates a new CascadeConnector instance. */
063  public CascadeConnector()
064    {
065    }
066
067  /**
068   * Constructor CascadeConnector creates a new CascadeConnector instance.
069   *
070   * @param properties of type Map<Object, Object>
071   */
072  @ConstructorProperties({"properties"})
073  public CascadeConnector( Map<Object, Object> properties )
074    {
075    this.properties = properties;
076    }
077
078  /**
079   * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name
080   * of the Cascade is derived from the given Flow instances.
081   *
082   * @param flows of type Collection<Flow>
083   * @return Cascade
084   */
085  public Cascade connect( Collection<Flow> flows )
086    {
087    return connect( null, flows.toArray( new Flow[ flows.size() ] ) );
088    }
089
090  /**
091   * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance.
092   *
093   * @param name  of type String
094   * @param flows of type Collection<Flow>
095   * @return Cascade
096   */
097  public Cascade connect( String name, Collection<Flow> flows )
098    {
099    return connect( name, flows.toArray( new Flow[ flows.size() ] ) );
100    }
101
102  /**
103   * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name
104   * of the Cascade is derived from the given Flow instances.
105   *
106   * @param flows of type Flow
107   * @return Cascade
108   */
109  public Cascade connect( Flow... flows )
110    {
111    return connect( null, flows );
112    }
113
114  /**
115   * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance.
116   *
117   * @param name  of type String
118   * @param flows of type Flow
119   * @return Cascade
120   */
121  public Cascade connect( String name, Flow... flows )
122    {
123    name = name == null ? makeName( flows ) : name;
124
125    CascadeDef cascadeDef = cascadeDef()
126      .setName( name )
127      .addFlows( flows );
128
129    return connect( cascadeDef );
130    }
131
132  public Cascade connect( CascadeDef cascadeDef )
133    {
134    IdentifierGraph identifierGraph = new IdentifierGraph( cascadeDef.getFlowsArray() );
135    FlowGraph flowGraph = new FlowGraph( identifierGraph );
136
137    return new BaseCascade( cascadeDef, properties, flowGraph, identifierGraph );
138    }
139
140  private String makeName( Flow[] flows )
141    {
142    String[] names = new String[ flows.length ];
143
144    for( int i = 0; i < flows.length; i++ )
145      names[ i ] = flows[ i ].getName();
146
147    return Util.join( names, "+" );
148    }
149  }