001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.util.TreeMap; 024 025import cascading.pipe.Pipe; 026import cascading.pipe.SubAssembly; 027import cascading.tap.Tap; 028 029/** 030 * 031 */ 032public class FlowElements 033 { 034 public static String id( FlowElement flowElement ) 035 { 036 if( flowElement instanceof Pipe ) 037 return Pipe.id( (Pipe) flowElement ); 038 039 if( flowElement instanceof Tap ) 040 return Tap.id( (Tap) flowElement ); 041 042 throw new IllegalArgumentException( "id not supported for: " + flowElement.getClass().getCanonicalName() ); 043 } 044 045 public static int isPrevious( Pipe pipe, Pipe previous ) 046 { 047 if( pipe == previous ) 048 return 0; 049 050 if( pipe instanceof SubAssembly ) 051 { 052 Pipe[] unwind = SubAssembly.unwind( pipe ); 053 054 for( Pipe unwound : unwind ) 055 { 056 int result = collectPipes( unwound, 0, previous ); 057 058 if( result != -1 ) 059 return result; 060 } 061 062 return -1; 063 } 064 065 return collectPipes( pipe, 0, previous ); 066 } 067 068 private static int collectPipes( Pipe pipe, int depth, Pipe... allPrevious ) 069 { 070 depth++; 071 072 for( Pipe previous : allPrevious ) 073 { 074 if( pipe == previous ) 075 return depth; 076 077 int result; 078 if( previous instanceof SubAssembly ) 079 result = collectPipes( pipe, depth, SubAssembly.unwind( previous ) ); 080 else 081 result = collectPipes( pipe, depth, previous.getPrevious() ); 082 083 if( result != -1 ) 084 return result; 085 } 086 087 return -1; 088 } 089 090 public static Integer findOrdinal( Pipe pipe, Pipe previous ) 091 { 092 Pipe[] previousPipes = pipe.getPrevious(); 093 094 TreeMap<Integer, Integer> sorted = new TreeMap<>(); 095 096 for( int i = 0; i < previousPipes.length; i++ ) 097 { 098 int result = isPrevious( previousPipes[ i ], (Pipe) previous ); 099 100 if( result == -1 ) 101 continue; 102 103 sorted.put( result, i ); 104 } 105 106 return sorted.firstEntry().getValue(); 107 } 108 }