001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.planner;
022    
023    import java.net.URI;
024    import java.util.Collections;
025    import java.util.Comparator;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Properties;
032    import java.util.Set;
033    import java.util.TreeSet;
034    
035    import cascading.flow.FlowConnector;
036    import cascading.flow.FlowDef;
037    import cascading.flow.FlowElement;
038    import cascading.flow.hadoop.HadoopFlow;
039    import cascading.flow.hadoop.util.HadoopUtil;
040    import cascading.flow.planner.ElementGraph;
041    import cascading.flow.planner.ElementGraphs;
042    import cascading.flow.planner.FlowPlanner;
043    import cascading.flow.planner.FlowStepGraph;
044    import cascading.flow.planner.PlatformInfo;
045    import cascading.flow.planner.Scope;
046    import cascading.pipe.CoGroup;
047    import cascading.pipe.Every;
048    import cascading.pipe.Group;
049    import cascading.pipe.Pipe;
050    import cascading.property.AppProps;
051    import cascading.property.PropertyUtil;
052    import cascading.tap.Tap;
053    import cascading.tap.hadoop.Hfs;
054    import cascading.tap.hadoop.util.TempHfs;
055    import cascading.util.Util;
056    import org.apache.hadoop.mapred.JobConf;
057    import org.jgrapht.GraphPath;
058    import org.jgrapht.Graphs;
059    import org.slf4j.Logger;
060    import org.slf4j.LoggerFactory;
061    
062    import static cascading.flow.planner.ElementGraphs.getAllShortestPathsBetween;
063    
064    /**
065     * Class HadoopPlanner is the core Hadoop MapReduce planner.
066     * <p/>
067     * Notes:
068     * <p/>
069     * <strong>Custom JobConf properties</strong><br/>
070     * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)}
071     * on a map properties object before constructing a new {@link cascading.flow.hadoop.HadoopFlowConnector}.
072     * <p/>
073     * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector.
074     * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting
075     * Flow instances.
076     * <p/>
077     * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop
078     * to spawn all child jvms with a heap of 512MB.
079     */
080    public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf>
081      {
082      /** Field LOG */
083      private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class );
084    
085      /** Field jobConf */
086      private JobConf jobConf;
087      /** Field intermediateSchemeClass */
088      private Class intermediateSchemeClass;
089    
090      /**
091       * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass
092       * custom default Hadoop JobConf properties to Hadoop.
093       *
094       * @param properties of type Map
095       * @param jobConf    of type JobConf
096       */
097      public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf )
098        {
099        for( Map.Entry<String, String> entry : jobConf )
100          properties.put( entry.getKey(), entry.getValue() );
101        }
102    
103      /**
104       * Method createJobConf returns a new JobConf instance using the values in the given properties argument.
105       *
106       * @param properties of type Map
107       * @return a JobConf instance
108       */
109      public static JobConf createJobConf( Map<Object, Object> properties )
110        {
111        JobConf conf = new JobConf();
112    
113        copyProperties( conf, properties );
114    
115        return conf;
116        }
117    
118      /**
119       * Method copyProperties adds the given Map values to the given JobConf object.
120       *
121       * @param jobConf    of type JobConf
122       * @param properties of type Map
123       */
124      public static void copyProperties( JobConf jobConf, Map<Object, Object> properties )
125        {
126        if( properties instanceof Properties )
127          {
128          Properties props = (Properties) properties;
129          Set<String> keys = props.stringPropertyNames();
130    
131          for( String key : keys )
132            jobConf.set( key, props.getProperty( key ) );
133          }
134        else
135          {
136          for( Map.Entry<Object, Object> entry : properties.entrySet() )
137            {
138            if( entry.getValue() != null )
139              jobConf.set( entry.getKey().toString(), entry.getValue().toString() );
140            }
141          }
142        }
143    
144      /**
145       * Method setNormalizeHeterogeneousSources adds the given doNormalize boolean to the given properties object.
146       * Use this method if additional jobs should be planned in to handle incompatible InputFormat classes.
147       * <p/>
148       * Normalization is off by default and should only be enabled by advanced users. Typically this will decrease
149       * application performance.
150       *
151       * @param properties  of type Map
152       * @param doNormalize of type boolean
153       */
154      @Deprecated
155      public static void setNormalizeHeterogeneousSources( Map<Object, Object> properties, boolean doNormalize )
156        {
157        properties.put( "cascading.multimapreduceplanner.normalizesources", Boolean.toString( doNormalize ) );
158        }
159    
160      /**
161       * Method getNormalizeHeterogeneousSources returns if this planner will normalize heterogeneous input sources.
162       *
163       * @param properties of type Map
164       * @return a boolean
165       */
166      @Deprecated
167      public static boolean getNormalizeHeterogeneousSources( Map<Object, Object> properties )
168        {
169        return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.normalizesources", "false" ) );
170        }
171    
172      /**
173       * Method setCollapseAdjacentTaps enables/disables an optimization that will identify if a sink tap and an intermediate tap
174       * are equivalent field wise, and discard the intermediate tap for the sink tap to minimize the number of MR jobs.
175       * <p/>
176       * Note that some Scheme types may lose type information if the planner cannot detect field types. This could result
177       * in type mismatch errors during joins.
178       *
179       * @param properties
180       * @param collapseAdjacent
181       */
182      public static void setCollapseAdjacentTaps( Map<Object, Object> properties, boolean collapseAdjacent )
183        {
184        properties.put( "cascading.multimapreduceplanner.collapseadjacentaps", Boolean.toString( collapseAdjacent ) );
185        }
186    
187      public static boolean getCollapseAdjacentTaps( Map<Object, Object> properties )
188        {
189        return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.collapseadjacentaps", "true" ) );
190        }
191    
192      @Override
193      public JobConf getConfig()
194        {
195        return jobConf;
196        }
197    
198      @Override
199      public PlatformInfo getPlatformInfo()
200        {
201        return HadoopUtil.getPlatformInfo();
202        }
203    
204      @Override
205      public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
206        {
207        super.initialize( flowConnector, properties );
208    
209        jobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) );
210        intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties );
211    
212        Class type = AppProps.getApplicationJarClass( properties );
213        if( jobConf.getJar() == null && type != null )
214          jobConf.setJarByClass( type );
215    
216        String path = AppProps.getApplicationJarPath( properties );
217        if( jobConf.getJar() == null && path != null )
218          jobConf.setJar( path );
219    
220        if( jobConf.getJar() == null )
221          jobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) );
222    
223        AppProps.setApplicationJarPath( properties, jobConf.getJar() );
224    
225        LOG.info( "using application jar: {}", jobConf.getJar() );
226        }
227    
228      @Override
229      protected HadoopFlow createFlow( FlowDef flowDef )
230        {
231        return new HadoopFlow( getPlatformInfo(), getProperties(), getConfig(), flowDef );
232        }
233    
234      @Override
235      public HadoopFlow buildFlow( FlowDef flowDef )
236        {
237        ElementGraph elementGraph = null;
238    
239        try
240          {
241          // generic
242          verifyAllTaps( flowDef );
243    
244          HadoopFlow flow = createFlow( flowDef );
245    
246          Pipe[] tails = resolveTails( flowDef, flow );
247    
248          verifyAssembly( flowDef, tails );
249    
250          elementGraph = createElementGraph( flowDef, tails );
251    
252          // rules
253          failOnLoneGroupAssertion( elementGraph );
254          failOnMissingGroup( elementGraph );
255          failOnMisusedBuffer( elementGraph );
256          failOnGroupEverySplit( elementGraph );
257    
258          // m/r specific
259          handleWarnEquivalentPaths( elementGraph );
260          handleSplit( elementGraph );
261          handleJobPartitioning( elementGraph );
262          handleJoins( elementGraph );
263          handleNonSafeOperations( elementGraph );
264    
265          if( getNormalizeHeterogeneousSources( properties ) )
266            handleHeterogeneousSources( elementGraph );
267    
268          // generic
269          elementGraph.removeUnnecessaryPipes(); // groups must be added before removing pipes
270          elementGraph.resolveFields();
271    
272          elementGraph = flow.updateSchemes( elementGraph );
273    
274          // m/r specific
275          if( getCollapseAdjacentTaps( properties ) )
276            handleAdjacentTaps( elementGraph );
277    
278          FlowStepGraph flowStepGraph = new HadoopStepGraph( flowDef.getName(), elementGraph );
279    
280          flow.initialize( elementGraph, flowStepGraph );
281    
282          return flow;
283          }
284        catch( Exception exception )
285          {
286          throw handleExceptionDuringPlanning( exception, elementGraph );
287          }
288        }
289    
290      private void handleWarnEquivalentPaths( ElementGraph elementGraph )
291        {
292        List<CoGroup> coGroups = elementGraph.findAllCoGroups();
293    
294        for( CoGroup coGroup : coGroups )
295          {
296          List<GraphPath<FlowElement, Scope>> graphPaths = elementGraph.getAllShortestPathsTo( coGroup );
297    
298          List<List<FlowElement>> paths = ElementGraphs.asPathList( graphPaths );
299    
300          if( !areEquivalentPaths( elementGraph, paths ) )
301            continue;
302    
303          LOG.warn( "found equivalent paths from: {} to: {}", paths.get( 0 ).get( 1 ), coGroup );
304    
305          // in order to remove dupe paths, we need to verify there isn't any branching
306          }
307        }
308    
309      private boolean areEquivalentPaths( ElementGraph elementGraph, List<List<FlowElement>> paths )
310        {
311        int length = sameLength( paths );
312    
313        if( length == -1 )
314          return false;
315    
316        Set<FlowElement> elements = new TreeSet<FlowElement>( new EquivalenceComparator( elementGraph ) );
317    
318        for( int i = 0; i < length; i++ )
319          {
320          elements.clear();
321    
322          for( List<FlowElement> path : paths )
323            elements.add( path.get( i ) );
324    
325          if( elements.size() != 1 )
326            return false;
327          }
328    
329        return true;
330        }
331    
332      private class EquivalenceComparator implements Comparator<FlowElement>
333        {
334        private final ElementGraph elementGraph;
335    
336        public EquivalenceComparator( ElementGraph elementGraph )
337          {
338          this.elementGraph = elementGraph;
339          }
340    
341        @Override
342        public int compare( FlowElement lhs, FlowElement rhs )
343          {
344          boolean areEquivalent = lhs.isEquivalentTo( rhs );
345          boolean sameIncoming = elementGraph.inDegreeOf( lhs ) == elementGraph.inDegreeOf( rhs );
346          boolean sameOutgoing = elementGraph.outDegreeOf( lhs ) == elementGraph.outDegreeOf( rhs );
347    
348          if( areEquivalent && sameIncoming && sameOutgoing )
349            return 0;
350    
351          return System.identityHashCode( lhs ) - System.identityHashCode( rhs );
352          }
353        }
354    
355      private int sameLength( List<List<FlowElement>> paths )
356        {
357        int lastSize = paths.get( 0 ).size();
358    
359        for( int i = 1; i < paths.size(); i++ )
360          {
361          if( paths.get( i ).size() != lastSize )
362            return -1;
363          }
364    
365        return lastSize;
366        }
367    
368      /**
369       * optimized for this case
370       * <pre>
371       *         e - t           e1 - e - t
372       * t - e1 -       -- > t -
373       *         e - t           e1 - e - t
374       * </pre>
375       * <p/>
376       * this should run in two map/red jobs, not 3. needs to be a flag on e1 to prevent this
377       * <p/>
378       * <pre>
379       *        g - t                 g - t
380       * g - e -       --> g - e - t -
381       *        g - t                 g - t
382       * </pre>
383       * <p/>
384       * <pre>
385       *             - e - e                            e - e
386       * t - e1 - e2         - g  --> t - e1 - e2 - t -       - g
387       *             - e - e                            e - e
388       * </pre>
389       *
390       * @param elementGraph
391       */
392      private void handleSplit( ElementGraph elementGraph )
393        {
394        // if there was a graph change, iterate paths again.
395        while( !internalSplit( elementGraph ) )
396          ;
397        }
398    
399      private boolean internalSplit( ElementGraph elementGraph )
400        {
401        List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents();
402    
403        for( GraphPath<FlowElement, Scope> path : paths )
404          {
405          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
406          Set<Pipe> tapInsertions = new HashSet<Pipe>();
407          FlowElement lastInsertable = null;
408    
409          for( int i = 0; i < flowElements.size(); i++ )
410            {
411            FlowElement flowElement = flowElements.get( i );
412    
413            if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail
414              continue;
415    
416            // if Tap, Group, or Every - we insert the tap here
417            if( flowElement instanceof Tap || flowElement instanceof Group || flowElement instanceof Every )
418              lastInsertable = flowElement;
419    
420            // support splits on Pipe unless the previous is a Tap
421            if( flowElement.getClass() == Pipe.class && flowElements.get( i - 1 ) instanceof Tap )
422              continue;
423    
424            if( flowElement instanceof Tap )
425              continue;
426    
427            if( elementGraph.outDegreeOf( flowElement ) <= 1 )
428              continue;
429    
430            // we are at the root of a split here
431    
432            // do any split paths converge on a single Group?
433            int maxPaths = elementGraph.getMaxNumPathsBetweenElementAndGroupingMergeJoin( flowElement );
434            if( maxPaths <= 1 && lastInsertable instanceof Tap )
435              continue;
436    
437            tapInsertions.add( (Pipe) flowElement );
438            }
439    
440          for( Pipe pipe : tapInsertions )
441            insertTempTapAfter( elementGraph, pipe );
442    
443          if( !tapInsertions.isEmpty() )
444            return false;
445          }
446    
447        return true;
448        }
449    
450      /**
451       * will collapse adjacent and equivalent taps.
452       * equivalence is based on the tap adjacent taps using the same filesystem
453       * and the sink being symmetrical, and having the same fields as the temp tap.
454       * <p/>
455       * <p/>
456       * must be run after fields are resolved so temp taps have fully defined scheme instances.
457       *
458       * @param elementGraph
459       */
460      private void handleAdjacentTaps( ElementGraph elementGraph )
461        {
462        // if there was a graph change, iterate paths again.
463        while( !internalAdjacentTaps( elementGraph ) )
464          ;
465        }
466    
467      private boolean internalAdjacentTaps( ElementGraph elementGraph )
468        {
469        List<Tap> taps = elementGraph.findAllTaps();
470    
471        for( Tap tap : taps )
472          {
473          if( !( tap instanceof TempHfs ) )
474            continue;
475    
476          for( FlowElement successor : elementGraph.getAllSuccessors( tap ) )
477            {
478            if( !( successor instanceof Hfs ) )
479              continue;
480    
481            Hfs successorTap = (Hfs) successor;
482    
483            // does this scheme source what it sinks
484            if( !successorTap.getScheme().isSymmetrical() )
485              continue;
486    
487            URI tempURIScheme = getDefaultURIScheme( tap ); // temp uses default fs
488            URI successorURIScheme = getURIScheme( successorTap );
489    
490            if( !tempURIScheme.equals( successorURIScheme ) )
491              continue;
492    
493            // safe, both are symmetrical
494            // should be called after fields are resolved
495            if( !tap.getSourceFields().equals( successorTap.getSourceFields() ) )
496              continue;
497    
498            elementGraph.replaceElementWith( tap, successor );
499    
500            return false;
501            }
502          }
503    
504        return true;
505        }
506    
507      private URI getDefaultURIScheme( Tap tap )
508        {
509        return ( (Hfs) tap ).getDefaultFileSystemURIScheme( jobConf );
510        }
511    
512      private URI getURIScheme( Tap tap )
513        {
514        return ( (Hfs) tap ).getURIScheme( jobConf );
515        }
516    
517      private void handleHeterogeneousSources( ElementGraph elementGraph )
518        {
519        while( !internalHeterogeneousSources( elementGraph ) )
520          ;
521        }
522    
523      private boolean internalHeterogeneousSources( ElementGraph elementGraph )
524        {
525        // find all Groups
526        List<Group> groups = elementGraph.findAllMergeJoinGroups();
527    
528        // compare group sources
529        Map<Group, Set<Tap>> normalizeGroups = new HashMap<Group, Set<Tap>>();
530    
531        for( Group group : groups )
532          {
533          Set<Tap> taps = new HashSet<Tap>();
534    
535          // iterate each shortest path to current group finding each tap sourcing the merge/join
536          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( group ) )
537            {
538            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is group
539            Collections.reverse( flowElements ); // first element is group
540    
541            for( FlowElement previousElement : flowElements )
542              {
543              if( previousElement instanceof Tap )
544                {
545                taps.add( (Tap) previousElement );
546                break; // stop finding taps in this path
547                }
548              }
549            }
550    
551          if( taps.size() == 1 )
552            continue;
553    
554          Iterator<Tap> iterator = taps.iterator();
555          Tap commonTap = iterator.next();
556    
557          while( iterator.hasNext() )
558            {
559            Tap tap = iterator.next();
560    
561            // making assumption hadoop can handle multiple filesytems, but not multiple inputformats
562            // in the same job
563            // possibly could test for common input format
564            if( getSchemeClass( tap ) != getSchemeClass( commonTap ) )
565              {
566              normalizeGroups.put( group, taps );
567              break;
568              }
569            }
570          }
571    
572        // if incompatible, insert Tap after its join/merge pipe
573        for( Group group : normalizeGroups.keySet() )
574          {
575          Set<Tap> taps = normalizeGroups.get( group );
576    
577          for( Tap tap : taps )
578            {
579            if( tap instanceof TempHfs || getSchemeClass( tap ).equals( intermediateSchemeClass ) ) // we normalize to TempHfs
580              continue;
581    
582            // handle case where there is a split on a pipe between the tap and group
583            for( GraphPath<FlowElement, Scope> path : getAllShortestPathsBetween( elementGraph, tap, group ) )
584              {
585              List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // shortest path tap -> group
586              Collections.reverse( flowElements ); // group -> tap
587    
588              FlowElement flowElement = flowElements.get( 1 );
589    
590              if( flowElement instanceof TempHfs )
591                continue;
592    
593              LOG.warn( "inserting step to normalize incompatible sources: {}", tap );
594    
595              insertTempTapAfter( elementGraph, (Pipe) flowElement );
596    
597              return false;
598              }
599            }
600          }
601    
602        return normalizeGroups.isEmpty();
603        }
604    
605      @Override
606      protected Tap makeTempTap( String prefix, String name )
607        {
608        // must give Taps unique names
609        return new TempHfs( jobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null );
610        }
611    
612      private Class getSchemeClass( Tap tap )
613        {
614        if( tap instanceof TempHfs )
615          return ( (TempHfs) tap ).getSchemeClass();
616        else
617          return tap.getScheme().getClass();
618        }
619      }