001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.stream.element;
023
024import java.io.IOException;
025import java.util.Collection;
026import java.util.HashSet;
027import java.util.Set;
028
029import cascading.CascadingException;
030import cascading.flow.FlowProcess;
031import cascading.flow.SliceCounters;
032import cascading.flow.planner.Scope;
033import cascading.flow.stream.duct.Duct;
034import cascading.flow.stream.duct.DuctException;
035import cascading.flow.stream.element.InputSource;
036import cascading.flow.stream.element.SpliceGate;
037import cascading.flow.stream.graph.IORole;
038import cascading.flow.stream.graph.StreamGraph;
039import cascading.pipe.Pipe;
040import cascading.pipe.Splice;
041import cascading.tap.hadoop.util.MeasuredOutputCollector;
042import cascading.tuple.Tuple;
043import cascading.tuple.TupleEntry;
044import cascading.tuple.io.KeyTuple;
045import cascading.tuple.io.ValueTuple;
046import cascading.tuple.util.Resettable1;
047import cascading.util.SortedListMultiMap;
048import cascading.util.Util;
049import org.apache.hadoop.mapred.OutputCollector;
050import org.apache.tez.runtime.api.LogicalInput;
051import org.apache.tez.runtime.api.LogicalOutput;
052import org.apache.tez.runtime.library.api.KeyValueReader;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 *
058 */
059public class TezMergeGate extends SpliceGate<TupleEntry, TupleEntry> implements InputSource
060  {
061  private static final Logger LOG = LoggerFactory.getLogger( TezMergeGate.class );
062
063  protected Collection<LogicalOutput> logicalOutputs;
064  protected SortedListMultiMap<Integer, LogicalInput> logicalInputs;
065
066  private MeasuredOutputCollector collector;
067  private TupleEntry valueEntry;
068
069  private final Resettable1<Tuple> keyTuple = new KeyTuple();
070
071  public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, Collection<LogicalOutput> logicalOutputs )
072    {
073    super( flowProcess, splice, role );
074
075    if( logicalOutputs == null || logicalOutputs.isEmpty() )
076      throw new IllegalArgumentException( "output must not be null or empty" );
077
078    this.logicalOutputs = logicalOutputs;
079    }
080
081  public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
082    {
083    super( flowProcess, splice, role );
084
085    if( logicalInputs == null || logicalInputs.getKeys().size() == 0 )
086      throw new IllegalArgumentException( "inputs must not be null or empty" );
087
088    Set<LogicalInput> inputs = new HashSet<>( logicalInputs.getValues() );
089
090    if( inputs.size() != 1 )
091      throw new IllegalArgumentException( "only supports a single input" );
092
093    this.logicalInputs = logicalInputs;
094    }
095
096  @Override
097  public void initialize()
098    {
099    super.initialize();
100
101    Scope outgoingScope = Util.getFirst( outgoingScopes );
102    valueEntry = new TupleEntry( outgoingScope.getOutValuesFields(), true );
103    }
104
105  @Override
106  public void bind( StreamGraph streamGraph )
107    {
108    if( role != IORole.sink )
109      next = getNextFor( streamGraph );
110    }
111
112  @Override
113  public void prepare()
114    {
115    try
116      {
117      if( logicalInputs != null )
118        {
119        for( LogicalInput logicalInput : logicalInputs.getValues() )
120          {
121          LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() );
122
123          logicalInput.start();
124          }
125        }
126
127      if( logicalOutputs != null )
128        {
129        for( LogicalOutput logicalOutput : logicalOutputs )
130          {
131          LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) );
132
133          logicalOutput.start();
134          }
135        }
136      }
137    catch( Exception exception )
138      {
139      throw new CascadingException( "unable to start input/output", exception );
140      }
141
142    if( role != IORole.source )
143      collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() );
144
145    super.prepare();
146    }
147
148  @Override
149  public void start( Duct previous )
150    {
151    if( next != null )
152      super.start( previous );
153    }
154
155  @Override
156  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
157    {
158    try
159      {
160      keyTuple.reset( incomingEntry.getTuple() );
161
162      collector.collect( keyTuple, ValueTuple.NULL );
163      flowProcess.increment( SliceCounters.Tuples_Written, 1 );
164      }
165    catch( OutOfMemoryError error )
166      {
167      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
168      }
169    catch( CascadingException exception )
170      {
171      handleException( exception, incomingEntry );
172      }
173    catch( Throwable throwable )
174      {
175      handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry );
176      }
177    }
178
179  @Override
180  public void complete( Duct previous )
181    {
182    if( next != null )
183      super.complete( previous );
184    }
185
186  @Override
187  public void run( Object input ) throws Throwable
188    {
189    Throwable throwable = map();
190
191    if( throwable != null )
192      throw throwable;
193    }
194
195  protected Throwable map() throws Exception
196    {
197    Throwable localThrowable = null;
198
199    try
200      {
201      start( this );
202
203      // if multiple ordinals, an input could be duplicated if sourcing multiple paths
204      LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() );
205
206      KeyValueReader reader = (KeyValueReader) logicalInput.getReader();
207
208      while( reader.next() )
209        {
210        Tuple currentKey = (Tuple) reader.getCurrentKey();
211
212        valueEntry.setTuple( currentKey );
213        next.receive( this, 0, valueEntry );
214        }
215
216      complete( this );
217      }
218    catch( Throwable throwable )
219      {
220      if( !( throwable instanceof OutOfMemoryError ) )
221        LOG.error( "caught throwable", throwable );
222
223      return throwable;
224      }
225
226    return localThrowable;
227    }
228
229  protected OutputCollector createOutputCollector()
230    {
231    if( logicalOutputs.size() == 1 )
232      return new OldOutputCollector( Util.getFirst( logicalOutputs ) );
233
234    final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ];
235
236    int count = 0;
237    for( LogicalOutput logicalOutput : logicalOutputs )
238      collectors[ count++ ] = new OldOutputCollector( logicalOutput );
239
240    return new OutputCollector()
241      {
242      @Override
243      public void collect( Object key, Object value ) throws IOException
244        {
245        for( OutputCollector outputCollector : collectors )
246          outputCollector.collect( key, value );
247        }
248      };
249    }
250  }