001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.util;
023
024import java.beans.Expression;
025import java.io.BufferedReader;
026import java.io.File;
027import java.io.FileWriter;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.Writer;
031import java.lang.reflect.Constructor;
032import java.lang.reflect.Field;
033import java.lang.reflect.Method;
034import java.lang.reflect.Type;
035import java.net.URL;
036import java.net.URLDecoder;
037import java.security.MessageDigest;
038import java.security.NoSuchAlgorithmException;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.Enumeration;
044import java.util.HashMap;
045import java.util.HashSet;
046import java.util.IdentityHashMap;
047import java.util.Iterator;
048import java.util.LinkedHashSet;
049import java.util.List;
050import java.util.Map;
051import java.util.Set;
052import java.util.TreeSet;
053import java.util.UUID;
054import java.util.concurrent.Callable;
055import java.util.concurrent.ExecutionException;
056import java.util.concurrent.ExecutorService;
057import java.util.concurrent.Executors;
058import java.util.concurrent.Future;
059import java.util.concurrent.TimeUnit;
060import java.util.concurrent.TimeoutException;
061import java.util.regex.Pattern;
062
063import cascading.CascadingException;
064import cascading.flow.FlowException;
065import cascading.tap.MultiSourceTap;
066import cascading.tap.Tap;
067import cascading.tuple.coerce.Coercions;
068import cascading.util.jgrapht.ComponentAttributeProvider;
069import cascading.util.jgrapht.DOTExporter;
070import cascading.util.jgrapht.EdgeNameProvider;
071import cascading.util.jgrapht.IntegerNameProvider;
072import cascading.util.jgrapht.VertexNameProvider;
073import org.jgrapht.Graph;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/** Class Util provides reusable operations. */
078public class Util
079  {
080  /**
081   * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file
082   * to a pdf document.
083   */
084  public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled";
085  public static int ID_LENGTH = 32;
086
087  private static final Logger LOG = LoggerFactory.getLogger( Util.class );
088  private static final String HEXES = "0123456789ABCDEF";
089
090  public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() );
091  public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT();
092
093  public static <K, V> HashMap<K, V> createHashMap()
094    {
095    return new HashMap<K, V>();
096    }
097
098  public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to )
099    {
100    boolean dupes = false;
101
102    for( Map.Entry<V, K> entry : from.entrySet() )
103      dupes |= to.put( entry.getValue(), entry.getKey() ) != null;
104
105    return dupes;
106    }
107
108  public static <V> Set<V> createIdentitySet()
109    {
110    return Collections.<V>newSetFromMap( new IdentityHashMap() );
111    }
112
113  public static <V> Set<V> createIdentitySet( Collection<V> collection )
114    {
115    Set<V> identitySet = createIdentitySet();
116
117    if( collection != null )
118      identitySet.addAll( collection );
119
120    return identitySet;
121    }
122
123  public static <V> V getFirst( Collection<V> collection )
124    {
125    if( collection == null || collection.isEmpty() )
126      return null;
127
128    return collection.iterator().next();
129    }
130
131  public static <V> V getFirst( Iterator<V> iterator )
132    {
133    if( iterator == null || !iterator.hasNext() )
134      return null;
135
136    return iterator.next();
137    }
138
139  public static <V> V getLast( Iterator<V> iterator )
140    {
141    if( iterator == null || !iterator.hasNext() )
142      return null;
143
144    V v = iterator.next();
145
146    while( iterator.hasNext() )
147      v = iterator.next();
148
149    return v;
150    }
151
152  public static <T> List<T> asList( T t, T[] ts )
153    {
154    List<T> list = new ArrayList<>( 1 + ts.length );
155
156    list.add( t );
157    Collections.addAll( list, ts );
158
159    return list;
160    }
161
162  public static <N extends Number> N max( Collection<N> collection )
163    {
164    return new TreeSet<>( collection ).first();
165    }
166
167  public static <N extends Number> N min( Collection<N> collection )
168    {
169    return new TreeSet<>( collection ).last();
170    }
171
172  public static <T> Set<T> narrowSet( Class<T> type, Collection collection )
173    {
174    return narrowSet( type, collection.iterator() );
175    }
176
177  public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection )
178    {
179    return narrowIdentitySet( type, collection.iterator() );
180    }
181
182  public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include )
183    {
184    return narrowSet( type, collection.iterator(), include );
185    }
186
187  public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection, boolean include )
188    {
189    return narrowIdentitySet( type, collection.iterator(), include );
190    }
191
192  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator )
193    {
194    return narrowSet( type, iterator, true );
195    }
196
197  public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator )
198    {
199    return narrowIdentitySet( type, iterator, true );
200    }
201
202  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include )
203    {
204    return narrowSetInternal( type, iterator, include, new HashSet<T>() );
205    }
206
207  public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator, boolean include )
208    {
209    return narrowSetInternal( type, iterator, include, Util.<T>createIdentitySet() );
210    }
211
212  private static <T> Set<T> narrowSetInternal( Class<T> type, Iterator iterator, boolean include, Set<T> set )
213    {
214    while( iterator.hasNext() )
215      {
216      Object o = iterator.next();
217
218      if( type.isInstance( o ) == include )
219        set.add( (T) o );
220      }
221
222    return set;
223    }
224
225  public static <T> boolean contains( Class<T> type, Collection collection )
226    {
227    return contains( type, collection.iterator() );
228    }
229
230  public static <T> boolean contains( Class<T> type, Iterator iterator )
231    {
232    while( iterator.hasNext() )
233      {
234      Object o = iterator.next();
235
236      if( type.isInstance( o ) )
237        return true;
238      }
239
240    return false;
241    }
242
243  public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs )
244    {
245    Set<T> diff = createIdentitySet( lhs );
246
247    diff.removeAll( rhs );
248
249    return diff;
250    }
251
252  public static synchronized String createUniqueIDWhichStartsWithAChar()
253    {
254    String value;
255
256    do
257      {
258      value = createUniqueID();
259      }
260    while( Character.isDigit( value.charAt( 0 ) ) );
261
262    return value;
263    }
264
265  public static synchronized String createUniqueID()
266    {
267    // creates a cryptographically secure random value
268    String value = UUID.randomUUID().toString();
269    return value.toUpperCase().replaceAll( "-", "" );
270    }
271
272  public static String createID( String rawID )
273    {
274    return createID( rawID.getBytes() );
275    }
276
277  /**
278   * Method CreateID returns a HEX hash of the given bytes with length 32 characters long.
279   *
280   * @param bytes the bytes
281   * @return string
282   */
283  public static String createID( byte[] bytes )
284    {
285    try
286      {
287      return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) );
288      }
289    catch( NoSuchAlgorithmException exception )
290      {
291      throw new RuntimeException( "unable to digest string" );
292      }
293    }
294
295  public static String getHex( byte[] bytes )
296    {
297    if( bytes == null )
298      return null;
299
300    final StringBuilder hex = new StringBuilder( 2 * bytes.length );
301
302    for( final byte b : bytes )
303      hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) );
304
305    return hex.toString();
306    }
307
308  public static byte[] longToByteArray( long value )
309    {
310    return new byte[]{
311      (byte) ( value >> 56 ),
312      (byte) ( value >> 48 ),
313      (byte) ( value >> 40 ),
314      (byte) ( value >> 32 ),
315      (byte) ( value >> 24 ),
316      (byte) ( value >> 16 ),
317      (byte) ( value >> 8 ),
318      (byte) value
319    };
320    }
321
322  public static byte[] intToByteArray( int value )
323    {
324    return new byte[]{
325      (byte) ( value >> 24 ),
326      (byte) ( value >> 16 ),
327      (byte) ( value >> 8 ),
328      (byte) value
329    };
330    }
331
332  public static <T> T[] copy( T[] source )
333    {
334    if( source == null )
335      return null;
336
337    return Arrays.copyOf( source, source.length );
338    }
339
340  public static String unique( String value, String delim )
341    {
342    String[] split = value.split( delim );
343
344    Set<String> values = new LinkedHashSet<String>();
345
346    Collections.addAll( values, split );
347
348    return join( values, delim );
349    }
350
351  /**
352   * This method joins the values in the given list with the delim String value.
353   *
354   * @param list
355   * @param delim
356   * @return String
357   */
358  public static String join( int[] list, String delim )
359    {
360    return join( list, delim, false );
361    }
362
363  public static String join( int[] list, String delim, boolean printNull )
364    {
365    StringBuffer buffer = new StringBuffer();
366    int count = 0;
367
368    for( Object s : list )
369      {
370      if( count != 0 )
371        buffer.append( delim );
372
373      if( printNull || s != null )
374        buffer.append( s );
375
376      count++;
377      }
378
379    return buffer.toString();
380    }
381
382  public static String join( String delim, String... strings )
383    {
384    return join( delim, false, strings );
385    }
386
387  public static String join( String delim, boolean printNull, String... strings )
388    {
389    return join( strings, delim, printNull );
390    }
391
392  /**
393   * This method joins the values in the given list with the delim String value.
394   *
395   * @param list
396   * @param delim
397   * @return a String
398   */
399  public static String join( Object[] list, String delim )
400    {
401    return join( list, delim, false );
402    }
403
404  public static String join( Object[] list, String delim, boolean printNull )
405    {
406    return join( list, delim, printNull, 0 );
407    }
408
409  public static String join( Object[] list, String delim, boolean printNull, int beginAt )
410    {
411    return join( list, delim, printNull, beginAt, list.length - beginAt );
412    }
413
414  public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length )
415    {
416    StringBuffer buffer = new StringBuffer();
417    int count = 0;
418
419    for( int i = beginAt; i < beginAt + length; i++ )
420      {
421      Object s = list[ i ];
422      if( count != 0 )
423        buffer.append( delim );
424
425      if( printNull || s != null )
426        buffer.append( s );
427
428      count++;
429      }
430
431    return buffer.toString();
432    }
433
434  public static String join( Iterable iterable, String delim, boolean printNull )
435    {
436    int count = 0;
437
438    StringBuilder buffer = new StringBuilder();
439
440    for( Object s : iterable )
441      {
442      if( count != 0 )
443        buffer.append( delim );
444
445      if( printNull || s != null )
446        buffer.append( s );
447
448      count++;
449      }
450
451    return buffer.toString();
452    }
453
454  /**
455   * This method joins each value in the collection with a tab character as the delimiter.
456   *
457   * @param collection
458   * @return a String
459   */
460  public static String join( Collection collection )
461    {
462    return join( collection, "\t" );
463    }
464
465  /**
466   * This method joins each valuein the collection with the given delimiter.
467   *
468   * @param collection
469   * @param delim
470   * @return a String
471   */
472  public static String join( Collection collection, String delim )
473    {
474    return join( collection, delim, false );
475    }
476
477  public static String join( Collection collection, String delim, boolean printNull )
478    {
479    StringBuffer buffer = new StringBuffer();
480
481    join( buffer, collection, delim, printNull );
482
483    return buffer.toString();
484    }
485
486  /**
487   * This method joins each value in the collection with the given delimiter. All results are appended to the
488   * given {@link StringBuffer} instance.
489   *
490   * @param buffer
491   * @param collection
492   * @param delim
493   */
494  public static void join( StringBuffer buffer, Collection collection, String delim )
495    {
496    join( buffer, collection, delim, false );
497    }
498
499  public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull )
500    {
501    int count = 0;
502
503    for( Object s : collection )
504      {
505      if( count != 0 )
506        buffer.append( delim );
507
508      if( printNull || s != null )
509        buffer.append( s );
510
511      count++;
512      }
513    }
514
515  public static <T> List<T> split( Class<T> type, String values )
516    {
517    return split( type, ",", values );
518    }
519
520  public static <T> List<T> split( Class<T> type, String delim, String values )
521    {
522    List<T> results = new ArrayList<>();
523
524    if( values == null )
525      return results;
526
527    String[] split = values.split( delim );
528
529    for( String value : split )
530      results.add( Coercions.<T>coerce( value, type ) );
531
532    return results;
533    }
534
535  public static String[] removeNulls( String... strings )
536    {
537    List<String> list = new ArrayList<String>();
538
539    for( String string : strings )
540      {
541      if( string != null )
542        list.add( string );
543      }
544
545    return list.toArray( new String[ list.size() ] );
546    }
547
548  public static Collection<String> quote( Collection<String> collection, String quote )
549    {
550    List<String> list = new ArrayList<String>();
551
552    for( String string : collection )
553      list.add( quote + string + quote );
554
555    return list;
556    }
557
558  public static String print( Collection collection, String delim )
559    {
560    StringBuffer buffer = new StringBuffer();
561
562    print( buffer, collection, delim );
563
564    return buffer.toString();
565    }
566
567  public static void print( StringBuffer buffer, Collection collection, String delim )
568    {
569    int count = 0;
570
571    for( Object s : collection )
572      {
573      if( count != 0 )
574        buffer.append( delim );
575
576      buffer.append( "[" );
577      buffer.append( s );
578      buffer.append( "]" );
579
580      count++;
581      }
582    }
583
584  /**
585   * This method attempts to remove any username and password from the given url String.
586   *
587   * @param url
588   * @return a String
589   */
590  public static String sanitizeUrl( String url )
591    {
592    if( url == null )
593      return null;
594
595    return url.replaceAll( "(?<=//).*:.*@", "" );
596    }
597
598  /**
599   * This method attempts to remove duplicate consecutive forward slashes from the given url.
600   *
601   * @param url
602   * @return a String
603   */
604  public static String normalizeUrl( String url )
605    {
606    if( url == null )
607      return null;
608
609    return url.replaceAll( "([^:]/)/{2,}", "$1/" );
610    }
611
612  /**
613   * This method returns the {@link Object#toString()} of the given object, or an empty String if the object
614   * is null.
615   *
616   * @param object
617   * @return a String
618   */
619  public static String toNull( Object object )
620    {
621    if( object == null )
622      return "";
623
624    return object.toString();
625    }
626
627  /**
628   * This method truncates the given String value to the given size, but appends an ellipse ("...") if the
629   * String is larger than maxSize.
630   *
631   * @param string
632   * @param maxSize
633   * @return a String
634   */
635  public static String truncate( String string, int maxSize )
636    {
637    string = toNull( string );
638
639    if( string.length() <= maxSize )
640      return string;
641
642    return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) );
643    }
644
645  public static void printGraph( String filename, Graph graph )
646    {
647    try
648      {
649      new File( filename ).getParentFile().mkdirs();
650      Writer writer = new FileWriter( filename );
651
652      try
653        {
654        printGraph( writer, graph );
655        }
656      finally
657        {
658        writer.close();
659        }
660      }
661    catch( IOException exception )
662      {
663      LOG.error( "failed printing graph to {}, with exception: {}", filename, exception );
664      }
665    }
666
667  @SuppressWarnings({"unchecked"})
668  private static void printGraph( Writer writer, Graph graph )
669    {
670    DOTExporter dot = new DOTExporter( new IntegerNameProvider(), object ->
671    {
672    if( object == null )
673      return "none";
674
675    return object.toString().replaceAll( "\"", "\'" );
676    }, (EdgeNameProvider<Object>) object ->
677    {
678    if( object == null )
679      return "none";
680
681    return object.toString().replaceAll( "\"", "\'" );
682    }
683    );
684
685    dot.export( writer, graph );
686    }
687
688  /**
689   * This method removes all nulls from the given List.
690   *
691   * @param list
692   */
693  @SuppressWarnings({"StatementWithEmptyBody"})
694  public static void removeAllNulls( List list )
695    {
696    while( list.remove( null ) )
697      ;
698    }
699
700  public static void writeDOT( Writer writer, Graph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider )
701    {
702    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph );
703    }
704
705  public static void writeDOT( Writer writer, Graph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider,
706                               ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider )
707    {
708    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph );
709    }
710
711  public static String asString( Object object )
712    {
713    return object == null ? null : object.toString();
714    }
715
716  public static boolean isEmpty( String string )
717    {
718    return string == null || string.isEmpty();
719    }
720
721  private static String[] findSplitName( String path )
722    {
723    String separator = "/";
724
725    if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) )
726      separator = "\\\\";
727
728    String[] split = path.split( separator );
729
730    path = split[ split.length - 1 ];
731
732    path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar
733
734    return path.split( "-(?=\\d)", 2 );
735    }
736
737  public static String findVersion( String path )
738    {
739    if( path == null || path.isEmpty() )
740      return null;
741
742    String[] split = findSplitName( path );
743
744    if( split.length == 2 )
745      return split[ 1 ];
746
747    return null;
748    }
749
750  public static String findName( String path )
751    {
752    if( path == null || path.isEmpty() )
753      return null;
754
755    String[] split = findSplitName( path );
756
757    if( split.length == 0 )
758      return null;
759
760    return split[ 0 ];
761    }
762
763  public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException
764    {
765    long sourceModified = 0;
766
767    while( values.hasNext() )
768      {
769      Tap source = values.next();
770
771      if( source instanceof MultiSourceTap )
772        return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified );
773
774      sourceModified = source.getModifiedTime( confCopy );
775
776      // source modified returns zero if does not exist
777      // this should minimize number of times we touch any file meta-data server
778      if( sourceModified == 0 && !source.resourceExists( confCopy ) )
779        throw new FlowException( "source does not exist: " + source );
780
781      if( sinkModified < sourceModified )
782        return sourceModified;
783      }
784
785    return sourceModified;
786    }
787
788  public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException
789    {
790    long sinkModified = Long.MAX_VALUE;
791
792    for( Tap sink : sinks )
793      {
794      if( sink.isReplace() || sink.isUpdate() )
795        sinkModified = -1L;
796      else
797        {
798        if( !sink.resourceExists( config ) )
799          sinkModified = 0L;
800        else
801          sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date
802        }
803      }
804
805    return sinkModified;
806    }
807
808  public static String getTypeName( Type type )
809    {
810    if( type == null )
811      return null;
812
813    return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString();
814    }
815
816  public static String getSimpleTypeName( Type type )
817    {
818    if( type == null )
819      return null;
820
821    return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString();
822    }
823
824  public static String[] typeNames( Type[] types )
825    {
826    String[] names = new String[ types.length ];
827
828    for( int i = 0; i < types.length; i++ )
829      names[ i ] = getTypeName( types[ i ] );
830
831    return names;
832    }
833
834  public static String[] simpleTypeNames( Type[] types )
835    {
836    String[] names = new String[ types.length ];
837
838    for( int i = 0; i < types.length; i++ )
839      names[ i ] = getSimpleTypeName( types[ i ] );
840
841    return names;
842    }
843
844  public static boolean containsNull( Object[] values )
845    {
846    for( Object value : values )
847      {
848      if( value == null )
849        return true;
850      }
851
852    return false;
853    }
854
855  public static void safeSleep( long durationMillis )
856    {
857    try
858      {
859      Thread.sleep( durationMillis );
860      }
861    catch( InterruptedException exception )
862      {
863      // do nothing
864      }
865    }
866
867  public static void writePDF( String path )
868    {
869    if( !HAS_DOT_EXEC )
870      return;
871
872    // dot *.dot -Tpdf -O -Nshape=box
873    File file = new File( path );
874    execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" );
875    }
876
877  static boolean hasDOT()
878    {
879    return execProcess( null, "which", "dot" ) == 0;
880    }
881
882  public static int execProcess( File parentFile, String... command )
883    {
884    try
885      {
886      String commandLine = join( command, " " );
887
888      LOG.debug( "command: {}", commandLine );
889
890      Process process = Runtime.getRuntime().exec( commandLine, null, parentFile );
891
892      int result = process.waitFor();
893
894      BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) );
895
896      String line = reader.readLine();
897
898      while( line != null )
899        {
900        LOG.warn( "{} stdout returned: {}", command[ 0 ], line );
901        line = reader.readLine();
902        }
903
904      reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) );
905
906      line = reader.readLine();
907
908      while( line != null )
909        {
910        LOG.warn( "{} stderr returned: {}", command[ 0 ], line );
911        line = reader.readLine();
912        }
913
914      return result;
915      }
916    catch( IOException exception )
917      {
918      LOG.warn( "unable to exec " + command[ 0 ], exception );
919      }
920    catch( InterruptedException exception )
921      {
922      LOG.warn( "interrupted exec " + command[ 0 ], exception );
923      }
924
925    return Integer.MIN_VALUE;
926    }
927
928  public static String formatDurationFromMillis( long duration )
929    {
930    if( duration / 1000 / 60 / 60 / 24 > 0.0 )
931      return formatDurationDHMSms( duration );
932    if( duration / 1000 / 60 / 60 > 0.0 )
933      return formatDurationHMSms( duration );
934    else
935      return formatDurationMSms( duration );
936    }
937
938  public static String formatDurationMSms( long duration )
939    {
940    long ms = duration % 1000;
941    long durationSeconds = duration / 1000;
942    long seconds = durationSeconds % 60;
943    long minutes = durationSeconds / 60;
944
945    return String.format( "%02d:%02d.%03d", minutes, seconds, ms );
946    }
947
948  public static String formatDurationHMSms( long duration )
949    {
950    long ms = duration % 1000;
951    long durationSeconds = duration / 1000;
952    long seconds = durationSeconds % 60;
953    long minutes = ( durationSeconds / 60 ) % 60;
954    long hours = durationSeconds / 60 / 60;
955
956    return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms );
957    }
958
959  public static String formatDurationDHMSms( long duration )
960    {
961    long ms = duration % 1000;
962    long durationSeconds = duration / 1000;
963    long seconds = durationSeconds % 60;
964    long minutes = ( durationSeconds / 60 ) % 60;
965    long hours = ( durationSeconds / 60 / 60 ) % 24;
966    long days = durationSeconds / 60 / 60 / 24;
967
968    return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms );
969    }
970
971  /**
972   * Converts a given comma separated String of Exception names into a List of classes.
973   * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning.
974   *
975   * @param classNames A comma separated String of Exception names.
976   * @return List of Exception classes.
977   */
978  public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage )
979    {
980    Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>();
981    String[] split = classNames.split( "," );
982
983    // possibly user provided type, load from context
984    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
985
986    for( String className : split )
987      {
988      if( className != null )
989        className = className.trim();
990
991      if( isEmpty( className ) )
992        continue;
993
994      try
995        {
996        Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class );
997
998        exceptionClasses.add( exceptionClass );
999        }
1000      catch( ClassNotFoundException exception )
1001        {
1002        if( !Util.isEmpty( warningMessage ) )
1003          LOG.warn( "{}: {}", warningMessage, className );
1004        }
1005      }
1006
1007    return exceptionClasses;
1008    }
1009
1010  public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception
1011    {
1012    ExecutorService executor = Executors.newFixedThreadPool( 1 );
1013
1014    Future<Boolean> future = executor.submit( task );
1015
1016    executor.shutdown();
1017
1018    try
1019      {
1020      return future.get( timeout, timeUnit );
1021      }
1022    catch( TimeoutException exception )
1023      {
1024      future.cancel( true );
1025      }
1026    catch( ExecutionException exception )
1027      {
1028      Throwable cause = exception.getCause();
1029
1030      if( cause instanceof RuntimeException )
1031        throw (RuntimeException) cause;
1032
1033      throw (Exception) cause;
1034      }
1035
1036    return null;
1037    }
1038
1039  public interface RetryOperator<T>
1040    {
1041    T operate() throws Exception;
1042
1043    boolean rethrow( Exception exception );
1044    }
1045
1046  public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception
1047    {
1048    Exception saved = null;
1049
1050    for( int i = 0; i < retries; i++ )
1051      {
1052      try
1053        {
1054        return operator.operate();
1055        }
1056      catch( Exception exception )
1057        {
1058        if( operator.rethrow( exception ) )
1059          {
1060          logger.warn( message + ", but not retrying", exception );
1061
1062          throw exception;
1063          }
1064
1065        saved = exception;
1066
1067        logger.warn( message + ", attempt: " + ( i + 1 ), exception );
1068
1069        try
1070          {
1071          Thread.sleep( secondsDelay * 1000 );
1072          }
1073        catch( InterruptedException exception1 )
1074          {
1075          // do nothing
1076          }
1077        }
1078      }
1079
1080    logger.warn( message + ", done retrying after attempts: " + retries, saved );
1081
1082    throw saved;
1083    }
1084
1085  public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes )
1086    {
1087    try
1088      {
1089      Constructor constructor = type.getDeclaredConstructor( parameterTypes );
1090
1091      constructor.setAccessible( true );
1092
1093      return constructor.newInstance( parameters );
1094      }
1095    catch( Exception exception )
1096      {
1097      LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception );
1098
1099      throw new FlowException( "unable to instantiate type: " + type.getName(), exception );
1100      }
1101    }
1102
1103  public static boolean hasClass( String typeString )
1104    {
1105    try
1106      {
1107      Util.class.getClassLoader().loadClass( typeString );
1108
1109      return true;
1110      }
1111    catch( ClassNotFoundException exception )
1112      {
1113      return false;
1114      }
1115    }
1116
1117  public static <T> T newInstance( String className, Object... parameters )
1118    {
1119    try
1120      {
1121      Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className );
1122
1123      return newInstance( type, parameters );
1124      }
1125    catch( ClassNotFoundException exception )
1126      {
1127      throw new CascadingException( "unable to load class: " + className, exception );
1128      }
1129    }
1130
1131  public static <T> T newInstance( Class<T> target, Object... parameters )
1132    {
1133    // using Expression makes sure that constructors using sub-types properly work, otherwise we get a
1134    // NoSuchMethodException.
1135    Expression expr = new Expression( target, "new", parameters );
1136
1137    try
1138      {
1139      return (T) expr.getValue();
1140      }
1141    catch( Exception exception )
1142      {
1143      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1144      }
1145    }
1146
1147  public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes )
1148    {
1149    Class type = loadClass( typeString );
1150
1151    return invokeStaticMethod( type, methodName, parameters, parameterTypes );
1152    }
1153
1154  public static Class<?> loadClass( String typeString )
1155    {
1156    try
1157      {
1158      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1159      }
1160    catch( ClassNotFoundException exception )
1161      {
1162      throw new CascadingException( "unable to load class: " + typeString, exception );
1163      }
1164    }
1165
1166  public static Class<?> loadClassSafe( String typeString )
1167    {
1168    try
1169      {
1170      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1171      }
1172    catch( ClassNotFoundException exception )
1173      {
1174      return null;
1175      }
1176    }
1177
1178  public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes )
1179    {
1180    try
1181      {
1182      Method method = type.getDeclaredMethod( methodName, parameterTypes );
1183
1184      method.setAccessible( true );
1185
1186      return method.invoke( null, parameters );
1187      }
1188    catch( Exception exception )
1189      {
1190      throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception );
1191      }
1192    }
1193
1194  public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes )
1195    {
1196    try
1197      {
1198      return target.getClass().getMethod( methodName, parameterTypes ) != null;
1199      }
1200    catch( NoSuchMethodException exception )
1201      {
1202      return false;
1203      }
1204    }
1205
1206  public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1207    {
1208    try
1209      {
1210      return invokeInstanceMethod( target, methodName, parameters, parameterTypes );
1211      }
1212    catch( Exception exception )
1213      {
1214      return null;
1215      }
1216    }
1217
1218  public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1219    {
1220    try
1221      {
1222      Method method = target.getClass().getMethod( methodName, parameterTypes );
1223
1224      method.setAccessible( true );
1225
1226      return method.invoke( target, parameters );
1227      }
1228    catch( Exception exception )
1229      {
1230      throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception );
1231      }
1232    }
1233
1234  public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName )
1235    {
1236    try
1237      {
1238      return returnInstanceFieldIfExists( target, fieldName );
1239      }
1240    catch( Exception exception )
1241      {
1242      // do nothing
1243      return null;
1244      }
1245    }
1246
1247  public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes )
1248    {
1249    try
1250      {
1251      Class type = Util.class.getClassLoader().loadClass( className );
1252
1253      return invokeConstructor( type, parameters, parameterTypes );
1254      }
1255    catch( ClassNotFoundException exception )
1256      {
1257      throw new CascadingException( "unable to load class: " + className, exception );
1258      }
1259    }
1260
1261  public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes )
1262    {
1263    try
1264      {
1265      Constructor<T> constructor = target.getConstructor( parameterTypes );
1266
1267      constructor.setAccessible( true );
1268
1269      return constructor.newInstance( parameters );
1270      }
1271    catch( Exception exception )
1272      {
1273      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1274      }
1275    }
1276
1277  public static <R> R returnInstanceFieldIfExists( Object target, String fieldName )
1278    {
1279    try
1280      {
1281      Class<?> type = target.getClass();
1282      Field field = getDeclaredField( fieldName, type );
1283
1284      field.setAccessible( true );
1285
1286      return (R) field.get( target );
1287      }
1288    catch( Exception exception )
1289      {
1290      throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception );
1291      }
1292    }
1293
1294  public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value )
1295    {
1296    try
1297      {
1298      setInstanceFieldIfExists( target, fieldName, value );
1299      }
1300    catch( Exception exception )
1301      {
1302      return false;
1303      }
1304
1305    return true;
1306    }
1307
1308  public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value )
1309    {
1310    try
1311      {
1312      Class<?> type = target.getClass();
1313      Field field = getDeclaredField( fieldName, type );
1314
1315      field.setAccessible( true );
1316
1317      field.set( target, value );
1318      }
1319    catch( Exception exception )
1320      {
1321      throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception );
1322      }
1323    }
1324
1325  private static Field getDeclaredField( String fieldName, Class<?> type )
1326    {
1327    if( type == Object.class )
1328      {
1329      if( LOG.isDebugEnabled() )
1330        LOG.debug( "did not find {} field on {}", fieldName, type.getName() );
1331
1332      return null;
1333      }
1334
1335    try
1336      {
1337      return type.getDeclaredField( fieldName );
1338      }
1339    catch( NoSuchFieldException exception )
1340      {
1341      return getDeclaredField( fieldName, type.getSuperclass() );
1342      }
1343    }
1344
1345  public static String makePath( String prefix, String name )
1346    {
1347    if( name == null || name.isEmpty() )
1348      throw new IllegalArgumentException( "name may not be null or empty " );
1349
1350    if( prefix == null || prefix.isEmpty() )
1351      prefix = Long.toString( (long) ( Math.random() * 10000000000L ) );
1352
1353    name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) );
1354
1355    return prefix + "/" + name + "/";
1356    }
1357
1358  public static String cleansePathName( String name )
1359    {
1360    return name.replaceAll( "\\s+|\\*|\\+|/+", "_" );
1361    }
1362
1363  public static Class findMainClass( Class defaultType, String packageExclude )
1364    {
1365    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
1366
1367    for( StackTraceElement stackTraceElement : stackTrace )
1368      {
1369      if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) )
1370        {
1371        try
1372          {
1373          LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() );
1374
1375          return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() );
1376          }
1377        catch( ClassNotFoundException exception )
1378          {
1379          LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception );
1380          }
1381        }
1382      }
1383
1384    LOG.info( "using default application jar, may cause class not found exceptions on the cluster" );
1385
1386    return defaultType;
1387    }
1388
1389  public static String findContainingJar( Class<?> type )
1390    {
1391    ClassLoader classLoader = type.getClassLoader();
1392
1393    String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class";
1394
1395    try
1396      {
1397      for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); )
1398        {
1399        URL url = iterator.nextElement();
1400
1401        if( !"jar".equals( url.getProtocol() ) )
1402          continue;
1403
1404        String path = url.getPath();
1405
1406        if( path.startsWith( "file:" ) )
1407          path = path.substring( "file:".length() );
1408
1409        path = URLDecoder.decode( path, "UTF-8" );
1410
1411        return path.replaceAll( "!.*$", "" );
1412        }
1413      }
1414    catch( IOException exception )
1415      {
1416      throw new CascadingException( exception );
1417      }
1418
1419    return null;
1420    }
1421
1422  public static boolean containsWhitespace( String string )
1423    {
1424    return Pattern.compile( "\\s" ).matcher( string ).find();
1425    }
1426
1427  public static String parseHostname( String uri )
1428    {
1429    if( isEmpty( uri ) )
1430      return null;
1431
1432    String[] parts = uri.split( "://", 2 );
1433    String result;
1434
1435    // missing protocol
1436    result = parts[ parts.length - 1 ];
1437
1438    // user:pass@hostname:port/stuff
1439    parts = result.split( "/", 2 );
1440    result = parts[ 0 ];
1441
1442    // user:pass@hostname:port
1443    parts = result.split( "@", 2 );
1444    result = parts[ parts.length - 1 ];
1445
1446    // hostname:port
1447    parts = result.split( ":", 2 );
1448    result = parts[ 0 ];
1449
1450    return result;
1451    }
1452  }