001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.util; 022 023import java.beans.Expression; 024import java.io.BufferedReader; 025import java.io.File; 026import java.io.FileWriter; 027import java.io.IOException; 028import java.io.InputStreamReader; 029import java.io.Writer; 030import java.lang.reflect.Constructor; 031import java.lang.reflect.Field; 032import java.lang.reflect.Method; 033import java.lang.reflect.Type; 034import java.net.URL; 035import java.net.URLDecoder; 036import java.security.MessageDigest; 037import java.security.NoSuchAlgorithmException; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.Collection; 041import java.util.Collections; 042import java.util.Enumeration; 043import java.util.HashMap; 044import java.util.HashSet; 045import java.util.IdentityHashMap; 046import java.util.Iterator; 047import java.util.LinkedHashSet; 048import java.util.List; 049import java.util.Map; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.UUID; 053import java.util.concurrent.Callable; 054import java.util.concurrent.ExecutionException; 055import java.util.concurrent.ExecutorService; 056import java.util.concurrent.Executors; 057import java.util.concurrent.Future; 058import java.util.concurrent.TimeUnit; 059import java.util.concurrent.TimeoutException; 060import java.util.regex.Pattern; 061 062import cascading.CascadingException; 063import cascading.flow.FlowException; 064import cascading.tap.MultiSourceTap; 065import cascading.tap.Tap; 066import cascading.tuple.coerce.Coercions; 067import cascading.util.jgrapht.ComponentAttributeProvider; 068import cascading.util.jgrapht.DOTExporter; 069import cascading.util.jgrapht.EdgeNameProvider; 070import cascading.util.jgrapht.IntegerNameProvider; 071import cascading.util.jgrapht.VertexNameProvider; 072import org.jgrapht.DirectedGraph; 073import org.jgrapht.graph.SimpleDirectedGraph; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** Class Util provides reusable operations. */ 078public class Util 079 { 080 /** 081 * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file 082 * to a pdf document. 083 */ 084 public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled"; 085 public static int ID_LENGTH = 32; 086 087 private static final Logger LOG = LoggerFactory.getLogger( Util.class ); 088 private static final String HEXES = "0123456789ABCDEF"; 089 090 public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() ); 091 public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT(); 092 093 public static <K, V> HashMap<K, V> createHashMap() 094 { 095 return new HashMap<K, V>(); 096 } 097 098 public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to ) 099 { 100 boolean dupes = false; 101 102 for( Map.Entry<V, K> entry : from.entrySet() ) 103 dupes |= to.put( entry.getValue(), entry.getKey() ) != null; 104 105 return dupes; 106 } 107 108 public static <V> Set<V> createIdentitySet() 109 { 110 return Collections.<V>newSetFromMap( new IdentityHashMap() ); 111 } 112 113 public static <V> Set<V> createIdentitySet( Collection<V> collection ) 114 { 115 Set<V> identitySet = createIdentitySet(); 116 117 if( collection != null ) 118 identitySet.addAll( collection ); 119 120 return identitySet; 121 } 122 123 public static <V> V getFirst( Collection<V> collection ) 124 { 125 if( collection == null || collection.isEmpty() ) 126 return null; 127 128 return collection.iterator().next(); 129 } 130 131 public static <N extends Number> N max( Collection<N> collection ) 132 { 133 return new TreeSet<>( collection ).first(); 134 } 135 136 public static <N extends Number> N min( Collection<N> collection ) 137 { 138 return new TreeSet<>( collection ).last(); 139 } 140 141 public static <T> Set<T> narrowSet( Class<T> type, Collection collection ) 142 { 143 return narrowSet( type, collection.iterator() ); 144 } 145 146 public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include ) 147 { 148 return narrowSet( type, collection.iterator(), include ); 149 } 150 151 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator ) 152 { 153 return narrowSet( type, iterator, true ); 154 } 155 156 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include ) 157 { 158 Set<T> set = new HashSet<>(); 159 160 while( iterator.hasNext() ) 161 { 162 Object o = iterator.next(); 163 164 if( type.isInstance( o ) == include ) 165 set.add( (T) o ); 166 } 167 168 return set; 169 } 170 171 public static <T> boolean contains( Class<T> type, Collection collection ) 172 { 173 return contains( type, collection.iterator() ); 174 } 175 176 public static <T> boolean contains( Class<T> type, Iterator iterator ) 177 { 178 while( iterator.hasNext() ) 179 { 180 Object o = iterator.next(); 181 182 if( type.isInstance( o ) ) 183 return true; 184 } 185 186 return false; 187 } 188 189 public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs ) 190 { 191 Set<T> diff = createIdentitySet( lhs ); 192 193 diff.removeAll( rhs ); 194 195 return diff; 196 } 197 198 public static synchronized String createUniqueIDWhichStartsWithAChar() 199 { 200 String value; 201 202 do 203 { 204 value = createUniqueID(); 205 } 206 while( Character.isDigit( value.charAt( 0 ) ) ); 207 208 return value; 209 } 210 211 public static synchronized String createUniqueID() 212 { 213 // creates a cryptographically secure random value 214 String value = UUID.randomUUID().toString(); 215 return value.toUpperCase().replaceAll( "-", "" ); 216 } 217 218 public static String createID( String rawID ) 219 { 220 return createID( rawID.getBytes() ); 221 } 222 223 /** 224 * Method CreateID returns a HEX hash of the given bytes with length 32 characters long. 225 * 226 * @param bytes the bytes 227 * @return string 228 */ 229 public static String createID( byte[] bytes ) 230 { 231 try 232 { 233 return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) ); 234 } 235 catch( NoSuchAlgorithmException exception ) 236 { 237 throw new RuntimeException( "unable to digest string" ); 238 } 239 } 240 241 public static String getHex( byte[] bytes ) 242 { 243 if( bytes == null ) 244 return null; 245 246 final StringBuilder hex = new StringBuilder( 2 * bytes.length ); 247 248 for( final byte b : bytes ) 249 hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) ); 250 251 return hex.toString(); 252 } 253 254 public static byte[] longToByteArray( long value ) 255 { 256 return new byte[]{ 257 (byte) ( value >> 56 ), 258 (byte) ( value >> 48 ), 259 (byte) ( value >> 40 ), 260 (byte) ( value >> 32 ), 261 (byte) ( value >> 24 ), 262 (byte) ( value >> 16 ), 263 (byte) ( value >> 8 ), 264 (byte) value 265 }; 266 } 267 268 public static byte[] intToByteArray( int value ) 269 { 270 return new byte[]{ 271 (byte) ( value >> 24 ), 272 (byte) ( value >> 16 ), 273 (byte) ( value >> 8 ), 274 (byte) value 275 }; 276 } 277 278 public static <T> T[] copy( T[] source ) 279 { 280 if( source == null ) 281 return null; 282 283 return Arrays.copyOf( source, source.length ); 284 } 285 286 public static String unique( String value, String delim ) 287 { 288 String[] split = value.split( delim ); 289 290 Set<String> values = new LinkedHashSet<String>(); 291 292 Collections.addAll( values, split ); 293 294 return join( values, delim ); 295 } 296 297 /** 298 * This method joins the values in the given list with the delim String value. 299 * 300 * @param list 301 * @param delim 302 * @return String 303 */ 304 public static String join( int[] list, String delim ) 305 { 306 return join( list, delim, false ); 307 } 308 309 public static String join( int[] list, String delim, boolean printNull ) 310 { 311 StringBuffer buffer = new StringBuffer(); 312 int count = 0; 313 314 for( Object s : list ) 315 { 316 if( count != 0 ) 317 buffer.append( delim ); 318 319 if( printNull || s != null ) 320 buffer.append( s ); 321 322 count++; 323 } 324 325 return buffer.toString(); 326 } 327 328 public static String join( String delim, String... strings ) 329 { 330 return join( delim, false, strings ); 331 } 332 333 public static String join( String delim, boolean printNull, String... strings ) 334 { 335 return join( strings, delim, printNull ); 336 } 337 338 /** 339 * This method joins the values in the given list with the delim String value. 340 * 341 * @param list 342 * @param delim 343 * @return a String 344 */ 345 public static String join( Object[] list, String delim ) 346 { 347 return join( list, delim, false ); 348 } 349 350 public static String join( Object[] list, String delim, boolean printNull ) 351 { 352 return join( list, delim, printNull, 0 ); 353 } 354 355 public static String join( Object[] list, String delim, boolean printNull, int beginAt ) 356 { 357 return join( list, delim, printNull, beginAt, list.length - beginAt ); 358 } 359 360 public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length ) 361 { 362 StringBuffer buffer = new StringBuffer(); 363 int count = 0; 364 365 for( int i = beginAt; i < beginAt + length; i++ ) 366 { 367 Object s = list[ i ]; 368 if( count != 0 ) 369 buffer.append( delim ); 370 371 if( printNull || s != null ) 372 buffer.append( s ); 373 374 count++; 375 } 376 377 return buffer.toString(); 378 } 379 380 public static String join( Iterable iterable, String delim, boolean printNull ) 381 { 382 int count = 0; 383 384 StringBuilder buffer = new StringBuilder(); 385 386 for( Object s : iterable ) 387 { 388 if( count != 0 ) 389 buffer.append( delim ); 390 391 if( printNull || s != null ) 392 buffer.append( s ); 393 394 count++; 395 } 396 397 return buffer.toString(); 398 } 399 400 /** 401 * This method joins each value in the collection with a tab character as the delimiter. 402 * 403 * @param collection 404 * @return a String 405 */ 406 public static String join( Collection collection ) 407 { 408 return join( collection, "\t" ); 409 } 410 411 /** 412 * This method joins each valuein the collection with the given delimiter. 413 * 414 * @param collection 415 * @param delim 416 * @return a String 417 */ 418 public static String join( Collection collection, String delim ) 419 { 420 return join( collection, delim, false ); 421 } 422 423 public static String join( Collection collection, String delim, boolean printNull ) 424 { 425 StringBuffer buffer = new StringBuffer(); 426 427 join( buffer, collection, delim, printNull ); 428 429 return buffer.toString(); 430 } 431 432 /** 433 * This method joins each value in the collection with the given delimiter. All results are appended to the 434 * given {@link StringBuffer} instance. 435 * 436 * @param buffer 437 * @param collection 438 * @param delim 439 */ 440 public static void join( StringBuffer buffer, Collection collection, String delim ) 441 { 442 join( buffer, collection, delim, false ); 443 } 444 445 public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull ) 446 { 447 int count = 0; 448 449 for( Object s : collection ) 450 { 451 if( count != 0 ) 452 buffer.append( delim ); 453 454 if( printNull || s != null ) 455 buffer.append( s ); 456 457 count++; 458 } 459 } 460 461 public static <T> List<T> split( Class<T> type, String values ) 462 { 463 return split( type, ",", values ); 464 } 465 466 public static <T> List<T> split( Class<T> type, String delim, String values ) 467 { 468 String[] split = values.split( delim ); 469 470 List<T> results = new ArrayList<>(); 471 472 for( String value : split ) 473 results.add( Coercions.<T>coerce( value, type ) ); 474 475 return results; 476 } 477 478 public static String[] removeNulls( String... strings ) 479 { 480 List<String> list = new ArrayList<String>(); 481 482 for( String string : strings ) 483 { 484 if( string != null ) 485 list.add( string ); 486 } 487 488 return list.toArray( new String[ list.size() ] ); 489 } 490 491 public static Collection<String> quote( Collection<String> collection, String quote ) 492 { 493 List<String> list = new ArrayList<String>(); 494 495 for( String string : collection ) 496 list.add( quote + string + quote ); 497 498 return list; 499 } 500 501 public static String print( Collection collection, String delim ) 502 { 503 StringBuffer buffer = new StringBuffer(); 504 505 print( buffer, collection, delim ); 506 507 return buffer.toString(); 508 } 509 510 public static void print( StringBuffer buffer, Collection collection, String delim ) 511 { 512 int count = 0; 513 514 for( Object s : collection ) 515 { 516 if( count != 0 ) 517 buffer.append( delim ); 518 519 buffer.append( "[" ); 520 buffer.append( s ); 521 buffer.append( "]" ); 522 523 count++; 524 } 525 } 526 527 /** 528 * This method attempts to remove any username and password from the given url String. 529 * 530 * @param url 531 * @return a String 532 */ 533 public static String sanitizeUrl( String url ) 534 { 535 if( url == null ) 536 return null; 537 538 return url.replaceAll( "(?<=//).*:.*@", "" ); 539 } 540 541 /** 542 * This method attempts to remove duplicate consecutive forward slashes from the given url. 543 * 544 * @param url 545 * @return a String 546 */ 547 public static String normalizeUrl( String url ) 548 { 549 if( url == null ) 550 return null; 551 552 return url.replaceAll( "([^:]/)/{2,}", "$1/" ); 553 } 554 555 /** 556 * This method returns the {@link Object#toString()} of the given object, or an empty String if the object 557 * is null. 558 * 559 * @param object 560 * @return a String 561 */ 562 public static String toNull( Object object ) 563 { 564 if( object == null ) 565 return ""; 566 567 return object.toString(); 568 } 569 570 /** 571 * This method truncates the given String value to the given size, but appends an ellipse ("...") if the 572 * String is larger than maxSize. 573 * 574 * @param string 575 * @param maxSize 576 * @return a String 577 */ 578 public static String truncate( String string, int maxSize ) 579 { 580 string = toNull( string ); 581 582 if( string.length() <= maxSize ) 583 return string; 584 585 return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) ); 586 } 587 588 public static void printGraph( String filename, SimpleDirectedGraph graph ) 589 { 590 try 591 { 592 new File( filename ).getParentFile().mkdirs(); 593 Writer writer = new FileWriter( filename ); 594 595 try 596 { 597 printGraph( writer, graph ); 598 } 599 finally 600 { 601 writer.close(); 602 } 603 } 604 catch( IOException exception ) 605 { 606 LOG.error( "failed printing graph to {}, with exception: {}", filename, exception ); 607 } 608 } 609 610 @SuppressWarnings({"unchecked"}) 611 private static void printGraph( Writer writer, SimpleDirectedGraph graph ) 612 { 613 DOTExporter dot = new DOTExporter( new IntegerNameProvider(), new VertexNameProvider() 614 { 615 public String getVertexName( Object object ) 616 { 617 if( object == null ) 618 return "none"; 619 620 return object.toString().replaceAll( "\"", "\'" ); 621 } 622 }, new EdgeNameProvider<Object>() 623 { 624 public String getEdgeName( Object object ) 625 { 626 if( object == null ) 627 return "none"; 628 629 return object.toString().replaceAll( "\"", "\'" ); 630 } 631 } 632 ); 633 634 dot.export( writer, graph ); 635 } 636 637 /** 638 * This method removes all nulls from the given List. 639 * 640 * @param list 641 */ 642 @SuppressWarnings({"StatementWithEmptyBody"}) 643 public static void removeAllNulls( List list ) 644 { 645 while( list.remove( null ) ) 646 ; 647 } 648 649 public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider ) 650 { 651 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph ); 652 } 653 654 public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider, 655 ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider ) 656 { 657 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph ); 658 } 659 660 public static boolean isEmpty( String string ) 661 { 662 return string == null || string.isEmpty(); 663 } 664 665 private static String[] findSplitName( String path ) 666 { 667 String separator = "/"; 668 669 if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) ) 670 separator = "\\\\"; 671 672 String[] split = path.split( separator ); 673 674 path = split[ split.length - 1 ]; 675 676 path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar 677 678 return path.split( "-(?=\\d)", 2 ); 679 } 680 681 public static String findVersion( String path ) 682 { 683 if( path == null || path.isEmpty() ) 684 return null; 685 686 String[] split = findSplitName( path ); 687 688 if( split.length == 2 ) 689 return split[ 1 ]; 690 691 return null; 692 } 693 694 public static String findName( String path ) 695 { 696 if( path == null || path.isEmpty() ) 697 return null; 698 699 String[] split = findSplitName( path ); 700 701 if( split.length == 0 ) 702 return null; 703 704 return split[ 0 ]; 705 } 706 707 public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException 708 { 709 long sourceModified = 0; 710 711 while( values.hasNext() ) 712 { 713 Tap source = values.next(); 714 715 if( source instanceof MultiSourceTap ) 716 return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified ); 717 718 sourceModified = source.getModifiedTime( confCopy ); 719 720 // source modified returns zero if does not exist 721 // this should minimize number of times we touch any file meta-data server 722 if( sourceModified == 0 && !source.resourceExists( confCopy ) ) 723 throw new FlowException( "source does not exist: " + source ); 724 725 if( sinkModified < sourceModified ) 726 return sourceModified; 727 } 728 729 return sourceModified; 730 } 731 732 public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException 733 { 734 long sinkModified = Long.MAX_VALUE; 735 736 for( Tap sink : sinks ) 737 { 738 if( sink.isReplace() || sink.isUpdate() ) 739 sinkModified = -1L; 740 else 741 { 742 if( !sink.resourceExists( config ) ) 743 sinkModified = 0L; 744 else 745 sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date 746 } 747 } 748 return sinkModified; 749 } 750 751 public static String getTypeName( Type type ) 752 { 753 if( type == null ) 754 return null; 755 756 return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString(); 757 } 758 759 public static String getSimpleTypeName( Type type ) 760 { 761 if( type == null ) 762 return null; 763 764 return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString(); 765 } 766 767 public static String[] typeNames( Type[] types ) 768 { 769 String[] names = new String[ types.length ]; 770 771 for( int i = 0; i < types.length; i++ ) 772 names[ i ] = getTypeName( types[ i ] ); 773 774 return names; 775 } 776 777 public static String[] simpleTypeNames( Type[] types ) 778 { 779 String[] names = new String[ types.length ]; 780 781 for( int i = 0; i < types.length; i++ ) 782 names[ i ] = getSimpleTypeName( types[ i ] ); 783 784 return names; 785 } 786 787 public static boolean containsNull( Object[] values ) 788 { 789 for( Object value : values ) 790 { 791 if( value == null ) 792 return true; 793 } 794 795 return false; 796 } 797 798 public static void safeSleep( long durationMillis ) 799 { 800 try 801 { 802 Thread.sleep( durationMillis ); 803 } 804 catch( InterruptedException exception ) 805 { 806 // do nothing 807 } 808 } 809 810 public static void writePDF( String path ) 811 { 812 if( !HAS_DOT_EXEC ) 813 return; 814 815 // dot *.dot -Tpdf -O -Nshape=box 816 File file = new File( path ); 817 execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" ); 818 } 819 820 static boolean hasDOT() 821 { 822 return execProcess( null, "which", "dot" ) == 0; 823 } 824 825 public static int execProcess( File parentFile, String... command ) 826 { 827 try 828 { 829 String commandLine = join( command, " " ); 830 831 LOG.debug( "command: {}", commandLine ); 832 833 Process process = Runtime.getRuntime().exec( commandLine, null, parentFile ); 834 835 int result = process.waitFor(); 836 837 BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) ); 838 839 String line = reader.readLine(); 840 841 while( line != null ) 842 { 843 LOG.warn( "{} stdout returned: {}", command[ 0 ], line ); 844 line = reader.readLine(); 845 } 846 847 reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) ); 848 849 line = reader.readLine(); 850 851 while( line != null ) 852 { 853 LOG.warn( "{} stderr returned: {}", command[ 0 ], line ); 854 line = reader.readLine(); 855 } 856 857 return result; 858 } 859 catch( IOException exception ) 860 { 861 LOG.warn( "unable to exec " + command[ 0 ], exception ); 862 } 863 catch( InterruptedException exception ) 864 { 865 LOG.warn( "interrupted exec " + command[ 0 ], exception ); 866 } 867 868 return Integer.MIN_VALUE; 869 } 870 871 public static String formatDurationFromMillis( long duration ) 872 { 873 if( duration / 1000 / 60 / 60 / 24 > 0.0 ) 874 return formatDurationDHMSms( duration ); 875 if( duration / 1000 / 60 / 60 > 0.0 ) 876 return formatDurationHMSms( duration ); 877 else 878 return formatDurationMSms( duration ); 879 } 880 881 public static String formatDurationMSms( long duration ) 882 { 883 long ms = duration % 1000; 884 long durationSeconds = duration / 1000; 885 long seconds = durationSeconds % 60; 886 long minutes = durationSeconds / 60; 887 888 return String.format( "%02d:%02d.%03d", minutes, seconds, ms ); 889 } 890 891 public static String formatDurationHMSms( long duration ) 892 { 893 long ms = duration % 1000; 894 long durationSeconds = duration / 1000; 895 long seconds = durationSeconds % 60; 896 long minutes = ( durationSeconds / 60 ) % 60; 897 long hours = durationSeconds / 60 / 60; 898 899 return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms ); 900 } 901 902 public static String formatDurationDHMSms( long duration ) 903 { 904 long ms = duration % 1000; 905 long durationSeconds = duration / 1000; 906 long seconds = durationSeconds % 60; 907 long minutes = ( durationSeconds / 60 ) % 60; 908 long hours = ( durationSeconds / 60 / 60 ) % 24; 909 long days = durationSeconds / 60 / 60 / 24; 910 911 return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms ); 912 } 913 914 /** 915 * Converts a given comma separated String of Exception names into a List of classes. 916 * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning. 917 * 918 * @param classNames A comma separated String of Exception names. 919 * @return List of Exception classes. 920 */ 921 public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage ) 922 { 923 Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>(); 924 String[] split = classNames.split( "," ); 925 926 // possibly user provided type, load from context 927 ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); 928 929 for( String className : split ) 930 { 931 if( className != null ) 932 className = className.trim(); 933 934 if( isEmpty( className ) ) 935 continue; 936 937 try 938 { 939 Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class ); 940 941 exceptionClasses.add( exceptionClass ); 942 } 943 catch( ClassNotFoundException exception ) 944 { 945 if( !Util.isEmpty( warningMessage ) ) 946 LOG.warn( "{}: {}", warningMessage, className ); 947 } 948 } 949 950 return exceptionClasses; 951 } 952 953 public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception 954 { 955 ExecutorService executor = Executors.newFixedThreadPool( 1 ); 956 957 Future<Boolean> future = executor.submit( task ); 958 959 executor.shutdown(); 960 961 try 962 { 963 return future.get( timeout, timeUnit ); 964 } 965 catch( TimeoutException exception ) 966 { 967 future.cancel( true ); 968 } 969 catch( ExecutionException exception ) 970 { 971 Throwable cause = exception.getCause(); 972 973 if( cause instanceof RuntimeException ) 974 throw (RuntimeException) cause; 975 976 throw (Exception) cause; 977 } 978 979 return null; 980 } 981 982 public interface RetryOperator<T> 983 { 984 T operate() throws Exception; 985 986 boolean rethrow( Exception exception ); 987 } 988 989 public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception 990 { 991 Exception saved = null; 992 993 for( int i = 0; i < retries; i++ ) 994 { 995 try 996 { 997 return operator.operate(); 998 } 999 catch( Exception exception ) 1000 { 1001 if( operator.rethrow( exception ) ) 1002 { 1003 logger.warn( message + ", but not retrying", exception ); 1004 1005 throw exception; 1006 } 1007 1008 saved = exception; 1009 1010 logger.warn( message + ", attempt: " + ( i + 1 ), exception ); 1011 1012 try 1013 { 1014 Thread.sleep( secondsDelay * 1000 ); 1015 } 1016 catch( InterruptedException exception1 ) 1017 { 1018 // do nothing 1019 } 1020 } 1021 } 1022 1023 logger.warn( message + ", done retrying after attempts: " + retries, saved ); 1024 1025 throw saved; 1026 } 1027 1028 public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes ) 1029 { 1030 try 1031 { 1032 Constructor constructor = type.getDeclaredConstructor( parameterTypes ); 1033 1034 constructor.setAccessible( true ); 1035 1036 return constructor.newInstance( parameters ); 1037 } 1038 catch( Exception exception ) 1039 { 1040 LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception ); 1041 1042 throw new FlowException( "unable to instantiate type: " + type.getName(), exception ); 1043 } 1044 } 1045 1046 public static boolean hasClass( String typeString ) 1047 { 1048 try 1049 { 1050 Util.class.getClassLoader().loadClass( typeString ); 1051 1052 return true; 1053 } 1054 catch( ClassNotFoundException exception ) 1055 { 1056 return false; 1057 } 1058 } 1059 1060 public static <T> T newInstance( String className, Object... parameters ) 1061 { 1062 try 1063 { 1064 Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className ); 1065 1066 return newInstance( type, parameters ); 1067 } 1068 catch( ClassNotFoundException exception ) 1069 { 1070 throw new CascadingException( "unable to load class: " + className, exception ); 1071 } 1072 } 1073 1074 public static <T> T newInstance( Class<T> target, Object... parameters ) 1075 { 1076 // using Expression makes sure that constructors using sub-types properly work, otherwise we get a 1077 // NoSuchMethodException. 1078 Expression expr = new Expression( target, "new", parameters ); 1079 1080 try 1081 { 1082 return (T) expr.getValue(); 1083 } 1084 catch( Exception exception ) 1085 { 1086 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1087 } 1088 } 1089 1090 public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes ) 1091 { 1092 Class type = loadClass( typeString ); 1093 1094 return invokeStaticMethod( type, methodName, parameters, parameterTypes ); 1095 } 1096 1097 public static Class<?> loadClass( String typeString ) 1098 { 1099 try 1100 { 1101 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1102 } 1103 catch( ClassNotFoundException exception ) 1104 { 1105 throw new CascadingException( "unable to load class: " + typeString, exception ); 1106 } 1107 } 1108 1109 public static Class<?> loadClassSafe( String typeString ) 1110 { 1111 try 1112 { 1113 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1114 } 1115 catch( ClassNotFoundException exception ) 1116 { 1117 return null; 1118 } 1119 } 1120 1121 public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes ) 1122 { 1123 try 1124 { 1125 Method method = type.getDeclaredMethod( methodName, parameterTypes ); 1126 1127 method.setAccessible( true ); 1128 1129 return method.invoke( null, parameters ); 1130 } 1131 catch( Exception exception ) 1132 { 1133 throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception ); 1134 } 1135 } 1136 1137 public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes ) 1138 { 1139 try 1140 { 1141 return target.getClass().getMethod( methodName, parameterTypes ) != null; 1142 } 1143 catch( NoSuchMethodException exception ) 1144 { 1145 return false; 1146 } 1147 } 1148 1149 public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1150 { 1151 try 1152 { 1153 return invokeInstanceMethod( target, methodName, parameters, parameterTypes ); 1154 } 1155 catch( Exception exception ) 1156 { 1157 return null; 1158 } 1159 } 1160 1161 public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1162 { 1163 try 1164 { 1165 Method method = target.getClass().getMethod( methodName, parameterTypes ); 1166 1167 method.setAccessible( true ); 1168 1169 return method.invoke( target, parameters ); 1170 } 1171 catch( Exception exception ) 1172 { 1173 throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception ); 1174 } 1175 } 1176 1177 public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName ) 1178 { 1179 try 1180 { 1181 return returnInstanceFieldIfExists( target, fieldName ); 1182 } 1183 catch( Exception exception ) 1184 { 1185 // do nothing 1186 return null; 1187 } 1188 } 1189 1190 public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes ) 1191 { 1192 try 1193 { 1194 Class type = Util.class.getClassLoader().loadClass( className ); 1195 1196 return invokeConstructor( type, parameters, parameterTypes ); 1197 } 1198 catch( ClassNotFoundException exception ) 1199 { 1200 throw new CascadingException( "unable to load class: " + className, exception ); 1201 } 1202 } 1203 1204 public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes ) 1205 { 1206 try 1207 { 1208 Constructor<T> constructor = target.getConstructor( parameterTypes ); 1209 1210 constructor.setAccessible( true ); 1211 1212 return constructor.newInstance( parameters ); 1213 } 1214 catch( Exception exception ) 1215 { 1216 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1217 } 1218 } 1219 1220 public static <R> R returnInstanceFieldIfExists( Object target, String fieldName ) 1221 { 1222 try 1223 { 1224 Class<?> type = target.getClass(); 1225 Field field = getDeclaredField( fieldName, type ); 1226 1227 field.setAccessible( true ); 1228 1229 return (R) field.get( target ); 1230 } 1231 catch( Exception exception ) 1232 { 1233 throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1234 } 1235 } 1236 1237 public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value ) 1238 { 1239 try 1240 { 1241 setInstanceFieldIfExists( target, fieldName, value ); 1242 } 1243 catch( Exception exception ) 1244 { 1245 return false; 1246 } 1247 1248 return true; 1249 } 1250 1251 public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value ) 1252 { 1253 try 1254 { 1255 Class<?> type = target.getClass(); 1256 Field field = getDeclaredField( fieldName, type ); 1257 1258 field.setAccessible( true ); 1259 1260 field.set( target, value ); 1261 } 1262 catch( Exception exception ) 1263 { 1264 throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1265 } 1266 } 1267 1268 private static Field getDeclaredField( String fieldName, Class<?> type ) 1269 { 1270 if( type == Object.class ) 1271 { 1272 if( LOG.isDebugEnabled() ) 1273 LOG.debug( "did not find {} field on {}", fieldName, type.getName() ); 1274 1275 return null; 1276 } 1277 1278 try 1279 { 1280 return type.getDeclaredField( fieldName ); 1281 } 1282 catch( NoSuchFieldException exception ) 1283 { 1284 return getDeclaredField( fieldName, type.getSuperclass() ); 1285 } 1286 } 1287 1288 public static String makePath( String prefix, String name ) 1289 { 1290 if( name == null || name.isEmpty() ) 1291 throw new IllegalArgumentException( "name may not be null or empty " ); 1292 1293 if( prefix == null || prefix.isEmpty() ) 1294 prefix = Long.toString( (long) ( Math.random() * 10000000000L ) ); 1295 1296 name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) ); 1297 1298 return prefix + "/" + name + "/"; 1299 } 1300 1301 public static String cleansePathName( String name ) 1302 { 1303 return name.replaceAll( "\\s+|\\*|\\+|/+", "_" ); 1304 } 1305 1306 public static Class findMainClass( Class defaultType, String packageExclude ) 1307 { 1308 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 1309 1310 for( StackTraceElement stackTraceElement : stackTrace ) 1311 { 1312 if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) ) 1313 { 1314 try 1315 { 1316 LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() ); 1317 1318 return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() ); 1319 } 1320 catch( ClassNotFoundException exception ) 1321 { 1322 LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception ); 1323 } 1324 } 1325 } 1326 1327 LOG.info( "using default application jar, may cause class not found exceptions on the cluster" ); 1328 1329 return defaultType; 1330 } 1331 1332 public static String findContainingJar( Class<?> type ) 1333 { 1334 ClassLoader classLoader = type.getClassLoader(); 1335 1336 String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class"; 1337 1338 try 1339 { 1340 for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); ) 1341 { 1342 URL url = iterator.nextElement(); 1343 1344 if( !"jar".equals( url.getProtocol() ) ) 1345 continue; 1346 1347 String path = url.getPath(); 1348 1349 if( path.startsWith( "file:" ) ) 1350 path = path.substring( "file:".length() ); 1351 1352 path = URLDecoder.decode( path, "UTF-8" ); 1353 1354 return path.replaceAll( "!.*$", "" ); 1355 } 1356 } 1357 catch( IOException exception ) 1358 { 1359 throw new CascadingException( exception ); 1360 } 1361 1362 return null; 1363 } 1364 1365 public static boolean containsWhitespace( String string ) 1366 { 1367 return Pattern.compile( "\\s" ).matcher( string ).find(); 1368 } 1369 }