Cascading 4.0 User Guide - Built-In Operations

1. Introduction

1.1. What Is Cascading?

2. Diving into the APIs

2.1. Anatomy of a Word-Count Application

3. Cascading Basic Concepts

3.1. Terminology

3.3. Pipes

3.4. Platforms

3.6. Sink Modes

3.7. Flows

4. Tuple Fields

4.1. Field Sets

5. Pipe Assemblies

5.1. Each and Every Pipes

5.2. Merge

5.3. GroupBy

5.4. CoGroup

5.5. HashJoin

6. Flows

6.1. Creating Flows from Pipe Assemblies

7. Cascades

7.1. Creating a Cascade

8. Configuring

8.1. Introduction

9. Local Platform

9.1. Building an Application

10. The Apache Hadoop Platforms

10.1. What is Apache Hadoop?

11. Apache Hadoop MapReduce Platform

11.1. Configuring Applications

11.3. Building

12. Apache Tez Platform

12.1. Configuring Applications

12.2. Building

13. Using and Developing Operations

13.1. Introduction

13.2. Functions

13.3. Filters

13.4. Aggregators

13.5. Buffers

14. Custom Taps and Schemes

14.1. Introduction

14.2. Custom Taps

15. Advanced Processing

15.1. SubAssemblies

16. Built-In Operations

16.1. Identity Function

16.9. Assertions

16.11. Buffers

17. Built-in SubAssemblies

17.1. Optimized Aggregations

18. Cascading Best Practices

18.1. Unit Testing

19. Extending Cascading

19.1. Scripting

20. Cookbook: Code Examples of Cascading Idioms

20.1. Tuples and Fields

20.5. API Usage

21. The Cascading Process Planner

21.1. FlowConnector

21.3. RuleRegistry

Built-In Operations

Identity Function

The cascading.operation.Identity function is used to "shape" a tuple stream.

Here are some common patterns that illustrate how Cascading "field algebra" works. (Note that, in actual practice, some of these example tasks might be better performed with helper subassemblies such as Rename, Retain, and Discard.)

Discard unused fields

Here the arguments of the Identity function are passed as results because the Fields.ARGS field declaration is coded on the line.

// incoming -> "ip", "time", "method", "event", "status", "size"

Identity identity = new Identity( Fields.ARGS );
Fields ipMethod = new Fields( "ip", "method" );
pipe =
  new Each( pipe, ipMethod, identity, Fields.RESULTS );

// outgoing -> "ip", "method"

In practice you can omit the field declaration, as Field.ARGS is the default declaration for the Identity function. You can also omit Fields.RESULTS, as it is the default for the Each pipe. Thus, simpler code yields the same result:

// incoming -> "ip", "time", "method", "event", "status", "size"

pipe = new Each( pipe, new Fields( "ip", "method" ), new Identity() );

// outgoing -> "ip", "method"
Rename all fields

Here Identity renames the incoming arguments. Since Fields.RESULTS is implied, the incoming Tuple is replaced by the selected arguments and given new field names as declared on Identity.

// incoming -> "ip", "method"

Identity identity = new Identity( new Fields( "address", "request" ) );
pipe = new Each( pipe, new Fields( "ip", "method" ), identity );

// outgoing -> "address", "request"

In the example above, if there were more fields than "ip" and "method," it would work fine — all the extra fields would be discarded. But if the same were true for the next example, the Cascading query planner would fail.

// incoming -> "ip", "method"

Identity identity = new Identity( new Fields( "address", "request" ) );
pipe = new Each( pipe, Fields.ALL, identity );

// outgoing -> "address", "request"

Since Fields.ALL is the default argument selector for the Each pipe, it can be left out as shown below. Again, the above and below examples will fail unless there are exactly two fields in the tuples of the incoming stream.

// incoming -> "ip", "method"

Identity identity = new Identity( new Fields( "address", "request" ) );
pipe = new Each( pipe, identity );

// outgoing -> "address", "request"
Rename a single field

Here we rename a single field and return it, along with an input Tuple field, as the result. All other fields are dropped.

// incoming -> "ip", "time", "method", "event", "status", "size"

