001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.function;
022
023import java.beans.ConstructorProperties;
024import java.util.Arrays;
025
026import cascading.flow.FlowProcess;
027import cascading.management.annotation.Property;
028import cascading.management.annotation.PropertyDescription;
029import cascading.management.annotation.Visibility;
030import cascading.operation.BaseOperation;
031import cascading.operation.Function;
032import cascading.operation.FunctionCall;
033import cascading.tuple.Fields;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036import cascading.tuple.TupleEntryCollector;
037import cascading.util.Util;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Class UnGroup is a {@link Function} that will 'un-group' data from a given dataset.
043 * <p/>
044 * That is, for the given field positions, this function will emit a new Tuple for every value. For example:
045 * <p/>
046 * <pre>
047 * A, x, y
048 * B, x, z
049 * C, y, z
050 * </pre>
051 * <p/>
052 * to:
053 * <p/>
054 * <pre>
055 * A, x
056 * A, y
057 * B, x
058 * B, z
059 * C, y
060 * C, z
061 * </pre>
062 */
063public class UnGroup extends BaseOperation implements Function
064  {
065  /** Field LOG */
066  private static final Logger LOG = LoggerFactory.getLogger( UnGroup.class );
067
068  /** Field groupFieldSelector */
069  private Fields groupFieldSelector;
070  /** Field resultFieldSelectors */
071  private Fields[] resultFieldSelectors;
072  /** Field size */
073  private int size = 1;
074
075  /**
076   * Constructor UnGroup creates a new UnGroup instance.
077   *
078   * @param groupSelector  of type Fields
079   * @param valueSelectors of type Fields[]
080   */
081  @ConstructorProperties({"groupSelector", "valueSelectors"})
082  public UnGroup( Fields groupSelector, Fields[] valueSelectors )
083    {
084    if( valueSelectors == null || valueSelectors.length == 1 )
085      throw new IllegalArgumentException( "value selectors may not be empty" );
086
087    int size = valueSelectors[ 0 ].size();
088
089    for( int i = 1; i < valueSelectors.length; i++ )
090      {
091      if( valueSelectors[ 0 ].size() != valueSelectors[ i ].size() )
092        throw new IllegalArgumentException( "all value selectors must be the same size" );
093
094      size = valueSelectors[ i ].size();
095      }
096
097    this.numArgs = groupSelector.size() + size * valueSelectors.length;
098    this.groupFieldSelector = groupSelector;
099    this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length );
100    this.fieldDeclaration = Fields.size( groupSelector.size() + size );
101    }
102
103  /**
104   * Constructor UnGroup creates a new UnGroup instance.
105   *
106   * @param fieldDeclaration of type Fields
107   * @param groupSelector    of type Fields
108   * @param valueSelectors   of type Fields[]
109   */
110  @ConstructorProperties({"fieldDeclaration", "groupSelector", "valueSelectors"})
111  public UnGroup( Fields fieldDeclaration, Fields groupSelector, Fields[] valueSelectors )
112    {
113    super( fieldDeclaration );
114
115    if( valueSelectors == null || valueSelectors.length == 1 )
116      throw new IllegalArgumentException( "value selectors may not be empty" );
117
118    numArgs = groupSelector.size();
119    int selectorSize = -1;
120
121    for( Fields resultFieldSelector : valueSelectors )
122      {
123      numArgs += resultFieldSelector.size();
124      int fieldSize = groupSelector.size() + resultFieldSelector.size();
125
126      if( selectorSize != -1 && selectorSize != resultFieldSelector.size() )
127        throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" );
128
129      selectorSize = resultFieldSelector.size();
130
131      if( fieldDeclaration.size() != fieldSize )
132        throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" );
133      }
134
135    this.groupFieldSelector = groupSelector;
136    this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length );
137    }
138
139  /**
140   * Constructor UnGroup creates a new UnGroup instance. Where the numValues argument specifies the number
141   * of values to include.
142   *
143   * @param fieldDeclaration of type Fields
144   * @param groupSelector    of type Fields
145   * @param numValues        of type int
146   */
147  @ConstructorProperties({"fieldDeclaration", "groupSelector", "numValues"})
148  public UnGroup( Fields fieldDeclaration, Fields groupSelector, int numValues )
149    {
150    super( fieldDeclaration );
151    this.groupFieldSelector = groupSelector;
152    this.size = numValues;
153    }
154
155  @Property(name = "ungroupFieldSelector", visibility = Visibility.PRIVATE)
156  @PropertyDescription("The fields to un-group.")
157  public Fields getGroupFieldSelector()
158    {
159    return groupFieldSelector;
160    }
161
162  @Property(name = "resultFieldSelectors", visibility = Visibility.PRIVATE)
163  @PropertyDescription("The result field selectors.")
164  public Fields[] getResultFieldSelectors()
165    {
166    return Util.copy( resultFieldSelectors );
167    }
168
169  public int getSize()
170    {
171    return size;
172    }
173
174  @Override
175  public void operate( FlowProcess flowProcess, FunctionCall functionCall )
176    {
177    if( resultFieldSelectors != null )
178      useResultSelectors( functionCall.getArguments(), functionCall.getOutputCollector() );
179    else
180      useSize( functionCall.getArguments(), functionCall.getOutputCollector() );
181    }
182
183  private void useSize( TupleEntry input, TupleEntryCollector outputCollector )
184    {
185    LOG.debug( "using size: {}", size );
186
187    Tuple tuple = new Tuple( input.getTuple() ); // make clone
188    Tuple group = tuple.remove( input.getFields(), groupFieldSelector );
189
190    for( int i = 0; i < tuple.size(); i = i + size )
191      {
192      Tuple result = new Tuple( group );
193      result.addAll( tuple.get( Fields.offsetSelector( size, i ).getPos() ) );
194
195      outputCollector.add( result );
196      }
197    }
198
199  private void useResultSelectors( TupleEntry input, TupleEntryCollector outputCollector )
200    {
201    LOG.debug( "using result selectors: {}", resultFieldSelectors.length );
202
203    for( Fields resultFieldSelector : resultFieldSelectors )
204      {
205      Tuple group = input.selectTupleCopy( groupFieldSelector ); // need a mutable copy
206
207      input.selectInto( resultFieldSelector, group );
208
209      outputCollector.add( group );
210      }
211    }
212
213  @Override
214  public boolean equals( Object object )
215    {
216    if( this == object )
217      return true;
218    if( !( object instanceof UnGroup ) )
219      return false;
220    if( !super.equals( object ) )
221      return false;
222
223    UnGroup unGroup = (UnGroup) object;
224
225    if( size != unGroup.size )
226      return false;
227    if( groupFieldSelector != null ? !groupFieldSelector.equals( unGroup.groupFieldSelector ) : unGroup.groupFieldSelector != null )
228      return false;
229    if( !Arrays.equals( resultFieldSelectors, unGroup.resultFieldSelectors ) )
230      return false;
231
232    return true;
233    }
234
235  @Override
236  public int hashCode()
237    {
238    int result = super.hashCode();
239    result = 31 * result + ( groupFieldSelector != null ? groupFieldSelector.hashCode() : 0 );
240    result = 31 * result + ( resultFieldSelectors != null ? Arrays.hashCode( resultFieldSelectors ) : 0 );
241    result = 31 * result + size;
242    return result;
243    }
244  }