001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.Map; 027import java.util.Set; 028 029import cascading.flow.FlowConnectorProps; 030import cascading.operation.DebugLevel; 031import cascading.platform.PlatformRunner; 032import cascading.platform.TestPlatform; 033import org.junit.After; 034import org.junit.Before; 035import org.junit.runner.RunWith; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * PlatformTestCase is the base class for JUnit tests that are platform agnostic. That is using the {@link TestPlatform} 041 * interface each test can be run against all supported platform like Hadoop or Cascading local mode. 042 * <p/> 043 * It is strongly recommended users look at the source of {@link FieldedPipesPlatformTest} or related tests to see how 044 * this class is used. 045 * <p/> 046 * This test case uses the {@link PlatformRunner} to inject the available platform providers which implement the 047 * TestPlatform base class. 048 * <p/> 049 * By default the PlatformRunner looks for "cascading/platform/platform.properties" file in the classpath, and 050 * instantiates the class specified by the "platform.classname" property. If more than one "platform.properties" 051 * resource is found, each class is instantiated and the whole suite of tests will be run against each instance. 052 * <p/> 053 * To limit this, setting the system property "platform.includes" to list the platform names that should be run will 054 * cause the PlatformRunner to ignore any unlisted platforms. Thus setting {@code platform.includes=local}, only 055 * local mode will run even if the "hadoop" platform was found in the classpath. 056 * <p/> 057 * To pass custom properties to each test to be used by the {@link cascading.flow.FlowConnector}, create 058 * system properties prefixed by "platform.". These properties, minus the "platform." prefix in the property name, 059 * will override any defaults. 060 * <p/> 061 * Subclasses of PlatformTestCase can set "{@code useCluster} to {@code true} on the constructor if the underlying 062 * platform can boot a cluster for testing. By setting the system property "test.cluster.enabled" to false, this 063 * can be deactivated in order to temporarily speed test execution. By default {@code useCluster} is {@code false}, 064 * typically user tests don't need to have a cluster running to test their functionality so leaving the default is 065 * reasonable. 066 */ 067@RunWith(PlatformRunner.class) 068public abstract class PlatformTestCase extends CascadingTestCase 069 { 070 private static final Logger LOG = LoggerFactory.getLogger( PlatformTestCase.class ); 071 072 static Set<String> allPaths = new HashSet<String>(); 073 074 Set<String> currentPaths = new HashSet<String>(); 075 076 private transient TestPlatform platform = null; 077 078 private transient boolean useCluster; 079 private transient int numMapTasks; 080 private transient int numGatherPartitions; 081 082 protected PlatformTestCase( boolean useCluster ) 083 { 084 this.useCluster = useCluster; 085 } 086 087 protected PlatformTestCase( boolean useCluster, int numMapTasks, int numGatherPartitions ) 088 { 089 this( useCluster ); 090 this.numMapTasks = numMapTasks; 091 this.numGatherPartitions = numGatherPartitions; 092 } 093 094 protected PlatformTestCase() 095 { 096 this( false ); 097 } 098 099 public void installPlatform( TestPlatform platform ) 100 { 101 this.platform = platform; 102 this.platform.setUseCluster( useCluster ); 103 104 if( this.platform.isMapReduce() ) 105 { 106 platform.setNumMappers( numMapTasks ); 107 platform.setNumReducers( numGatherPartitions ); 108 } 109 110 if( this.platform.isDAG() ) 111 platform.setNumGatherPartitions( numGatherPartitions ); 112 } 113 114 public TestPlatform getPlatform() 115 { 116 return platform; 117 } 118 119 @Override 120 protected String[] getOutputPathElements() 121 { 122 return new String[]{getTestOutputRoot(), getPlatformName(), getTestCaseName(), getTestName()}; 123 } 124 125 @Override 126 protected String[] getPlanPathElements() 127 { 128 return new String[]{getTestPlanRoot(), getPlatformName(), getTestCaseName(), getTestName()}; 129 } 130 131 public String getOutputPath( String path ) 132 { 133 String result = makeOutputPath( path ); 134 135 if( allPaths.contains( result ) ) 136 throw new IllegalStateException( "path already has been used:" + result ); 137 138 allPaths.add( result ); 139 currentPaths.add( result ); 140 141 return result; 142 } 143 144 protected String makeOutputPath( String path ) 145 { 146 if( path.startsWith( "/" ) ) 147 return getOutputPath() + path; 148 149 return getOutputPath() + "/" + path; 150 } 151 152 public String getPlatformName() 153 { 154 return platform.getName(); 155 } 156 157 @Before 158 public void setUp() throws Exception 159 { 160 super.setUp(); 161 getPlatform().setUp(); 162 } 163 164 public Map<Object, Object> getProperties() 165 { 166 return new HashMap<Object, Object>( getPlatform().getProperties() ); 167 } 168 169 protected void copyFromLocal( String inputFile ) throws IOException 170 { 171 getPlatform().copyFromLocal( inputFile ); 172 } 173 174 protected Map<Object, Object> disableDebug() 175 { 176 Map<Object, Object> properties = getProperties(); 177 FlowConnectorProps.setDebugLevel( properties, DebugLevel.NONE ); 178 179 return properties; 180 } 181 182 @After 183 public void tearDown() throws Exception 184 { 185 try 186 { 187 for( String path : currentPaths ) 188 { 189 LOG.info( "copying to local {}", path ); 190 191 if( getPlatform().isUseCluster() && getPlatform().remoteExists( path ) ) 192 getPlatform().copyToLocal( path ); 193 } 194 195 currentPaths.clear(); 196 } 197 finally 198 { 199 getPlatform().tearDown(); 200 } 201 } 202 }