001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.local;
022    
023    import java.util.Collections;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Map;
027    import java.util.Properties;
028    import java.util.Set;
029    
030    import cascading.flow.FlowProcess;
031    import cascading.flow.local.planner.LocalFlowStepJob;
032    import cascading.flow.planner.BaseFlowStep;
033    import cascading.flow.planner.FlowStepJob;
034    import cascading.property.ConfigDef;
035    import cascading.tap.Tap;
036    
037    /** Class LocalFlowStep is the local mode implementation of {@link cascading.flow.FlowStep}. */
038    public class LocalFlowStep extends BaseFlowStep<Properties>
039      {
040      /** Field mapperTraps */
041      private final Map<String, Tap> traps = new HashMap<String, Tap>();
042    
043      /** Map of Properties modified by each Tap's sourceConfInit/sinkConfInit */
044      private final Map<Tap, Properties> tapProperties = new HashMap<Tap, Properties>();
045    
046      public LocalFlowStep( String name, int id )
047        {
048        super( name, id );
049        }
050    
051      @Override
052      public Properties getInitializedConfig( FlowProcess<Properties> flowProcess, Properties parentConfig )
053        {
054        Properties currentProperties = parentConfig == null ? new Properties() : new Properties( parentConfig );
055    
056        initTaps( flowProcess, currentProperties, getSources(), false );
057        initTaps( flowProcess, currentProperties, getSinks(), true );
058        initTaps( flowProcess, currentProperties, getTraps(), true );
059    
060        initFromProcessConfigDef( currentProperties );
061    
062        return currentProperties;
063        }
064    
065      protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink )
066        {
067        if( !taps.isEmpty() )
068          {
069          for( Tap tap : taps )
070            {
071            Properties confCopy = flowProcess.copyConfig( conf );
072            tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy
073    
074            if( isSink )
075              tap.sinkConfInit( flowProcess, confCopy );
076            else
077              tap.sourceConfInit( flowProcess, confCopy );
078            }
079          }
080        }
081    
082      private void initFromProcessConfigDef( final Properties properties )
083        {
084        initConfFromProcessConfigDef( getSetterFor( properties ) );
085        }
086    
087      private ConfigDef.Setter getSetterFor( final Properties properties )
088        {
089        return new ConfigDef.Setter()
090        {
091        @Override
092        public String set( String key, String value )
093          {
094          String oldValue = get( key );
095    
096          properties.setProperty( key, value );
097    
098          return oldValue;
099          }
100    
101        @Override
102        public String update( String key, String value )
103          {
104          String oldValue = get( key );
105    
106          if( oldValue == null )
107            properties.setProperty( key, value );
108          else if( !oldValue.contains( value ) )
109            properties.setProperty( key, oldValue + "," + value );
110    
111          return oldValue;
112          }
113    
114        @Override
115        public String get( String key )
116          {
117          String value = properties.getProperty( key );
118    
119          if( value == null || value.isEmpty() )
120            return null;
121    
122          return value;
123          }
124        };
125        }
126    
127      @Override
128      public void clean( Properties config )
129        {
130        }
131    
132      @Override
133      protected FlowStepJob<Properties> createFlowStepJob( FlowProcess<Properties> flowProcess, Properties parentConfig )
134        {
135        setConf( getInitializedConfig( flowProcess, parentConfig ) );
136    
137        flowProcess = new LocalFlowProcess( flowProcess.getCurrentSession(), getConfig() );
138    
139        return new LocalFlowStepJob( createClientState( flowProcess ), (LocalFlowProcess) flowProcess, this );
140        }
141    
142      public Map<String, Tap> getTrapMap()
143        {
144        return traps;
145        }
146    
147      public Map<Tap, Properties> getPropertiesMap()
148        {
149        return tapProperties;
150        }
151    
152      @Override
153      public Set<Tap> getTraps()
154        {
155        return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) );
156        }
157    
158      public Tap getTrap( String name )
159        {
160        return getTrapMap().get( name );
161        }
162      }