001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.security.DigestInputStream;
026    import java.security.MessageDigest;
027    import java.security.NoSuchAlgorithmException;
028    
029    import org.apache.commons.codec.binary.Hex;
030    import org.apache.hadoop.fs.FSInputStream;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * Class FSDigestInputStream is an {@link FSInputStream} implementation that can verify a
036     * {@link MessageDigest} and will count the number of bytes read for use in progress status.
037     */
038    public class FSDigestInputStream extends FSInputStream
039      {
040      /** Field LOG */
041      private static final Logger LOG = LoggerFactory.getLogger( FSDigestInputStream.class );
042    
043      /** Field count */
044      int count = 0;
045      /** Field inputStream */
046      final InputStream inputStream;
047      /** Field digestHex */
048      final String digestHex;
049    
050      /**
051       * Constructor FSDigestInputStream creates a new FSDigestInputStream instance.
052       *
053       * @param inputStream of type InputStream
054       * @param digestHex   of type String
055       * @throws IOException if unable to get md5 digest
056       */
057      public FSDigestInputStream( InputStream inputStream, String digestHex ) throws IOException
058        {
059        this( inputStream, getMD5Digest(), digestHex );
060        }
061    
062      /**
063       * Constructor FSDigestInputStream creates a new FSDigestInputStream instance.
064       *
065       * @param inputStream   of type InputStream
066       * @param messageDigest of type MessageDigest
067       * @param digestHex     of type String
068       */
069      public FSDigestInputStream( InputStream inputStream, MessageDigest messageDigest, String digestHex )
070        {
071        this.inputStream = digestHex == null ? inputStream : new DigestInputStream( inputStream, messageDigest );
072        this.digestHex = digestHex;
073        }
074    
075      /**
076       * Method getMD5Digest returns the MD5Digest of this FSDigestInputStream object.
077       *
078       * @return the MD5Digest (type MessageDigest) of this FSDigestInputStream object.
079       * @throws IOException when
080       */
081      private static MessageDigest getMD5Digest() throws IOException
082        {
083        try
084          {
085          return MessageDigest.getInstance( "MD5" );
086          }
087        catch( NoSuchAlgorithmException exception )
088          {
089          throw new IOException( "digest not found: " + exception.getMessage() );
090          }
091        }
092    
093      @Override
094      public int read() throws IOException
095        {
096        count++;
097        return inputStream.read();
098        }
099    
100      @Override
101      public int read( byte[] b, int off, int len ) throws IOException
102        {
103        int result = inputStream.read( b, off, len );
104        count += result;
105        return result;
106        }
107    
108      @Override
109      public void close() throws IOException
110        {
111        inputStream.close();
112    
113        LOG.info( "closing stream, testing digest: [{}]", digestHex == null ? "none" : digestHex );
114    
115        if( digestHex == null )
116          return;
117    
118        String digestHex = new String( Hex.encodeHex( ( (DigestInputStream) inputStream ).getMessageDigest().digest() ) );
119    
120        if( !digestHex.equals( this.digestHex ) )
121          {
122          String message = "given digest: [" + this.digestHex + "], does not match input stream digest: [" + digestHex + "]";
123          LOG.error( message );
124          throw new IOException( message );
125          }
126        }
127    
128      @Override
129      public void seek( long pos ) throws IOException
130        {
131        if( getPos() == pos )
132          return;
133    
134        if( getPos() > pos )
135          throw new IOException( "cannot seek to " + pos + ", currently at" + getPos() );
136    
137        int len = (int) ( pos - getPos() );
138        byte[] bytes = new byte[ 50 * 1024 ];
139    
140        while( len > 0 )
141          len -= read( bytes, 0, Math.min( len, bytes.length ) );
142        }
143    
144      @Override
145      public long getPos() throws IOException
146        {
147        return count;
148        }
149    
150      @Override
151      public boolean seekToNewSource( long targetPos ) throws IOException
152        {
153        return false;
154        }
155      }