001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation.filter; 022 023import java.beans.ConstructorProperties; 024 025import cascading.flow.FlowProcess; 026import cascading.flow.stream.StopDataNotificationException; 027import cascading.operation.BaseOperation; 028import cascading.operation.Filter; 029import cascading.operation.FilterCall; 030import cascading.operation.OperationCall; 031import cascading.util.Traceable; 032 033/** 034 * Class Stop is a {@link Filter} class that will force the current pipeline to stop reading data. 035 * <p> 036 * When the nested filter returns {@code true} on {@link Filter#isRemove(FlowProcess, FilterCall)}, a 037 * {@link StopDataNotificationException} will be thrown. 038 * <p> 039 * Currently this only works reliably using the LocalFlowConnector. 040 */ 041public class Stop extends BaseOperation implements Filter 042 { 043 /** Field filter */ 044 private final Filter filter; 045 046 /** 047 * Constructor Not creates a new Not instance. 048 * 049 * @param filter of type Filter 050 */ 051 @ConstructorProperties({"filter"}) 052 public Stop( Filter filter ) 053 { 054 this.filter = filter; 055 056 if( filter == null ) 057 throw new IllegalArgumentException( "filter may not be null" ); 058 } 059 060 public Filter getFilter() 061 { 062 return filter; 063 } 064 065 @Override 066 public void prepare( FlowProcess flowProcess, OperationCall operationCall ) 067 { 068 filter.prepare( flowProcess, operationCall ); 069 } 070 071 @Override 072 public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall ) 073 { 074 if( filter.isRemove( flowProcess, filterCall ) ) 075 throw new StopDataNotificationException( "data stopped on filter: " + getFilterString() ); 076 077 return false; 078 } 079 080 protected String getFilterString() 081 { 082 String string = filter.toString(); 083 084 if( filter instanceof Traceable ) 085 string += " @ " + ( (Traceable) filter ).getTrace(); 086 087 return string; 088 } 089 090 @Override 091 public void cleanup( FlowProcess flowProcess, OperationCall operationCall ) 092 { 093 filter.cleanup( flowProcess, operationCall ); 094 } 095 096 @Override 097 public boolean equals( Object object ) 098 { 099 if( this == object ) 100 return true; 101 if( !( object instanceof Stop ) ) 102 return false; 103 if( !super.equals( object ) ) 104 return false; 105 106 Stop not = (Stop) object; 107 108 if( filter != null ? !filter.equals( not.filter ) : not.filter != null ) 109 return false; 110 111 return true; 112 } 113 114 @Override 115 public int hashCode() 116 { 117 int result = super.hashCode(); 118 result = 31 * result + ( filter != null ? filter.hashCode() : 0 ); 119 return result; 120 } 121 }