001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.util;
022
023import java.io.IOException;
024import java.net.URI;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028
029import cascading.flow.FlowException;
030import cascading.flow.hadoop.HadoopFlowProcess;
031import cascading.scheme.hadoop.TextLine;
032import cascading.tap.SinkMode;
033import cascading.tap.hadoop.Hfs;
034import cascading.tap.hadoop.Lfs;
035import cascading.tuple.Fields;
036import cascading.tuple.Tuple;
037import cascading.tuple.TupleEntryCollector;
038import cascading.tuple.TupleEntryIterator;
039import cascading.util.Util;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.filecache.DistributedCache;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.LocalFileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.mapred.JobConf;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 *
051 */
052public class HadoopMRUtil
053  {
054  private static final Logger LOG = LoggerFactory.getLogger( HadoopMRUtil.class );
055
056  public static String writeStateToDistCache( JobConf conf, String id, String kind, String stepState )
057    {
058    if( Util.isEmpty( stepState ) )
059      return null;
060
061    LOG.info( "writing step state to dist cache, too large for job conf, size: {}", stepState.length() );
062
063    String statePath = Hfs.getTempPath( conf ) + "/" + kind + "-state-" + id;
064
065    Hfs temp = new Hfs( new TextLine(), statePath, SinkMode.REPLACE );
066
067    try
068      {
069      TupleEntryCollector writer = temp.openForWrite( new HadoopFlowProcess( conf ) );
070
071      writer.add( new Tuple( stepState ) );
072
073      writer.close();
074      }
075    catch( IOException exception )
076      {
077      throw new FlowException( "unable to write step state to Hadoop FS: " + temp.getIdentifier() );
078      }
079
080    URI uri = new Path( statePath ).toUri();
081    DistributedCache.addCacheFile( uri, conf );
082
083    LOG.info( "using step state path: {}", uri );
084
085    return statePath;
086    }
087
088  public static String readStateFromDistCache( JobConf jobConf, String id, String kind ) throws IOException
089    {
090    Path[] files = DistributedCache.getLocalCacheFiles( jobConf );
091
092    Path stepStatePath = null;
093
094    for( Path file : files )
095      {
096      if( !file.toString().contains( kind + "-state-" + id ) )
097        continue;
098
099      stepStatePath = file;
100      break;
101      }
102
103    if( stepStatePath == null )
104      throw new FlowException( "unable to find step state from distributed cache" );
105
106    LOG.info( "reading step state from local path: {}", stepStatePath );
107
108    Hfs temp = new Lfs( new TextLine( new Fields( "line" ) ), stepStatePath.toString() );
109
110    TupleEntryIterator reader = null;
111
112    try
113      {
114      reader = temp.openForRead( new HadoopFlowProcess( jobConf ) );
115
116      if( !reader.hasNext() )
117        throw new FlowException( "step state path is empty: " + temp.getIdentifier() );
118
119      return reader.next().getString( 0 );
120      }
121    catch( IOException exception )
122      {
123      throw new FlowException( "unable to find state path: " + temp.getIdentifier(), exception );
124      }
125    finally
126      {
127      try
128        {
129        if( reader != null )
130          reader.close();
131        }
132      catch( IOException exception )
133        {
134        LOG.warn( "error closing state path reader", exception );
135        }
136      }
137    }
138
139  /**
140   * Add to class path.
141   *
142   * @param config    the config
143   * @param classpath the classpath
144   */
145  public static Map<Path, Path> addToClassPath( Configuration config, List<String> classpath )
146    {
147    if( classpath == null )
148      return null;
149
150    // given to fully qualified
151    Map<String, Path> localPaths = new HashMap<String, Path>();
152    Map<String, Path> remotePaths = new HashMap<String, Path>();
153
154    HadoopUtil.resolvePaths( config, classpath, null, null, localPaths, remotePaths );
155
156    try
157      {
158      LocalFileSystem localFS = HadoopUtil.getLocalFS( config );
159
160      for( String path : localPaths.keySet() )
161        {
162        // only add local if no remote
163        if( remotePaths.containsKey( path ) )
164          continue;
165
166        Path artifact = localPaths.get( path );
167
168        DistributedCache.addFileToClassPath( artifact.makeQualified( localFS ), config );
169        }
170
171      FileSystem defaultFS = HadoopUtil.getDefaultFS( config );
172
173      for( String path : remotePaths.keySet() )
174        {
175        // always add remote
176        Path artifact = remotePaths.get( path );
177
178        DistributedCache.addFileToClassPath( artifact.makeQualified( defaultFS ), config );
179        }
180      }
181    catch( IOException exception )
182      {
183      throw new FlowException( "unable to set distributed cache paths", exception );
184      }
185
186    return HadoopUtil.getCommonPaths( localPaths, remotePaths );
187    }
188
189  public static boolean hasReducer( JobConf jobConf )
190    {
191    return jobConf.getReducerClass() != null;
192    }
193  }