001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.CascadingException; 031import cascading.flow.FlowProcess; 032import cascading.flow.FlowSession; 033import cascading.flow.hadoop.MapRed; 034import cascading.flow.hadoop.util.HadoopUtil; 035import cascading.tap.Tap; 036import cascading.tuple.TupleEntryCollector; 037import cascading.tuple.TupleEntryIterator; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.mapred.Reporter; 040import org.apache.hadoop.util.ReflectionUtils; 041import org.apache.tez.dag.api.TezConfiguration; 042import org.apache.tez.mapreduce.processor.MRTaskReporter; 043import org.apache.tez.runtime.api.ProcessorContext; 044import org.apache.tez.runtime.api.Writer; 045 046/** 047 * Class HadoopFlowProcess is an implementation of {@link cascading.flow.FlowProcess} for Hadoop. Use this interface to get direct 048 * access to the Hadoop JobConf and Reporter interfaces. 049 * <p/> 050 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to 051 * fail if they are executed on a system other than Hadoop. 052 * 053 * @see cascading.flow.FlowSession 054 */ 055public class Hadoop2TezFlowProcess extends FlowProcess<TezConfiguration> implements MapRed 056 { 057 /** Field jobConf */ 058 final TezConfiguration configuration; 059 private ProcessorContext context; 060 private Writer writer; 061 062 public Hadoop2TezFlowProcess() 063 { 064 this.configuration = new TezConfiguration(); 065 } 066 067 public Hadoop2TezFlowProcess( TezConfiguration configuration ) 068 { 069 this.configuration = configuration; 070 } 071 072 public Hadoop2TezFlowProcess( FlowSession flowSession, ProcessorContext context, TezConfiguration configuration ) 073 { 074 super( flowSession ); 075 this.context = context; 076 this.configuration = configuration; 077 } 078 079 public Hadoop2TezFlowProcess( Hadoop2TezFlowProcess flowProcess, TezConfiguration configuration ) 080 { 081 super( flowProcess ); 082 this.context = flowProcess.context; 083 this.configuration = configuration; 084 } 085 086 public ProcessorContext getContext() 087 { 088 return context; 089 } 090 091 public void setWriter( Writer writer ) 092 { 093 this.writer = writer; 094 } 095 096 @Override 097 public FlowProcess copyWith( TezConfiguration configuration ) 098 { 099 return new Hadoop2TezFlowProcess( this, configuration ); 100 } 101 102 /** 103 * Method getJobConf returns the jobConf of this HadoopFlowProcess object. 104 * 105 * @return the jobConf (type JobConf) of this HadoopFlowProcess object. 106 */ 107 public TezConfiguration getConfiguration() 108 { 109 return configuration; 110 } 111 112 @Override 113 public TezConfiguration getConfig() 114 { 115 return configuration; 116 } 117 118 @Override 119 public TezConfiguration getConfigCopy() 120 { 121 return new TezConfiguration( configuration ); 122 } 123 124 /** 125 * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task. 126 * 127 * @return int 128 */ 129 @Override 130 public int getCurrentSliceNum() 131 { 132 return getConfiguration().getInt( "mapred.task.partition", 0 ); // TODO: likely incorrect 133 } 134 135 @Override 136 public int getNumProcessSlices() 137 { 138 return 0; 139 } 140 141 /** 142 * Method getReporter returns the reporter of this HadoopFlowProcess object. 143 * 144 * @return the reporter (type Reporter) of this HadoopFlowProcess object. 145 */ 146 @Override 147 public Reporter getReporter() 148 { 149 if( context == null ) 150 return Reporter.NULL; 151 152 return new MRTaskReporter( context ); 153 } 154 155 @Override 156 public Object getProperty( String key ) 157 { 158 return configuration.get( key ); 159 } 160 161 @Override 162 public Collection<String> getPropertyKeys() 163 { 164 Set<String> keys = new HashSet<String>(); 165 166 for( Map.Entry<String, String> entry : configuration ) 167 keys.add( entry.getKey() ); 168 169 return Collections.unmodifiableSet( keys ); 170 } 171 172 @Override 173 public Object newInstance( String className ) 174 { 175 if( className == null || className.isEmpty() ) 176 return null; 177 178 try 179 { 180 Class type = (Class) Hadoop2TezFlowProcess.class.getClassLoader().loadClass( className.toString() ); 181 182 return ReflectionUtils.newInstance( type, configuration ); 183 } 184 catch( ClassNotFoundException exception ) 185 { 186 throw new CascadingException( "unable to load class: " + className.toString(), exception ); 187 } 188 } 189 190 @Override 191 public void keepAlive() 192 { 193 // unsupported 194 } 195 196 @Override 197 public void increment( Enum counter, long amount ) 198 { 199 if( context != null ) 200 context.getCounters().findCounter( counter ).increment( amount ); 201 } 202 203 @Override 204 public void increment( String group, String counter, long amount ) 205 { 206 if( context != null ) 207 context.getCounters().findCounter( group, counter ).increment( amount ); 208 } 209 210 @Override 211 public long getCounterValue( Enum counter ) 212 { 213 if( context == null ) 214 return 0; 215 216 return context.getCounters().findCounter( counter ).getValue(); 217 } 218 219 @Override 220 public long getCounterValue( String group, String counter ) 221 { 222 if( context == null ) 223 return 0; 224 225 return context.getCounters().findCounter( group, counter ).getValue(); 226 } 227 228 @Override 229 public void setStatus( String status ) 230 { 231 // unsupported 232 } 233 234 @Override 235 public boolean isCounterStatusInitialized() 236 { 237 if( context == null ) 238 return false; 239 240 return context.getCounters() != null; 241 } 242 243 @Override 244 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 245 { 246 return tap.openForRead( this ); 247 } 248 249 @Override 250 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 251 { 252 return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks 253 } 254 255 @Override 256 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 257 { 258 TezConfiguration jobConf = new TezConfiguration( getConfiguration() ); 259 260 int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 ); 261 int nodeNum = jobConf.getInt( "cascading.flow.node.num", 0 ); 262 263 String partname = String.format( "-%05d-%05d-", stepNum, nodeNum ); 264 265 jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" ); 266 267 return trap.openForWrite( new Hadoop2TezFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks 268 } 269 270 @Override 271 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 272 { 273 return null; 274/* 275 return new TupleEntryCollector( Fields.size( 2 ) ) 276 { 277 @Override 278 protected void collect( TupleEntry tupleEntry ) 279 { 280 try 281 { 282 getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) ); 283 } 284 catch( IOException exception ) 285 { 286 throw new CascadingException( "failed collecting key and value", exception ); 287 } 288 } 289 }; 290*/ 291 } 292 293 @Override 294 public <C> C copyConfig( C config ) 295 { 296 return HadoopUtil.copyJobConf( config ); 297 } 298 299 @Override 300 public <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig ) 301 { 302 return HadoopUtil.getConfig( (Configuration) defaultConfig, (Configuration) updatedConfig ); 303 } 304 305 @Override 306 public TezConfiguration mergeMapIntoConfig( TezConfiguration defaultConfig, Map<String, String> map ) 307 { 308 return HadoopUtil.mergeConf( new TezConfiguration( defaultConfig ), map, true ); 309 } 310 }