001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.filter;
022
023import java.beans.ConstructorProperties;
024
025import cascading.flow.FlowProcess;
026import cascading.flow.stream.StopDataNotificationException;
027import cascading.operation.BaseOperation;
028import cascading.operation.Filter;
029import cascading.operation.FilterCall;
030import cascading.operation.OperationCall;
031import cascading.util.Traceable;
032
033/**
034 * Class Stop is a {@link Filter} class that will force the current pipeline to stop reading data.
035 * <p>
036 * When the nested filter returns {@code true} on {@link Filter#isRemove(FlowProcess, FilterCall)}, a
037 * {@link StopDataNotificationException} will be thrown.
038 * <p>
039 * Currently this only works reliably using the LocalFlowConnector.
040 */
041public class Stop extends BaseOperation implements Filter
042  {
043  /** Field filter */
044  private final Filter filter;
045
046  /**
047   * Constructor Not creates a new Not instance.
048   *
049   * @param filter of type Filter
050   */
051  @ConstructorProperties({"filter"})
052  public Stop( Filter filter )
053    {
054    this.filter = filter;
055
056    if( filter == null )
057      throw new IllegalArgumentException( "filter may not be null" );
058    }
059
060  public Filter getFilter()
061    {
062    return filter;
063    }
064
065  @Override
066  public void prepare( FlowProcess flowProcess, OperationCall operationCall )
067    {
068    filter.prepare( flowProcess, operationCall );
069    }
070
071  @Override
072  public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall )
073    {
074    if( filter.isRemove( flowProcess, filterCall ) )
075      throw new StopDataNotificationException( "data stopped on filter: " + getFilterString() );
076
077    return false;
078    }
079
080  protected String getFilterString()
081    {
082    String string = filter.toString();
083
084    if( filter instanceof Traceable )
085      string += " @ " + ( (Traceable) filter ).getTrace();
086
087    return string;
088    }
089
090  @Override
091  public void cleanup( FlowProcess flowProcess, OperationCall operationCall )
092    {
093    filter.cleanup( flowProcess, operationCall );
094    }
095
096  @Override
097  public boolean equals( Object object )
098    {
099    if( this == object )
100      return true;
101    if( !( object instanceof Stop ) )
102      return false;
103    if( !super.equals( object ) )
104      return false;
105
106    Stop not = (Stop) object;
107
108    if( filter != null ? !filter.equals( not.filter ) : not.filter != null )
109      return false;
110
111    return true;
112    }
113
114  @Override
115  public int hashCode()
116    {
117    int result = super.hashCode();
118    result = 31 * result + ( filter != null ? filter.hashCode() : 0 );
119    return result;
120    }
121  }