Cascading 4.0 User Guide - Built-In Operations
- 1. Introduction
-
1.1. What Is Cascading?
1.2. Another Perspective
1.3. Why Use Cascading?
1.5. Who Are the Users?
- 2. Diving into the APIs
- 3. Cascading Basic Concepts
-
3.1. Terminology
3.2. Pipe Assemblies
3.3. Pipes
3.4. Platforms
3.6. Sink Modes
3.7. Flows
- 4. Tuple Fields
-
4.1. Field Sets
4.2. Field Algebra
4.3. Field Typing
4.4. Type Coercion
- 5. Pipe Assemblies
-
5.1. Each and Every Pipes
5.2. Merge
5.3. GroupBy
5.4. CoGroup
5.5. HashJoin
- 6. Flows
-
6.1. Creating Flows from Pipe Assemblies
6.2. Configuring Flows
6.3. Skipping Flows
6.6. Runtime Metrics
- 7. Cascades
-
7.1. Creating a Cascade
- 8. Configuring
-
8.1. Introduction
8.2. Creating Properties
8.3. Passing Properties
- 9. Local Platform
-
9.3. Source and Sink Taps
- 10. The Apache Hadoop Platforms
-
10.1. What is Apache Hadoop?
10.4. Configuring Applications
10.5. Building an Application
10.6. Executing an Application
10.8. Source and Sink Taps
10.9. Custom Taps and Schemes
- 11. Apache Hadoop MapReduce Platform
-
11.1. Configuring Applications
11.3. Building
- 12. Apache Tez Platform
-
12.1. Configuring Applications
12.2. Building
- 13. Using and Developing Operations
-
13.1. Introduction
13.2. Functions
13.3. Filters
13.4. Aggregators
13.5. Buffers
- 14. Custom Taps and Schemes
-
14.1. Introduction
14.2. Custom Taps
14.3. Custom Schemes
14.5. Tap Life-Cycle Methods
- 15. Advanced Processing
-
15.1. SubAssemblies
15.2. Stream Assertions
15.3. Failure Traps
15.4. Checkpointing
15.7. PartitionTaps
- 16. Built-In Operations
-
16.1. Identity Function
16.2. Debug Function
16.4. Insert Function
16.5. Text Functions
16.8. XML Operations
16.9. Assertions
16.10. Logical Filter Operators
16.11. Buffers
- 17. Built-in SubAssemblies
-
17.1. Optimized Aggregations
17.2. Stream Shaping
- 18. Cascading Best Practices
-
18.1. Unit Testing
18.2. Flow Granularity
18.7. Optimizing Joins
18.8. Debugging Streams
18.11. Fields Constants
18.12. Checking the Source Code
- 19. Extending Cascading
-
19.1. Scripting
- 20. Cookbook: Code Examples of Cascading Idioms
-
20.1. Tuples and Fields
20.2. Stream Shaping
20.3. Common Operations
20.4. Stream Ordering
20.5. API Usage
- 21. The Cascading Process Planner
-
21.1. FlowConnector
21.2. RuleRegistrySet
21.3. RuleRegistry
Built-In Operations
Identity Function
The cascading.operation.Identity function is used to "shape" a tuple stream.
Here are some common patterns that illustrate how Cascading "field algebra" works. (Note that, in actual practice, some of these example tasks might be better performed with helper subassemblies such as Rename, Retain, and Discard.)
- Discard unused fields
-
Here the arguments of the Identity function are passed as results because the Fields.ARGS field declaration is coded on the line.
// incoming -> "ip", "time", "method", "event", "status", "size" Identity identity = new Identity( Fields.ARGS ); Fields ipMethod = new Fields( "ip", "method" ); pipe = new Each( pipe, ipMethod, identity, Fields.RESULTS ); // outgoing -> "ip", "method"
In practice you can omit the field declaration, as Field.ARGS is the default declaration for the Identity function. You can also omit Fields.RESULTS, as it is the default for the Each pipe. Thus, simpler code yields the same result:
// incoming -> "ip", "time", "method", "event", "status", "size" pipe = new Each( pipe, new Fields( "ip", "method" ), new Identity() ); // outgoing -> "ip", "method"
- Rename all fields
-
Here Identity renames the incoming arguments. Since Fields.RESULTS is implied, the incoming Tuple is replaced by the selected arguments and given new field names as declared on Identity.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, new Fields( "ip", "method" ), identity ); // outgoing -> "address", "request"
In the example above, if there were more fields than "ip" and "method," it would work fine — all the extra fields would be discarded. But if the same were true for the next example, the Cascading query planner would fail.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, Fields.ALL, identity ); // outgoing -> "address", "request"
Since Fields.ALL is the default argument selector for the Each pipe, it can be left out as shown below. Again, the above and below examples will fail unless there are exactly two fields in the tuples of the incoming stream.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, identity ); // outgoing -> "address", "request"
- Rename a single field
-
Here we rename a single field and return it, along with an input Tuple field, as the result. All other fields are dropped.
// incoming -> "ip", "time", "method", "event", "status", "size" Fields fieldSelector = new Fields( "address", "method" ); Identity identity = new Identity( new Fields( "address" ) ); pipe = new Each( pipe, new Fields( "ip" ), identity, fieldSelector ); // outgoing -> "address", "method"
- Coerce values to specific primitive types
-
Here we replace the Tuple String values "status" and "size" with int and long values, respectively. All other fields are dropped.
// incoming -> "ip", "time", "method", "event", "status", "size" Identity identity = new Identity( Integer.TYPE, Long.TYPE ); pipe = new Each( pipe, new Fields( "status", "size" ), identity ); // outgoing -> "status", "size"
Or we can replace just the Tuple String value "status" with an int, while keeping all the other values in the output Tuple.
// incoming -> "ip", "time", "method", "event", "status", "size" Identity identity = new Identity( Integer.TYPE ); pipe = new Each( pipe, new Fields( "status" ), identity, Fields.REPLACE ); // outgoing -> "ip", "time", "method", "event", "status", "size"
Debug Function
The cascading.operation.Debug function is a utility function (actually, it’s a Filter) that prints the current argument Tuple to either stdout or stderr. Used with one of the DebugLevel enum values (NONE, DEFAULT, or VERBOSE), different debug levels can be embedded in a pipe assembly.
Nothing from the Debug output displays on the client side. Debug is only useful when testing in an IDE or if the remote logs are readily available.
Sample and Limit Functions
The Sample and Limit functions are used to limit the number of tuples that pass through a pipe assembly.
- Sample
-
The cascading.operation.filter.Sample filter specifies a percentage of tuples that can pass through a pipe assembly.
- Limit
-
The cascading.operation.filter.Limit sets a maximum number of tuples that can pass through a pipe assembly.
Insert Function
The cascading.operation.Insert function enables insertion of constant values into the tuple stream. This function is most useful in either of the following cases:
-
A tuple stream is split and one of the branches needs an identifying value.
-
A missing parameter or value, such as a date String for the current date, needs to be inserted.
Text Functions
Cascading includes a number of text functions in the cascading.operation.text package.
- DateFormatter
-
The cascading.operation.text.DateFormatter function is used to convert a datestamp to a formatted String. This function expects a long value representing the number of milliseconds since January 1, 1970, 00:00:00 GMT/UTC, and formats the output using java.text.SimpleDateFormat syntax.
// "ts" -> 1188604863000 DateFormatter formatter = new DateFormatter( new Fields( "date" ), "dd/MMM/yyyy" ); pipe = new Each( pipe, new Fields( "ts" ), formatter ); // outgoing -> "date" -> 31/Aug/2007
The example above converts a long timestamp ("ts") to a date String.
- DateParser
-
The cascading.operation.text.DateParser function is used to convert a text date-and-time String to a timestamp, using the java.text.SimpleDateFormat syntax. The timestamp is a long value representing the number of milliseconds since January 1, 1970, 00:00:00 GMT/UTC. By default, the output is a field with the name "ts" (for timestamp), but this can be overridden by passing a declared Fields value.
// "time" -> 01/Sep/2007:00:01:03 +0000 DateParser dateParser = new DateParser( "dd/MMM/yyyy:HH:mm:ss Z" ); pipe = new Each( pipe, new Fields( "time" ), dateParser ); // outgoing -> "ts" -> 1188604863000
In the example above, an Apache log-style date-time field is converted into a long timestamp in UTC.
- FieldJoiner
-
The cascading.operation.text.FieldJoiner function joins all the values in a Tuple with a specified delimiter and places the result into a new field. (For the opposite effect, use the RegexSplitter function that is described below.)
- FieldFormatter
-
The cascading.operation.text.FieldFormatter function formats Tuple values with a given String format and places the result into a new field. The java.util.Formatter class is used internally to create a new formatted String.
Regular Expression Operations
- RegexSplitter
-
The cascading.operation.regex.RegexSplitter function splits an argument value based on a regex pattern String. (For the opposite effect, use the FieldJoiner function that is described above.)
Internally, this function uses java.util.regex.Pattern.split(), and it behaves accordingly. By default, it splits on the TAB character ("\t"). If it is known that a determinate number of values will emerge from this function, it can declare field names. In this case, if the splitter encounters more split values than field names, the remaining values are discarded. For more information, see java.util.regex.Pattern.split( input, limit ).
- RegexParser
-
The cascading.operation.regex.RegexParser function is used to extract a regex-matched value from an incoming argument value. If the regular expression is sufficiently complex, an int array may be provided to specify which regex groups should be returned in which field names.
// incoming -> "line" String regex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +" + "\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$"; Fields fieldDeclaration = new Fields( "ip", "time", "method", "event", "status", "size" ); int[] groups = {1, 2, 3, 4, 5, 6}; RegexParser parser = new RegexParser( fieldDeclaration, regex, groups ); assembly = new Each( assembly, new Fields( "line" ), parser ); // outgoing -> "ip", "time", "method", "event", "status", "size"
In the example above, a line from an Apache access log is parsed into its component parts. Note that the int[] groups array starts at 1, not 0. Group 0 is the whole group — if the first field is included, it is a copy of "line" and not "ip."
- RegexReplace
-
The cascading.operation.regex.RegexReplace function is used to replace a regex-matched value with a specified replacement value. It can operate in a "replace all" or "replace first" mode. For more information, see the Javadoc for the java.util.regex.Matcher.replaceAll() and java.util.regex.Matcher.replaceFirst() methods.
// incoming -> "line" RegexReplace replace = new RegexReplace( new Fields( "clean-line" ), "\\s+", " ", true ); assembly = new Each( assembly, new Fields( "line" ), replace ); // outgoing -> "clean-line"
In the example above, all adjoined white space characters are replaced with a single space character.
- RegexFilter
-
The cascading.operation.regex.RegexFilter function filters a Tuple stream based on a specified regex value. By default, tuples that match the given pattern are kept, and tuples that do not match are filtered out. This can be reversed by setting "removeMatch" to true.
Also, by default, the whole Tuple is matched against the given regex String (in tab-delimited sections). If "matchEachElement" is set to true, the pattern is applied to each Tuple value individually. For more information, see the java.util.regex.Matcher.find() method.
// incoming -> "ip", "time", "method", "event", "status", "size" Filter filter = new RegexFilter( "^68\\..*" ); assembly = new Each( assembly, new Fields( "ip" ), filter ); // outgoing -> "ip", "time", "method", "event", "status", "size"
The above keeps all lines in which "68." appears at the start of the IP address.
- RegexGenerator
-
The cascading.operation.regex.RegexGenerator function generates a new Tuple for every String (found in an input tuple) that matches a specified regex pattern.
// incoming -> "line" String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; Function function = new RegexGenerator( new Fields( "word" ), regex ); assembly = new Each( assembly, new Fields( "line" ), function ); // outgoing -> "word"
In the above example, each "line" in a document is parsed into unique words and stored in the "word" field of each result Tuple.
- RegexSplitGenerator
-
The cascading.operation.regex.RegexSplitGenerator function generates a new Tuple for every split on the incoming argument value delimited by the given pattern String.
The behavior is similar to the RegexSplitter function, except that (assuming multiple matches) RegexSplitter generates a single Tuple that may contain multiple values. Another differing characteristic is that RegexSplitGenerator, as well as RegexGenerator, pass multiple Tuples, each of which contain only one value.
Java Expression Operations
As of 3.1, all expression operations have moved to the cascading-expression-3.x.y.jar JAR.
Cascading provides some support for dynamically-compiled Java expressions to be used in either Functions or Filters. This capability is provided by the Janino-embedded Java compiler, which compiles the expressions into byte code for optimal processing speed. Janino is documented in detail on its website: http://www.janino.net/.
This capability allows an Operation to evaluate a suitable one-line Java expression, such as a + 3 * 2 or a < 7, where the variable values (a and b) are passed in as Tuple fields. The result of the Operation thus depends on the evaluated result of the expression — in the first example, a Number, and in the second, a Boolean value.
- ExpressionFunction
-
The function cascading.operation.expression.ExpressionFunction dynamically compiles a String expression when executed, assigning argument Tuple values to variables in the expression.
// incoming -> "ip", "time", "method", "event", "status", "size" String exp = "\"this \" + method + \" request was \" + size + \" bytes\""; Fields fields = new Fields( "pretty" ); ExpressionFunction function = new ExpressionFunction( fields, exp, String.class ); (1) assembly = new Each( assembly, new Fields( "method", "size" ), function ); // outgoing -> "pretty" = "this GET request was 1282652 bytes"
1 Converts all field values to a String Above, we return a new String value that contains values from the current Tuple.
- ExpressionFilter
-
The filter cascading.operation.expression.ExpressionFilter evaluates a Boolean expression, assigning argument Tuple values to variables in the expression. If the expression returns true, the Tuple is removed from the stream.
// incoming -> "ip", "time", "method", "event", "status", "size" ExpressionFilter filter = new ExpressionFilter( "status != 200", Integer.TYPE ); assembly = new Each( assembly, new Fields( "status" ), filter ); // outgoing -> "ip", "time", "method", "event", "status", "size"
In this example, every line in the Apache log that does not have a status of "200" is filtered out. ExpressionFilter coerces the value into the specified type if necessary to make the comparison. In this case, ExpressionFilter coerces the status String into an int.
Along with ExpressionFilter and ExpressionFunction, either cascading.operation.expression.ScriptFilter or cascading.operation.expression.ScriptFunction can be used when an expression must be invoked in multiple lines of Java code.
See the relevant Javadoc for details on usage.
XML Operations
To use XML Operations in a Cascading application, include the cascading-xml-x.y.z.jar in the project. When using the TagSoupParser operation, this module requires the TagSoup library, which provides support for HTML and XML "tidying." More information is available at the TagSoup website: http://home.ccil.org/~cowan/XML/tagsoup/.
- XPathParser
-
The cascading.operation.adoc.XPathParser function uses one or more XPath expressions, passed into the constructor, to extract one or more node values from an XML document contained in the passed Tuple argument, and places the result(s) into one or more new fields in the current Tuple. In this way, it effectively parses an XML document into a table of fields, creating one Tuple field value for every given XPath expression. The Node is converted to a String type containing an XML document. If only the text values are required, search on the text() nodes or consider using XPathGenerator to handle multiple NodeList values. If the returned result of an XPath expression is a NodeList, only the first Node is used for the field value and the rest are ignored.
- XPathGenerator
-
Similar to XPathParser, the cascading.operation.adoc.XPathGenerator function generates a new Tuple for every Node returned by the given XPath expression from the XML in the current Tuple.
- XPathFilter
-
The filter cascading.operation.adoc.XPathFilter removes a Tuple if the specified XPath expression returns false. Set the removeMatch parameter to true if the filter should be reversed, i.e., to keep only those Tuples where the XPath expression returns true.
- TagSoupParser
-
The cascading.operation.adoc.TagSoupParser function uses the TagSoup library to convert incoming HTML to clean XHTML. Use the setFeature( feature, value ) method to set TagSoup-specific features, which are documented on the TagSoup website.
Assertions
Cascading stream assertions are used to build robust reusable pipe assemblies. If desired, they can be planned out of a Flow instance at runtime. For more information, see Stream Assertions. The following documentation covers the assertions available in the core library.
- AssertEquals
-
The cascading.operation.assertion.AssertEquals assertion specifies that the number of values given on the constructor is equal to the number of argument Tuple values. Each constructor value .equals() its corresponding argument value.
- AssertNotEquals
-
The cascading.operation.assertion.AssertNotEquals assertion specifies that the number of values given on the constructor is equal to the number of argument Tuple values. Each constructor value is not equal to its corresponding argument value.
- AssertEqualsAll
-
The cascading.operation.assertion.AssertEqualsAll assertion specifies that every value in the argument Tuple .equals() the single value given on the constructor.
- AssertExpression
-
The cascading.operation.assertion.AssertExpression assertion dynamically resolves a given Java expression (see Java Expression Operations) using argument Tuple values. Any Tuple that returns true for the given expression passes the assertion.
- AssertMatches
-
The cascading.operation.assertion.AssertMatches assertion matches the given regular expression pattern String agains the entire argument Tuple. The comparison is made possible by concatenating all the fields of the Tuple, separated by the TAB character (\t). If a match is found, the Tuple passes the assertion.
- AssertMatchesAll
-
The cascading.operation.assertion.AssertMatchesAll assertion matches the given regular expression pattern String against each argument Tuple value individually.
- AssertNotNull
-
The cascading.operation.assertion.AssertNotNull assertion specifies that every position or field in the argument Tuple is not null.
- AssertNull
-
The cascading.operation.assertion.AssertNull assertion specifies that every position or field in the argument Tuple is null.
- AssertSizeEquals
-
The cascading.operation.assertion.AssertSizeEquals assertion specifies a size for the current Tuple in the tuple stream. Size here is the number of fields in the Tuple, as returned by Tuple.size(). Note that some or all fields may be null.
- AssertSizeLessThan
-
The cascading.operation.assertion.AssertSizeLessThan Assertion asserts that the current Tuple in the stream has a size less than (<) the given size. Size, here, is the number of fields in the Tuple, as returned by Tuple.size(). Note that some or all fields may be null.
- AssertSizeMoreThan
-
The cascading.operation.assertion.AssertSizeMoreThan assertion specifies that the current Tuple in the stream has a size greater than (>) a given size. Size here is the number of fields in the Tuple, as returned by Tuple.size(). Note that some or all fields may be null.
- AssertGroupSizeEquals
-
The cascading.operation.assertion.AssertGroupSizeEquals group assertion specifies the number of items in the current grouping, where "equals" is indicated with (==) before the given size. If a pattern String is given, only grouping keys that match the regular expression will have this assertion applied where multiple key values are delimited by a TAB character.
- AssertGroupSizeLessThan
-
The cascading.operation.assertion.AssertGroupSizeEquals Group assertion specifies that the number of items in the current grouping is less than (<) the given size. If a pattern String is given, only grouping keys that match the regular expression will have this assertion applied where multiple key values are delimited by a TAB character.
- AssertGroupSizeMoreThan
-
The cascading.operation.assertion.AssertGroupSizeEquals group assertion specifies that the number of items in the current grouping is greater than (>) the given size. If a pattern String is given, only grouping keys that match the regular expression will have this assertion applied where multiple key values are delimited by a TAB character.
Logical Filter Operators
The logical Filter operators allow you to combine multiple filters to run in a single Pipe, instead of chaining multiple Pipes together to get the same logical result.
- And
-
The cascading.operation.filter.And Filter performs a logical "and" on the results of the constructor-provided Filter instances. If Filter#isRemove() returns true for all of the given instances, this filter returns true.
- Or
-
The cascading.operation.filter.Or Filter performs a logical "or" on the results of the constructor-provided Filter instances. If Filter#isRemove() returns true for any of the given instances, this filter returns true.
- Not
-
The cascading.operation.filter.Not Filter performs a logical "not" (negation) on the results of the constructor-provided Filter instance. If Filter#isRemove() returns true for the given instance, this filter returns false. Conversely, if Filter#isRemove() returns false for the given instance, this filter returns true.
- Xor
-
The cascading.operation.filter.Xor Filter performs a logical "xor" (exclusive or) on the results of the constructor-provided Filter instances. Xor can only be applied to two instances at a time. It returns true if the two instances have different truth values, and false if they have the same truth value. If Filter.isRemove() returns true for both, or returns false for both, this filter returns false; otherwise it returns true.
// incoming -> "ip", "time", "method", "event", "status", "size"
FilterNull filterNull = new FilterNull();
RegexFilter regexFilter = new RegexFilter( "(GET|HEAD|POST)" );
And andFilter = new And( filterNull, regexFilter );
assembly = new Each( assembly, new Fields( "method" ), andFilter );
// outgoing -> "ip", "time", "method", "event", "status", "size"
The example above performs a logical "and" on the two filters. Criteria for both filters must be met for the data to pass through this one Pipe.
Buffers
The FirstNBuffer Buffer is provided as an optimized means to determine the top N elements in a grouping.
- FirstNBuffer
-
The cascading.operation.buffer.FirstNBuffer Buffer returns the first N tuples seen in a given grouping. Unlike the cascading.pipe.assembly.FirstBy AggregateBy and cascading.operation.aggregator.First Aggregator, FirstNBuffer will stop iterating the available tuples when the top N condition is met. FirstNBuffer is used by cascading.pipe.assembly.Unique.