001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.assertion;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.regex.Matcher;
025    import java.util.regex.Pattern;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.management.annotation.Property;
029    import cascading.management.annotation.PropertyDescription;
030    import cascading.management.annotation.Visibility;
031    import cascading.operation.GroupAssertion;
032    import cascading.operation.GroupAssertionCall;
033    import cascading.operation.OperationCall;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    /**
040     *
041     */
042    public abstract class AssertGroupBase extends BaseAssertion<AssertGroupBase.Context> implements GroupAssertion<AssertGroupBase.Context>
043      {
044      /** Field LOG */
045      private static final Logger LOG = LoggerFactory.getLogger( AssertGroupBase.class );
046    
047      /** Field patternString */
048      protected String patternString;
049    
050      /** Field size */
051      protected final long size;
052    
053      public static class Context
054        {
055        Pattern pattern;
056        Long count;
057        String fields;
058        String group;
059    
060        public Context set( long count, String fields, String group )
061          {
062          this.count = count;
063          this.fields = fields;
064          this.group = group;
065    
066          return this;
067          }
068    
069        public Context reset()
070          {
071          count = null;
072          fields = null;
073          group = null;
074    
075          return this;
076          }
077        }
078    
079      @ConstructorProperties({"message", "size"})
080      public AssertGroupBase( String message, long size )
081        {
082        super( message );
083        this.size = size;
084        }
085    
086      @ConstructorProperties({"message", "patternString", "size"})
087      protected AssertGroupBase( String message, String patternString, long size )
088        {
089        super( message );
090        this.patternString = patternString;
091        this.size = size;
092        }
093    
094      @Property(name = "patternString", visibility = Visibility.PRIVATE)
095      @PropertyDescription("The regular expression pattern string.")
096      public String getPatternString()
097        {
098        return patternString;
099        }
100    
101      public long getSize()
102        {
103        return size;
104        }
105    
106      private Pattern getPattern()
107        {
108        Pattern pattern;
109    
110        if( patternString == null )
111          pattern = Pattern.compile( ".*" );
112        else
113          pattern = Pattern.compile( patternString );
114    
115        return pattern;
116        }
117    
118      private boolean matchWholeTuple( Tuple input, Pattern pattern )
119        {
120        if( patternString == null )
121          return true;
122    
123        Matcher matcher = pattern.matcher( input.toString( "\t", false ) );
124    
125        LOG.debug( "pattern: {}, matches: {}", pattern, matcher.matches() );
126    
127        return matcher.matches();
128        }
129    
130      @Override
131      public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
132        {
133        operationCall.setContext( new Context() );
134        operationCall.getContext().pattern = getPattern();
135        }
136    
137      @Override
138      public void start( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
139        {
140        TupleEntry groupEntry = assertionCall.getGroup();
141        Context context = assertionCall.getContext();
142    
143        // didn't match, so skip
144        if( !matchWholeTuple( groupEntry.getTuple(), context.pattern ) )
145          context.reset();
146        else
147          context.set( 0L, groupEntry.getFields().print(), groupEntry.getTuple().print() );
148        }
149    
150      @Override
151      public void aggregate( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
152        {
153        Long groupSize = assertionCall.getContext().count;
154    
155        // didn't match, so skip
156        if( groupSize != null )
157          assertionCall.getContext().count += 1L;
158        }
159    
160      @Override
161      public void doAssert( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
162        {
163        Context context = assertionCall.getContext();
164        Long groupSize = context.count;
165    
166        if( groupSize == null ) // didn't match, so skip
167          return;
168    
169        if( assertFails( groupSize ) )
170          {
171          if( patternString == null )
172            fail( groupSize, size, context.fields, context.group );
173          else
174            fail( patternString, groupSize, size, context.fields, context.group );
175          }
176        }
177    
178      protected abstract boolean assertFails( Long groupSize );
179    
180      @Override
181      public boolean equals( Object object )
182        {
183        if( this == object )
184          return true;
185        if( !( object instanceof AssertGroupBase ) )
186          return false;
187        if( !super.equals( object ) )
188          return false;
189    
190        AssertGroupBase that = (AssertGroupBase) object;
191    
192        if( size != that.size )
193          return false;
194        if( patternString != null ? !patternString.equals( that.patternString ) : that.patternString != null )
195          return false;
196    
197        return true;
198        }
199    
200      @Override
201      public int hashCode()
202        {
203        int result = super.hashCode();
204        result = 31 * result + ( patternString != null ? patternString.hashCode() : 0 );
205        result = 31 * result + (int) ( size ^ size >>> 32 );
206        return result;
207        }
208      }