001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.cascade;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Collection;
025    import java.util.Map;
026    
027    import cascading.cascade.planner.FlowGraph;
028    import cascading.cascade.planner.IdentifierGraph;
029    import cascading.flow.Flow;
030    import cascading.tap.Tap;
031    import cascading.util.Util;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    import static cascading.cascade.CascadeDef.cascadeDef;
036    
037    /**
038     * Class CascadeConnector is used to construct a new {@link Cascade} instance from a collection of {@link cascading.flow.Flow} instance.
039     * <p/>
040     * Note order is not significant when adding passing Flow instances to the {@code connect}
041     * method. This connector will order them based on their dependencies, if any.
042     * <p/>
043     * One Flow is dependent on another if the first sinks (produces) output that the second Flow sources (consumes) as
044     * input. A sink and source are considered equivalent if the fully qualified identifier, typically {@link Tap#getFullIdentifier(Object)}
045     * from either are {@code equals()}.
046     * <p/>
047     * <p/>
048     * Note that checkpoint sink Taps from an upstream Flow may be the sources to downstream Flow instances.
049     * <p/>
050     * The {@link CascadeDef} is a convenience class for dynamically defining a Cascade that can be passed to the
051     * {@link CascadeConnector#connect(CascadeDef)} method.
052     * <p/>
053     * Use the {@link CascadeProps} fluent helper class to create global properties to pass to the CascadeConnector
054     * constructor.
055     *
056     * @see CascadeDef
057     * @see CascadeProps
058     */
059    public class CascadeConnector
060      {
061      /** Field LOG */
062      private static final Logger LOG = LoggerFactory.getLogger( CascadeConnector.class );
063    
064      /** Field properties */
065      private Map<Object, Object> properties;
066    
067      /** Constructor CascadeConnector creates a new CascadeConnector instance. */
068      public CascadeConnector()
069        {
070        }
071    
072      /**
073       * Constructor CascadeConnector creates a new CascadeConnector instance.
074       *
075       * @param properties of type Map<Object, Object>
076       */
077      @ConstructorProperties({"properties"})
078      public CascadeConnector( Map<Object, Object> properties )
079        {
080        this.properties = properties;
081        }
082    
083      /**
084       * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name
085       * of the Cascade is derived from the given Flow instances.
086       *
087       * @param flows of type Collection<Flow>
088       * @return Cascade
089       */
090      public Cascade connect( Collection<Flow> flows )
091        {
092        return connect( null, flows.toArray( new Flow[ flows.size() ] ) );
093        }
094    
095      /**
096       * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance.
097       *
098       * @param name  of type String
099       * @param flows of type Collection<Flow>
100       * @return Cascade
101       */
102      public Cascade connect( String name, Collection<Flow> flows )
103        {
104        return connect( name, flows.toArray( new Flow[ flows.size() ] ) );
105        }
106    
107      /**
108       * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name
109       * of the Cascade is derived from the given Flow instances.
110       *
111       * @param flows of type Flow
112       * @return Cascade
113       */
114      public Cascade connect( Flow... flows )
115        {
116        return connect( null, flows );
117        }
118    
119      /**
120       * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance.
121       *
122       * @param name  of type String
123       * @param flows of type Flow
124       * @return Cascade
125       */
126      public Cascade connect( String name, Flow... flows )
127        {
128        name = name == null ? makeName( flows ) : name;
129    
130        CascadeDef cascadeDef = cascadeDef()
131          .setName( name )
132          .addFlows( flows );
133    
134        return connect( cascadeDef );
135        }
136    
137      public Cascade connect( CascadeDef cascadeDef )
138        {
139        IdentifierGraph identifierGraph = new IdentifierGraph( cascadeDef.getFlowsArray() );
140        FlowGraph flowGraph = new FlowGraph( identifierGraph );
141    
142        return new Cascade( cascadeDef, properties, flowGraph, identifierGraph );
143        }
144    
145      private String makeName( Flow[] flows )
146        {
147        String[] names = new String[ flows.length ];
148    
149        for( int i = 0; i < flows.length; i++ )
150          names[ i ] = flows[ i ].getName();
151    
152        return Util.join( names, "+" );
153        }
154      }