001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.stream.graph;
023
024import java.io.IOException;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import cascading.flow.FlowElement;
033import cascading.flow.FlowElements;
034import cascading.flow.FlowException;
035import cascading.flow.FlowNode;
036import cascading.flow.FlowProcess;
037import cascading.flow.Flows;
038import cascading.flow.hadoop.stream.HadoopMemoryJoinGate;
039import cascading.flow.hadoop.util.HadoopUtil;
040import cascading.flow.stream.annotations.StreamMode;
041import cascading.flow.stream.duct.Duct;
042import cascading.flow.stream.duct.Gate;
043import cascading.flow.stream.element.InputSource;
044import cascading.flow.stream.element.MemoryHashJoinGate;
045import cascading.flow.stream.element.SinkStage;
046import cascading.flow.stream.element.SourceStage;
047import cascading.flow.stream.graph.IORole;
048import cascading.flow.stream.graph.NodeStreamGraph;
049import cascading.flow.tez.Hadoop2TezFlowProcess;
050import cascading.flow.tez.stream.element.TezBoundaryStage;
051import cascading.flow.tez.stream.element.TezCoGroupGate;
052import cascading.flow.tez.stream.element.TezGroupByGate;
053import cascading.flow.tez.stream.element.TezMergeGate;
054import cascading.flow.tez.stream.element.TezSinkStage;
055import cascading.flow.tez.stream.element.TezSourceStage;
056import cascading.flow.tez.util.TezUtil;
057import cascading.pipe.Boundary;
058import cascading.pipe.CoGroup;
059import cascading.pipe.Group;
060import cascading.pipe.GroupBy;
061import cascading.pipe.HashJoin;
062import cascading.pipe.Merge;
063import cascading.pipe.Pipe;
064import cascading.tap.Tap;
065import cascading.util.SetMultiMap;
066import cascading.util.SortedListMultiMap;
067import cascading.util.Util;
068import org.apache.hadoop.conf.Configuration;
069import org.apache.tez.dag.api.TezConfiguration;
070import org.apache.tez.runtime.api.LogicalInput;
071import org.apache.tez.runtime.api.LogicalOutput;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import static cascading.flow.tez.util.TezUtil.*;
076
077/**
078 *
079 */
080public class Hadoop2TezStreamGraph extends NodeStreamGraph
081  {
082  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezStreamGraph.class );
083
084  private InputSource streamedHead;
085  private Map<String, LogicalInput> inputMap;
086  private Map<String, LogicalOutput> outputMap;
087  private Map<LogicalInput, Configuration> inputConfigMap = new HashMap<>();
088  private Map<LogicalOutput, Configuration> outputConfigMap = new HashMap<>();
089  private SetMultiMap<String, LogicalInput> inputMultiMap;
090  private SetMultiMap<String, LogicalOutput> outputMultiMap;
091
092  public Hadoop2TezStreamGraph( Hadoop2TezFlowProcess currentProcess, FlowNode flowNode, Map<String, LogicalInput> inputMap, Map<String, LogicalOutput> outputMap )
093    {
094    super( currentProcess, flowNode );
095    this.inputMap = inputMap;
096    this.outputMap = outputMap;
097
098    buildGraph();
099
100    setTraps();
101    setScopes();
102
103    printGraph( node.getID(), node.getName(), flowProcess.getCurrentSliceNum() );
104
105    bind();
106
107    printBoundGraph( node.getID(), node.getName(), flowProcess.getCurrentSliceNum() );
108    }
109
110  public InputSource getStreamedHead()
111    {
112    return streamedHead;
113    }
114
115  protected void buildGraph()
116    {
117    inputMultiMap = new SetMultiMap<>();
118
119    for( Map.Entry<String, LogicalInput> entry : inputMap.entrySet() )
120      {
121      Configuration inputConfiguration = getInputConfiguration( entry.getValue() );
122      inputConfigMap.put( entry.getValue(), inputConfiguration );
123
124      inputMultiMap.addAll( getEdgeSourceID( entry.getValue(), inputConfiguration ), entry.getValue() );
125      }
126
127    outputMultiMap = new SetMultiMap<>();
128
129    for( Map.Entry<String, LogicalOutput> entry : outputMap.entrySet() )
130      {
131      Configuration outputConfiguration = getOutputConfiguration( entry.getValue() );
132      outputConfigMap.put( entry.getValue(), outputConfiguration );
133
134      outputMultiMap.addAll( TezUtil.getEdgeSinkID( entry.getValue(), outputConfiguration ), entry.getValue() );
135      }
136
137    // this made the assumption we can have a physical and logical input per vertex. seems we can't
138    if( inputMultiMap.getKeys().size() == 1 )
139      {
140      streamedSource = Flows.getFlowElementForID( node.getSourceElements(), Util.getFirst( inputMultiMap.getKeys() ) );
141      }
142    else
143      {
144      Set<FlowElement> sourceElements = new HashSet<>( node.getSourceElements() );
145      Set<? extends FlowElement> accumulated = node.getSourceElements( StreamMode.Accumulated );
146
147      sourceElements.removeAll( accumulated );
148
149      if( sourceElements.size() != 1 )
150        throw new IllegalStateException( "too many input source keys, got: " + Util.join( sourceElements, ", " ) );
151
152      streamedSource = Util.getFirst( sourceElements );
153      }
154
155    LOG.info( "using streamed source: " + streamedSource );
156
157    streamedHead = handleHead( streamedSource, flowProcess );
158
159    Set<FlowElement> accumulated = new HashSet<>( node.getSourceElements() );
160
161    accumulated.remove( streamedSource );
162
163    Hadoop2TezFlowProcess tezProcess = (Hadoop2TezFlowProcess) flowProcess;
164    TezConfiguration conf = tezProcess.getConfiguration();
165
166    for( FlowElement flowElement : accumulated )
167      {
168      LOG.info( "using accumulated source: " + flowElement );
169
170      if( flowElement instanceof Tap )
171        {
172        Tap source = (Tap) flowElement;
173
174        // allows client side config to be used cluster side
175        String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( source ) );
176
177        if( property == null )
178          throw new IllegalStateException( "accumulated source conf property missing for: " + source.getIdentifier() );
179
180        conf = getSourceConf( tezProcess, conf, property );
181        }
182      else
183        {
184        conf = (TezConfiguration) inputConfigMap.get( FlowElements.id( flowElement ) );
185        }
186
187      FlowProcess flowProcess = conf == null ? tezProcess : new Hadoop2TezFlowProcess( tezProcess, conf );
188
189      handleHead( flowElement, flowProcess );
190      }
191    }
192
193  private TezConfiguration getSourceConf( FlowProcess<TezConfiguration> flowProcess, TezConfiguration conf, String property )
194    {
195    Map<String, String> priorConf;
196
197    try
198      {
199      priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true );
200      }
201    catch( IOException exception )
202      {
203      throw new FlowException( "unable to deserialize properties", exception );
204      }
205
206    return flowProcess.mergeMapIntoConfig( conf, priorConf );
207    }
208
209  private InputSource handleHead( FlowElement source, FlowProcess flowProcess )
210    {
211    Duct sourceDuct;
212
213    if( source instanceof Tap )
214      sourceDuct = createSourceStage( (Tap) source, flowProcess );
215    else if( source instanceof Merge )
216      sourceDuct = createMergeStage( (Merge) source, IORole.source );
217    else if( source instanceof Boundary )
218      sourceDuct = createBoundaryStage( (Boundary) source, IORole.source );
219    else if( ( (Group) source ).isGroupBy() )
220      sourceDuct = createGroupByGate( (GroupBy) source, IORole.source );
221    else
222      sourceDuct = createCoGroupGate( (CoGroup) source, IORole.source );
223
224    addHead( sourceDuct );
225
226    handleDuct( source, sourceDuct );
227
228    return (InputSource) sourceDuct;
229    }
230
231  protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
232    {
233    String id = Tap.id( source );
234    LogicalInput logicalInput = inputMap.get( id );
235
236    if( logicalInput == null )
237      logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
238
239    if( logicalInput == null )
240      return new SourceStage( flowProcess, source );
241
242    return new TezSourceStage( flowProcess, source, logicalInput );
243    }
244
245  @Override
246  protected SinkStage createSinkStage( Tap sink )
247    {
248    String id = Tap.id( sink );
249    LogicalOutput logicalOutput = outputMap.get( id );
250
251    if( logicalOutput == null )
252      logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
253
254    if( logicalOutput == null )
255      throw new IllegalStateException( "could not find output for: " + sink );
256
257    return new TezSinkStage( flowProcess, sink, logicalOutput );
258    }
259
260  @Override
261  protected Duct createMergeStage( Merge element, IORole role )
262    {
263    if( role == IORole.pass )
264      return super.createMergeStage( element, IORole.pass );
265    else if( role == IORole.sink )
266      return createSinkMergeGate( element );
267    else if( role == IORole.source )
268      return createSourceMergeGate( element );
269    else
270      throw new UnsupportedOperationException( "both role not supported with merge" );
271    }
272
273  private Duct createSourceMergeGate( Merge element )
274    {
275    return new TezMergeGate( flowProcess, element, IORole.source, createInputMap( element ) );
276    }
277
278  private Duct createSinkMergeGate( Merge element )
279    {
280    return new TezMergeGate( flowProcess, element, IORole.sink, findLogicalOutputs( element ) );
281    }
282
283  @Override
284  protected Duct createBoundaryStage( Boundary element, IORole role )
285    {
286    if( role == IORole.pass )
287      return super.createBoundaryStage( element, IORole.pass );
288    else if( role == IORole.sink )
289      return createSinkBoundaryStage( element );
290    else if( role == IORole.source )
291      return createSourceBoundaryStage( element );
292    else
293      throw new UnsupportedOperationException( "both role not supported with boundary" );
294    }
295
296  private Duct createSourceBoundaryStage( Boundary element )
297    {
298    return new TezBoundaryStage( flowProcess, element, IORole.source, findLogicalInput( element ) );
299    }
300
301  private Duct createSinkBoundaryStage( Boundary element )
302    {
303    return new TezBoundaryStage( flowProcess, element, IORole.sink, findLogicalOutputs( element ) );
304    }
305
306  @Override
307  protected Gate createGroupByGate( GroupBy element, IORole role )
308    {
309    if( role == IORole.sink )
310      return createSinkGroupByGate( element );
311    else
312      return createSourceGroupByGate( element );
313    }
314
315  @Override
316  protected Gate createCoGroupGate( CoGroup element, IORole role )
317    {
318    if( role == IORole.sink )
319      return createSinkCoGroupByGate( element );
320    else
321      return createSourceCoGroupByGate( element );
322    }
323
324  private Gate createSinkCoGroupByGate( CoGroup element )
325    {
326    return new TezCoGroupGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) );
327    }
328
329  private Gate createSourceCoGroupByGate( CoGroup element )
330    {
331    return new TezCoGroupGate( flowProcess, element, IORole.source, createInputMap( element ) );
332    }
333
334  protected Gate createSinkGroupByGate( GroupBy element )
335    {
336    return new TezGroupByGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) );
337    }
338
339  protected Gate createSourceGroupByGate( GroupBy element )
340    {
341    return new TezGroupByGate( flowProcess, element, IORole.source, createInputMap( element ) );
342    }
343
344  private LogicalOutput findLogicalOutput( Pipe element )
345    {
346    String id = Pipe.id( element );
347    LogicalOutput logicalOutput = outputMap.get( id );
348
349    if( logicalOutput == null )
350      logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
351
352    if( logicalOutput == null )
353      throw new IllegalStateException( "could not find output for: " + element );
354
355    return logicalOutput;
356    }
357
358  private Collection<LogicalOutput> findLogicalOutputs( Pipe element )
359    {
360    String id = Pipe.id( element );
361
362    return outputMultiMap.getValues( id );
363    }
364
365  private LogicalInput findLogicalInput( Pipe element )
366    {
367    String id = Pipe.id( element );
368    LogicalInput logicalInput = inputMap.get( id );
369
370    if( logicalInput == null )
371      logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
372
373    if( logicalInput == null )
374      throw new IllegalStateException( "could not find input for: " + element );
375
376    return logicalInput;
377    }
378
379  /**
380   * Maps each input to an ordinal on the flowelement. an input may be bound to multiple ordinals.
381   *
382   * @param element
383   */
384  private SortedListMultiMap<Integer, LogicalInput> createInputMap( FlowElement element )
385    {
386    String id = FlowElements.id( element );
387    SortedListMultiMap<Integer, LogicalInput> ordinalMap = new SortedListMultiMap<>();
388
389    for( LogicalInput logicalInput : inputMap.values() )
390      {
391      Configuration configuration = inputConfigMap.get( logicalInput );
392
393      String foundID = configuration.get( "cascading.node.source" );
394
395      if( Util.isEmpty( foundID ) )
396        throw new IllegalStateException( "cascading.node.source property not set on source LogicalInput" );
397
398      if( !foundID.equals( id ) )
399        continue;
400
401      String values = configuration.get( "cascading.node.ordinals", "" );
402      List<Integer> ordinals = Util.split( Integer.class, ",", values );
403
404      for( Integer ordinal : ordinals )
405        ordinalMap.put( ordinal, logicalInput );
406      }
407
408    return ordinalMap;
409    }
410
411  @Override
412  protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join )
413    {
414    return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch
415    }
416  }