001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local; 022 023 import java.util.Collections; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Map; 027 import java.util.Properties; 028 import java.util.Set; 029 030 import cascading.flow.FlowProcess; 031 import cascading.flow.local.planner.LocalFlowStepJob; 032 import cascading.flow.planner.BaseFlowStep; 033 import cascading.flow.planner.FlowStepJob; 034 import cascading.property.ConfigDef; 035 import cascading.tap.Tap; 036 037 /** Class LocalFlowStep is the local mode implementation of {@link cascading.flow.FlowStep}. */ 038 public class LocalFlowStep extends BaseFlowStep<Properties> 039 { 040 /** Field mapperTraps */ 041 private final Map<String, Tap> traps = new HashMap<String, Tap>(); 042 043 /** Map of Properties modified by each Tap's sourceConfInit/sinkConfInit */ 044 private final Map<Tap, Properties> tapProperties = new HashMap<Tap, Properties>(); 045 046 public LocalFlowStep( String name, int id ) 047 { 048 super( name, id ); 049 } 050 051 @Override 052 public Properties getInitializedConfig( FlowProcess<Properties> flowProcess, Properties parentConfig ) 053 { 054 Properties currentProperties = parentConfig == null ? new Properties() : new Properties( parentConfig ); 055 056 initTaps( flowProcess, currentProperties, getSources(), false ); 057 initTaps( flowProcess, currentProperties, getSinks(), true ); 058 initTaps( flowProcess, currentProperties, getTraps(), true ); 059 060 initFromProcessConfigDef( currentProperties ); 061 062 return currentProperties; 063 } 064 065 protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink ) 066 { 067 if( !taps.isEmpty() ) 068 { 069 for( Tap tap : taps ) 070 { 071 Properties confCopy = flowProcess.copyConfig( conf ); 072 tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy 073 074 if( isSink ) 075 tap.sinkConfInit( flowProcess, confCopy ); 076 else 077 tap.sourceConfInit( flowProcess, confCopy ); 078 } 079 } 080 } 081 082 private void initFromProcessConfigDef( final Properties properties ) 083 { 084 initConfFromProcessConfigDef( getSetterFor( properties ) ); 085 } 086 087 private ConfigDef.Setter getSetterFor( final Properties properties ) 088 { 089 return new ConfigDef.Setter() 090 { 091 @Override 092 public String set( String key, String value ) 093 { 094 String oldValue = get( key ); 095 096 properties.setProperty( key, value ); 097 098 return oldValue; 099 } 100 101 @Override 102 public String update( String key, String value ) 103 { 104 String oldValue = get( key ); 105 106 if( oldValue == null ) 107 properties.setProperty( key, value ); 108 else if( !oldValue.contains( value ) ) 109 properties.setProperty( key, oldValue + "," + value ); 110 111 return oldValue; 112 } 113 114 @Override 115 public String get( String key ) 116 { 117 String value = properties.getProperty( key ); 118 119 if( value == null || value.isEmpty() ) 120 return null; 121 122 return value; 123 } 124 }; 125 } 126 127 @Override 128 public void clean( Properties config ) 129 { 130 } 131 132 @Override 133 protected FlowStepJob<Properties> createFlowStepJob( FlowProcess<Properties> flowProcess, Properties parentConfig ) 134 { 135 setConf( getInitializedConfig( flowProcess, parentConfig ) ); 136 137 flowProcess = new LocalFlowProcess( flowProcess.getCurrentSession(), getConfig() ); 138 139 return new LocalFlowStepJob( createClientState( flowProcess ), (LocalFlowProcess) flowProcess, this ); 140 } 141 142 public Map<String, Tap> getTrapMap() 143 { 144 return traps; 145 } 146 147 public Map<Tap, Properties> getPropertiesMap() 148 { 149 return tapProperties; 150 } 151 152 @Override 153 public Set<Tap> getTraps() 154 { 155 return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) ); 156 } 157 158 public Tap getTrap( String name ) 159 { 160 return getTrapMap().get( name ); 161 } 162 }