001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.util; 023 024import java.beans.Expression; 025import java.io.BufferedReader; 026import java.io.File; 027import java.io.FileWriter; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.Writer; 031import java.lang.reflect.Constructor; 032import java.lang.reflect.Field; 033import java.lang.reflect.Method; 034import java.lang.reflect.Type; 035import java.net.URL; 036import java.net.URLDecoder; 037import java.security.MessageDigest; 038import java.security.NoSuchAlgorithmException; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.Enumeration; 044import java.util.HashMap; 045import java.util.HashSet; 046import java.util.IdentityHashMap; 047import java.util.Iterator; 048import java.util.LinkedHashSet; 049import java.util.List; 050import java.util.Map; 051import java.util.Set; 052import java.util.TreeSet; 053import java.util.UUID; 054import java.util.concurrent.Callable; 055import java.util.concurrent.ExecutionException; 056import java.util.concurrent.ExecutorService; 057import java.util.concurrent.Executors; 058import java.util.concurrent.Future; 059import java.util.concurrent.TimeUnit; 060import java.util.concurrent.TimeoutException; 061import java.util.regex.Pattern; 062 063import cascading.CascadingException; 064import cascading.flow.FlowException; 065import cascading.tap.MultiSourceTap; 066import cascading.tap.Tap; 067import cascading.tuple.coerce.Coercions; 068import cascading.util.jgrapht.ComponentAttributeProvider; 069import cascading.util.jgrapht.DOTExporter; 070import cascading.util.jgrapht.EdgeNameProvider; 071import cascading.util.jgrapht.IntegerNameProvider; 072import cascading.util.jgrapht.VertexNameProvider; 073import org.jgrapht.Graph; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** Class Util provides reusable operations. */ 078public class Util 079 { 080 /** 081 * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file 082 * to a pdf document. 083 */ 084 public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled"; 085 public static int ID_LENGTH = 32; 086 087 private static final Logger LOG = LoggerFactory.getLogger( Util.class ); 088 private static final String HEXES = "0123456789ABCDEF"; 089 090 public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() ); 091 public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT(); 092 093 public static <K, V> HashMap<K, V> createHashMap() 094 { 095 return new HashMap<K, V>(); 096 } 097 098 public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to ) 099 { 100 boolean dupes = false; 101 102 for( Map.Entry<V, K> entry : from.entrySet() ) 103 dupes |= to.put( entry.getValue(), entry.getKey() ) != null; 104 105 return dupes; 106 } 107 108 public static <V> Set<V> createIdentitySet() 109 { 110 return Collections.<V>newSetFromMap( new IdentityHashMap() ); 111 } 112 113 public static <V> Set<V> createIdentitySet( Collection<V> collection ) 114 { 115 Set<V> identitySet = createIdentitySet(); 116 117 if( collection != null ) 118 identitySet.addAll( collection ); 119 120 return identitySet; 121 } 122 123 public static <V> V getFirst( Collection<V> collection ) 124 { 125 if( collection == null || collection.isEmpty() ) 126 return null; 127 128 return collection.iterator().next(); 129 } 130 131 public static <V> V getFirst( Iterator<V> iterator ) 132 { 133 if( iterator == null || !iterator.hasNext() ) 134 return null; 135 136 return iterator.next(); 137 } 138 139 public static <V> V getLast( Iterator<V> iterator ) 140 { 141 if( iterator == null || !iterator.hasNext() ) 142 return null; 143 144 V v = iterator.next(); 145 146 while( iterator.hasNext() ) 147 v = iterator.next(); 148 149 return v; 150 } 151 152 public static <T> List<T> asList( T t, T[] ts ) 153 { 154 List<T> list = new ArrayList<>( 1 + ts.length ); 155 156 list.add( t ); 157 Collections.addAll( list, ts ); 158 159 return list; 160 } 161 162 public static <N extends Number> N max( Collection<N> collection ) 163 { 164 return new TreeSet<>( collection ).first(); 165 } 166 167 public static <N extends Number> N min( Collection<N> collection ) 168 { 169 return new TreeSet<>( collection ).last(); 170 } 171 172 public static <T> Set<T> narrowSet( Class<T> type, Collection collection ) 173 { 174 return narrowSet( type, collection.iterator() ); 175 } 176 177 public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection ) 178 { 179 return narrowIdentitySet( type, collection.iterator() ); 180 } 181 182 public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include ) 183 { 184 return narrowSet( type, collection.iterator(), include ); 185 } 186 187 public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection, boolean include ) 188 { 189 return narrowIdentitySet( type, collection.iterator(), include ); 190 } 191 192 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator ) 193 { 194 return narrowSet( type, iterator, true ); 195 } 196 197 public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator ) 198 { 199 return narrowIdentitySet( type, iterator, true ); 200 } 201 202 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include ) 203 { 204 return narrowSetInternal( type, iterator, include, new HashSet<T>() ); 205 } 206 207 public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator, boolean include ) 208 { 209 return narrowSetInternal( type, iterator, include, Util.<T>createIdentitySet() ); 210 } 211 212 private static <T> Set<T> narrowSetInternal( Class<T> type, Iterator iterator, boolean include, Set<T> set ) 213 { 214 while( iterator.hasNext() ) 215 { 216 Object o = iterator.next(); 217 218 if( type.isInstance( o ) == include ) 219 set.add( (T) o ); 220 } 221 222 return set; 223 } 224 225 public static <T> boolean contains( Class<T> type, Collection collection ) 226 { 227 return contains( type, collection.iterator() ); 228 } 229 230 public static <T> boolean contains( Class<T> type, Iterator iterator ) 231 { 232 while( iterator.hasNext() ) 233 { 234 Object o = iterator.next(); 235 236 if( type.isInstance( o ) ) 237 return true; 238 } 239 240 return false; 241 } 242 243 public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs ) 244 { 245 Set<T> diff = createIdentitySet( lhs ); 246 247 diff.removeAll( rhs ); 248 249 return diff; 250 } 251 252 public static synchronized String createUniqueIDWhichStartsWithAChar() 253 { 254 String value; 255 256 do 257 { 258 value = createUniqueID(); 259 } 260 while( Character.isDigit( value.charAt( 0 ) ) ); 261 262 return value; 263 } 264 265 public static synchronized String createUniqueID() 266 { 267 // creates a cryptographically secure random value 268 String value = UUID.randomUUID().toString(); 269 return value.toUpperCase().replaceAll( "-", "" ); 270 } 271 272 public static String createID( String rawID ) 273 { 274 return createID( rawID.getBytes() ); 275 } 276 277 /** 278 * Method CreateID returns a HEX hash of the given bytes with length 32 characters long. 279 * 280 * @param bytes the bytes 281 * @return string 282 */ 283 public static String createID( byte[] bytes ) 284 { 285 try 286 { 287 return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) ); 288 } 289 catch( NoSuchAlgorithmException exception ) 290 { 291 throw new RuntimeException( "unable to digest string" ); 292 } 293 } 294 295 public static String getHex( byte[] bytes ) 296 { 297 if( bytes == null ) 298 return null; 299 300 final StringBuilder hex = new StringBuilder( 2 * bytes.length ); 301 302 for( final byte b : bytes ) 303 hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) ); 304 305 return hex.toString(); 306 } 307 308 public static byte[] longToByteArray( long value ) 309 { 310 return new byte[]{ 311 (byte) ( value >> 56 ), 312 (byte) ( value >> 48 ), 313 (byte) ( value >> 40 ), 314 (byte) ( value >> 32 ), 315 (byte) ( value >> 24 ), 316 (byte) ( value >> 16 ), 317 (byte) ( value >> 8 ), 318 (byte) value 319 }; 320 } 321 322 public static byte[] intToByteArray( int value ) 323 { 324 return new byte[]{ 325 (byte) ( value >> 24 ), 326 (byte) ( value >> 16 ), 327 (byte) ( value >> 8 ), 328 (byte) value 329 }; 330 } 331 332 public static <T> T[] copy( T[] source ) 333 { 334 if( source == null ) 335 return null; 336 337 return Arrays.copyOf( source, source.length ); 338 } 339 340 public static String unique( String value, String delim ) 341 { 342 String[] split = value.split( delim ); 343 344 Set<String> values = new LinkedHashSet<String>(); 345 346 Collections.addAll( values, split ); 347 348 return join( values, delim ); 349 } 350 351 /** 352 * This method joins the values in the given list with the delim String value. 353 * 354 * @param list 355 * @param delim 356 * @return String 357 */ 358 public static String join( int[] list, String delim ) 359 { 360 return join( list, delim, false ); 361 } 362 363 public static String join( int[] list, String delim, boolean printNull ) 364 { 365 StringBuffer buffer = new StringBuffer(); 366 int count = 0; 367 368 for( Object s : list ) 369 { 370 if( count != 0 ) 371 buffer.append( delim ); 372 373 if( printNull || s != null ) 374 buffer.append( s ); 375 376 count++; 377 } 378 379 return buffer.toString(); 380 } 381 382 public static String join( String delim, String... strings ) 383 { 384 return join( delim, false, strings ); 385 } 386 387 public static String join( String delim, boolean printNull, String... strings ) 388 { 389 return join( strings, delim, printNull ); 390 } 391 392 /** 393 * This method joins the values in the given list with the delim String value. 394 * 395 * @param list 396 * @param delim 397 * @return a String 398 */ 399 public static String join( Object[] list, String delim ) 400 { 401 return join( list, delim, false ); 402 } 403 404 public static String join( Object[] list, String delim, boolean printNull ) 405 { 406 return join( list, delim, printNull, 0 ); 407 } 408 409 public static String join( Object[] list, String delim, boolean printNull, int beginAt ) 410 { 411 return join( list, delim, printNull, beginAt, list.length - beginAt ); 412 } 413 414 public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length ) 415 { 416 StringBuffer buffer = new StringBuffer(); 417 int count = 0; 418 419 for( int i = beginAt; i < beginAt + length; i++ ) 420 { 421 Object s = list[ i ]; 422 if( count != 0 ) 423 buffer.append( delim ); 424 425 if( printNull || s != null ) 426 buffer.append( s ); 427 428 count++; 429 } 430 431 return buffer.toString(); 432 } 433 434 public static String join( Iterable iterable, String delim, boolean printNull ) 435 { 436 int count = 0; 437 438 StringBuilder buffer = new StringBuilder(); 439 440 for( Object s : iterable ) 441 { 442 if( count != 0 ) 443 buffer.append( delim ); 444 445 if( printNull || s != null ) 446 buffer.append( s ); 447 448 count++; 449 } 450 451 return buffer.toString(); 452 } 453 454 /** 455 * This method joins each value in the collection with a tab character as the delimiter. 456 * 457 * @param collection 458 * @return a String 459 */ 460 public static String join( Collection collection ) 461 { 462 return join( collection, "\t" ); 463 } 464 465 /** 466 * This method joins each valuein the collection with the given delimiter. 467 * 468 * @param collection 469 * @param delim 470 * @return a String 471 */ 472 public static String join( Collection collection, String delim ) 473 { 474 return join( collection, delim, false ); 475 } 476 477 public static String join( Collection collection, String delim, boolean printNull ) 478 { 479 StringBuffer buffer = new StringBuffer(); 480 481 join( buffer, collection, delim, printNull ); 482 483 return buffer.toString(); 484 } 485 486 /** 487 * This method joins each value in the collection with the given delimiter. All results are appended to the 488 * given {@link StringBuffer} instance. 489 * 490 * @param buffer 491 * @param collection 492 * @param delim 493 */ 494 public static void join( StringBuffer buffer, Collection collection, String delim ) 495 { 496 join( buffer, collection, delim, false ); 497 } 498 499 public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull ) 500 { 501 int count = 0; 502 503 for( Object s : collection ) 504 { 505 if( count != 0 ) 506 buffer.append( delim ); 507 508 if( printNull || s != null ) 509 buffer.append( s ); 510 511 count++; 512 } 513 } 514 515 public static <T> List<T> split( Class<T> type, String values ) 516 { 517 return split( type, ",", values ); 518 } 519 520 public static <T> List<T> split( Class<T> type, String delim, String values ) 521 { 522 List<T> results = new ArrayList<>(); 523 524 if( values == null ) 525 return results; 526 527 String[] split = values.split( delim ); 528 529 for( String value : split ) 530 results.add( Coercions.<T>coerce( value, type ) ); 531 532 return results; 533 } 534 535 public static String[] removeNulls( String... strings ) 536 { 537 List<String> list = new ArrayList<String>(); 538 539 for( String string : strings ) 540 { 541 if( string != null ) 542 list.add( string ); 543 } 544 545 return list.toArray( new String[ list.size() ] ); 546 } 547 548 public static Collection<String> quote( Collection<String> collection, String quote ) 549 { 550 List<String> list = new ArrayList<String>(); 551 552 for( String string : collection ) 553 list.add( quote + string + quote ); 554 555 return list; 556 } 557 558 public static String print( Collection collection, String delim ) 559 { 560 StringBuffer buffer = new StringBuffer(); 561 562 print( buffer, collection, delim ); 563 564 return buffer.toString(); 565 } 566 567 public static void print( StringBuffer buffer, Collection collection, String delim ) 568 { 569 int count = 0; 570 571 for( Object s : collection ) 572 { 573 if( count != 0 ) 574 buffer.append( delim ); 575 576 buffer.append( "[" ); 577 buffer.append( s ); 578 buffer.append( "]" ); 579 580 count++; 581 } 582 } 583 584 /** 585 * This method attempts to remove any username and password from the given url String. 586 * 587 * @param url 588 * @return a String 589 */ 590 public static String sanitizeUrl( String url ) 591 { 592 if( url == null ) 593 return null; 594 595 return url.replaceAll( "(?<=//).*:.*@", "" ); 596 } 597 598 /** 599 * This method attempts to remove duplicate consecutive forward slashes from the given url. 600 * 601 * @param url 602 * @return a String 603 */ 604 public static String normalizeUrl( String url ) 605 { 606 if( url == null ) 607 return null; 608 609 return url.replaceAll( "([^:]/)/{2,}", "$1/" ); 610 } 611 612 /** 613 * This method returns the {@link Object#toString()} of the given object, or an empty String if the object 614 * is null. 615 * 616 * @param object 617 * @return a String 618 */ 619 public static String toNull( Object object ) 620 { 621 if( object == null ) 622 return ""; 623 624 return object.toString(); 625 } 626 627 /** 628 * This method truncates the given String value to the given size, but appends an ellipse ("...") if the 629 * String is larger than maxSize. 630 * 631 * @param string 632 * @param maxSize 633 * @return a String 634 */ 635 public static String truncate( String string, int maxSize ) 636 { 637 string = toNull( string ); 638 639 if( string.length() <= maxSize ) 640 return string; 641 642 return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) ); 643 } 644 645 public static void printGraph( String filename, Graph graph ) 646 { 647 try 648 { 649 new File( filename ).getParentFile().mkdirs(); 650 Writer writer = new FileWriter( filename ); 651 652 try 653 { 654 printGraph( writer, graph ); 655 } 656 finally 657 { 658 writer.close(); 659 } 660 } 661 catch( IOException exception ) 662 { 663 LOG.error( "failed printing graph to {}, with exception: {}", filename, exception ); 664 } 665 } 666 667 @SuppressWarnings({"unchecked"}) 668 private static void printGraph( Writer writer, Graph graph ) 669 { 670 DOTExporter dot = new DOTExporter( new IntegerNameProvider(), object -> 671 { 672 if( object == null ) 673 return "none"; 674 675 return object.toString().replaceAll( "\"", "\'" ); 676 }, (EdgeNameProvider<Object>) object -> 677 { 678 if( object == null ) 679 return "none"; 680 681 return object.toString().replaceAll( "\"", "\'" ); 682 } 683 ); 684 685 dot.export( writer, graph ); 686 } 687 688 /** 689 * This method removes all nulls from the given List. 690 * 691 * @param list 692 */ 693 @SuppressWarnings({"StatementWithEmptyBody"}) 694 public static void removeAllNulls( List list ) 695 { 696 while( list.remove( null ) ) 697 ; 698 } 699 700 public static void writeDOT( Writer writer, Graph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider ) 701 { 702 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph ); 703 } 704 705 public static void writeDOT( Writer writer, Graph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider, 706 ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider ) 707 { 708 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph ); 709 } 710 711 public static String asString( Object object ) 712 { 713 return object == null ? null : object.toString(); 714 } 715 716 public static boolean isEmpty( String string ) 717 { 718 return string == null || string.isEmpty(); 719 } 720 721 private static String[] findSplitName( String path ) 722 { 723 String separator = "/"; 724 725 if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) ) 726 separator = "\\\\"; 727 728 String[] split = path.split( separator ); 729 730 path = split[ split.length - 1 ]; 731 732 path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar 733 734 return path.split( "-(?=\\d)", 2 ); 735 } 736 737 public static String findVersion( String path ) 738 { 739 if( path == null || path.isEmpty() ) 740 return null; 741 742 String[] split = findSplitName( path ); 743 744 if( split.length == 2 ) 745 return split[ 1 ]; 746 747 return null; 748 } 749 750 public static String findName( String path ) 751 { 752 if( path == null || path.isEmpty() ) 753 return null; 754 755 String[] split = findSplitName( path ); 756 757 if( split.length == 0 ) 758 return null; 759 760 return split[ 0 ]; 761 } 762 763 public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException 764 { 765 long sourceModified = 0; 766 767 while( values.hasNext() ) 768 { 769 Tap source = values.next(); 770 771 if( source instanceof MultiSourceTap ) 772 return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified ); 773 774 sourceModified = source.getModifiedTime( confCopy ); 775 776 // source modified returns zero if does not exist 777 // this should minimize number of times we touch any file meta-data server 778 if( sourceModified == 0 && !source.resourceExists( confCopy ) ) 779 throw new FlowException( "source does not exist: " + source ); 780 781 if( sinkModified < sourceModified ) 782 return sourceModified; 783 } 784 785 return sourceModified; 786 } 787 788 public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException 789 { 790 long sinkModified = Long.MAX_VALUE; 791 792 for( Tap sink : sinks ) 793 { 794 if( sink.isReplace() || sink.isUpdate() ) 795 sinkModified = -1L; 796 else 797 { 798 if( !sink.resourceExists( config ) ) 799 sinkModified = 0L; 800 else 801 sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date 802 } 803 } 804 805 return sinkModified; 806 } 807 808 public static String getTypeName( Type type ) 809 { 810 if( type == null ) 811 return null; 812 813 return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString(); 814 } 815 816 public static String getSimpleTypeName( Type type ) 817 { 818 if( type == null ) 819 return null; 820 821 return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString(); 822 } 823 824 public static String[] typeNames( Type[] types ) 825 { 826 String[] names = new String[ types.length ]; 827 828 for( int i = 0; i < types.length; i++ ) 829 names[ i ] = getTypeName( types[ i ] ); 830 831 return names; 832 } 833 834 public static String[] simpleTypeNames( Type[] types ) 835 { 836 String[] names = new String[ types.length ]; 837 838 for( int i = 0; i < types.length; i++ ) 839 names[ i ] = getSimpleTypeName( types[ i ] ); 840 841 return names; 842 } 843 844 public static boolean containsNull( Object[] values ) 845 { 846 for( Object value : values ) 847 { 848 if( value == null ) 849 return true; 850 } 851 852 return false; 853 } 854 855 public static void safeSleep( long durationMillis ) 856 { 857 try 858 { 859 Thread.sleep( durationMillis ); 860 } 861 catch( InterruptedException exception ) 862 { 863 // do nothing 864 } 865 } 866 867 public static void writePDF( String path ) 868 { 869 if( !HAS_DOT_EXEC ) 870 return; 871 872 // dot *.dot -Tpdf -O -Nshape=box 873 File file = new File( path ); 874 execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" ); 875 } 876 877 static boolean hasDOT() 878 { 879 return execProcess( null, "which", "dot" ) == 0; 880 } 881 882 public static int execProcess( File parentFile, String... command ) 883 { 884 try 885 { 886 String commandLine = join( command, " " ); 887 888 LOG.debug( "command: {}", commandLine ); 889 890 Process process = Runtime.getRuntime().exec( commandLine, null, parentFile ); 891 892 int result = process.waitFor(); 893 894 BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) ); 895 896 String line = reader.readLine(); 897 898 while( line != null ) 899 { 900 LOG.warn( "{} stdout returned: {}", command[ 0 ], line ); 901 line = reader.readLine(); 902 } 903 904 reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) ); 905 906 line = reader.readLine(); 907 908 while( line != null ) 909 { 910 LOG.warn( "{} stderr returned: {}", command[ 0 ], line ); 911 line = reader.readLine(); 912 } 913 914 return result; 915 } 916 catch( IOException exception ) 917 { 918 LOG.warn( "unable to exec " + command[ 0 ], exception ); 919 } 920 catch( InterruptedException exception ) 921 { 922 LOG.warn( "interrupted exec " + command[ 0 ], exception ); 923 } 924 925 return Integer.MIN_VALUE; 926 } 927 928 public static String formatDurationFromMillis( long duration ) 929 { 930 if( duration / 1000 / 60 / 60 / 24 > 0.0 ) 931 return formatDurationDHMSms( duration ); 932 if( duration / 1000 / 60 / 60 > 0.0 ) 933 return formatDurationHMSms( duration ); 934 else 935 return formatDurationMSms( duration ); 936 } 937 938 public static String formatDurationMSms( long duration ) 939 { 940 long ms = duration % 1000; 941 long durationSeconds = duration / 1000; 942 long seconds = durationSeconds % 60; 943 long minutes = durationSeconds / 60; 944 945 return String.format( "%02d:%02d.%03d", minutes, seconds, ms ); 946 } 947 948 public static String formatDurationHMSms( long duration ) 949 { 950 long ms = duration % 1000; 951 long durationSeconds = duration / 1000; 952 long seconds = durationSeconds % 60; 953 long minutes = ( durationSeconds / 60 ) % 60; 954 long hours = durationSeconds / 60 / 60; 955 956 return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms ); 957 } 958 959 public static String formatDurationDHMSms( long duration ) 960 { 961 long ms = duration % 1000; 962 long durationSeconds = duration / 1000; 963 long seconds = durationSeconds % 60; 964 long minutes = ( durationSeconds / 60 ) % 60; 965 long hours = ( durationSeconds / 60 / 60 ) % 24; 966 long days = durationSeconds / 60 / 60 / 24; 967 968 return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms ); 969 } 970 971 /** 972 * Converts a given comma separated String of Exception names into a List of classes. 973 * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning. 974 * 975 * @param classNames A comma separated String of Exception names. 976 * @return List of Exception classes. 977 */ 978 public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage ) 979 { 980 Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>(); 981 String[] split = classNames.split( "," ); 982 983 // possibly user provided type, load from context 984 ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); 985 986 for( String className : split ) 987 { 988 if( className != null ) 989 className = className.trim(); 990 991 if( isEmpty( className ) ) 992 continue; 993 994 try 995 { 996 Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class ); 997 998 exceptionClasses.add( exceptionClass ); 999 } 1000 catch( ClassNotFoundException exception ) 1001 { 1002 if( !Util.isEmpty( warningMessage ) ) 1003 LOG.warn( "{}: {}", warningMessage, className ); 1004 } 1005 } 1006 1007 return exceptionClasses; 1008 } 1009 1010 public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception 1011 { 1012 ExecutorService executor = Executors.newFixedThreadPool( 1 ); 1013 1014 Future<Boolean> future = executor.submit( task ); 1015 1016 executor.shutdown(); 1017 1018 try 1019 { 1020 return future.get( timeout, timeUnit ); 1021 } 1022 catch( TimeoutException exception ) 1023 { 1024 future.cancel( true ); 1025 } 1026 catch( ExecutionException exception ) 1027 { 1028 Throwable cause = exception.getCause(); 1029 1030 if( cause instanceof RuntimeException ) 1031 throw (RuntimeException) cause; 1032 1033 throw (Exception) cause; 1034 } 1035 1036 return null; 1037 } 1038 1039 public interface RetryOperator<T> 1040 { 1041 T operate() throws Exception; 1042 1043 boolean rethrow( Exception exception ); 1044 } 1045 1046 public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception 1047 { 1048 Exception saved = null; 1049 1050 for( int i = 0; i < retries; i++ ) 1051 { 1052 try 1053 { 1054 return operator.operate(); 1055 } 1056 catch( Exception exception ) 1057 { 1058 if( operator.rethrow( exception ) ) 1059 { 1060 logger.warn( message + ", but not retrying", exception ); 1061 1062 throw exception; 1063 } 1064 1065 saved = exception; 1066 1067 logger.warn( message + ", attempt: " + ( i + 1 ), exception ); 1068 1069 try 1070 { 1071 Thread.sleep( secondsDelay * 1000 ); 1072 } 1073 catch( InterruptedException exception1 ) 1074 { 1075 // do nothing 1076 } 1077 } 1078 } 1079 1080 logger.warn( message + ", done retrying after attempts: " + retries, saved ); 1081 1082 throw saved; 1083 } 1084 1085 public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes ) 1086 { 1087 try 1088 { 1089 Constructor constructor = type.getDeclaredConstructor( parameterTypes ); 1090 1091 constructor.setAccessible( true ); 1092 1093 return constructor.newInstance( parameters ); 1094 } 1095 catch( Exception exception ) 1096 { 1097 LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception ); 1098 1099 throw new FlowException( "unable to instantiate type: " + type.getName(), exception ); 1100 } 1101 } 1102 1103 public static boolean hasClass( String typeString ) 1104 { 1105 try 1106 { 1107 Util.class.getClassLoader().loadClass( typeString ); 1108 1109 return true; 1110 } 1111 catch( ClassNotFoundException exception ) 1112 { 1113 return false; 1114 } 1115 } 1116 1117 public static <T> T newInstance( String className, Object... parameters ) 1118 { 1119 try 1120 { 1121 Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className ); 1122 1123 return newInstance( type, parameters ); 1124 } 1125 catch( ClassNotFoundException exception ) 1126 { 1127 throw new CascadingException( "unable to load class: " + className, exception ); 1128 } 1129 } 1130 1131 public static <T> T newInstance( Class<T> target, Object... parameters ) 1132 { 1133 // using Expression makes sure that constructors using sub-types properly work, otherwise we get a 1134 // NoSuchMethodException. 1135 Expression expr = new Expression( target, "new", parameters ); 1136 1137 try 1138 { 1139 return (T) expr.getValue(); 1140 } 1141 catch( Exception exception ) 1142 { 1143 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1144 } 1145 } 1146 1147 public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes ) 1148 { 1149 Class type = loadClass( typeString ); 1150 1151 return invokeStaticMethod( type, methodName, parameters, parameterTypes ); 1152 } 1153 1154 public static Class<?> loadClass( String typeString ) 1155 { 1156 try 1157 { 1158 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1159 } 1160 catch( ClassNotFoundException exception ) 1161 { 1162 throw new CascadingException( "unable to load class: " + typeString, exception ); 1163 } 1164 } 1165 1166 public static Class<?> loadClassSafe( String typeString ) 1167 { 1168 try 1169 { 1170 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1171 } 1172 catch( ClassNotFoundException exception ) 1173 { 1174 return null; 1175 } 1176 } 1177 1178 public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes ) 1179 { 1180 try 1181 { 1182 Method method = type.getDeclaredMethod( methodName, parameterTypes ); 1183 1184 method.setAccessible( true ); 1185 1186 return method.invoke( null, parameters ); 1187 } 1188 catch( Exception exception ) 1189 { 1190 throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception ); 1191 } 1192 } 1193 1194 public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes ) 1195 { 1196 try 1197 { 1198 return target.getClass().getMethod( methodName, parameterTypes ) != null; 1199 } 1200 catch( NoSuchMethodException exception ) 1201 { 1202 return false; 1203 } 1204 } 1205 1206 public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1207 { 1208 try 1209 { 1210 return invokeInstanceMethod( target, methodName, parameters, parameterTypes ); 1211 } 1212 catch( Exception exception ) 1213 { 1214 return null; 1215 } 1216 } 1217 1218 public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1219 { 1220 try 1221 { 1222 Method method = target.getClass().getMethod( methodName, parameterTypes ); 1223 1224 method.setAccessible( true ); 1225 1226 return method.invoke( target, parameters ); 1227 } 1228 catch( Exception exception ) 1229 { 1230 throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception ); 1231 } 1232 } 1233 1234 public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName ) 1235 { 1236 try 1237 { 1238 return returnInstanceFieldIfExists( target, fieldName ); 1239 } 1240 catch( Exception exception ) 1241 { 1242 // do nothing 1243 return null; 1244 } 1245 } 1246 1247 public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes ) 1248 { 1249 try 1250 { 1251 Class type = Util.class.getClassLoader().loadClass( className ); 1252 1253 return invokeConstructor( type, parameters, parameterTypes ); 1254 } 1255 catch( ClassNotFoundException exception ) 1256 { 1257 throw new CascadingException( "unable to load class: " + className, exception ); 1258 } 1259 } 1260 1261 public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes ) 1262 { 1263 try 1264 { 1265 Constructor<T> constructor = target.getConstructor( parameterTypes ); 1266 1267 constructor.setAccessible( true ); 1268 1269 return constructor.newInstance( parameters ); 1270 } 1271 catch( Exception exception ) 1272 { 1273 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1274 } 1275 } 1276 1277 public static <R> R returnInstanceFieldIfExists( Object target, String fieldName ) 1278 { 1279 try 1280 { 1281 Class<?> type = target.getClass(); 1282 Field field = getDeclaredField( fieldName, type ); 1283 1284 field.setAccessible( true ); 1285 1286 return (R) field.get( target ); 1287 } 1288 catch( Exception exception ) 1289 { 1290 throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1291 } 1292 } 1293 1294 public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value ) 1295 { 1296 try 1297 { 1298 setInstanceFieldIfExists( target, fieldName, value ); 1299 } 1300 catch( Exception exception ) 1301 { 1302 return false; 1303 } 1304 1305 return true; 1306 } 1307 1308 public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value ) 1309 { 1310 try 1311 { 1312 Class<?> type = target.getClass(); 1313 Field field = getDeclaredField( fieldName, type ); 1314 1315 field.setAccessible( true ); 1316 1317 field.set( target, value ); 1318 } 1319 catch( Exception exception ) 1320 { 1321 throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1322 } 1323 } 1324 1325 private static Field getDeclaredField( String fieldName, Class<?> type ) 1326 { 1327 if( type == Object.class ) 1328 { 1329 if( LOG.isDebugEnabled() ) 1330 LOG.debug( "did not find {} field on {}", fieldName, type.getName() ); 1331 1332 return null; 1333 } 1334 1335 try 1336 { 1337 return type.getDeclaredField( fieldName ); 1338 } 1339 catch( NoSuchFieldException exception ) 1340 { 1341 return getDeclaredField( fieldName, type.getSuperclass() ); 1342 } 1343 } 1344 1345 public static String makePath( String prefix, String name ) 1346 { 1347 if( name == null || name.isEmpty() ) 1348 throw new IllegalArgumentException( "name may not be null or empty " ); 1349 1350 if( prefix == null || prefix.isEmpty() ) 1351 prefix = Long.toString( (long) ( Math.random() * 10000000000L ) ); 1352 1353 name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) ); 1354 1355 return prefix + "/" + name + "/"; 1356 } 1357 1358 public static String cleansePathName( String name ) 1359 { 1360 return name.replaceAll( "\\s+|\\*|\\+|/+", "_" ); 1361 } 1362 1363 public static Class findMainClass( Class defaultType, String packageExclude ) 1364 { 1365 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 1366 1367 for( StackTraceElement stackTraceElement : stackTrace ) 1368 { 1369 if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) ) 1370 { 1371 try 1372 { 1373 LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() ); 1374 1375 return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() ); 1376 } 1377 catch( ClassNotFoundException exception ) 1378 { 1379 LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception ); 1380 } 1381 } 1382 } 1383 1384 LOG.info( "using default application jar, may cause class not found exceptions on the cluster" ); 1385 1386 return defaultType; 1387 } 1388 1389 public static String findContainingJar( Class<?> type ) 1390 { 1391 ClassLoader classLoader = type.getClassLoader(); 1392 1393 String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class"; 1394 1395 try 1396 { 1397 for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); ) 1398 { 1399 URL url = iterator.nextElement(); 1400 1401 if( !"jar".equals( url.getProtocol() ) ) 1402 continue; 1403 1404 String path = url.getPath(); 1405 1406 if( path.startsWith( "file:" ) ) 1407 path = path.substring( "file:".length() ); 1408 1409 path = URLDecoder.decode( path, "UTF-8" ); 1410 1411 return path.replaceAll( "!.*$", "" ); 1412 } 1413 } 1414 catch( IOException exception ) 1415 { 1416 throw new CascadingException( exception ); 1417 } 1418 1419 return null; 1420 } 1421 1422 public static boolean containsWhitespace( String string ) 1423 { 1424 return Pattern.compile( "\\s" ).matcher( string ).find(); 1425 } 1426 1427 public static String parseHostname( String uri ) 1428 { 1429 if( isEmpty( uri ) ) 1430 return null; 1431 1432 String[] parts = uri.split( "://", 2 ); 1433 String result; 1434 1435 // missing protocol 1436 result = parts[ parts.length - 1 ]; 1437 1438 // user:pass@hostname:port/stuff 1439 parts = result.split( "/", 2 ); 1440 result = parts[ 0 ]; 1441 1442 // user:pass@hostname:port 1443 parts = result.split( "@", 2 ); 1444 result = parts[ parts.length - 1 ]; 1445 1446 // hostname:port 1447 parts = result.split( ":", 2 ); 1448 result = parts[ 0 ]; 1449 1450 return result; 1451 } 1452 }