001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream.graph;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027
028import cascading.flow.FlowException;
029import cascading.flow.FlowNode;
030import cascading.flow.FlowProcess;
031import cascading.flow.hadoop.HadoopFlowProcess;
032import cascading.flow.hadoop.stream.HadoopMemoryJoinGate;
033import cascading.flow.hadoop.stream.element.HadoopCoGroupGate;
034import cascading.flow.hadoop.stream.element.HadoopGroupByGate;
035import cascading.flow.hadoop.stream.element.HadoopSinkStage;
036import cascading.flow.hadoop.util.HadoopUtil;
037import cascading.flow.planner.graph.ElementGraphs;
038import cascading.flow.stream.duct.Gate;
039import cascading.flow.stream.element.GroupingSpliceGate;
040import cascading.flow.stream.element.SinkStage;
041import cascading.flow.stream.element.SourceStage;
042import cascading.flow.stream.graph.IORole;
043import cascading.flow.stream.graph.NodeStreamGraph;
044import cascading.pipe.CoGroup;
045import cascading.pipe.GroupBy;
046import cascading.pipe.HashJoin;
047import cascading.tap.Tap;
048import org.apache.hadoop.mapred.JobConf;
049import org.apache.hadoop.mapred.Reporter;
050
051/**
052 *
053 */
054public class HadoopMapStreamGraph extends NodeStreamGraph
055  {
056  private final Tap source;
057  private SourceStage streamedHead;
058
059  public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, Tap source )
060    {
061    super( flowProcess, node, source );
062    this.source = source;
063
064    buildGraph();
065
066    setTraps();
067    setScopes();
068
069    printGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() );
070    bind();
071    }
072
073  public SourceStage getStreamedHead()
074    {
075    return streamedHead;
076    }
077
078  protected void buildGraph()
079    {
080    streamedHead = handleHead( this.source, flowProcess );
081
082    Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
083
084    tributaries.remove( this.source ); // we cannot stream and accumulate the same source
085
086    // accumulated paths
087    for( Object source : tributaries )
088      {
089      final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
090      JobConf conf = hadoopProcess.getJobConf();
091
092      // allows client side config to be used cluster side
093      String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) );
094
095      if( property == null )
096        throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
097
098      conf = getSourceConf( hadoopProcess, conf, property );
099
100      // the reporter isn't provided until after the #run method is called
101      flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
102        {
103        @Override
104        public Reporter getReporter()
105          {
106          return hadoopProcess.getReporter();
107          }
108        };
109
110      handleHead( (Tap) source, flowProcess );
111      }
112    }
113
114  private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property )
115    {
116    Map<String, String> priorConf;
117    try
118      {
119      priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true );
120      }
121    catch( IOException exception )
122      {
123      throw new FlowException( "unable to deserialize properties", exception );
124      }
125
126    return flowProcess.mergeMapIntoConfig( conf, priorConf );
127    }
128
129  private SourceStage handleHead( Tap source, FlowProcess flowProcess )
130    {
131    SourceStage sourceDuct = new SourceStage( flowProcess, source );
132
133    addHead( sourceDuct );
134
135    handleDuct( source, sourceDuct );
136
137    return sourceDuct;
138    }
139
140  @Override
141  protected SinkStage createSinkStage( Tap element )
142    {
143    return new HadoopSinkStage( flowProcess, element );
144    }
145
146  @Override
147  protected Gate createCoGroupGate( CoGroup element, IORole role )
148    {
149    return new HadoopCoGroupGate( flowProcess, element, IORole.sink );
150    }
151
152  @Override
153  protected Gate createGroupByGate( GroupBy element, IORole role )
154    {
155    return new HadoopGroupByGate( flowProcess, element, role );
156    }
157
158  @Override
159  protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join )
160    {
161    return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch
162    }
163  }