001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.stream.element;
023
024import cascading.CascadingException;
025import cascading.flow.FlowProcess;
026import cascading.flow.hadoop.stream.HadoopGroupGate;
027import cascading.flow.stream.duct.Duct;
028import cascading.flow.stream.element.InputSource;
029import cascading.flow.stream.graph.IORole;
030import cascading.pipe.Pipe;
031import cascading.pipe.Splice;
032import cascading.tuple.Tuple;
033import cascading.util.SortedListMultiMap;
034import org.apache.hadoop.mapred.OutputCollector;
035import org.apache.tez.runtime.api.LogicalInput;
036import org.apache.tez.runtime.api.LogicalOutput;
037import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 *
043 */
044public abstract class TezGroupGate extends HadoopGroupGate implements InputSource
045  {
046  private static final Logger LOG = LoggerFactory.getLogger( TezGroupGate.class );
047
048  protected OrderedPartitionedKVOutput logicalOutput;
049  protected SortedListMultiMap<Integer, LogicalInput> logicalInputs;
050
051  public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, LogicalOutput logicalOutput )
052    {
053    super( flowProcess, splice, role );
054
055    if( logicalOutput == null )
056      throw new IllegalArgumentException( "output must not be null" );
057
058    this.logicalOutput = (OrderedPartitionedKVOutput) logicalOutput;
059    }
060
061  public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
062    {
063    super( flowProcess, splice, role );
064
065    if( logicalInputs == null || logicalInputs.getKeys().size() == 0 )
066      throw new IllegalArgumentException( "inputs must not be null or empty" );
067
068    this.logicalInputs = logicalInputs;
069    }
070
071  @Override
072  public void initialize()
073    {
074    super.initialize();
075
076    if( role == IORole.sink )
077      return;
078
079    initComparators();
080    }
081
082  @Override
083  public void prepare()
084    {
085    try
086      {
087      if( logicalInputs != null )
088        {
089        for( LogicalInput logicalInput : logicalInputs.getValues() )
090          {
091          LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() );
092
093          logicalInput.start();
094          }
095        }
096
097      if( logicalOutput != null )
098        {
099        LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) );
100
101        logicalOutput.start();
102        }
103      }
104    catch( Exception exception )
105      {
106      throw new CascadingException( "unable to start input/output", exception );
107      }
108
109    super.prepare();
110    }
111
112  @Override
113  public void run( Object input ) throws Throwable
114    {
115    Throwable throwable = reduce();
116
117    if( throwable != null )
118      throw throwable;
119    }
120
121  protected abstract Throwable reduce() throws Exception;
122
123  @Override
124  protected void wrapGroupingAndCollect( Duct previous, int ordinal, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException
125    {
126    collector.collect( groupKey, valuesTuple );
127    }
128
129  @Override
130  protected OutputCollector createOutputCollector()
131    {
132    return new OldOutputCollector( logicalOutput );
133    }
134  }