001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.stream; 022 023import java.util.Collection; 024import java.util.HashSet; 025import java.util.Map; 026import java.util.Set; 027 028import cascading.flow.FlowProcess; 029import cascading.flow.stream.element.MemoryHashJoinGate; 030import cascading.pipe.HashJoin; 031import cascading.provider.FactoryLoader; 032import cascading.tuple.Tuple; 033import cascading.tuple.collect.Spillable; 034import cascading.tuple.collect.SpillableTupleList; 035import cascading.tuple.collect.TupleMapFactory; 036import cascading.tuple.hadoop.collect.HadoopTupleMapFactory; 037import org.apache.hadoop.conf.Configuration; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import static cascading.tuple.collect.TupleMapFactory.TUPLE_MAP_FACTORY; 042 043/** 044 * 045 */ 046public class HadoopMemoryJoinGate extends MemoryHashJoinGate 047 { 048 private static final Logger LOG = LoggerFactory.getLogger( HadoopMemoryJoinGate.class ); 049 050 public enum Spill 051 { 052 Num_Spills_Written, Num_Spills_Read, Num_Tuples_Spilled, Duration_Millis_Written 053 } 054 055 private class SpillListener implements Spillable.SpillListener 056 { 057 private final FlowProcess<? extends Configuration> flowProcess; 058 059 public SpillListener( FlowProcess<? extends Configuration> flowProcess ) 060 { 061 this.flowProcess = flowProcess; 062 } 063 064 @Override 065 public void notifyWriteSpillBegin( Spillable spillable, int spillSize, String spillReason ) 066 { 067 int numFiles = spillable.spillCount(); 068 069 if( numFiles % 10 == 0 ) 070 { 071 LOG.info( "spilling grouping: {}, num times: {}, with reason: {}", 072 new Object[]{spillable.getGrouping().print(), numFiles + 1, spillReason} ); 073 074 Runtime runtime = Runtime.getRuntime(); 075 long freeMem = runtime.freeMemory() / 1024 / 1024; 076 long maxMem = runtime.maxMemory() / 1024 / 1024; 077 long totalMem = runtime.totalMemory() / 1024 / 1024; 078 079 LOG.info( "mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem ); 080 } 081 082 LOG.info( "spilling {} tuples in list to file number {}", spillSize, numFiles + 1 ); 083 084 flowProcess.increment( Spill.Num_Spills_Written, 1 ); 085 flowProcess.increment( Spill.Num_Tuples_Spilled, spillSize ); 086 } 087 088 @Override 089 public void notifyWriteSpillEnd( SpillableTupleList spillableTupleList, long duration ) 090 { 091 flowProcess.increment( Spill.Duration_Millis_Written, duration ); 092 } 093 094 @Override 095 public void notifyReadSpillBegin( Spillable spillable ) 096 { 097 flowProcess.increment( Spill.Num_Spills_Read, 1 ); 098 } 099 } 100 101 private final SpillListener spillListener; 102 private TupleMapFactory<Configuration> tupleMapFactory; 103 104 public HadoopMemoryJoinGate( FlowProcess<? extends Configuration> flowProcess, HashJoin join ) 105 { 106 super( flowProcess, join ); 107 108 this.spillListener = new SpillListener( flowProcess ); 109 110 FactoryLoader loader = FactoryLoader.getInstance(); 111 112 this.tupleMapFactory = loader.loadFactoryFrom( flowProcess, TUPLE_MAP_FACTORY, HadoopTupleMapFactory.class ); 113 } 114 115 @Override 116 protected Set<Tuple> createKeySet() 117 { 118 return new HashSet<Tuple>(); // does not need to be synchronized, or ordered 119 } 120 121 @Override 122 protected Map<Tuple, Collection<Tuple>> createTupleMap() 123 { 124 Map<Tuple, Collection<Tuple>> map = tupleMapFactory.create( flowProcess ); 125 126 if( map instanceof Spillable ) 127 ( (Spillable) map ).setSpillListener( spillListener ); 128 129 return map; 130 } 131 132 @Override 133 protected void waitOnLatch() 134 { 135 // do nothing 136 } 137 138 @Override 139 protected void countDownLatch() 140 { 141 // do nothing 142 } 143 }