Fields fieldSelector = new Fields( "address", "method" );
Identity identity = new Identity( new Fields( "address" ) );
pipe = new Each( pipe, new Fields( "ip" ), identity, fieldSelector );

// outgoing -> "address", "method"
Coerce values to specific primitive types

Here we replace the Tuple String values "status" and "size" with int and long values, respectively. All other fields are dropped.

// incoming -> "ip", "time", "method", "event", "status", "size"

Identity identity = new Identity( Integer.TYPE, Long.TYPE );
pipe = new Each( pipe, new Fields( "status", "size" ), identity );

// outgoing -> "status", "size"

Or we can replace just the Tuple String value "status" with an int, while keeping all the other values in the output Tuple.

// incoming -> "ip", "time", "method", "event", "status", "size"

Identity identity = new Identity( Integer.TYPE );
pipe =
  new Each( pipe, new Fields( "status" ), identity, Fields.REPLACE );

// outgoing -> "ip", "time", "method", "event", "status", "size"

Debug Function

The cascading.operation.Debug function is a utility function (actually, it’s a Filter) that prints the current argument Tuple to either stdout or stderr. Used with one of the DebugLevel enum values (NONE, DEFAULT, or VERBOSE), different debug levels can be embedded in a pipe assembly.

Nothing from the Debug output displays on the client side. Debug is only useful when testing in an IDE or if the remote logs are readily available.

Sample and Limit Functions

The Sample and Limit functions are used to limit the number of tuples that pass through a pipe assembly.

Sample

The cascading.operation.filter.Sample filter specifies a percentage of tuples that can pass through a pipe assembly.

Limit

The cascading.operation.filter.Limit sets a maximum number of tuples that can pass through a pipe assembly.

Insert Function

The cascading.operation.Insert function enables insertion of constant values into the tuple stream. This function is most useful in either of the following cases:

  • A tuple stream is split and one of the branches needs an identifying value.

  • A missing parameter or value, such as a date String for the current date, needs to be inserted.

Text Functions

Cascading includes a number of text functions in the cascading.operation.text package.

DateFormatter

The cascading.operation.text.DateFormatter function is used to convert a datestamp to a formatted String. This function expects a long value representing the number of milliseconds since January 1, 1970, 00:00:00 GMT/UTC, and formats the output using java.text.SimpleDateFormat syntax.

// "ts" -> 1188604863000

DateFormatter formatter =
  new DateFormatter( new Fields( "date" ), "dd/MMM/yyyy" );
pipe = new Each( pipe, new Fields( "ts" ), formatter );

// outgoing -> "date" -> 31/Aug/2007

The example above converts a long timestamp ("ts") to a date String.

DateParser

The cascading.operation.text.DateParser function is used to convert a text date-and-time String to a timestamp, using the java.text.SimpleDateFormat syntax. The timestamp is a long value representing the number of milliseconds since January 1, 1970, 00:00:00 GMT/UTC. By default, the output is a field with the name "ts" (for timestamp), but this can be overridden by passing a declared Fields value.

// "time" -> 01/Sep/2007:00:01:03 +0000

DateParser dateParser = new DateParser( "dd/MMM/yyyy:HH:mm:ss Z" );
pipe = new Each( pipe, new Fields( "time" ), dateParser );

// outgoing -> "ts" -> 1188604863000

In the example above, an Apache log-style date-time field is converted into a long timestamp in UTC.

FieldJoiner

The cascading.operation.text.FieldJoiner function joins all the values in a Tuple with a specified delimiter and places the result into a new field. (For the opposite effect, use the RegexSplitter function that is described below.)

FieldFormatter

The cascading.operation.text.FieldFormatter function formats Tuple values with a given String format and places the result into a new field. The java.util.Formatter class is used internally to create a new formatted String.

Regular Expression Operations

RegexSplitter

The cascading.operation.regex.RegexSplitter function splits an argument value based on a regex pattern String. (For the opposite effect, use the FieldJoiner function that is described above.)

Internally, this function uses java.util.regex.Pattern.split(), and it behaves accordingly. By default, it splits on the TAB character ("\t"). If it is known that a determinate number of values will emerge from this function, it can declare field names. In this case, if the splitter encounters more split values than field names, the remaining values are discarded. For more information, see java.util.regex.Pattern.split( input, limit ).

