001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.util;
022
023import java.io.IOException;
024import java.util.Set;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.planner.Scope;
029import cascading.scheme.Scheme;
030import cascading.scheme.SinkCall;
031import cascading.scheme.SourceCall;
032import cascading.scheme.hadoop.SequenceFile;
033import cascading.tap.Tap;
034import cascading.tap.hadoop.Hfs;
035import cascading.tuple.Fields;
036import cascading.tuple.Tuple;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.mapred.OutputCollector;
040import org.apache.hadoop.mapred.OutputFormat;
041import org.apache.hadoop.mapred.RecordReader;
042import org.apache.hadoop.mapred.lib.NullOutputFormat;
043
044/** Class TempHfs creates a temporary {@link cascading.tap.Tap} instance for use internally. */
045public class TempHfs extends Hfs
046  {
047  /** Field name */
048  final String name;
049  /** Field schemeClass */
050  private Class<? extends Scheme> schemeClass;
051  /** Field temporaryPath */
052
053  /** Class NullScheme is a noop scheme used as a placeholder */
054  private static class NullScheme extends Scheme<Configuration, RecordReader, OutputCollector, Object, Object>
055    {
056    @Override
057    public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
058      {
059      // do nothing
060      }
061
062    @Override
063    public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
064      {
065      conf.setClass( "mapred.output.key.class", Tuple.class, Object.class );
066      conf.setClass( "mapred.output.value.class", Tuple.class, Object.class );
067      conf.setClass( "mapred.output.format.class", NullOutputFormat.class, OutputFormat.class );
068      }
069
070    @Override
071    public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object, RecordReader> sourceCall ) throws IOException
072      {
073      return false;
074      }
075
076    @Override
077    public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object, OutputCollector> sinkCall ) throws IOException
078      {
079      }
080    }
081
082  /**
083   * Constructor TempHfs creates a new TempHfs instance.
084   *
085   * @param name   of type String
086   * @param isNull of type boolean
087   */
088  public TempHfs( Configuration conf, String name, boolean isNull )
089    {
090    super( isNull ? new NullScheme() : new SequenceFile()
091    {
092    } );
093    this.name = name;
094    this.stringPath = initTemporaryPath( conf, true );
095    }
096
097  /**
098   * Constructor TempDfs creates a new TempDfs instance.
099   *
100   * @param name of type String
101   */
102  public TempHfs( Configuration conf, String name, Class<? extends Scheme> schemeClass )
103    {
104    this( conf, name, schemeClass, true );
105    }
106
107  public TempHfs( Configuration conf, String name, Class<? extends Scheme> schemeClass, boolean unique )
108    {
109    this.name = name;
110
111    if( schemeClass == null )
112      this.schemeClass = SequenceFile.class;
113    else
114      this.schemeClass = schemeClass;
115
116    this.stringPath = initTemporaryPath( conf, unique );
117    }
118
119  public Class<? extends Scheme> getSchemeClass()
120    {
121    return schemeClass;
122    }
123
124  private String initTemporaryPath( Configuration conf, boolean unique )
125    {
126    String child = unique ? makeTemporaryPathDirString( name ) : name;
127
128    return new Path( getTempPath( conf ), child ).toString();
129    }
130
131  @Override
132  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
133    {
134    Fields fields = incomingScopes.iterator().next().getIncomingTapFields();
135
136    setSchemeUsing( fields );
137
138    return new Scope( fields );
139    }
140
141  private void setSchemeUsing( Fields fields )
142    {
143    try
144      {
145      setScheme( schemeClass.getConstructor( Fields.class ).newInstance( fields ) );
146      }
147    catch( Exception exception )
148      {
149      throw new CascadingException( "unable to create specified scheme: " + schemeClass.getName(), exception );
150      }
151    }
152
153  @Override
154  public boolean isTemporary()
155    {
156    return true;
157    }
158
159  @Override
160  public String toString()
161    {
162    return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[" + name + "]";
163    }
164  }