001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.local.planner;
022    
023    import java.util.Map;
024    import java.util.Properties;
025    
026    import cascading.flow.FlowConnector;
027    import cascading.flow.FlowDef;
028    import cascading.flow.local.LocalFlow;
029    import cascading.flow.planner.ElementGraph;
030    import cascading.flow.planner.FlowPlanner;
031    import cascading.flow.planner.FlowStepGraph;
032    import cascading.flow.planner.PlatformInfo;
033    import cascading.pipe.Pipe;
034    import cascading.tap.Tap;
035    import cascading.util.Version;
036    
037    /**
038     *
039     */
040    public class LocalPlanner extends FlowPlanner<LocalFlow, Properties>
041      {
042      public LocalPlanner()
043        {
044        }
045    
046      @Override
047      public Properties getConfig()
048        {
049        return null;
050        }
051    
052      @Override
053      public PlatformInfo getPlatformInfo()
054        {
055        return new PlatformInfo( "local", "Concurrent, Inc.", Version.getRelease() );
056        }
057    
058      @Override
059      public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
060        {
061        super.initialize( flowConnector, properties );
062        }
063    
064      protected LocalFlow createFlow( FlowDef flowDef )
065        {
066        return new LocalFlow( getPlatformInfo(), getProperties(), getConfig(), flowDef );
067        }
068    
069      @Override
070      public LocalFlow buildFlow( FlowDef flowDef )
071        {
072        ElementGraph elementGraph = null;
073    
074        try
075          {
076          // generic
077          verifyAllTaps( flowDef );
078    
079          LocalFlow flow = createFlow( flowDef );
080    
081          Pipe[] tails = resolveTails( flowDef, flow );
082    
083          verifyAssembly( flowDef, tails );
084    
085          elementGraph = createElementGraph( flowDef, tails );
086    
087          // rules
088          failOnLoneGroupAssertion( elementGraph );
089          failOnMissingGroup( elementGraph );
090          failOnMisusedBuffer( elementGraph );
091          failOnGroupEverySplit( elementGraph );
092    
093          // generic
094          elementGraph.removeUnnecessaryPipes(); // groups must be added before removing pipes
095          elementGraph.resolveFields();
096    
097          // used for checkpointing
098          elementGraph = flow.updateSchemes( elementGraph );
099    
100          FlowStepGraph flowStepGraph = new LocalStepGraph( flowDef.getName(), elementGraph );
101    
102          flow.initialize( elementGraph, flowStepGraph );
103    
104          return flow;
105          }
106        catch( Exception exception )
107          {
108          throw handleExceptionDuringPlanning( exception, elementGraph );
109          }
110        }
111    
112      @Override
113      protected Tap makeTempTap( String prefix, String name )
114        {
115        return null;
116        }
117      }