RegexParser

The cascading.operation.regex.RegexParser function is used to extract a regex-matched value from an incoming argument value. If the regular expression is sufficiently complex, an int array may be provided to specify which regex groups should be returned in which field names.

// incoming -> "line"

String regex =
  "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +" +
    "\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
Fields fieldDeclaration =
  new Fields( "ip", "time", "method", "event", "status", "size" );
int[] groups = {1, 2, 3, 4, 5, 6};
RegexParser parser = new RegexParser( fieldDeclaration, regex, groups );
assembly = new Each( assembly, new Fields( "line" ), parser );

// outgoing -> "ip", "time", "method", "event", "status", "size"

In the example above, a line from an Apache access log is parsed into its component parts. Note that the int[] groups array starts at 1, not 0. Group 0 is the whole group — if the first field is included, it is a copy of "line" and not "ip."

RegexReplace

The cascading.operation.regex.RegexReplace function is used to replace a regex-matched value with a specified replacement value. It can operate in a "replace all" or "replace first" mode. For more information, see the Javadoc for the java.util.regex.Matcher.replaceAll() and java.util.regex.Matcher.replaceFirst() methods.

// incoming -> "line"

RegexReplace replace =
  new RegexReplace( new Fields( "clean-line" ), "\\s+", " ", true );
assembly = new Each( assembly, new Fields( "line" ), replace );

// outgoing -> "clean-line"

In the example above, all adjoined white space characters are replaced with a single space character.

RegexFilter

The cascading.operation.regex.RegexFilter function filters a Tuple stream based on a specified regex value. By default, tuples that match the given pattern are kept, and tuples that do not match are filtered out. This can be reversed by setting "removeMatch" to true.

Also, by default, the whole Tuple is matched against the given regex String (in tab-delimited sections). If "matchEachElement" is set to true, the pattern is applied to each Tuple value individually. For more information, see the java.util.regex.Matcher.find() method.

// incoming -> "ip", "time", "method", "event", "status", "size"

Filter filter = new RegexFilter( "^68\\..*" );
assembly = new Each( assembly, new Fields( "ip" ), filter );

// outgoing -> "ip", "time", "method", "event", "status", "size"

The above keeps all lines in which "68." appears at the start of the IP address.

RegexGenerator

The cascading.operation.regex.RegexGenerator function generates a new Tuple for every String (found in an input tuple) that matches a specified regex pattern.

// incoming -> "line"

String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

// outgoing -> "word"

In the above example, each "line" in a document is parsed into unique words and stored in the "word" field of each result Tuple.

RegexSplitGenerator

The cascading.operation.regex.RegexSplitGenerator function generates a new Tuple for every split on the incoming argument value delimited by the given pattern String.

The behavior is similar to the RegexSplitter function, except that (assuming multiple matches) RegexSplitter generates a single Tuple that may contain multiple values. Another differing characteristic is that RegexSplitGenerator, as well as RegexGenerator, pass multiple Tuples, each of which contain only one value.

Java Expression Operations

As of 3.1, all expression operations have moved to the cascading-expression-3.x.y.jar JAR.

Cascading provides some support for dynamically-compiled Java expressions to be used in either Functions or Filters. This capability is provided by the Janino-embedded Java compiler, which compiles the expressions into byte code for optimal processing speed. Janino is documented in detail on its website: http://www.janino.net/.

This capability allows an Operation to evaluate a suitable one-line Java expression, such as a + 3 * 2 or a < 7, where the variable values (a and b) are passed in as Tuple fields. The result of the Operation thus depends on the evaluated result of the expression — in the first example, a Number, and in the second, a Boolean value.

ExpressionFunction

The function cascading.operation.expression.ExpressionFunction dynamically compiles a String expression when executed, assigning argument Tuple values to variables in the expression.

// incoming -> "ip", "time", "method", "event", "status", "size"

String exp =
  "\"this \" + method + \" request was \" + size + \" bytes\"";
Fields fields = new Fields( "pretty" );
ExpressionFunction function =
  new ExpressionFunction( fields, exp, String.class ); (1)

