001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local.stream; 022 023 import java.util.List; 024 import java.util.Properties; 025 026 import cascading.flow.FlowElement; 027 import cascading.flow.FlowProcess; 028 import cascading.flow.local.LocalFlowProcess; 029 import cascading.flow.local.LocalFlowStep; 030 import cascading.flow.stream.Duct; 031 import cascading.flow.stream.Gate; 032 import cascading.flow.stream.MemoryCoGroupGate; 033 import cascading.flow.stream.SinkStage; 034 import cascading.flow.stream.SourceStage; 035 import cascading.flow.stream.StepStreamGraph; 036 import cascading.pipe.CoGroup; 037 import cascading.pipe.GroupBy; 038 import cascading.pipe.Merge; 039 import cascading.property.PropertyUtil; 040 import cascading.tap.Tap; 041 042 /** 043 * 044 */ 045 public class LocalStepStreamGraph extends StepStreamGraph 046 { 047 public LocalStepStreamGraph( FlowProcess<Properties> flowProcess, LocalFlowStep step ) 048 { 049 super( flowProcess, step ); 050 051 buildGraph(); 052 setTraps(); 053 setScopes(); 054 055 printGraph( step.getID(), "local", 0 ); 056 057 bind(); 058 } 059 060 protected void buildGraph() 061 { 062 for( Object rhsElement : step.getSources() ) 063 { 064 Duct rhsDuct = new SourceStage( tapFlowProcess( (Tap) rhsElement ), (Tap) rhsElement ); 065 066 addHead( rhsDuct ); 067 068 handleDuct( (FlowElement) rhsElement, rhsDuct ); 069 } 070 } 071 072 protected Gate createCoGroupGate( CoGroup element ) 073 { 074 return new MemoryCoGroupGate( flowProcess, element ); 075 } 076 077 protected Gate createGroupByGate( GroupBy element ) 078 { 079 return new LocalGroupByGate( flowProcess, element ); 080 } 081 082 @Override 083 protected Duct createMergeStage( Merge merge ) 084 { 085 return new SyncMergeStage( flowProcess, merge ); 086 } 087 088 @Override 089 protected SinkStage createSinkStage( Tap element ) 090 { 091 return new SinkStage( tapFlowProcess( element ), element ); 092 } 093 094 private LocalFlowProcess tapFlowProcess( Tap tap ) 095 { 096 Properties defaultProperties = ( (LocalFlowProcess) flowProcess ).getConfigCopy(); 097 Properties tapProperties = ( (LocalFlowStep) step ).getPropertiesMap().get( tap ); 098 099 tapProperties = PropertyUtil.createProperties( tapProperties, defaultProperties ); 100 101 return new LocalFlowProcess( (LocalFlowProcess) flowProcess, tapProperties ); 102 } 103 104 protected boolean stopOnElement( FlowElement lhsElement, List<FlowElement> successors ) 105 { 106 if( successors.isEmpty() ) 107 { 108 if( !( lhsElement instanceof Tap ) ) 109 throw new IllegalStateException( "expected a Tap instance" ); 110 111 return true; 112 } 113 114 return false; 115 } 116 }