001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez; 023 024import java.io.IOException; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.CascadingException; 032import cascading.flow.FlowProcess; 033import cascading.flow.FlowSession; 034import cascading.flow.hadoop.MapRed; 035import cascading.flow.hadoop.util.HadoopUtil; 036import cascading.tap.Tap; 037import cascading.tuple.TupleEntryCollector; 038import cascading.tuple.TupleEntryIterator; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.mapred.Reporter; 041import org.apache.hadoop.util.ReflectionUtils; 042import org.apache.tez.dag.api.TezConfiguration; 043import org.apache.tez.mapreduce.processor.MRTaskReporter; 044import org.apache.tez.runtime.api.ProcessorContext; 045import org.apache.tez.runtime.api.Writer; 046 047/** 048 * Class HadoopFlowProcess is an implementation of {@link cascading.flow.FlowProcess} for Hadoop. Use this interface to get direct 049 * access to the Hadoop JobConf and Reporter interfaces. 050 * <p> 051 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to 052 * fail if they are executed on a system other than Hadoop. 053 * 054 * @see cascading.flow.FlowSession 055 */ 056public class Hadoop2TezFlowProcess extends FlowProcess<TezConfiguration> implements MapRed 057 { 058 /** Field jobConf */ 059 final TezConfiguration configuration; 060 private ProcessorContext context; 061 private Writer writer; 062 063 public Hadoop2TezFlowProcess() 064 { 065 this.configuration = new TezConfiguration(); 066 } 067 068 public Hadoop2TezFlowProcess( TezConfiguration configuration ) 069 { 070 this.configuration = configuration; 071 } 072 073 public Hadoop2TezFlowProcess( FlowSession flowSession, ProcessorContext context, TezConfiguration configuration ) 074 { 075 super( flowSession ); 076 this.context = context; 077 this.configuration = configuration; 078 } 079 080 public Hadoop2TezFlowProcess( Hadoop2TezFlowProcess flowProcess, TezConfiguration configuration ) 081 { 082 super( flowProcess ); 083 this.context = flowProcess.context; 084 this.configuration = configuration; 085 } 086 087 public ProcessorContext getContext() 088 { 089 return context; 090 } 091 092 public void setWriter( Writer writer ) 093 { 094 this.writer = writer; 095 } 096 097 @Override 098 public FlowProcess copyWith( TezConfiguration configuration ) 099 { 100 return new Hadoop2TezFlowProcess( this, configuration ); 101 } 102 103 /** 104 * Method getJobConf returns the jobConf of this HadoopFlowProcess object. 105 * 106 * @return the jobConf (type JobConf) of this HadoopFlowProcess object. 107 */ 108 public TezConfiguration getConfiguration() 109 { 110 return configuration; 111 } 112 113 @Override 114 public TezConfiguration getConfig() 115 { 116 return configuration; 117 } 118 119 @Override 120 public TezConfiguration getConfigCopy() 121 { 122 return new TezConfiguration( configuration ); 123 } 124 125 /** 126 * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task. 127 * 128 * @return int 129 */ 130 @Override 131 public int getCurrentSliceNum() 132 { 133 return getConfiguration().getInt( "mapred.task.partition", 0 ); // TODO: likely incorrect 134 } 135 136 @Override 137 public int getNumProcessSlices() 138 { 139 return 0; 140 } 141 142 /** 143 * Method getReporter returns the reporter of this HadoopFlowProcess object. 144 * 145 * @return the reporter (type Reporter) of this HadoopFlowProcess object. 146 */ 147 @Override 148 public Reporter getReporter() 149 { 150 if( context == null ) 151 return Reporter.NULL; 152 153 return new MRTaskReporter( context ); 154 } 155 156 @Override 157 public Object getProperty( String key ) 158 { 159 return configuration.get( key ); 160 } 161 162 @Override 163 public Collection<String> getPropertyKeys() 164 { 165 Set<String> keys = new HashSet<String>(); 166 167 for( Map.Entry<String, String> entry : configuration ) 168 keys.add( entry.getKey() ); 169 170 return Collections.unmodifiableSet( keys ); 171 } 172 173 @Override 174 public Object newInstance( String className ) 175 { 176 if( className == null || className.isEmpty() ) 177 return null; 178 179 try 180 { 181 Class type = (Class) Hadoop2TezFlowProcess.class.getClassLoader().loadClass( className.toString() ); 182 183 return ReflectionUtils.newInstance( type, configuration ); 184 } 185 catch( ClassNotFoundException exception ) 186 { 187 throw new CascadingException( "unable to load class: " + className.toString(), exception ); 188 } 189 } 190 191 @Override 192 public void keepAlive() 193 { 194 // unsupported 195 } 196 197 @Override 198 public void increment( Enum counter, long amount ) 199 { 200 if( context != null ) 201 context.getCounters().findCounter( counter ).increment( amount ); 202 } 203 204 @Override 205 public void increment( String group, String counter, long amount ) 206 { 207 if( context != null ) 208 context.getCounters().findCounter( group, counter ).increment( amount ); 209 } 210 211 @Override 212 public long getCounterValue( Enum counter ) 213 { 214 if( context == null ) 215 return 0; 216 217 return context.getCounters().findCounter( counter ).getValue(); 218 } 219 220 @Override 221 public long getCounterValue( String group, String counter ) 222 { 223 if( context == null ) 224 return 0; 225 226 return context.getCounters().findCounter( group, counter ).getValue(); 227 } 228 229 @Override 230 public void setStatus( String status ) 231 { 232 // unsupported 233 } 234 235 @Override 236 public boolean isCounterStatusInitialized() 237 { 238 if( context == null ) 239 return false; 240 241 return context.getCounters() != null; 242 } 243 244 @Override 245 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 246 { 247 return tap.openForRead( this ); 248 } 249 250 @Override 251 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 252 { 253 return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks 254 } 255 256 @Override 257 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 258 { 259 TezConfiguration jobConf = new TezConfiguration( getConfiguration() ); 260 261 int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 ); 262 int nodeNum = jobConf.getInt( "cascading.flow.node.num", 0 ); 263 264 String partname = String.format( "-%05d-%05d-", stepNum, nodeNum ); 265 266 jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" ); 267 268 return trap.openForWrite( new Hadoop2TezFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks 269 } 270 271 @Override 272 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 273 { 274 return null; 275/* 276 return new TupleEntryCollector( Fields.size( 2 ) ) 277 { 278 @Override 279 protected void collect( TupleEntry tupleEntry ) 280 { 281 try 282 { 283 getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) ); 284 } 285 catch( IOException exception ) 286 { 287 throw new CascadingException( "failed collecting key and value", exception ); 288 } 289 } 290 }; 291*/ 292 } 293 294 @Override 295 public <C> C copyConfig( C config ) 296 { 297 return HadoopUtil.copyJobConf( config ); 298 } 299 300 @Override 301 public <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig ) 302 { 303 return HadoopUtil.getConfig( (Configuration) defaultConfig, (Configuration) updatedConfig ); 304 } 305 306 @Override 307 public TezConfiguration mergeMapIntoConfig( TezConfiguration defaultConfig, Map<String, String> map ) 308 { 309 return HadoopUtil.mergeConf( new TezConfiguration( defaultConfig ), map, true ); 310 } 311 }