assembly =
  new Each( assembly, new Fields( "method", "size" ), function );

// outgoing -> "pretty" = "this GET request was 1282652 bytes"
1 Converts all field values to a String

Above, we return a new String value that contains values from the current Tuple.

ExpressionFilter

The filter cascading.operation.expression.ExpressionFilter evaluates a Boolean expression, assigning argument Tuple values to variables in the expression. If the expression returns true, the Tuple is removed from the stream.

// incoming -> "ip", "time", "method", "event", "status", "size"

ExpressionFilter filter =
  new ExpressionFilter( "status != 200", Integer.TYPE );

assembly = new Each( assembly, new Fields( "status" ), filter );

// outgoing -> "ip", "time", "method", "event", "status", "size"

In this example, every line in the Apache log that does not have a status of "200" is filtered out. ExpressionFilter coerces the value into the specified type if necessary to make the comparison. In this case, ExpressionFilter coerces the status String into an int.

Along with ExpressionFilter and ExpressionFunction, either cascading.operation.expression.ScriptFilter or cascading.operation.expression.ScriptFunction can be used when an expression must be invoked in multiple lines of Java code.

See the relevant Javadoc for details on usage.

XML Operations

To use XML Operations in a Cascading application, include the cascading-xml-x.y.z.jar in the project. When using the TagSoupParser operation, this module requires the TagSoup library, which provides support for HTML and XML "tidying." More information is available at the TagSoup website: http://home.ccil.org/~cowan/XML/tagsoup/.

XPathParser

The cascading.operation.adoc.XPathParser function uses one or more XPath expressions, passed into the constructor, to extract one or more node values from an XML document contained in the passed Tuple argument, and places the result(s) into one or more new fields in the current Tuple. In this way, it effectively parses an XML document into a table of fields, creating one Tuple field value for every given XPath expression. The Node is converted to a String type containing an XML document. If only the text values are required, search on the text() nodes or consider using XPathGenerator to handle multiple NodeList values. If the returned result of an XPath expression is a NodeList, only the first Node is used for the field value and the rest are ignored.

XPathGenerator

Similar to XPathParser, the cascading.operation.adoc.XPathGenerator function generates a new Tuple for every Node returned by the given XPath expression from the XML in the current Tuple.

XPathFilter

The filter cascading.operation.adoc.XPathFilter removes a Tuple if the specified XPath expression returns false. Set the removeMatch parameter to true if the filter should be reversed, i.e., to keep only those Tuples where the XPath expression returns true.

TagSoupParser

The cascading.operation.adoc.TagSoupParser function uses the TagSoup library to convert incoming HTML to clean XHTML. Use the setFeature( feature, value ) method to set TagSoup-specific features, which are documented on the TagSoup website.

Assertions

Cascading stream assertions are used to build robust reusable pipe assemblies. If desired, they can be planned out of a Flow instance at runtime. For more information, see Stream Assertions. The following documentation covers the assertions available in the core library.

AssertEquals

The cascading.operation.assertion.AssertEquals assertion specifies that the number of values given on the constructor is equal to the number of argument Tuple values. Each constructor value .equals() its corresponding argument value.

AssertNotEquals

The cascading.operation.assertion.AssertNotEquals assertion specifies that the number of values given on the constructor is equal to the number of argument Tuple values. Each constructor value is not equal to its corresponding argument value.

AssertEqualsAll

The cascading.operation.assertion.AssertEqualsAll assertion specifies that every value in the argument Tuple .equals() the single value given on the constructor.

AssertExpression

The cascading.operation.assertion.AssertExpression assertion dynamically resolves a given Java expression (see Java Expression Operations) using argument Tuple values. Any Tuple that returns true for the given expression passes the assertion.

AssertMatches

The cascading.operation.assertion.AssertMatches assertion matches the given regular expression pattern String agains the entire argument Tuple. The comparison is made possible by concatenating all the fields of the Tuple, separated by the TAB character (\t). If a match is found, the Tuple passes the assertion.

AssertMatchesAll

The cascading.operation.assertion.AssertMatchesAll assertion matches the given regular expression pattern String against each argument Tuple value individually.

