001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.cascade; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Collection; 025 import java.util.Map; 026 027 import cascading.cascade.planner.FlowGraph; 028 import cascading.cascade.planner.IdentifierGraph; 029 import cascading.flow.Flow; 030 import cascading.tap.Tap; 031 import cascading.util.Util; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 import static cascading.cascade.CascadeDef.cascadeDef; 036 037 /** 038 * Class CascadeConnector is used to construct a new {@link Cascade} instance from a collection of {@link cascading.flow.Flow} instance. 039 * <p/> 040 * Note order is not significant when adding passing Flow instances to the {@code connect} 041 * method. This connector will order them based on their dependencies, if any. 042 * <p/> 043 * One Flow is dependent on another if the first sinks (produces) output that the second Flow sources (consumes) as 044 * input. A sink and source are considered equivalent if the fully qualified identifier, typically {@link Tap#getFullIdentifier(Object)} 045 * from either are {@code equals()}. 046 * <p/> 047 * <p/> 048 * Note that checkpoint sink Taps from an upstream Flow may be the sources to downstream Flow instances. 049 * <p/> 050 * The {@link CascadeDef} is a convenience class for dynamically defining a Cascade that can be passed to the 051 * {@link CascadeConnector#connect(CascadeDef)} method. 052 * <p/> 053 * Use the {@link CascadeProps} fluent helper class to create global properties to pass to the CascadeConnector 054 * constructor. 055 * 056 * @see CascadeDef 057 * @see CascadeProps 058 */ 059 public class CascadeConnector 060 { 061 /** Field LOG */ 062 private static final Logger LOG = LoggerFactory.getLogger( CascadeConnector.class ); 063 064 /** Field properties */ 065 private Map<Object, Object> properties; 066 067 /** Constructor CascadeConnector creates a new CascadeConnector instance. */ 068 public CascadeConnector() 069 { 070 } 071 072 /** 073 * Constructor CascadeConnector creates a new CascadeConnector instance. 074 * 075 * @param properties of type Map<Object, Object> 076 */ 077 @ConstructorProperties({"properties"}) 078 public CascadeConnector( Map<Object, Object> properties ) 079 { 080 this.properties = properties; 081 } 082 083 /** 084 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name 085 * of the Cascade is derived from the given Flow instances. 086 * 087 * @param flows of type Collection<Flow> 088 * @return Cascade 089 */ 090 public Cascade connect( Collection<Flow> flows ) 091 { 092 return connect( null, flows.toArray( new Flow[ flows.size() ] ) ); 093 } 094 095 /** 096 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. 097 * 098 * @param name of type String 099 * @param flows of type Collection<Flow> 100 * @return Cascade 101 */ 102 public Cascade connect( String name, Collection<Flow> flows ) 103 { 104 return connect( name, flows.toArray( new Flow[ flows.size() ] ) ); 105 } 106 107 /** 108 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. The name 109 * of the Cascade is derived from the given Flow instances. 110 * 111 * @param flows of type Flow 112 * @return Cascade 113 */ 114 public Cascade connect( Flow... flows ) 115 { 116 return connect( null, flows ); 117 } 118 119 /** 120 * Given any number of {@link cascading.flow.Flow} objects, it will connect them and return a new {@link Cascade} instance. 121 * 122 * @param name of type String 123 * @param flows of type Flow 124 * @return Cascade 125 */ 126 public Cascade connect( String name, Flow... flows ) 127 { 128 name = name == null ? makeName( flows ) : name; 129 130 CascadeDef cascadeDef = cascadeDef() 131 .setName( name ) 132 .addFlows( flows ); 133 134 return connect( cascadeDef ); 135 } 136 137 public Cascade connect( CascadeDef cascadeDef ) 138 { 139 IdentifierGraph identifierGraph = new IdentifierGraph( cascadeDef.getFlowsArray() ); 140 FlowGraph flowGraph = new FlowGraph( identifierGraph ); 141 142 return new Cascade( cascadeDef, properties, flowGraph, identifierGraph ); 143 } 144 145 private String makeName( Flow[] flows ) 146 { 147 String[] names = new String[ flows.length ]; 148 149 for( int i = 0; i < flows.length; i++ ) 150 names[ i ] = flows[ i ].getName(); 151 152 return Util.join( names, "+" ); 153 } 154 }