001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.operation.function; 023 024import java.beans.ConstructorProperties; 025import java.util.Arrays; 026 027import cascading.flow.FlowProcess; 028import cascading.management.annotation.Property; 029import cascading.management.annotation.PropertyDescription; 030import cascading.management.annotation.Visibility; 031import cascading.operation.BaseOperation; 032import cascading.operation.Function; 033import cascading.operation.FunctionCall; 034import cascading.tuple.Fields; 035import cascading.tuple.Tuple; 036import cascading.tuple.TupleEntry; 037import cascading.tuple.TupleEntryCollector; 038import cascading.util.Util; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Class UnGroup is a {@link Function} that will 'un-group' data from a given dataset. 044 * <p> 045 * That is, for the given field positions, this function will emit a new Tuple for every value. For example: 046 * <pre> 047 * A, x, y 048 * B, x, z 049 * C, y, z 050 * </pre> 051 * <p> 052 * to: 053 * <pre> 054 * A, x 055 * A, y 056 * B, x 057 * B, z 058 * C, y 059 * C, z 060 * </pre> 061 */ 062public class UnGroup extends BaseOperation implements Function 063 { 064 /** Field LOG */ 065 private static final Logger LOG = LoggerFactory.getLogger( UnGroup.class ); 066 067 /** Field groupFieldSelector */ 068 private Fields groupFieldSelector; 069 /** Field resultFieldSelectors */ 070 private Fields[] resultFieldSelectors; 071 /** Field size */ 072 private int size = 1; 073 074 /** 075 * Constructor UnGroup creates a new UnGroup instance. 076 * 077 * @param groupSelector of type Fields 078 * @param valueSelectors of type Fields[] 079 */ 080 @ConstructorProperties({"groupSelector", "valueSelectors"}) 081 public UnGroup( Fields groupSelector, Fields[] valueSelectors ) 082 { 083 if( valueSelectors == null || valueSelectors.length == 1 ) 084 throw new IllegalArgumentException( "value selectors may not be empty" ); 085 086 int size = valueSelectors[ 0 ].size(); 087 088 for( int i = 1; i < valueSelectors.length; i++ ) 089 { 090 if( valueSelectors[ 0 ].size() != valueSelectors[ i ].size() ) 091 throw new IllegalArgumentException( "all value selectors must be the same size" ); 092 093 size = valueSelectors[ i ].size(); 094 } 095 096 this.numArgs = groupSelector.size() + size * valueSelectors.length; 097 this.groupFieldSelector = groupSelector; 098 this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length ); 099 this.fieldDeclaration = Fields.size( groupSelector.size() + size ); 100 } 101 102 /** 103 * Constructor UnGroup creates a new UnGroup instance. 104 * 105 * @param fieldDeclaration of type Fields 106 * @param groupSelector of type Fields 107 * @param valueSelectors of type Fields[] 108 */ 109 @ConstructorProperties({"fieldDeclaration", "groupSelector", "valueSelectors"}) 110 public UnGroup( Fields fieldDeclaration, Fields groupSelector, Fields[] valueSelectors ) 111 { 112 super( fieldDeclaration ); 113 114 if( valueSelectors == null || valueSelectors.length == 1 ) 115 throw new IllegalArgumentException( "value selectors may not be empty" ); 116 117 numArgs = groupSelector.size(); 118 int selectorSize = -1; 119 120 for( Fields resultFieldSelector : valueSelectors ) 121 { 122 numArgs += resultFieldSelector.size(); 123 int fieldSize = groupSelector.size() + resultFieldSelector.size(); 124 125 if( selectorSize != -1 && selectorSize != resultFieldSelector.size() ) 126 throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" ); 127 128 selectorSize = resultFieldSelector.size(); 129 130 if( fieldDeclaration.size() != fieldSize ) 131 throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" ); 132 } 133 134 this.groupFieldSelector = groupSelector; 135 this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length ); 136 } 137 138 /** 139 * Constructor UnGroup creates a new UnGroup instance. Where the numValues argument specifies the number 140 * of values to include. 141 * 142 * @param fieldDeclaration of type Fields 143 * @param groupSelector of type Fields 144 * @param numValues of type int 145 */ 146 @ConstructorProperties({"fieldDeclaration", "groupSelector", "numValues"}) 147 public UnGroup( Fields fieldDeclaration, Fields groupSelector, int numValues ) 148 { 149 super( fieldDeclaration ); 150 this.groupFieldSelector = groupSelector; 151 this.size = numValues; 152 } 153 154 @Property(name = "ungroupFieldSelector", visibility = Visibility.PRIVATE) 155 @PropertyDescription("The fields to un-group.") 156 public Fields getGroupFieldSelector() 157 { 158 return groupFieldSelector; 159 } 160 161 @Property(name = "resultFieldSelectors", visibility = Visibility.PRIVATE) 162 @PropertyDescription("The result field selectors.") 163 public Fields[] getResultFieldSelectors() 164 { 165 return Util.copy( resultFieldSelectors ); 166 } 167 168 public int getSize() 169 { 170 return size; 171 } 172 173 @Override 174 public void operate( FlowProcess flowProcess, FunctionCall functionCall ) 175 { 176 if( resultFieldSelectors != null ) 177 useResultSelectors( functionCall.getArguments(), functionCall.getOutputCollector() ); 178 else 179 useSize( functionCall.getArguments(), functionCall.getOutputCollector() ); 180 } 181 182 private void useSize( TupleEntry input, TupleEntryCollector outputCollector ) 183 { 184 LOG.debug( "using size: {}", size ); 185 186 Tuple tuple = new Tuple( input.getTuple() ); // make clone 187 Tuple group = tuple.remove( input.getFields(), groupFieldSelector ); 188 189 for( int i = 0; i < tuple.size(); i = i + size ) 190 { 191 Tuple result = new Tuple( group ); 192 result.addAll( tuple.get( Fields.offsetSelector( size, i ).getPos() ) ); 193 194 outputCollector.add( result ); 195 } 196 } 197 198 private void useResultSelectors( TupleEntry input, TupleEntryCollector outputCollector ) 199 { 200 LOG.debug( "using result selectors: {}", resultFieldSelectors.length ); 201 202 for( Fields resultFieldSelector : resultFieldSelectors ) 203 { 204 Tuple group = input.selectTupleCopy( groupFieldSelector ); // need a mutable copy 205 206 input.selectInto( resultFieldSelector, group ); 207 208 outputCollector.add( group ); 209 } 210 } 211 212 @Override 213 public boolean equals( Object object ) 214 { 215 if( this == object ) 216 return true; 217 if( !( object instanceof UnGroup ) ) 218 return false; 219 if( !super.equals( object ) ) 220 return false; 221 222 UnGroup unGroup = (UnGroup) object; 223 224 if( size != unGroup.size ) 225 return false; 226 if( groupFieldSelector != null ? !groupFieldSelector.equals( unGroup.groupFieldSelector ) : unGroup.groupFieldSelector != null ) 227 return false; 228 if( !Arrays.equals( resultFieldSelectors, unGroup.resultFieldSelectors ) ) 229 return false; 230 231 return true; 232 } 233 234 @Override 235 public int hashCode() 236 { 237 int result = super.hashCode(); 238 result = 31 * result + ( groupFieldSelector != null ? groupFieldSelector.hashCode() : 0 ); 239 result = 31 * result + ( resultFieldSelectors != null ? Arrays.hashCode( resultFieldSelectors ) : 0 ); 240 result = 31 * result + size; 241 return result; 242 } 243 }