001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.util.TreeMap;
024
025import cascading.pipe.Pipe;
026import cascading.pipe.SubAssembly;
027import cascading.tap.Tap;
028
029/**
030 *
031 */
032public class FlowElements
033  {
034  public static String id( FlowElement flowElement )
035    {
036    if( flowElement instanceof Pipe )
037      return Pipe.id( (Pipe) flowElement );
038
039    if( flowElement instanceof Tap )
040      return Tap.id( (Tap) flowElement );
041
042    throw new IllegalArgumentException( "id not supported for: " + flowElement.getClass().getCanonicalName() );
043    }
044
045  public static int isPrevious( Pipe pipe, Pipe previous )
046    {
047    if( pipe == previous )
048      return 0;
049
050    if( pipe instanceof SubAssembly )
051      {
052      Pipe[] unwind = SubAssembly.unwind( pipe );
053
054      for( Pipe unwound : unwind )
055        {
056        int result = collectPipes( unwound, 0, previous );
057
058        if( result != -1 )
059          return result;
060        }
061
062      return -1;
063      }
064
065    return collectPipes( pipe, 0, previous );
066    }
067
068  private static int collectPipes( Pipe pipe, int depth, Pipe... allPrevious )
069    {
070    depth++;
071
072    for( Pipe previous : allPrevious )
073      {
074      if( pipe == previous )
075        return depth;
076
077      int result;
078      if( previous instanceof SubAssembly )
079        result = collectPipes( pipe, depth, SubAssembly.unwind( previous ) );
080      else
081        result = collectPipes( pipe, depth, previous.getPrevious() );
082
083      if( result != -1 )
084        return result;
085      }
086
087    return -1;
088    }
089
090  public static Integer findOrdinal( Pipe pipe, Pipe previous )
091    {
092    Pipe[] previousPipes = pipe.getPrevious();
093
094    TreeMap<Integer, Integer> sorted = new TreeMap<>();
095
096    for( int i = 0; i < previousPipes.length; i++ )
097      {
098      int result = isPrevious( previousPipes[ i ], (Pipe) previous );
099
100      if( result == -1 )
101        continue;
102
103      sorted.put( result, i );
104      }
105
106    return sorted.firstEntry().getValue();
107    }
108  }