001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import cascading.flow.FlowNode;
024import cascading.flow.FlowProcess;
025import cascading.flow.hadoop.util.HadoopMRUtil;
026import cascading.flow.planner.BaseFlowNode;
027import cascading.flow.planner.graph.BaseElementGraph;
028import cascading.flow.planner.process.FlowNodeGraph;
029import cascading.flow.planner.process.ProcessEdge;
030import cascading.tap.Tap;
031import org.apache.hadoop.mapred.JobConf;
032
033/** Class MapReduceFlowStep wraps a {@link JobConf} and allows it to be executed as a {@link cascading.flow.Flow}. */
034public class MapReduceFlowStep extends HadoopFlowStep
035  {
036  /** Field jobConf */
037  private final JobConf jobConf;
038
039  protected MapReduceFlowStep( String flowName, String stepName, JobConf jobConf, Tap sink )
040    {
041    super( BaseElementGraph.NULL, createFlowNodeGraph( jobConf ) );
042    setName( stepName );
043    setFlowName( flowName );
044    this.jobConf = jobConf;
045    addSink( "default", sink );
046    }
047
048  @Override
049  public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
050    {
051    // allow to delete
052    getSink().sinkConfInit( flowProcess, new JobConf() );
053
054    return jobConf;
055    }
056
057  private static FlowNodeGraph createFlowNodeGraph( JobConf jobConf )
058    {
059    FlowNodeGraph flowNodeGraph = new FlowNodeGraph();
060    int nodes = 1;
061    boolean hasReducer = HadoopMRUtil.hasReducer( jobConf );
062
063    if( hasReducer )
064      nodes = 2;
065
066    FlowNode mapperNode = new BaseFlowNode( String.format( "(1/%s)", nodes ), 0 );
067    flowNodeGraph.addVertex( mapperNode );
068
069    if( hasReducer )
070      {
071      FlowNode reducerNode = new BaseFlowNode( "(2/2)", 1 );
072      flowNodeGraph.addVertex( reducerNode );
073      flowNodeGraph.addEdge( mapperNode, reducerNode, new ProcessEdge( mapperNode, reducerNode ) );
074      }
075
076    return flowNodeGraph;
077    }
078  }