001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Set;
028
029import cascading.flow.FlowConnectorProps;
030import cascading.operation.DebugLevel;
031import cascading.platform.PlatformRunner;
032import cascading.platform.TestPlatform;
033import org.junit.After;
034import org.junit.Before;
035import org.junit.runner.RunWith;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * PlatformTestCase is the base class for JUnit tests that are platform agnostic. That is using the {@link TestPlatform}
041 * interface each test can be run against all supported platform like Hadoop or Cascading local mode.
042 * <p/>
043 * It is strongly recommended users look at the source of {@link FieldedPipesPlatformTest} or related tests to see how
044 * this class is used.
045 * <p/>
046 * This test case uses the {@link PlatformRunner} to inject the available platform providers which implement the
047 * TestPlatform base class.
048 * <p/>
049 * By default the PlatformRunner looks for "cascading/platform/platform.properties" file in the classpath, and
050 * instantiates the class specified by the "platform.classname" property. If more than one "platform.properties"
051 * resource is found, each class is instantiated and the whole suite of tests will be run against each instance.
052 * <p/>
053 * To limit this, setting the system property "platform.includes" to list the platform names that should be run will
054 * cause the PlatformRunner to ignore any unlisted platforms. Thus setting {@code platform.includes=local}, only
055 * local mode will run even if the "hadoop" platform was found in the classpath.
056 * <p/>
057 * To pass custom properties to each test to be used by the {@link cascading.flow.FlowConnector}, create
058 * system properties prefixed by "platform.". These properties, minus the "platform." prefix in the property name,
059 * will override any defaults.
060 * <p/>
061 * Subclasses of PlatformTestCase can set "{@code useCluster} to {@code true} on the constructor if the underlying
062 * platform can boot a cluster for testing. By setting the system property "test.cluster.enabled" to false, this
063 * can be deactivated in order to temporarily speed test execution. By default {@code useCluster} is {@code false},
064 * typically user tests don't need to have a cluster running to test their functionality so leaving the default is
065 * reasonable.
066 */
067@RunWith(PlatformRunner.class)
068public abstract class PlatformTestCase extends CascadingTestCase
069  {
070  private static final Logger LOG = LoggerFactory.getLogger( PlatformTestCase.class );
071
072  static Set<String> allPaths = new HashSet<String>();
073
074  Set<String> currentPaths = new HashSet<String>();
075
076  private transient TestPlatform platform = null;
077
078  private transient boolean useCluster;
079  private transient int numMapTasks;
080  private transient int numGatherPartitions;
081
082  protected PlatformTestCase( boolean useCluster )
083    {
084    this.useCluster = useCluster;
085    }
086
087  protected PlatformTestCase( boolean useCluster, int numMapTasks, int numGatherPartitions )
088    {
089    this( useCluster );
090    this.numMapTasks = numMapTasks;
091    this.numGatherPartitions = numGatherPartitions;
092    }
093
094  protected PlatformTestCase()
095    {
096    this( false );
097    }
098
099  public void installPlatform( TestPlatform platform )
100    {
101    this.platform = platform;
102    this.platform.setUseCluster( useCluster );
103
104    if( this.platform.isMapReduce() )
105      {
106      platform.setNumMappers( numMapTasks );
107      platform.setNumReducers( numGatherPartitions );
108      }
109
110    if( this.platform.isDAG() )
111      platform.setNumGatherPartitions( numGatherPartitions );
112    }
113
114  public TestPlatform getPlatform()
115    {
116    return platform;
117    }
118
119  @Override
120  protected String[] getOutputPathElements()
121    {
122    return new String[]{getTestOutputRoot(), getPlatformName(), getTestCaseName(), getTestName()};
123    }
124
125  @Override
126  protected String[] getPlanPathElements()
127    {
128    return new String[]{getTestPlanRoot(), getPlatformName(), getTestCaseName(), getTestName()};
129    }
130
131  public String getOutputPath( String path )
132    {
133    String result = makeOutputPath( path );
134
135    if( allPaths.contains( result ) )
136      throw new IllegalStateException( "path already has been used:" + result );
137
138    allPaths.add( result );
139    currentPaths.add( result );
140
141    return result;
142    }
143
144  protected String makeOutputPath( String path )
145    {
146    if( path.startsWith( "/" ) )
147      return getOutputPath() + path;
148
149    return getOutputPath() + "/" + path;
150    }
151
152  public String getPlatformName()
153    {
154    return platform.getName();
155    }
156
157  @Before
158  public void setUp() throws Exception
159    {
160    super.setUp();
161    getPlatform().setUp();
162    }
163
164  public Map<Object, Object> getProperties()
165    {
166    return new HashMap<Object, Object>( getPlatform().getProperties() );
167    }
168
169  protected void copyFromLocal( String inputFile ) throws IOException
170    {
171    getPlatform().copyFromLocal( inputFile );
172    }
173
174  protected Map<Object, Object> disableDebug()
175    {
176    Map<Object, Object> properties = getProperties();
177    FlowConnectorProps.setDebugLevel( properties, DebugLevel.NONE );
178
179    return properties;
180    }
181
182  @After
183  public void tearDown() throws Exception
184    {
185    try
186      {
187      for( String path : currentPaths )
188        {
189        LOG.info( "copying to local {}", path );
190
191        if( getPlatform().isUseCluster() && getPlatform().remoteExists( path ) )
192          getPlatform().copyToLocal( path );
193        }
194
195      currentPaths.clear();
196      }
197    finally
198      {
199      getPlatform().tearDown();
200      }
201    }
202  }