001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez;
023
024import java.io.IOException;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.CascadingException;
032import cascading.flow.FlowProcess;
033import cascading.flow.FlowSession;
034import cascading.flow.hadoop.MapRed;
035import cascading.flow.hadoop.util.HadoopUtil;
036import cascading.tap.Tap;
037import cascading.tuple.TupleEntryCollector;
038import cascading.tuple.TupleEntryIterator;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.mapred.Reporter;
041import org.apache.hadoop.util.ReflectionUtils;
042import org.apache.tez.dag.api.TezConfiguration;
043import org.apache.tez.mapreduce.processor.MRTaskReporter;
044import org.apache.tez.runtime.api.ProcessorContext;
045import org.apache.tez.runtime.api.Writer;
046
047/**
048 * Class HadoopFlowProcess is an implementation of {@link cascading.flow.FlowProcess} for Hadoop. Use this interface to get direct
049 * access to the Hadoop JobConf and Reporter interfaces.
050 * <p>
051 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to
052 * fail if they are executed on a system other than Hadoop.
053 *
054 * @see cascading.flow.FlowSession
055 */
056public class Hadoop2TezFlowProcess extends FlowProcess<TezConfiguration> implements MapRed
057  {
058  /** Field jobConf */
059  final TezConfiguration configuration;
060  private ProcessorContext context;
061  private Writer writer;
062
063  public Hadoop2TezFlowProcess()
064    {
065    this.configuration = new TezConfiguration();
066    }
067
068  public Hadoop2TezFlowProcess( TezConfiguration configuration )
069    {
070    this.configuration = configuration;
071    }
072
073  public Hadoop2TezFlowProcess( FlowSession flowSession, ProcessorContext context, TezConfiguration configuration )
074    {
075    super( flowSession );
076    this.context = context;
077    this.configuration = configuration;
078    }
079
080  public Hadoop2TezFlowProcess( Hadoop2TezFlowProcess flowProcess, TezConfiguration configuration )
081    {
082    super( flowProcess );
083    this.context = flowProcess.context;
084    this.configuration = configuration;
085    }
086
087  public ProcessorContext getContext()
088    {
089    return context;
090    }
091
092  public void setWriter( Writer writer )
093    {
094    this.writer = writer;
095    }
096
097  @Override
098  public FlowProcess copyWith( TezConfiguration configuration )
099    {
100    return new Hadoop2TezFlowProcess( this, configuration );
101    }
102
103  /**
104   * Method getJobConf returns the jobConf of this HadoopFlowProcess object.
105   *
106   * @return the jobConf (type JobConf) of this HadoopFlowProcess object.
107   */
108  public TezConfiguration getConfiguration()
109    {
110    return configuration;
111    }
112
113  @Override
114  public TezConfiguration getConfig()
115    {
116    return configuration;
117    }
118
119  @Override
120  public TezConfiguration getConfigCopy()
121    {
122    return new TezConfiguration( configuration );
123    }
124
125  /**
126   * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task.
127   *
128   * @return int
129   */
130  @Override
131  public int getCurrentSliceNum()
132    {
133    return getConfiguration().getInt( "mapred.task.partition", 0 ); // TODO: likely incorrect
134    }
135
136  @Override
137  public int getNumProcessSlices()
138    {
139    return 0;
140    }
141
142  /**
143   * Method getReporter returns the reporter of this HadoopFlowProcess object.
144   *
145   * @return the reporter (type Reporter) of this HadoopFlowProcess object.
146   */
147  @Override
148  public Reporter getReporter()
149    {
150    if( context == null )
151      return Reporter.NULL;
152
153    return new MRTaskReporter( context );
154    }
155
156  @Override
157  public Object getProperty( String key )
158    {
159    return configuration.get( key );
160    }
161
162  @Override
163  public Collection<String> getPropertyKeys()
164    {
165    Set<String> keys = new HashSet<String>();
166
167    for( Map.Entry<String, String> entry : configuration )
168      keys.add( entry.getKey() );
169
170    return Collections.unmodifiableSet( keys );
171    }
172
173  @Override
174  public Object newInstance( String className )
175    {
176    if( className == null || className.isEmpty() )
177      return null;
178
179    try
180      {
181      Class type = (Class) Hadoop2TezFlowProcess.class.getClassLoader().loadClass( className.toString() );
182
183      return ReflectionUtils.newInstance( type, configuration );
184      }
185    catch( ClassNotFoundException exception )
186      {
187      throw new CascadingException( "unable to load class: " + className.toString(), exception );
188      }
189    }
190
191  @Override
192  public void keepAlive()
193    {
194    // unsupported
195    }
196
197  @Override
198  public void increment( Enum counter, long amount )
199    {
200    if( context != null )
201      context.getCounters().findCounter( counter ).increment( amount );
202    }
203
204  @Override
205  public void increment( String group, String counter, long amount )
206    {
207    if( context != null )
208      context.getCounters().findCounter( group, counter ).increment( amount );
209    }
210
211  @Override
212  public long getCounterValue( Enum counter )
213    {
214    if( context == null )
215      return 0;
216
217    return context.getCounters().findCounter( counter ).getValue();
218    }
219
220  @Override
221  public long getCounterValue( String group, String counter )
222    {
223    if( context == null )
224      return 0;
225
226    return context.getCounters().findCounter( group, counter ).getValue();
227    }
228
229  @Override
230  public void setStatus( String status )
231    {
232    // unsupported
233    }
234
235  @Override
236  public boolean isCounterStatusInitialized()
237    {
238    if( context == null )
239      return false;
240
241    return context.getCounters() != null;
242    }
243
244  @Override
245  public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
246    {
247    return tap.openForRead( this );
248    }
249
250  @Override
251  public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
252    {
253    return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
254    }
255
256  @Override
257  public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
258    {
259    TezConfiguration jobConf = new TezConfiguration( getConfiguration() );
260
261    int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 );
262    int nodeNum = jobConf.getInt( "cascading.flow.node.num", 0 );
263
264    String partname = String.format( "-%05d-%05d-", stepNum, nodeNum );
265
266    jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" );
267
268    return trap.openForWrite( new Hadoop2TezFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks
269    }
270
271  @Override
272  public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
273    {
274    return null;
275/*
276    return new TupleEntryCollector( Fields.size( 2 ) )
277    {
278    @Override
279    protected void collect( TupleEntry tupleEntry )
280      {
281      try
282        {
283        getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) );
284        }
285      catch( IOException exception )
286        {
287        throw new CascadingException( "failed collecting key and value", exception );
288        }
289      }
290    };
291*/
292    }
293
294  @Override
295  public <C> C copyConfig( C config )
296    {
297    return HadoopUtil.copyJobConf( config );
298    }
299
300  @Override
301  public <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig )
302    {
303    return HadoopUtil.getConfig( (Configuration) defaultConfig, (Configuration) updatedConfig );
304    }
305
306  @Override
307  public TezConfiguration mergeMapIntoConfig( TezConfiguration defaultConfig, Map<String, String> map )
308    {
309    return HadoopUtil.mergeConf( new TezConfiguration( defaultConfig ), map, true );
310    }
311  }