001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.util.TreeMap; 024 025import cascading.pipe.Pipe; 026import cascading.pipe.SubAssembly; 027import cascading.tap.Tap; 028import cascading.util.Util; 029 030/** 031 * 032 */ 033public class FlowElements 034 { 035 public static String id( FlowElement flowElement ) 036 { 037 if( flowElement instanceof Pipe ) 038 return Pipe.id( (Pipe) flowElement ); 039 040 if( flowElement instanceof Tap ) 041 return Tap.id( (Tap) flowElement ); 042 043 String id = Util.returnInstanceFieldIfExistsSafe( flowElement, "id" ); 044 045 if( id != null ) 046 return id; 047 048 throw new IllegalArgumentException( "id not supported for: " + flowElement.getClass().getCanonicalName() ); 049 } 050 051 public static int isPrevious( Pipe pipe, Pipe previous ) 052 { 053 if( pipe == previous ) 054 return 0; 055 056 if( pipe instanceof SubAssembly ) 057 { 058 Pipe[] unwind = SubAssembly.unwind( pipe ); 059 060 for( Pipe unwound : unwind ) 061 { 062 int result = collectPipes( unwound, 0, previous ); 063 064 if( result != -1 ) 065 return result; 066 } 067 068 return -1; 069 } 070 071 return collectPipes( pipe, 0, previous ); 072 } 073 074 private static int collectPipes( Pipe pipe, int depth, Pipe... allPrevious ) 075 { 076 depth++; 077 078 for( Pipe previous : allPrevious ) 079 { 080 if( pipe == previous ) 081 return depth; 082 083 int result; 084 if( previous instanceof SubAssembly ) 085 result = collectPipes( pipe, depth, SubAssembly.unwind( previous ) ); 086 else 087 result = collectPipes( pipe, depth, previous.getPrevious() ); 088 089 if( result != -1 ) 090 return result; 091 } 092 093 return -1; 094 } 095 096 public static Integer findOrdinal( Pipe pipe, Pipe previous ) 097 { 098 Pipe[] previousPipes = pipe.getPrevious(); 099 100 TreeMap<Integer, Integer> sorted = new TreeMap<>(); 101 102 for( int i = 0; i < previousPipes.length; i++ ) 103 { 104 int result = isPrevious( previousPipes[ i ], (Pipe) previous ); 105 106 if( result == -1 ) 107 continue; 108 109 sorted.put( result, i ); 110 } 111 112 return sorted.firstEntry().getValue(); 113 } 114 }