001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local.planner; 022 023 import java.util.Map; 024 import java.util.Properties; 025 026 import cascading.flow.FlowConnector; 027 import cascading.flow.FlowDef; 028 import cascading.flow.local.LocalFlow; 029 import cascading.flow.planner.ElementGraph; 030 import cascading.flow.planner.FlowPlanner; 031 import cascading.flow.planner.FlowStepGraph; 032 import cascading.flow.planner.PlatformInfo; 033 import cascading.pipe.Pipe; 034 import cascading.tap.Tap; 035 import cascading.util.Version; 036 037 /** 038 * 039 */ 040 public class LocalPlanner extends FlowPlanner<LocalFlow, Properties> 041 { 042 public LocalPlanner() 043 { 044 } 045 046 @Override 047 public Properties getConfig() 048 { 049 return null; 050 } 051 052 @Override 053 public PlatformInfo getPlatformInfo() 054 { 055 return new PlatformInfo( "local", "Concurrent, Inc.", Version.getRelease() ); 056 } 057 058 @Override 059 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 060 { 061 super.initialize( flowConnector, properties ); 062 } 063 064 protected LocalFlow createFlow( FlowDef flowDef ) 065 { 066 return new LocalFlow( getPlatformInfo(), getProperties(), getConfig(), flowDef ); 067 } 068 069 @Override 070 public LocalFlow buildFlow( FlowDef flowDef ) 071 { 072 ElementGraph elementGraph = null; 073 074 try 075 { 076 // generic 077 verifyAllTaps( flowDef ); 078 079 LocalFlow flow = createFlow( flowDef ); 080 081 Pipe[] tails = resolveTails( flowDef, flow ); 082 083 verifyAssembly( flowDef, tails ); 084 085 elementGraph = createElementGraph( flowDef, tails ); 086 087 // rules 088 failOnLoneGroupAssertion( elementGraph ); 089 failOnMissingGroup( elementGraph ); 090 failOnMisusedBuffer( elementGraph ); 091 failOnGroupEverySplit( elementGraph ); 092 093 // generic 094 elementGraph.removeUnnecessaryPipes(); // groups must be added before removing pipes 095 elementGraph.resolveFields(); 096 097 // used for checkpointing 098 elementGraph = flow.updateSchemes( elementGraph ); 099 100 FlowStepGraph flowStepGraph = new LocalStepGraph( flowDef.getName(), elementGraph ); 101 102 flow.initialize( elementGraph, flowStepGraph ); 103 104 return flow; 105 } 106 catch( Exception exception ) 107 { 108 throw handleExceptionDuringPlanning( exception, elementGraph ); 109 } 110 } 111 112 @Override 113 protected Tap makeTempTap( String prefix, String name ) 114 { 115 return null; 116 } 117 }