001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.aggregator;
022
023import java.beans.ConstructorProperties;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027
028import cascading.flow.FlowProcess;
029import cascading.operation.Aggregator;
030import cascading.operation.AggregatorCall;
031import cascading.operation.BaseOperation;
032import cascading.operation.OperationCall;
033import cascading.tuple.Fields;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036
037/** Class ExtentBase is the base class for First and Last. */
038public abstract class ExtentBase extends BaseOperation<Tuple[]> implements Aggregator<Tuple[]>
039  {
040  /** Field ignoreTuples */
041  private final Collection<Tuple> ignoreTuples;
042
043  @ConstructorProperties({"fieldDeclaration"})
044  protected ExtentBase( Fields fieldDeclaration )
045    {
046    super( fieldDeclaration );
047    this.ignoreTuples = null;
048    }
049
050  @ConstructorProperties({"numArgs", "fieldDeclaration"})
051  protected ExtentBase( int numArgs, Fields fieldDeclaration )
052    {
053    super( numArgs, fieldDeclaration );
054    ignoreTuples = null;
055    }
056
057  @ConstructorProperties({"fieldDeclaration", "ignoreTuples"})
058  protected ExtentBase( Fields fieldDeclaration, Tuple... ignoreTuples )
059    {
060    super( fieldDeclaration );
061    this.ignoreTuples = new HashSet<Tuple>();
062    Collections.addAll( this.ignoreTuples, ignoreTuples );
063    }
064
065  public Collection<Tuple> getIgnoreTuples()
066    {
067    return Collections.unmodifiableCollection( ignoreTuples );
068    }
069
070  @Override
071  public void prepare( FlowProcess flowProcess, OperationCall<Tuple[]> operationCall )
072    {
073    operationCall.setContext( new Tuple[ 1 ] );
074    }
075
076  @Override
077  public void start( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
078    {
079    aggregatorCall.getContext()[ 0 ] = null;
080    }
081
082  @Override
083  public void aggregate( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
084    {
085    if( ignoreTuples != null && ignoreTuples.contains( aggregatorCall.getArguments().getTuple() ) )
086      return;
087
088    performOperation( aggregatorCall.getContext(), aggregatorCall.getArguments() );
089    }
090
091  protected abstract void performOperation( Tuple[] context, TupleEntry entry );
092
093  @Override
094  public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
095    {
096    if( aggregatorCall.getContext()[ 0 ] != null )
097      aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
098    }
099
100  protected Tuple getResult( AggregatorCall<Tuple[]> aggregatorCall )
101    {
102    return aggregatorCall.getContext()[ 0 ];
103    }
104
105  @Override
106  public boolean equals( Object object )
107    {
108    if( this == object )
109      return true;
110    if( !( object instanceof ExtentBase ) )
111      return false;
112    if( !super.equals( object ) )
113      return false;
114
115    ExtentBase that = (ExtentBase) object;
116
117    if( ignoreTuples != null ? !ignoreTuples.equals( that.ignoreTuples ) : that.ignoreTuples != null )
118      return false;
119
120    return true;
121    }
122
123  @Override
124  public int hashCode()
125    {
126    int result = super.hashCode();
127    result = 31 * result + ( ignoreTuples != null ? ignoreTuples.hashCode() : 0 );
128    return result;
129    }
130  }