001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.local.stream.graph; 022 023import java.util.Properties; 024 025import cascading.flow.FlowElement; 026import cascading.flow.FlowNode; 027import cascading.flow.FlowProcess; 028import cascading.flow.local.LocalFlowProcess; 029import cascading.flow.local.LocalFlowStep; 030import cascading.flow.local.stream.duct.ParallelFork; 031import cascading.flow.local.stream.element.LocalGroupByGate; 032import cascading.flow.local.stream.element.SyncMergeStage; 033import cascading.flow.stream.duct.Duct; 034import cascading.flow.stream.duct.Gate; 035import cascading.flow.stream.element.MemoryCoGroupGate; 036import cascading.flow.stream.element.SinkStage; 037import cascading.flow.stream.element.SourceStage; 038import cascading.flow.stream.graph.IORole; 039import cascading.flow.stream.graph.NodeStreamGraph; 040import cascading.pipe.CoGroup; 041import cascading.pipe.GroupBy; 042import cascading.pipe.Merge; 043import cascading.property.PropertyUtil; 044import cascading.tap.Tap; 045 046/** 047 * 048 */ 049public class LocalStepStreamGraph extends NodeStreamGraph 050 { 051 private LocalFlowStep step; 052 053 public LocalStepStreamGraph( FlowProcess<Properties> flowProcess, LocalFlowStep step, FlowNode node ) 054 { 055 super( flowProcess, node ); 056 this.step = step; 057 058 buildGraph(); 059 setTraps(); 060 setScopes(); 061 062 printGraph( node.getID(), "local", 0 ); 063 064 bind(); 065 } 066 067 protected void buildGraph() 068 { 069 for( Object rhsElement : node.getSourceTaps() ) 070 { 071 Duct rhsDuct = new SourceStage( tapFlowProcess( (Tap) rhsElement ), (Tap) rhsElement ); 072 073 addHead( rhsDuct ); 074 075 handleDuct( (FlowElement) rhsElement, rhsDuct ); 076 } 077 } 078 079 @Override 080 protected Duct createFork( Duct[] allNext ) 081 { 082 return new ParallelFork( allNext ); 083 } 084 085 protected Gate createCoGroupGate( CoGroup element, IORole role ) 086 { 087 return new MemoryCoGroupGate( flowProcess, element ); 088 } 089 090 protected Gate createGroupByGate( GroupBy element, IORole source ) 091 { 092 return new LocalGroupByGate( flowProcess, element ); 093 } 094 095 @Override 096 protected Duct createMergeStage( Merge merge, IORole both ) 097 { 098 return new SyncMergeStage( flowProcess, merge ); 099 } 100 101 @Override 102 protected SinkStage createSinkStage( Tap element ) 103 { 104 return new SinkStage( tapFlowProcess( element ), element ); 105 } 106 107 private LocalFlowProcess tapFlowProcess( Tap tap ) 108 { 109 Properties defaultProperties = ( (LocalFlowProcess) flowProcess ).getConfig(); 110 Properties tapProperties = step.getPropertiesMap().get( tap ); 111 112 tapProperties = PropertyUtil.createProperties( tapProperties, defaultProperties ); 113 114 return new LocalFlowProcess( (LocalFlowProcess) flowProcess, tapProperties ); 115 } 116 117 }