001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.process;
022
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.List;
028
029import cascading.CascadingException;
030import cascading.flow.Flow;
031import cascading.flow.process.ProcessFlowStep;
032import cascading.management.state.ClientState;
033import cascading.stats.FlowStats;
034import cascading.stats.FlowStepStats;
035import riffle.process.scheduler.ProcessException;
036import riffle.process.scheduler.ProcessWrapper;
037
038/**
039 * ProcessFlowStats is a sub-class of FlowStats which can fetch counters from a ProcessWrapper and hook them into the
040 * stats mechanism of Cascading.
041 */
042public class ProcessFlowStats extends FlowStats
043  {
044  /** The ProcessWrapper having the actual counters. */
045  private ProcessWrapper processWrapper;
046
047  /**
048   * Constructs a new ProcessFlowStats instance.
049   *
050   * @param flow           a Flow instance.
051   * @param clientState    The current client state.
052   * @param processWrapper a ProcessWrapper instance.
053   */
054  public ProcessFlowStats( Flow flow, ClientState clientState, ProcessWrapper processWrapper )
055    {
056    super( flow, clientState );
057    this.processWrapper = processWrapper;
058    }
059
060  @Override
061  public List<FlowStepStats> getFlowStepStats()
062    {
063    return getChildrenInternal();
064    }
065
066  @Override
067  public Collection getChildren()
068    {
069    return getChildrenInternal();
070    }
071
072  /**
073   * Internal method to retrieve the child stats objects from the ProcessWrapper.
074   */
075  private List<FlowStepStats> getChildrenInternal()
076    {
077    try
078      {
079      if( !processWrapper.hasChildren() )
080        {
081        if( processWrapper.hasCounters() )
082          return Arrays.<FlowStepStats>asList( new ProcessStepStats( clientState, processWrapper.getCounters(), new ProcessFlowStep( processWrapper, 1 ) ) );
083        else
084          return Collections.emptyList();
085        }
086
087      List<FlowStepStats> childStepStats = new ArrayList<FlowStepStats>();
088      int counter = 0;
089
090      for( Object process : processWrapper.getChildren() )
091        {
092        ProcessWrapper childWrapper = new ProcessWrapper( process );
093        if( childWrapper.hasCounters() )
094          {
095          ProcessStepStats processStepStats = new ProcessStepStats( clientState, childWrapper.getCounters(),
096            new ProcessFlowStep( processWrapper, counter ) );
097
098          counter++;
099
100          childStepStats.add( processStepStats );
101          }
102        }
103
104      return childStepStats;
105      }
106    catch( ProcessException exception )
107      {
108      throw new CascadingException( exception );
109      }
110    }
111
112  @Override
113  public int getStepsCount()
114    {
115    try
116      {
117      if( !processWrapper.hasChildren() )
118        return 1; // there is always a step, even if it is opaque to us
119
120      return processWrapper.getChildren().size();
121      }
122    catch( ProcessException exception )
123      {
124      throw new CascadingException( exception );
125      }
126    }
127  }