001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.planner; 022 023import java.net.URI; 024import java.util.Map; 025import java.util.Properties; 026import java.util.Set; 027 028import cascading.flow.FlowConnector; 029import cascading.flow.FlowConnectorProps; 030import cascading.flow.FlowDef; 031import cascading.flow.FlowElement; 032import cascading.flow.FlowStep; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.flow.planner.BaseFlowStepFactory; 035import cascading.flow.planner.FlowPlanner; 036import cascading.flow.planner.PlannerInfo; 037import cascading.flow.planner.PlatformInfo; 038import cascading.flow.planner.graph.ElementGraph; 039import cascading.flow.planner.process.FlowNodeGraph; 040import cascading.flow.planner.process.FlowStepFactory; 041import cascading.flow.planner.rule.RuleRegistry; 042import cascading.flow.planner.rule.transformer.BoundaryElementFactory; 043import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 044import cascading.flow.tez.Hadoop2TezFlow; 045import cascading.flow.tez.Hadoop2TezFlowStep; 046import cascading.flow.tez.util.TezUtil; 047import cascading.pipe.Boundary; 048import cascading.property.AppProps; 049import cascading.property.PropertyUtil; 050import cascading.tap.Tap; 051import cascading.tap.hadoop.DistCacheTap; 052import cascading.tap.hadoop.Hfs; 053import cascading.tap.hadoop.util.TempHfs; 054import cascading.util.Util; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.tez.dag.api.DAG; 057import org.apache.tez.dag.api.TezConfiguration; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import static cascading.flow.tez.util.TezUtil.asJobConf; 062 063/** 064 */ 065public class Hadoop2TezPlanner extends FlowPlanner<Hadoop2TezFlow, TezConfiguration> 066 { 067 /** Field LOG */ 068 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezPlanner.class ); 069 070 public static final String PLATFORM_NAME = "hadoop2-tez"; 071 072 /** Field defaultConfiguration */ 073 private TezConfiguration defaultConfiguration; 074 /** Field intermediateSchemeClass */ 075 private Class intermediateSchemeClass; 076 077 public static void copyConfiguration( Map<Object, Object> properties, Configuration configuration ) 078 { 079 for( Map.Entry<String, String> entry : configuration ) 080 properties.put( entry.getKey(), entry.getValue() ); 081 } 082 083 public static TezConfiguration createConfiguration( Map<Object, Object> properties ) 084 { 085 TezConfiguration conf = new TezConfiguration(); 086 087 copyProperties( conf, properties ); 088 089 return conf; 090 } 091 092 public static void copyProperties( Configuration jobConf, Map<Object, Object> properties ) 093 { 094 if( properties instanceof Properties ) 095 { 096 Properties props = (Properties) properties; 097 Set<String> keys = props.stringPropertyNames(); 098 099 for( String key : keys ) 100 jobConf.set( key, props.getProperty( key ) ); 101 } 102 else 103 { 104 for( Map.Entry<Object, Object> entry : properties.entrySet() ) 105 { 106 if( entry.getValue() != null ) 107 jobConf.set( entry.getKey().toString(), entry.getValue().toString() ); 108 } 109 } 110 } 111 112 @Override 113 public PlannerInfo getPlannerInfo( String registryName ) 114 { 115 return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName ); 116 } 117 118 @Override 119 public TezConfiguration getDefaultConfig() 120 { 121 return defaultConfiguration; 122 } 123 124 @Override 125 public PlatformInfo getPlatformInfo() 126 { 127 return HadoopUtil.getPlatformInfo( DAG.class, null, "Tez" ); 128 } 129 130 @Override 131 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 132 { 133 super.initialize( flowConnector, properties ); 134 135 defaultConfiguration = TezUtil.createTezConf( properties, createConfiguration( properties ) ); 136 intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties ); 137 138 String applicationJarPath = AppProps.getApplicationJarPath( properties ); 139 140 if( applicationJarPath == null ) 141 { 142 Class type = AppProps.getApplicationJarClass( properties ); 143 144 if( type == null ) 145 type = HadoopUtil.findMainClass( Hadoop2TezPlanner.class ); 146 147 if( type != null ) 148 applicationJarPath = Util.findContainingJar( type ); 149 150 AppProps.setApplicationJarPath( properties, applicationJarPath ); 151 } 152 153 if( applicationJarPath != null ) 154 LOG.info( "using application jar: {}", applicationJarPath ); 155 else 156 LOG.info( "using application jar not provided, see cascading.property.AppProps for more information" ); 157 } 158 159 @Override 160 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 161 { 162 super.configRuleRegistryDefaults( ruleRegistry ); 163 164 ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() ); 165 ruleRegistry.addDefaultElementFactory( BoundaryElementFactory.BOUNDARY_PIPE, new IntermediateBoundaryElementFactory() ); 166 167 if( PropertyUtil.getBooleanProperty( getDefaultProperties(), FlowConnectorProps.ENABLE_DECORATE_ACCUMULATED_TAP, true ) ) 168 ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.ACCUMULATED_TAP, new TempTapElementFactory( DistCacheTap.class.getName() ) ); 169 } 170 171 @Override 172 protected Hadoop2TezFlow createFlow( FlowDef flowDef ) 173 { 174 return new Hadoop2TezFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef ); 175 } 176 177 @Override 178 public FlowStepFactory<TezConfiguration> getFlowStepFactory() 179 { 180 return new BaseFlowStepFactory<TezConfiguration>( getFlowNodeFactory() ) 181 { 182 @Override 183 public FlowStep<TezConfiguration> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph ) 184 { 185 return new Hadoop2TezFlowStep( stepElementGraph, flowNodeGraph ); 186 } 187 }; 188 } 189 190 public URI getDefaultURIScheme( Tap tap ) 191 { 192 return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultConfiguration ); 193 } 194 195 public URI getURIScheme( Tap tap ) 196 { 197 return ( (Hfs) tap ).getURIScheme( defaultConfiguration ); 198 } 199 200 @Override 201 protected Tap makeTempTap( String prefix, String name ) 202 { 203 // must give Taps unique names 204 return new TempHfs( asJobConf( defaultConfiguration ), Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null ); 205 } 206 207 public class IntermediateBoundaryElementFactory extends BoundaryElementFactory 208 { 209 210 @Override 211 public FlowElement create( ElementGraph graph, FlowElement flowElement ) 212 { 213 return new Boundary(); 214 } 215 } 216 }