001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.local;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.Map;
027    import java.util.Properties;
028    
029    import cascading.CascadingException;
030    import cascading.flow.FlowProcess;
031    import cascading.flow.FlowSession;
032    import cascading.stats.local.LocalStepStats;
033    import cascading.tap.Tap;
034    import cascading.tuple.TupleEntryCollector;
035    import cascading.tuple.TupleEntryIterator;
036    
037    /** Class LocalFlowProcess is the local mode implementation of {@link FlowProcess}. */
038    public class LocalFlowProcess extends FlowProcess<Properties>
039      {
040      private final Properties config;
041      private LocalStepStats stepStats;
042    
043      public LocalFlowProcess()
044        {
045        config = new Properties();
046        }
047    
048      public LocalFlowProcess( Properties config )
049        {
050        this.config = config;
051        }
052    
053      public LocalFlowProcess( FlowSession flowSession, Properties config )
054        {
055        super( flowSession );
056        this.config = config;
057        }
058    
059      public LocalFlowProcess( LocalFlowProcess flowProcess, Properties properties )
060        {
061        super( flowProcess.getCurrentSession() );
062        this.config = properties;
063        this.stepStats = flowProcess.stepStats;
064        }
065    
066      public void setStepStats( LocalStepStats stepStats )
067        {
068        this.stepStats = stepStats;
069        }
070    
071      @Override
072      public int getNumProcessSlices()
073        {
074        return 1;
075        }
076    
077      @Override
078      public int getCurrentSliceNum()
079        {
080        return 0;
081        }
082    
083      @Override
084      public Object getProperty( String key )
085        {
086        return config.getProperty( key );
087        }
088    
089      @Override
090      public Collection<String> getPropertyKeys()
091        {
092        return Collections.unmodifiableSet( config.stringPropertyNames() );
093        }
094    
095      @Override
096      public Object newInstance( String className )
097        {
098        if( className == null || className.isEmpty() )
099          return null;
100    
101        try
102          {
103          Class type = (Class) LocalFlowProcess.class.getClassLoader().loadClass( className.toString() );
104    
105          return type.newInstance();
106          }
107        catch( ClassNotFoundException exception )
108          {
109          throw new CascadingException( "unable to load class: " + className.toString(), exception );
110          }
111        catch( InstantiationException exception )
112          {
113          throw new CascadingException( "unable to instantiate class: " + className.toString(), exception );
114          }
115        catch( IllegalAccessException exception )
116          {
117          throw new CascadingException( "unable to access class: " + className.toString(), exception );
118          }
119        }
120    
121      @Override
122      public void keepAlive()
123        {
124        }
125    
126      @Override
127      public void increment( Enum counter, long amount )
128        {
129        stepStats.increment( counter, amount );
130        }
131    
132      @Override
133      public void increment( String group, String counter, long amount )
134        {
135        stepStats.increment( group, counter, amount );
136        }
137    
138      @Override
139      public void setStatus( String status )
140        {
141    
142        }
143    
144      @Override
145      public boolean isCounterStatusInitialized()
146        {
147        return true;
148        }
149    
150      @Override
151      public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
152        {
153        return tap.openForRead( this );
154        }
155    
156      @Override
157      public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
158        {
159        return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
160        }
161    
162      @Override
163      public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
164        {
165        return trap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
166        }
167    
168      @Override
169      public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
170        {
171        return null;
172        }
173    
174      @Override
175      public FlowProcess copyWith( Properties object )
176        {
177        return new LocalFlowProcess( object );
178        }
179    
180      @Override
181      public Properties getConfigCopy()
182        {
183        return new Properties( config );
184        }
185    
186      @Override
187      public Properties copyConfig( Properties config )
188        {
189        return new Properties( config );
190        }
191    
192      @Override
193      public Map<String, String> diffConfigIntoMap( Properties defaultConfig, Properties updatedConfig )
194        {
195        return null;
196        }
197    
198      @Override
199      public Properties mergeMapIntoConfig( Properties defaultConfig, Map<String, String> map )
200        {
201        return null;
202        }
203      }