001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.util; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.Iterator; 026 027import cascading.flow.FlowProcess; 028import cascading.util.CloseableIterator; 029 030/** 031 * 032 */ 033public class TimedIterator<V> implements CloseableIterator<V> 034 { 035 private final FlowProcess flowProcess; 036 private final Enum durationCounter; 037 private final Enum countCounter; 038 private final int ordinal; 039 040 public static <V> TimedIterator<V>[] iterators( TimedIterator<V>... iterators ) 041 { 042 return iterators; 043 } 044 045 Iterator<V> iterator; 046 047 public TimedIterator( FlowProcess flowProcess, Enum durationCounter, Enum countCounter ) 048 { 049 this( flowProcess, durationCounter, countCounter, 0 ); 050 } 051 052 public TimedIterator( FlowProcess flowProcess, Enum durationCounter, Enum countCounter, int ordinal ) 053 { 054 this.flowProcess = flowProcess; 055 this.durationCounter = durationCounter; 056 this.countCounter = countCounter; 057 this.ordinal = ordinal; 058 } 059 060 public void reset( Iterable<V> iterable ) 061 { 062 if( iterable == null ) 063 this.iterator = null; 064 else 065 this.iterator = iterable.iterator(); 066 } 067 068 public void reset( Iterator<V> iterator ) 069 { 070 this.iterator = iterator; 071 } 072 073 @Override 074 public boolean hasNext() 075 { 076 if( iterator == null ) 077 return false; 078 079 long start = System.currentTimeMillis(); 080 081 try 082 { 083 return iterator.hasNext(); 084 } 085 finally 086 { 087 flowProcess.increment( durationCounter, System.currentTimeMillis() - start ); 088 } 089 } 090 091 @Override 092 public V next() 093 { 094 long start = System.currentTimeMillis(); 095 096 try 097 { 098 flowProcess.increment( countCounter, 1 ); 099 100 return iterator.next(); 101 } 102 finally 103 { 104 flowProcess.increment( durationCounter, System.currentTimeMillis() - start ); 105 } 106 } 107 108 @Override 109 public void remove() 110 { 111 iterator.remove(); 112 } 113 114 @Override 115 public void close() throws IOException 116 { 117 if( iterator instanceof Closeable ) 118 ( (Closeable) iterator ).close(); 119 } 120 }