001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.stream.element; 023 024import cascading.CascadingException; 025import cascading.flow.FlowProcess; 026import cascading.flow.hadoop.stream.HadoopGroupGate; 027import cascading.flow.stream.duct.Duct; 028import cascading.flow.stream.element.InputSource; 029import cascading.flow.stream.graph.IORole; 030import cascading.pipe.Pipe; 031import cascading.pipe.Splice; 032import cascading.tuple.Tuple; 033import cascading.util.SortedListMultiMap; 034import org.apache.hadoop.mapred.OutputCollector; 035import org.apache.tez.runtime.api.LogicalInput; 036import org.apache.tez.runtime.api.LogicalOutput; 037import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * 043 */ 044public abstract class TezGroupGate extends HadoopGroupGate implements InputSource 045 { 046 private static final Logger LOG = LoggerFactory.getLogger( TezGroupGate.class ); 047 048 protected OrderedPartitionedKVOutput logicalOutput; 049 protected SortedListMultiMap<Integer, LogicalInput> logicalInputs; 050 051 public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, LogicalOutput logicalOutput ) 052 { 053 super( flowProcess, splice, role ); 054 055 if( logicalOutput == null ) 056 throw new IllegalArgumentException( "output must not be null" ); 057 058 this.logicalOutput = (OrderedPartitionedKVOutput) logicalOutput; 059 } 060 061 public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 062 { 063 super( flowProcess, splice, role ); 064 065 if( logicalInputs == null || logicalInputs.getKeys().size() == 0 ) 066 throw new IllegalArgumentException( "inputs must not be null or empty" ); 067 068 this.logicalInputs = logicalInputs; 069 } 070 071 @Override 072 public void initialize() 073 { 074 super.initialize(); 075 076 if( role == IORole.sink ) 077 return; 078 079 initComparators(); 080 } 081 082 @Override 083 public void prepare() 084 { 085 try 086 { 087 if( logicalInputs != null ) 088 { 089 for( LogicalInput logicalInput : logicalInputs.getValues() ) 090 { 091 LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() ); 092 093 logicalInput.start(); 094 } 095 } 096 097 if( logicalOutput != null ) 098 { 099 LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) ); 100 101 logicalOutput.start(); 102 } 103 } 104 catch( Exception exception ) 105 { 106 throw new CascadingException( "unable to start input/output", exception ); 107 } 108 109 super.prepare(); 110 } 111 112 @Override 113 public void run( Object input ) throws Throwable 114 { 115 Throwable throwable = reduce(); 116 117 if( throwable != null ) 118 throw throwable; 119 } 120 121 protected abstract Throwable reduce() throws Exception; 122 123 @Override 124 protected void wrapGroupingAndCollect( Duct previous, int ordinal, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException 125 { 126 collector.collect( groupKey, valuesTuple ); 127 } 128 129 @Override 130 protected OutputCollector createOutputCollector() 131 { 132 return new OldOutputCollector( logicalOutput ); 133 } 134 }