Cascading 4.0 User Guide - Tuple Fields
- 1. Introduction
-
1.1. What Is Cascading?
1.2. Another Perspective
1.3. Why Use Cascading?
1.5. Who Are the Users?
- 2. Diving into the APIs
- 3. Cascading Basic Concepts
-
3.1. Terminology
3.2. Pipe Assemblies
3.3. Pipes
3.4. Platforms
3.6. Sink Modes
3.7. Flows
- 4. Tuple Fields
-
4.1. Field Sets
4.2. Field Algebra
4.3. Field Typing
4.4. Type Coercion
- 5. Pipe Assemblies
-
5.1. Each and Every Pipes
5.2. Merge
5.3. GroupBy
5.4. CoGroup
5.5. HashJoin
- 6. Flows
-
6.1. Creating Flows from Pipe Assemblies
6.2. Configuring Flows
6.3. Skipping Flows
6.6. Runtime Metrics
- 7. Cascades
-
7.1. Creating a Cascade
- 8. Configuring
-
8.1. Introduction
8.2. Creating Properties
8.3. Passing Properties
- 9. Local Platform
-
9.3. Source and Sink Taps
- 10. The Apache Hadoop Platforms
-
10.1. What is Apache Hadoop?
10.4. Configuring Applications
10.5. Building an Application
10.6. Executing an Application
10.8. Source and Sink Taps
10.9. Custom Taps and Schemes
- 11. Apache Hadoop MapReduce Platform
-
11.1. Configuring Applications
11.3. Building
- 12. Apache Tez Platform
-
12.1. Configuring Applications
12.2. Building
- 13. Using and Developing Operations
-
13.1. Introduction
13.2. Functions
13.3. Filters
13.4. Aggregators
13.5. Buffers
- 14. Custom Taps and Schemes
-
14.1. Introduction
14.2. Custom Taps
14.3. Custom Schemes
14.5. Tap Life-Cycle Methods
- 15. Advanced Processing
-
15.1. SubAssemblies
15.2. Stream Assertions
15.3. Failure Traps
15.4. Checkpointing
15.7. PartitionTaps
- 16. Built-In Operations
-
16.1. Identity Function
16.2. Debug Function
16.4. Insert Function
16.5. Text Functions
16.8. XML Operations
16.9. Assertions
16.10. Logical Filter Operators
16.11. Buffers
- 17. Built-in SubAssemblies
-
17.1. Optimized Aggregations
17.2. Stream Shaping
- 18. Cascading Best Practices
-
18.1. Unit Testing
18.2. Flow Granularity
18.7. Optimizing Joins
18.8. Debugging Streams
18.11. Fields Constants
18.12. Checking the Source Code
- 19. Extending Cascading
-
19.1. Scripting
- 20. Cookbook: Code Examples of Cascading Idioms
-
20.1. Tuples and Fields
20.2. Stream Shaping
20.3. Common Operations
20.4. Stream Ordering
20.5. API Usage
- 21. The Cascading Process Planner
-
21.1. FlowConnector
21.2. RuleRegistrySet
21.3. RuleRegistry
Tuple Fields
As data stored in tuples moves through a pipe assembly, individual columns or fields are added, removed, or modified in place as they pass through each pipe.
Tuple fields are manipulated through the use of the
cascading.tuple.Fields class. Fields can be used to either declare
a given set of fields (like fields are declared on a class definition) or to
select a set of fields from another tuple. For example, if you declare a Tuple
has the following field names [first, last]
, you can later select [last]
from the Tuple for use as an argument to an Operation. Equivalently, you can
select the second position with the ordinal [1]
or the last position with the
ordinal [-1]
.
Both Each and Every pipes process an input Tuple with the following steps:
-
The pipe selects fields from the input Tuple as arguments to the child Operation (Function ,Filter , Aggregator, or Buffer).
-
The pipe receives results from the Operation.
-
Operation results are returned. The results can be a mix of incoming Tuple values and Operation Tuple results.
This is achieved by providing new Fields instances that select or declare the required field names or ordinals as appropriate. When selecting arguments for an Operation, or the outgoing fields of the pipe, Cascading provides Fields sets, a feature of the Fields class that provides a sort of wildcard tool for referencing sets of field values without declaring them specifically.
There are also cases where Fields sets can be used when fields should be declared. All these cases are described below.
To see Fields sets in action, see the Identity function documentation. |
Field Sets
These predefined Fields sets are constant values on the Fields class. They can be used in many places where the Fields class is expected. They are:
- Fields.ALL
-
The cascading.tuple.Fields.ALL constant is a wildcard that represents all the current available fields.
// incoming -> first, last, age
String expression = "first + \" \" + last";
Fields fields = new Fields( "full" );
ExpressionFunction full =
new ExpressionFunction( fields, expression, String.class );
assembly =
new Each( assembly, new Fields( "first", "last" ), full, Fields.ALL );
// outgoing -> first, last, age, full
- Fields.RESULTS
-
The cascading.tuple.Fields.RESULTS constant is used to represent the field names of the current-operation return values. This Fields set can only be used as an output selector on a pipe, causing the pipe to output a tuple containing the operation results.
// incoming -> first, last, age
String expression = "first + \" \" + last";
Fields fields = new Fields( "full" );
ExpressionFunction full =
new ExpressionFunction( fields, expression, String.class );
Fields firstLast = new Fields( "first", "last" );
assembly =
new Each( assembly, firstLast, full, Fields.RESULTS );
// outgoing -> full
- Fields.REPLACE
-
The cascading.tuple.Fields.REPLACE constant is used as an output selector to inline-replace values in the incoming tuple with the results of an operation. This convenient Fields set allows operations to overwrite the value stored in the specified field. The current operation must either specify the identical argument selector field names used by the pipe, or use the ARGS Fields set.
// incoming -> first, last, age
// coerce to int
Identity function = new Identity( Fields.ARGS, Integer.class );
Fields age = new Fields( "age" );
assembly = new Each( assembly, age, function, Fields.REPLACE );
// outgoing -> first, last, age
- Fields.SWAP
-
The cascading.tuple.Fields.SWAP constant is used as an output selector to swap the operation arguments with its results. Neither the argument and result field names, nor the size, need to be the same. This is useful for when the operation arguments are no longer necessary and the result Fields and values should be appended to the remainder of the input field names and Tuple.
// incoming -> first, last, age
String expression = "first + \" \" + last";
Fields fields = new Fields( "full" );
ExpressionFunction full =
new ExpressionFunction( fields, expression, String.class );
Fields firstLast = new Fields( "first", "last" );
assembly = new Each( assembly, firstLast, full, Fields.SWAP );
// outgoing -> age, full
- Fields.ARGS
-
The cascading.tuple.Fields.ARGS constant is used to let a given operation inherit the field names of its argument Tuple. This Fields set is a convenience and is typically used when the Pipe output selector is RESULTS or REPLACE. It is specifically used by the Identity function when coercing values from Strings to primitive types.
// incoming -> first, last, age
// coerce to int
Identity function = new Identity( Fields.ARGS, Integer.class );
Fields age = new Fields( "age" );
assembly = new Each( assembly, age, function, Fields.REPLACE );
// outgoing -> first, last, age
- Fields.GROUP
-
The cascading.tuple.Fields.GROUP constant represents all the fields used as the grouping key in the most recent grouping. If no previous grouping exists in the pipe assembly, GROUP represents all the current field names.
// incoming -> first, last, age
assembly = new GroupBy( assembly, new Fields( "first", "last" ) );
FieldJoiner full = new FieldJoiner( new Fields( "full" ), " " );
assembly = new Each( assembly, Fields.GROUP, full, Fields.ALL );
// outgoing -> first, last, age, full
- Fields.VALUES
-
The cascading.tuple.Fields.VALUES constant represents all the fields not used as grouping fields in a previous Group. That is, if you have fields a, b, and c, and group on a, Fields.VALUES resolves to b and c.
// incoming -> first, last, age
assembly = new GroupBy( assembly, new Fields( "age" ) );
FieldJoiner full = new FieldJoiner( new Fields( "full" ), " " );
assembly = new Each( assembly, Fields.VALUES, full, Fields.ALL );
// outgoing -> first, last, age, full
- Fields.UNKNOWN
-
The cascading.tuple.Fields.UNKNOWN constant is used when Fields must be declared, but neither quantity nor names of the fields are known. This allows for processing tuples of arbitrary length from an input source or an operation.
Use cascading.tuple.Fields.UNKNOWN with caution. |
// incoming -> line
RegexSplitter function = new RegexSplitter( Fields.UNKNOWN, "\t" );
Fields fields = new Fields( "line" );
assembly =
new Each( assembly, fields, function, Fields.RESULTS );
// outgoing -> unknown
- Fields.NONE
-
The cascading.tuple.Fields.NONE constant is used to specify no fields. This constant is typically used as an argument selector for Operations that do not process any tuples, like cascading.operation.Insert.
// incoming -> first, last, age
Insert constant = new Insert( new Fields( "zip" ), "77373" );
assembly = new Each( assembly, Fields.NONE, constant, Fields.ALL );
// outgoing -> first, last, age, zip
The cascading.tuple.Fields.NONE constant can also be used when performing a join with CoGroup or HashJoin when a full cross product is desired across all data. But doing this type of join degrades performance. |
Field Algebra
The chart below shows common ways to merge input and result fields for the desired output fields. A few minutes with this chart may help clarify the discussion of fields, tuples, and pipes. Also see Each and Every Pipes for details on the different columns and their relationships to the Each and Every pipes and functions, aggregators, and buffers.
Field Typing
The Fields class can hold type information for each field, and the Cascading planner can propagate that information from source Tap instances to downstream Operations through to sink Tap instances.
This allows for:
-
Taps to read and store type information for external systems and applications
-
Error detection during joins (detecting non-comparable types)
-
Enforcement of canonical representations within the Tuple (prevent a field from switching arbitrarily between String and Integer types)
-
Pluggable coercion from one type to another type, even if either is not a Java primitive
-
Much improved data serialization/network performance between map and reduce nodes, or between Tez vertices. New in 3.1
To declare types, simply pass type information to the Fields instance either through the constructor or via a fluent API.
Fields resultFields = new Fields( "count", Long.class ); // null is ok
Fields resultFields = new Fields( "count" ).applyTypes( long.class ); // null becomes 0
Note the first example uses Long.class, and the second long.class. Since Long is an object, we are letting Cascading know that the null value can be set. If declared long (a primitive) then, null becomes zero.
In practice, typed fields can only be used when they declare the results of an Operation or the input/output of a Scheme.
Pipe assembly = new Pipe( "assembly" );
// ...
Fields groupingFields = new Fields( "date" );
// note we do not pass the parent assembly Pipe in
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size", long.class );
SumBy sumBy = new SumBy( valueField, sumField );
Fields countField = new Fields( "num-events" );
CountBy countBy = new CountBy( countField );
assembly = new AggregateBy( assembly, groupingFields, sumBy, countBy );
Here the type information serves two roles.
First, it allows a downstream consumer of the field value to know the type maintained in the tuple.
Second, the SumBy subassembly now has a simpler API and can get the type information it needs internally to perform the aggregation directly from the Fields instance.
Note that the TextDelimited and other Scheme classes should have any type information declared so it can be maintained by the Cascading planner. Custom Scheme types also have the opportunity to read type information from any field or data sources they represent so it can be passed to the planner during runtime.
Type Coercion
Type coercion is a means to convert one data type to another. For example, parsing the Java String "42" to the Integer 42 would be coercion. Or more simply, converting a Long 42 to a Double 42.0. Cascading supports primitive type coercions natively through the cascading.tuple.coerce.Coercions class.
In practice, developers implicitly invoke coercions via the cascading.tuple.TupleEntry interface by requesting a Long or String representation of a field, via TupleEntry.getLong() or TupleEntry.getString(), respectively.
Or when data is set on a Tuple via TupleEntry.setLong() or TupleEntry.setString(), for example. If the field was declared as an Integer, and TupleEntry.setString( "someField", "42" ) was called, the value of "someFields" will be coerced into its canonical form, 42.
To create custom coercions, the cascading.tuple.type.CoercibleType interface must be implemented, and instances of CoercibleType can be used as the Type accepted by the Fields API as CoercibleType extends java.lang.reflect.Type (as does java.lang.Class).
Cascading provides a cascading.tuple.type.DateType implementation to allow for coercions between date strings and the Long canonical type.
SimpleDateFormat dateFormat = new SimpleDateFormat( "dd/MMM/yyyy:HH:mm:ss:SSS Z" );
Date firstDate = new Date();
String stringFirstDate = dateFormat.format( firstDate );
CoercibleType coercible = new DateType( "dd/MMM/yyyy:HH:mm:ss:SSS Z", TimeZone.getDefault() );
// create the Fields, Tuple, and TupleEntry
Fields fields = new Fields( "dateString", "dateValue" ).applyTypes( coercible, long.class );
Tuple tuple = new Tuple( firstDate.getTime(), firstDate.getTime() );
TupleEntry results = new TupleEntry( fields, tuple );
// test the results
assert results.getObject( "dateString" ).equals( firstDate.getTime() );
assert results.getLong( "dateString" ) == firstDate.getTime();
assert results.getString( "dateString" ).equals( stringFirstDate );
assert !results.getString( "dateString" ).equals( results.getString( "dateValue" ) ); // not equals
Date secondDate = new Date( firstDate.getTime() + ( 60 * 1000 ) );
String stringSecondDate = dateFormat.format( secondDate );
results.setString( "dateString", stringSecondDate );
results.setLong( "dateValue", secondDate.getTime() );
assert !results.getObject( "dateString" ).equals( firstDate.getTime() ); // equals
assert results.getObject( "dateString" ).equals( secondDate.getTime() ); // not equals
In this example we declare the "dateString" field to be a DateType. DateType maintains the value of the field as a long internally, but if a String is set or requested, it will be converted using the given SimpleDateFormat String against the given TimeZone.
In the case of a TextDelimited CSV file, where one column is a date value, DateType can be used to declare its format allowing TextDelimited to read and write the value as a String, but use the value internally (in the Tuple) as a long, which is much more efficient.