001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.util;
022    
023    import java.util.Iterator;
024    
025    import cascading.flow.FlowProcess;
026    
027    /**
028     *
029     */
030    public class TimedIterator implements Iterator
031      {
032      private final FlowProcess flowProcess;
033      private final Enum durationCounter;
034      private final Enum countCounter;
035    
036      Iterator iterator;
037    
038      public TimedIterator( FlowProcess flowProcess, Enum durationCounter, Enum countCounter )
039        {
040        this.flowProcess = flowProcess;
041        this.durationCounter = durationCounter;
042        this.countCounter = countCounter;
043        }
044    
045      public void reset( Iterator iterator )
046        {
047        this.iterator = iterator;
048        }
049    
050      @Override
051      public boolean hasNext()
052        {
053        long start = System.currentTimeMillis();
054    
055        try
056          {
057          return iterator.hasNext();
058          }
059        finally
060          {
061          flowProcess.increment( durationCounter, System.currentTimeMillis() - start );
062          }
063        }
064    
065      @Override
066      public Object next()
067        {
068        long start = System.currentTimeMillis();
069    
070        try
071          {
072          flowProcess.increment( countCounter, 1 );
073    
074          return iterator.next();
075          }
076        finally
077          {
078          flowProcess.increment( durationCounter, System.currentTimeMillis() - start );
079          }
080        }
081    
082      @Override
083      public void remove()
084        {
085        iterator.remove();
086        }
087      }