001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.util;
022    
023    import java.io.IOException;
024    
025    import cascading.flow.FlowProcess;
026    import org.apache.hadoop.mapred.RecordReader;
027    
028    /**
029     *
030     */
031    public class MeasuredRecordReader implements RecordReader
032      {
033      private final FlowProcess flowProcess;
034      private final Enum counter;
035    
036      private RecordReader recordReader;
037    
038      public MeasuredRecordReader( FlowProcess flowProcess, Enum counter )
039        {
040        this.flowProcess = flowProcess;
041        this.counter = counter;
042        }
043    
044      public RecordReader getRecordReader()
045        {
046        return recordReader;
047        }
048    
049      public void setRecordReader( RecordReader recordReader )
050        {
051        this.recordReader = recordReader;
052        }
053    
054      @Override
055      public boolean next( Object key, Object value ) throws IOException
056        {
057        long start = System.currentTimeMillis();
058    
059        try
060          {
061          return recordReader.next( key, value );
062          }
063        finally
064          {
065          flowProcess.increment( counter, System.currentTimeMillis() - start );
066          }
067        }
068    
069      @Override
070      public Object createKey()
071        {
072        long start = System.currentTimeMillis();
073    
074        try
075          {
076          return recordReader.createKey();
077          }
078        finally
079          {
080          flowProcess.increment( counter, System.currentTimeMillis() - start );
081          }
082        }
083    
084      @Override
085      public Object createValue()
086        {
087        long start = System.currentTimeMillis();
088    
089        try
090          {
091          return recordReader.createValue();
092          }
093        finally
094          {
095          flowProcess.increment( counter, System.currentTimeMillis() - start );
096          }
097        }
098    
099      @Override
100      public long getPos() throws IOException
101        {
102        return recordReader.getPos();
103        }
104    
105      @Override
106      public void close() throws IOException
107        {
108        long start = System.currentTimeMillis();
109    
110        try
111          {
112          recordReader.close();
113          }
114        finally
115          {
116          flowProcess.increment( counter, System.currentTimeMillis() - start );
117          }
118        }
119    
120      @Override
121      public float getProgress() throws IOException
122        {
123        long start = System.currentTimeMillis();
124    
125        try
126          {
127          return recordReader.getProgress();
128          }
129        finally
130          {
131          flowProcess.increment( counter, System.currentTimeMillis() - start );
132          }
133        }
134      }