001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local.stream.graph;
022
023import java.util.Properties;
024
025import cascading.flow.FlowElement;
026import cascading.flow.FlowNode;
027import cascading.flow.FlowProcess;
028import cascading.flow.local.LocalFlowProcess;
029import cascading.flow.local.LocalFlowStep;
030import cascading.flow.local.stream.duct.ParallelFork;
031import cascading.flow.local.stream.element.LocalGroupByGate;
032import cascading.flow.local.stream.element.SyncMergeStage;
033import cascading.flow.stream.duct.Duct;
034import cascading.flow.stream.duct.Gate;
035import cascading.flow.stream.element.MemoryCoGroupGate;
036import cascading.flow.stream.element.SinkStage;
037import cascading.flow.stream.element.SourceStage;
038import cascading.flow.stream.graph.IORole;
039import cascading.flow.stream.graph.NodeStreamGraph;
040import cascading.pipe.CoGroup;
041import cascading.pipe.GroupBy;
042import cascading.pipe.Merge;
043import cascading.property.PropertyUtil;
044import cascading.tap.Tap;
045
046/**
047 *
048 */
049public class LocalStepStreamGraph extends NodeStreamGraph
050  {
051  private LocalFlowStep step;
052
053  public LocalStepStreamGraph( FlowProcess<Properties> flowProcess, LocalFlowStep step, FlowNode node )
054    {
055    super( flowProcess, node );
056    this.step = step;
057
058    buildGraph();
059    setTraps();
060    setScopes();
061
062    printGraph( node.getID(), "local", 0 );
063
064    bind();
065    }
066
067  protected void buildGraph()
068    {
069    for( Object rhsElement : node.getSourceTaps() )
070      {
071      Duct rhsDuct = new SourceStage( tapFlowProcess( (Tap) rhsElement ), (Tap) rhsElement );
072
073      addHead( rhsDuct );
074
075      handleDuct( (FlowElement) rhsElement, rhsDuct );
076      }
077    }
078
079  @Override
080  protected Duct createFork( Duct[] allNext )
081    {
082    return new ParallelFork( allNext );
083    }
084
085  protected Gate createCoGroupGate( CoGroup element, IORole role )
086    {
087    return new MemoryCoGroupGate( flowProcess, element );
088    }
089
090  protected Gate createGroupByGate( GroupBy element, IORole source )
091    {
092    return new LocalGroupByGate( flowProcess, element );
093    }
094
095  @Override
096  protected Duct createMergeStage( Merge merge, IORole both )
097    {
098    return new SyncMergeStage( flowProcess, merge );
099    }
100
101  @Override
102  protected SinkStage createSinkStage( Tap element )
103    {
104    return new SinkStage( tapFlowProcess( element ), element );
105    }
106
107  private LocalFlowProcess tapFlowProcess( Tap tap )
108    {
109    Properties defaultProperties = ( (LocalFlowProcess) flowProcess ).getConfig();
110    Properties tapProperties = step.getPropertiesMap().get( tap );
111
112    tapProperties = PropertyUtil.createProperties( tapProperties, defaultProperties );
113
114    return new LocalFlowProcess( (LocalFlowProcess) flowProcess, tapProperties );
115    }
116
117  }