001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading; 022 023 import java.io.IOException; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Map; 027 import java.util.Set; 028 029 import cascading.flow.FlowConnectorProps; 030 import cascading.operation.DebugLevel; 031 import cascading.platform.PlatformRunner; 032 import cascading.platform.TestPlatform; 033 import cascading.util.Util; 034 import org.junit.After; 035 import org.junit.Before; 036 import org.junit.Rule; 037 import org.junit.rules.TestName; 038 import org.junit.runner.RunWith; 039 import org.slf4j.Logger; 040 import org.slf4j.LoggerFactory; 041 042 /** 043 * PlatformTestCase is the base class for JUnit tests that are platform agnostic. That is using the {@link TestPlatform} 044 * interface each test can be run against all supported platform like Hadoop or Cascading local mode. 045 * <p/> 046 * It is strongly recommended users look at the source of {@link FieldedPipesPlatformTest} or related tests to see how 047 * this class is used. 048 * <p/> 049 * This test case uses the {@link PlatformRunner} to inject the available platform providers which implement the 050 * TestPlatform base class. 051 * <p/> 052 * By default the PlatformRunner looks for "cascading/platform/platform.properties" file in the classpath, and 053 * instantiates the class specified by the "platform.classname" property. If more than one "platform.properties" 054 * resource is found, each class is instantiated and the whole suite of tests will be run against each instance. 055 * <p/> 056 * To limit this, setting the system property "platform.includes" to list the platform names that should be run will 057 * cause the PlatformRunner to ignore any unlisted platforms. Thus setting {@code platform.includes=local}, only 058 * local mode will run even if the "hadoop" platform was found in the classpath. 059 * <p/> 060 * To pass custom properties to each test to be used by the {@link cascading.flow.FlowConnector}, create 061 * system properties prefixed by "platform.". These properties, minus the "platform." prefix in the property name, 062 * will override any defaults. 063 * <p/> 064 * Subclasses of PlatformTestCase can set "{@code useCluster} to {@code true} on the constructor if the underlying 065 * platform can boot a cluster for testing. By setting the system property "test.cluster.enabled" to false, this 066 * can be deactivated in order to temporarily speed test execution. By default {@code useCluster} is {@code false}, 067 * typically user tests don't need to have a cluster running to test their functionality so leaving the default is 068 * reasonable. 069 */ 070 @RunWith(PlatformRunner.class) 071 public class PlatformTestCase extends CascadingTestCase 072 { 073 private static final Logger LOG = LoggerFactory.getLogger( PlatformTestCase.class ); 074 075 public static final String ROOT_OUTPUT_PATH = "test.output.root"; 076 static Set<String> allPaths = new HashSet<String>(); 077 078 private String rootPath; 079 Set<String> currentPaths = new HashSet<String>(); 080 081 @Rule 082 public transient TestName name = new TestName(); 083 084 private transient TestPlatform platform = null; 085 086 private transient boolean useCluster; 087 private transient int numMapTasks; 088 private transient int numReduceTasks; 089 090 public PlatformTestCase( boolean useCluster ) 091 { 092 this.useCluster = useCluster; 093 } 094 095 public PlatformTestCase( boolean useCluster, int numMapTasks, int numReduceTasks ) 096 { 097 this( useCluster ); 098 this.numMapTasks = numMapTasks; 099 this.numReduceTasks = numReduceTasks; 100 } 101 102 public PlatformTestCase() 103 { 104 this( false ); 105 } 106 107 public void installPlatform( TestPlatform platform ) 108 { 109 this.platform = platform; 110 this.platform.setUseCluster( useCluster ); 111 112 if( this.platform.isMapReduce() ) 113 { 114 platform.setNumMappers( numMapTasks ); 115 platform.setNumReducers( numReduceTasks ); 116 } 117 } 118 119 public TestPlatform getPlatform() 120 { 121 return platform; 122 } 123 124 public String getTestName() 125 { 126 return name.getMethodName(); 127 } 128 129 protected String getRootPath() 130 { 131 if( rootPath == null ) 132 rootPath = Util.join( getPathElements(), "/" ); 133 134 return rootPath; 135 } 136 137 protected String[] getPathElements() 138 { 139 return new String[]{getTestRoot(), getPlatformName(), getTestCaseName()}; 140 } 141 142 public String getOutputPath( String path ) 143 { 144 String result = makeOutputPath( path ); 145 146 if( allPaths.contains( result ) ) 147 throw new IllegalStateException( "path already has been used:" + result ); 148 149 allPaths.add( result ); 150 currentPaths.add( result ); 151 152 return result; 153 } 154 155 private String makeOutputPath( String path ) 156 { 157 if( path.startsWith( "/" ) ) 158 return getRootPath() + path; 159 160 return getRootPath() + "/" + path; 161 } 162 163 public String getTestCaseName() 164 { 165 return getClass().getSimpleName().replaceAll( "^(.*)Test.*$", "$1" ).toLowerCase(); 166 } 167 168 public String getPlatformName() 169 { 170 return platform.getName(); 171 } 172 173 public static String getTestRoot() 174 { 175 return System.getProperty( ROOT_OUTPUT_PATH, "build/test/output" ).replace( ":", "_" ); 176 } 177 178 @Before 179 public void setUp() throws Exception 180 { 181 getPlatform().setUp(); 182 } 183 184 public Map<Object, Object> getProperties() 185 { 186 return new HashMap<Object, Object>( getPlatform().getProperties() ); 187 } 188 189 protected void copyFromLocal( String inputFile ) throws IOException 190 { 191 getPlatform().copyFromLocal( inputFile ); 192 } 193 194 protected Map<Object, Object> disableDebug() 195 { 196 Map<Object, Object> properties = getProperties(); 197 FlowConnectorProps.setDebugLevel( properties, DebugLevel.NONE ); 198 199 return properties; 200 } 201 202 @After 203 public void tearDown() throws Exception 204 { 205 try 206 { 207 for( String path : currentPaths ) 208 { 209 LOG.info( "copying to local {}", path ); 210 211 if( getPlatform().isUseCluster() && getPlatform().remoteExists( path ) ) 212 getPlatform().copyToLocal( path ); 213 } 214 215 currentPaths.clear(); 216 } 217 finally 218 { 219 getPlatform().tearDown(); 220 } 221 } 222 }