001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import cascading.flow.FlowNode;
024import cascading.flow.FlowProcess;
025import cascading.flow.hadoop.util.HadoopMRUtil;
026import cascading.flow.planner.BaseFlowNode;
027import cascading.flow.planner.graph.BaseElementGraph;
028import cascading.flow.planner.graph.ElementGraph;
029import cascading.flow.planner.process.FlowNodeGraph;
030import cascading.flow.planner.process.ProcessEdge;
031import cascading.tap.Tap;
032import org.apache.hadoop.mapred.JobConf;
033
034/** Class MapReduceFlowStep wraps a {@link JobConf} and allows it to be executed as a {@link cascading.flow.Flow}. */
035public class MapReduceFlowStep extends HadoopFlowStep
036  {
037  /** Field jobConf */
038  private final JobConf jobConf;
039
040  protected MapReduceFlowStep( String flowName, String stepName, JobConf jobConf, Tap sink )
041    {
042    this( flowName, stepName, BaseElementGraph.NULL, createFlowNodeGraph( jobConf ), jobConf );
043
044    addSink( "default", sink );
045    }
046
047  protected MapReduceFlowStep( String flowName, String stepName, ElementGraph elementGraph, FlowNodeGraph flowNodeGraph, JobConf jobConf )
048    {
049    super( elementGraph, flowNodeGraph );
050    setName( stepName );
051    setFlowName( flowName );
052    this.jobConf = jobConf;
053    }
054
055  @Override
056  public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
057    {
058    return jobConf;
059    }
060
061  protected static FlowNodeGraph createFlowNodeGraph( JobConf jobConf )
062    {
063    FlowNodeGraph flowNodeGraph = new FlowNodeGraph();
064    int nodes = 1;
065    boolean hasReducer = HadoopMRUtil.hasReducer( jobConf );
066
067    if( hasReducer )
068      nodes = 2;
069
070    FlowNode mapperNode = new BaseFlowNode( String.format( "(1/%s)", nodes ), 0 );
071    flowNodeGraph.addVertex( mapperNode );
072
073    if( hasReducer )
074      {
075      FlowNode reducerNode = new BaseFlowNode( "(2/2)", 1 );
076      flowNodeGraph.addVertex( reducerNode );
077      flowNodeGraph.addEdge( mapperNode, reducerNode, new ProcessEdge( mapperNode, reducerNode ) );
078      }
079
080    return flowNodeGraph;
081    }
082  }