001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.planner;
022
023import java.net.URI;
024import java.util.Map;
025import java.util.Properties;
026import java.util.Set;
027
028import cascading.flow.FlowConnector;
029import cascading.flow.FlowConnectorProps;
030import cascading.flow.FlowDef;
031import cascading.flow.FlowElement;
032import cascading.flow.FlowStep;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.flow.planner.BaseFlowStepFactory;
035import cascading.flow.planner.FlowPlanner;
036import cascading.flow.planner.PlannerInfo;
037import cascading.flow.planner.PlatformInfo;
038import cascading.flow.planner.graph.ElementGraph;
039import cascading.flow.planner.process.FlowNodeGraph;
040import cascading.flow.planner.process.FlowStepFactory;
041import cascading.flow.planner.rule.RuleRegistry;
042import cascading.flow.planner.rule.transformer.BoundaryElementFactory;
043import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory;
044import cascading.flow.tez.Hadoop2TezFlow;
045import cascading.flow.tez.Hadoop2TezFlowStep;
046import cascading.flow.tez.util.TezUtil;
047import cascading.pipe.Boundary;
048import cascading.property.AppProps;
049import cascading.property.PropertyUtil;
050import cascading.tap.Tap;
051import cascading.tap.hadoop.DistCacheTap;
052import cascading.tap.hadoop.Hfs;
053import cascading.tap.hadoop.util.TempHfs;
054import cascading.util.Util;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.tez.dag.api.DAG;
057import org.apache.tez.dag.api.TezConfiguration;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import static cascading.flow.tez.util.TezUtil.asJobConf;
062
063/**
064 */
065public class Hadoop2TezPlanner extends FlowPlanner<Hadoop2TezFlow, TezConfiguration>
066  {
067  /** Field LOG */
068  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezPlanner.class );
069
070  public static final String PLATFORM_NAME = "hadoop2-tez";
071
072  /** Field defaultConfiguration */
073  private TezConfiguration defaultConfiguration;
074  /** Field intermediateSchemeClass */
075  private Class intermediateSchemeClass;
076
077  public static void copyConfiguration( Map<Object, Object> properties, Configuration configuration )
078    {
079    for( Map.Entry<String, String> entry : configuration )
080      properties.put( entry.getKey(), entry.getValue() );
081    }
082
083  public static TezConfiguration createConfiguration( Map<Object, Object> properties )
084    {
085    TezConfiguration conf = new TezConfiguration();
086
087    copyProperties( conf, properties );
088
089    return conf;
090    }
091
092  public static void copyProperties( Configuration jobConf, Map<Object, Object> properties )
093    {
094    if( properties instanceof Properties )
095      {
096      Properties props = (Properties) properties;
097      Set<String> keys = props.stringPropertyNames();
098
099      for( String key : keys )
100        jobConf.set( key, props.getProperty( key ) );
101      }
102    else
103      {
104      for( Map.Entry<Object, Object> entry : properties.entrySet() )
105        {
106        if( entry.getValue() != null )
107          jobConf.set( entry.getKey().toString(), entry.getValue().toString() );
108        }
109      }
110    }
111
112  @Override
113  public PlannerInfo getPlannerInfo( String registryName )
114    {
115    return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName );
116    }
117
118  @Override
119  public TezConfiguration getDefaultConfig()
120    {
121    return defaultConfiguration;
122    }
123
124  @Override
125  public PlatformInfo getPlatformInfo()
126    {
127    return HadoopUtil.getPlatformInfo( DAG.class, null, "Tez" );
128    }
129
130  @Override
131  public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
132    {
133    super.initialize( flowConnector, properties );
134
135    defaultConfiguration = TezUtil.createTezConf( properties, createConfiguration( properties ) );
136    intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties );
137
138    String applicationJarPath = AppProps.getApplicationJarPath( properties );
139
140    if( applicationJarPath == null )
141      {
142      Class type = AppProps.getApplicationJarClass( properties );
143
144      if( type == null )
145        type = HadoopUtil.findMainClass( Hadoop2TezPlanner.class );
146
147      if( type != null )
148        applicationJarPath = Util.findContainingJar( type );
149
150      AppProps.setApplicationJarPath( properties, applicationJarPath );
151      }
152
153    if( applicationJarPath != null )
154      LOG.info( "using application jar: {}", applicationJarPath );
155    else
156      LOG.info( "using application jar not provided, see cascading.property.AppProps for more information" );
157    }
158
159  @Override
160  public void configRuleRegistryDefaults( RuleRegistry ruleRegistry )
161    {
162    super.configRuleRegistryDefaults( ruleRegistry );
163
164    ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() );
165    ruleRegistry.addDefaultElementFactory( BoundaryElementFactory.BOUNDARY_PIPE, new IntermediateBoundaryElementFactory() );
166
167    if( PropertyUtil.getBooleanProperty( getDefaultProperties(), FlowConnectorProps.ENABLE_DECORATE_ACCUMULATED_TAP, true ) )
168      ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.ACCUMULATED_TAP, new TempTapElementFactory( DistCacheTap.class.getName() ) );
169    }
170
171  @Override
172  protected Hadoop2TezFlow createFlow( FlowDef flowDef )
173    {
174    return new Hadoop2TezFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef );
175    }
176
177  @Override
178  public FlowStepFactory<TezConfiguration> getFlowStepFactory()
179    {
180    return new BaseFlowStepFactory<TezConfiguration>( getFlowNodeFactory() )
181      {
182      @Override
183      public FlowStep<TezConfiguration> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph )
184        {
185        return new Hadoop2TezFlowStep( stepElementGraph, flowNodeGraph );
186        }
187      };
188    }
189
190  public URI getDefaultURIScheme( Tap tap )
191    {
192    return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultConfiguration );
193    }
194
195  public URI getURIScheme( Tap tap )
196    {
197    return ( (Hfs) tap ).getURIScheme( defaultConfiguration );
198    }
199
200  @Override
201  protected Tap makeTempTap( String prefix, String name )
202    {
203    // must give Taps unique names
204    return new TempHfs( asJobConf( defaultConfiguration ), Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null );
205    }
206
207  public class IntermediateBoundaryElementFactory extends BoundaryElementFactory
208    {
209
210    @Override
211    public FlowElement create( ElementGraph graph, FlowElement flowElement )
212      {
213      return new Boundary();
214      }
215    }
216  }