001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.tez;
022
023import java.io.IOException;
024import java.util.Iterator;
025
026import cascading.flow.FlowNode;
027import cascading.flow.FlowStep;
028import cascading.management.state.ClientState;
029import cascading.stats.BaseCachedStepStats;
030import cascading.stats.CascadingStats;
031import cascading.stats.FlowNodeStats;
032import cascading.stats.tez.util.TezStatsUtil;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.tez.common.counters.TezCounters;
035import org.apache.tez.dag.api.client.DAGClient;
036import org.apache.tez.dag.api.client.DAGStatus;
037
038/**
039 *
040 */
041public abstract class TezStepStats extends BaseCachedStepStats<Configuration, DAGClient, TezCounters>
042  {
043  /**
044   * Constructor CascadingStats creates a new CascadingStats instance.
045   *
046   * @param flowStep
047   * @param clientState
048   */
049  protected TezStepStats( FlowStep flowStep, ClientState clientState )
050    {
051    super( flowStep, clientState );
052
053    Configuration config = (Configuration) flowStep.getConfig();
054
055    this.counterCache = new TezCounterCache<DAGClient>( this, config )
056      {
057      @Override
058      protected DAGClient getJobStatusClient()
059        {
060        return TezStepStats.this.getJobStatusClient();
061        }
062
063      protected TezCounters getCounters( DAGClient statusClient ) throws IOException
064        {
065        DAGStatus dagStatus = TezStatsUtil.getDagStatusWithCounters( statusClient );
066
067        if( dagStatus != null )
068          {
069          TezCounters counters = dagStatus.getDAGCounters();
070
071          if( counters != null && counters.countCounters() != 0 )
072            return counters;
073          }
074
075        return null;
076        }
077      };
078
079    Iterator<FlowNode> iterator = flowStep.getFlowNodeGraph().getOrderedTopologicalIterator();
080
081    while( iterator.hasNext() )
082      addNodeStats( new TezNodeStats( this, iterator.next(), clientState, config ) );
083    }
084
085  /** Method captureDetail captures statistics task details and completion events. */
086  @Override
087  public void captureDetail( CascadingStats.Type depth )
088    {
089    if( !getType().isChild( depth ) || !isDetailStale() )
090      return;
091
092    DAGClient dagClient = getJobStatusClient();
093
094    if( dagClient == null )
095      return;
096
097    synchronized( this )
098      {
099      if( !isDetailStale() )
100        return;
101
102      for( FlowNodeStats flowNodeStats : getFlowNodeStatsMap().values() )
103        flowNodeStats.captureDetail( depth );
104
105      markDetailCaptured();
106      }
107    }
108  }