AssertNotNull

The cascading.operation.assertion.AssertNotNull assertion specifies that every position or field in the argument Tuple is not null.

AssertNull

The cascading.operation.assertion.AssertNull assertion specifies that every position or field in the argument Tuple is null.

AssertSizeEquals

The cascading.operation.assertion.AssertSizeEquals assertion specifies a size for the current Tuple in the tuple stream. Size here is the number of fields in the Tuple, as returned by Tuple.size(). Note that some or all fields may be null.

AssertSizeLessThan

The cascading.operation.assertion.AssertSizeLessThan Assertion asserts that the current Tuple in the stream has a size less than (&lt;) the given size. Size, here, is the number of fields in the Tuple, as returned by Tuple.size(). Note that some or all fields may be null.

AssertSizeMoreThan

The cascading.operation.assertion.AssertSizeMoreThan assertion specifies that the current Tuple in the stream has a size greater than (&gt;) a given size. Size here is the number of fields in the Tuple, as returned by Tuple.size(). Note that some or all fields may be null.

AssertGroupSizeEquals

The cascading.operation.assertion.AssertGroupSizeEquals group assertion specifies the number of items in the current grouping, where "equals" is indicated with (==) before the given size. If a pattern String is given, only grouping keys that match the regular expression will have this assertion applied where multiple key values are delimited by a TAB character.

AssertGroupSizeLessThan

The cascading.operation.assertion.AssertGroupSizeEquals Group assertion specifies that the number of items in the current grouping is less than (&lt;) the given size. If a pattern String is given, only grouping keys that match the regular expression will have this assertion applied where multiple key values are delimited by a TAB character.

AssertGroupSizeMoreThan

The cascading.operation.assertion.AssertGroupSizeEquals group assertion specifies that the number of items in the current grouping is greater than (&gt;) the given size. If a pattern String is given, only grouping keys that match the regular expression will have this assertion applied where multiple key values are delimited by a TAB character.

Logical Filter Operators

The logical Filter operators allow you to combine multiple filters to run in a single Pipe, instead of chaining multiple Pipes together to get the same logical result.

And

The cascading.operation.filter.And Filter performs a logical "and" on the results of the constructor-provided Filter instances. If Filter#isRemove() returns true for all of the given instances, this filter returns true.

Or

The cascading.operation.filter.Or Filter performs a logical "or" on the results of the constructor-provided Filter instances. If Filter#isRemove() returns true for any of the given instances, this filter returns true.

Not

The cascading.operation.filter.Not Filter performs a logical "not" (negation) on the results of the constructor-provided Filter instance. If Filter#isRemove() returns true for the given instance, this filter returns false. Conversely, if Filter#isRemove() returns false for the given instance, this filter returns true.

Xor

The cascading.operation.filter.Xor Filter performs a logical "xor" (exclusive or) on the results of the constructor-provided Filter instances. Xor can only be applied to two instances at a time. It returns true if the two instances have different truth values, and false if they have the same truth value. If Filter.isRemove() returns true for both, or returns false for both, this filter returns false; otherwise it returns true.

Example 1. Combining filters
// incoming -> "ip", "time", "method", "event", "status", "size"

FilterNull filterNull = new FilterNull();
RegexFilter regexFilter = new RegexFilter( "(GET|HEAD|POST)" );

And andFilter = new And( filterNull, regexFilter );

assembly = new Each( assembly, new Fields( "method" ), andFilter );

// outgoing -> "ip", "time", "method", "event", "status", "size"

The example above performs a logical "and" on the two filters. Criteria for both filters must be met for the data to pass through this one Pipe.

Buffers

The FirstNBuffer Buffer is provided as an optimized means to determine the top N elements in a grouping.

FirstNBuffer

The cascading.operation.buffer.FirstNBuffer Buffer returns the first N tuples seen in a given grouping. Unlike the cascading.pipe.assembly.FirstBy AggregateBy and cascading.operation.aggregator.First Aggregator, FirstNBuffer will stop iterating the available tuples when the top N condition is met. FirstNBuffer is used by cascading.pipe.assembly.Unique.