001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.util;
022
023import java.io.Closeable;
024import java.io.Flushable;
025import java.io.IOException;
026
027import cascading.flow.FlowProcess;
028import org.apache.hadoop.mapred.OutputCollector;
029
030/**
031 *
032 */
033public class MeasuredOutputCollector implements OutputCollector, Flushable, Closeable
034  {
035  private final FlowProcess flowProcess;
036  private final Enum counter;
037
038  private OutputCollector outputCollector;
039
040  public MeasuredOutputCollector( FlowProcess flowProcess, Enum counter )
041    {
042    this.flowProcess = flowProcess;
043    this.counter = counter;
044    }
045
046  public MeasuredOutputCollector( FlowProcess flowProcess, Enum counter, OutputCollector outputCollector )
047    {
048    this.flowProcess = flowProcess;
049    this.counter = counter;
050    this.outputCollector = outputCollector;
051    }
052
053  public OutputCollector getOutputCollector()
054    {
055    return outputCollector;
056    }
057
058  public void setOutputCollector( OutputCollector outputCollector )
059    {
060    this.outputCollector = outputCollector;
061    }
062
063  @Override
064  public void collect( Object key, Object value ) throws IOException
065    {
066    long start = System.currentTimeMillis();
067
068    try
069      {
070      outputCollector.collect( key, value );
071      }
072    finally
073      {
074      flowProcess.increment( counter, System.currentTimeMillis() - start );
075      }
076    }
077
078  @Override
079  public void flush() throws IOException
080    {
081    if( outputCollector instanceof Flushable )
082      ( (Flushable) outputCollector ).flush();
083    }
084
085  @Override
086  public void close() throws IOException
087    {
088    if( outputCollector instanceof Closeable )
089      ( (Closeable) outputCollector ).close();
090    }
091  }