001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.platform.local;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Comparator;
026import java.util.Map;
027import java.util.Properties;
028
029import cascading.flow.FlowConnector;
030import cascading.flow.FlowProcess;
031import cascading.flow.FlowSession;
032import cascading.flow.local.LocalFlowConnector;
033import cascading.flow.local.LocalFlowProcess;
034import cascading.platform.TestPlatform;
035import cascading.scheme.Scheme;
036import cascading.scheme.local.TextDelimited;
037import cascading.scheme.local.TextLine;
038import cascading.scheme.util.DelimitedParser;
039import cascading.scheme.util.FieldTypeResolver;
040import cascading.tap.SinkMode;
041import cascading.tap.Tap;
042import cascading.tap.local.FileTap;
043import cascading.tap.local.PartitionTap;
044import cascading.tap.partition.Partition;
045import cascading.tuple.Fields;
046import org.apache.commons.io.FileUtils;
047
048/**
049 * Class LocalPlatform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
050 * so that all *PlatformTest classes can be tested against the Cascading local mode planner.
051 */
052public class LocalPlatform extends TestPlatform
053  {
054  private Properties properties = new Properties();
055
056  {
057  properties.putAll( getGlobalProperties() );
058  }
059
060  @Override
061  public void setUp() throws IOException
062    {
063    }
064
065  @Override
066  public Map<Object, Object> getProperties()
067    {
068    return new Properties( properties );
069    }
070
071  @Override
072  public void tearDown()
073    {
074    }
075
076  @Override
077  public boolean supportsGroupByAfterMerge()
078    {
079    return true;
080    }
081
082  @Override
083  public void copyFromLocal( String inputFile ) throws IOException
084    {
085    }
086
087  @Override
088  public void copyToLocal( String outputFile ) throws IOException
089    {
090    }
091
092  @Override
093  public boolean remoteExists( String outputFile ) throws IOException
094    {
095    return new File( outputFile ).exists();
096    }
097
098  @Override
099  public boolean remoteRemove( String outputFile, boolean recursive ) throws IOException
100    {
101    if( !remoteExists( outputFile ) )
102      return true;
103
104    File file = new File( outputFile );
105
106    if( !recursive || !file.isDirectory() )
107      return file.delete();
108
109    try
110      {
111      FileUtils.deleteDirectory( file );
112      }
113    catch( IOException exception )
114      {
115      return false;
116      }
117
118    return !file.exists();
119    }
120
121  @Override
122  public FlowProcess getFlowProcess()
123    {
124    return new LocalFlowProcess( FlowSession.NULL, (Properties) getProperties() );
125    }
126
127  @Override
128  public FlowConnector getFlowConnector( Map<Object, Object> properties )
129    {
130    return new LocalFlowConnector( properties );
131    }
132
133  @Override
134  public Tap getTap( Scheme scheme, String filename, SinkMode mode )
135    {
136    return new FileTap( scheme, filename, mode );
137    }
138
139  @Override
140  public Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode )
141    {
142    if( sourceFields == null )
143      return new FileTap( new TextLine(), filename, mode );
144
145    return new FileTap( new TextLine( sourceFields, sinkFields ), filename, mode );
146    }
147
148  @Override
149  public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode )
150    {
151    return new FileTap( new TextDelimited( fields, hasHeader, delimiter, quote, types ), filename, mode );
152    }
153
154  @Override
155  public Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode )
156    {
157    return new FileTap( new TextDelimited( fields, skipHeader, writeHeader, delimiter, quote, types ), filename, mode );
158    }
159
160  @Override
161  public Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode )
162    {
163    return new FileTap( new TextDelimited( true, new DelimitedParser( delimiter, quote, fieldTypeResolver ) ), filename, mode );
164    }
165
166  @Override
167  public Tap getPartitionTap( Tap sink, Partition partition, int openThreshold )
168    {
169    return new PartitionTap( (FileTap) sink, partition, openThreshold );
170    }
171
172  @Override
173  public Scheme getTestConfigDefScheme()
174    {
175    return new LocalConfigDefScheme( new Fields( "line" ) );
176    }
177
178  @Override
179  public Scheme getTestFailScheme()
180    {
181    return new LocalFailScheme( new Fields( "line" ) );
182    }
183
184  @Override
185  public Comparator getLongComparator( boolean reverseSort )
186    {
187    return new TestLongComparator( reverseSort );
188    }
189
190  @Override
191  public Comparator getStringComparator( boolean reverseSort )
192    {
193    return new TestStringComparator( reverseSort );
194    }
195
196  @Override
197  public String getHiddenTemporaryPath()
198    {
199    return null;
200    }
201  }