001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.local.stream;
022    
023    import java.util.List;
024    import java.util.Properties;
025    
026    import cascading.flow.FlowElement;
027    import cascading.flow.FlowProcess;
028    import cascading.flow.local.LocalFlowProcess;
029    import cascading.flow.local.LocalFlowStep;
030    import cascading.flow.stream.Duct;
031    import cascading.flow.stream.Gate;
032    import cascading.flow.stream.MemoryCoGroupGate;
033    import cascading.flow.stream.SinkStage;
034    import cascading.flow.stream.SourceStage;
035    import cascading.flow.stream.StepStreamGraph;
036    import cascading.pipe.CoGroup;
037    import cascading.pipe.GroupBy;
038    import cascading.pipe.Merge;
039    import cascading.property.PropertyUtil;
040    import cascading.tap.Tap;
041    
042    /**
043     *
044     */
045    public class LocalStepStreamGraph extends StepStreamGraph
046      {
047      public LocalStepStreamGraph( FlowProcess<Properties> flowProcess, LocalFlowStep step )
048        {
049        super( flowProcess, step );
050    
051        buildGraph();
052        setTraps();
053        setScopes();
054    
055        printGraph( step.getID(), "local", 0 );
056    
057        bind();
058        }
059    
060      protected void buildGraph()
061        {
062        for( Object rhsElement : step.getSources() )
063          {
064          Duct rhsDuct = new SourceStage( tapFlowProcess( (Tap) rhsElement ), (Tap) rhsElement );
065    
066          addHead( rhsDuct );
067    
068          handleDuct( (FlowElement) rhsElement, rhsDuct );
069          }
070        }
071    
072      protected Gate createCoGroupGate( CoGroup element )
073        {
074        return new MemoryCoGroupGate( flowProcess, element );
075        }
076    
077      protected Gate createGroupByGate( GroupBy element )
078        {
079        return new LocalGroupByGate( flowProcess, element );
080        }
081    
082      @Override
083      protected Duct createMergeStage( Merge merge )
084        {
085        return new SyncMergeStage( flowProcess, merge );
086        }
087    
088      @Override
089      protected SinkStage createSinkStage( Tap element )
090        {
091        return new SinkStage( tapFlowProcess( element ), element );
092        }
093    
094      private LocalFlowProcess tapFlowProcess( Tap tap )
095        {
096        Properties defaultProperties = ( (LocalFlowProcess) flowProcess ).getConfigCopy();
097        Properties tapProperties = ( (LocalFlowStep) step ).getPropertiesMap().get( tap );
098    
099        tapProperties = PropertyUtil.createProperties( tapProperties, defaultProperties );
100    
101        return new LocalFlowProcess( (LocalFlowProcess) flowProcess, tapProperties );
102        }
103    
104      protected boolean stopOnElement( FlowElement lhsElement, List<FlowElement> successors )
105        {
106        if( successors.isEmpty() )
107          {
108          if( !( lhsElement instanceof Tap ) )
109            throw new IllegalStateException( "expected a Tap instance" );
110    
111          return true;
112          }
113    
114        return false;
115        }
116      }