001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.function;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Arrays;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.management.annotation.Property;
028    import cascading.management.annotation.PropertyDescription;
029    import cascading.management.annotation.Visibility;
030    import cascading.operation.BaseOperation;
031    import cascading.operation.Function;
032    import cascading.operation.FunctionCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    import cascading.tuple.TupleEntryCollector;
037    import cascading.util.Util;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * Class UnGroup is a {@link Function} that will 'un-group' data from a given dataset.
043     * <p/>
044     * That is, for the given field positions, this function will emit a new Tuple for every value. For example:
045     * <p/>
046     * <pre>
047     * A, x, y
048     * B, x, z
049     * C, y, z
050     * </pre>
051     * <p/>
052     * to:
053     * <p/>
054     * <pre>
055     * A, x
056     * A, y
057     * B, x
058     * B, z
059     * C, y
060     * C, z
061     * </pre>
062     */
063    public class UnGroup extends BaseOperation implements Function
064      {
065      /** Field LOG */
066      private static final Logger LOG = LoggerFactory.getLogger( UnGroup.class );
067    
068      /** Field groupFieldSelector */
069      private Fields groupFieldSelector;
070      /** Field resultFieldSelectors */
071      private Fields[] resultFieldSelectors;
072      /** Field size */
073      private int size = 1;
074    
075      /**
076       * Constructor UnGroup creates a new UnGroup instance.
077       *
078       * @param groupSelector  of type Fields
079       * @param valueSelectors of type Fields[]
080       */
081      @ConstructorProperties({"groupSelector", "valueSelectors"})
082      public UnGroup( Fields groupSelector, Fields[] valueSelectors )
083        {
084        if( valueSelectors == null || valueSelectors.length == 1 )
085          throw new IllegalArgumentException( "value selectors may not be empty" );
086    
087        int size = valueSelectors[ 0 ].size();
088    
089        for( int i = 1; i < valueSelectors.length; i++ )
090          {
091          if( valueSelectors[ 0 ].size() != valueSelectors[ i ].size() )
092            throw new IllegalArgumentException( "all value selectors must be the same size" );
093    
094          size = valueSelectors[ i ].size();
095          }
096    
097        this.numArgs = groupSelector.size() + size * valueSelectors.length;
098        this.groupFieldSelector = groupSelector;
099        this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length );
100        this.fieldDeclaration = Fields.size( groupSelector.size() + size );
101        }
102    
103      /**
104       * Constructor UnGroup creates a new UnGroup instance.
105       *
106       * @param fieldDeclaration of type Fields
107       * @param groupSelector    of type Fields
108       * @param valueSelectors   of type Fields[]
109       */
110      @ConstructorProperties({"fieldDeclaration", "groupSelector", "valueSelectors"})
111      public UnGroup( Fields fieldDeclaration, Fields groupSelector, Fields[] valueSelectors )
112        {
113        super( fieldDeclaration );
114    
115        if( valueSelectors == null || valueSelectors.length == 1 )
116          throw new IllegalArgumentException( "value selectors may not be empty" );
117    
118        numArgs = groupSelector.size();
119        int selectorSize = -1;
120    
121        for( Fields resultFieldSelector : valueSelectors )
122          {
123          numArgs += resultFieldSelector.size();
124          int fieldSize = groupSelector.size() + resultFieldSelector.size();
125    
126          if( selectorSize != -1 && selectorSize != resultFieldSelector.size() )
127            throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" );
128    
129          selectorSize = resultFieldSelector.size();
130    
131          if( fieldDeclaration.size() != fieldSize )
132            throw new IllegalArgumentException( "all value selectors must be the same size, and this size plus group selector size must equal the declared field size" );
133          }
134    
135        this.groupFieldSelector = groupSelector;
136        this.resultFieldSelectors = Arrays.copyOf( valueSelectors, valueSelectors.length );
137        }
138    
139      /**
140       * Constructor UnGroup creates a new UnGroup instance. Where the numValues argument specifies the number
141       * of values to include.
142       *
143       * @param fieldDeclaration of type Fields
144       * @param groupSelector    of type Fields
145       * @param numValues        of type int
146       */
147      @ConstructorProperties({"fieldDeclaration", "groupSelector", "numValues"})
148      public UnGroup( Fields fieldDeclaration, Fields groupSelector, int numValues )
149        {
150        super( fieldDeclaration );
151        this.groupFieldSelector = groupSelector;
152        this.size = numValues;
153        }
154    
155      @Property(name = "ungroupFieldSelector", visibility = Visibility.PRIVATE)
156      @PropertyDescription("The fields to un-group.")
157      public Fields getGroupFieldSelector()
158        {
159        return groupFieldSelector;
160        }
161    
162      @Property(name = "resultFieldSelectors", visibility = Visibility.PRIVATE)
163      @PropertyDescription("The result field selectors.")
164      public Fields[] getResultFieldSelectors()
165        {
166        return Util.copy( resultFieldSelectors );
167        }
168    
169      public int getSize()
170        {
171        return size;
172        }
173    
174      @Override
175      public void operate( FlowProcess flowProcess, FunctionCall functionCall )
176        {
177        if( resultFieldSelectors != null )
178          useResultSelectors( functionCall.getArguments(), functionCall.getOutputCollector() );
179        else
180          useSize( functionCall.getArguments(), functionCall.getOutputCollector() );
181        }
182    
183      private void useSize( TupleEntry input, TupleEntryCollector outputCollector )
184        {
185        LOG.debug( "using size: {}", size );
186    
187        Tuple tuple = new Tuple( input.getTuple() ); // make clone
188        Tuple group = tuple.remove( input.getFields(), groupFieldSelector );
189    
190        for( int i = 0; i < tuple.size(); i = i + size )
191          {
192          Tuple result = new Tuple( group );
193          result.addAll( tuple.get( Fields.offsetSelector( size, i ).getPos() ) );
194    
195          outputCollector.add( result );
196          }
197        }
198    
199      private void useResultSelectors( TupleEntry input, TupleEntryCollector outputCollector )
200        {
201        LOG.debug( "using result selectors: {}", resultFieldSelectors.length );
202    
203        for( Fields resultFieldSelector : resultFieldSelectors )
204          {
205          Tuple group = input.selectTupleCopy( groupFieldSelector ); // need a mutable copy
206    
207          input.selectInto( resultFieldSelector, group );
208    
209          outputCollector.add( group );
210          }
211        }
212    
213      @Override
214      public boolean equals( Object object )
215        {
216        if( this == object )
217          return true;
218        if( !( object instanceof UnGroup ) )
219          return false;
220        if( !super.equals( object ) )
221          return false;
222    
223        UnGroup unGroup = (UnGroup) object;
224    
225        if( size != unGroup.size )
226          return false;
227        if( groupFieldSelector != null ? !groupFieldSelector.equals( unGroup.groupFieldSelector ) : unGroup.groupFieldSelector != null )
228          return false;
229        if( !Arrays.equals( resultFieldSelectors, unGroup.resultFieldSelectors ) )
230          return false;
231    
232        return true;
233        }
234    
235      @Override
236      public int hashCode()
237        {
238        int result = super.hashCode();
239        result = 31 * result + ( groupFieldSelector != null ? groupFieldSelector.hashCode() : 0 );
240        result = 31 * result + ( resultFieldSelectors != null ? Arrays.hashCode( resultFieldSelectors ) : 0 );
241        result = 31 * result + size;
242        return result;
243        }
244      }