Cascading 4.0 User Guide - Tuple Fields

1. Introduction

1.1. What Is Cascading?

2. Diving into the APIs

2.1. Anatomy of a Word-Count Application

3. Cascading Basic Concepts

3.1. Terminology

3.3. Pipes

3.4. Platforms

3.6. Sink Modes

3.7. Flows

4. Tuple Fields

4.1. Field Sets

5. Pipe Assemblies

5.1. Each and Every Pipes

5.2. Merge

5.3. GroupBy

5.4. CoGroup

5.5. HashJoin

6. Flows

6.1. Creating Flows from Pipe Assemblies

7. Cascades

7.1. Creating a Cascade

8. Configuring

8.1. Introduction

9. Local Platform

9.1. Building an Application

10. The Apache Hadoop Platforms

10.1. What is Apache Hadoop?

11. Apache Hadoop MapReduce Platform

11.1. Configuring Applications

11.3. Building

12. Apache Tez Platform

12.1. Configuring Applications

12.2. Building

13. Using and Developing Operations

13.1. Introduction

13.2. Functions

13.3. Filters

13.4. Aggregators

13.5. Buffers

14. Custom Taps and Schemes

14.1. Introduction

14.2. Custom Taps

15. Advanced Processing

15.1. SubAssemblies

16. Built-In Operations

16.1. Identity Function

16.9. Assertions

16.11. Buffers

17. Built-in SubAssemblies

17.1. Optimized Aggregations

18. Cascading Best Practices

18.1. Unit Testing

19. Extending Cascading

19.1. Scripting

20. Cookbook: Code Examples of Cascading Idioms

20.1. Tuples and Fields

20.5. API Usage

21. The Cascading Process Planner

21.1. FlowConnector

21.3. RuleRegistry

Tuple Fields

As data stored in tuples moves through a pipe assembly, individual columns or fields are added, removed, or modified in place as they pass through each pipe.

Tuple fields are manipulated through the use of the cascading.tuple.Fields class. Fields can be used to either declare a given set of fields (like fields are declared on a class definition) or to select a set of fields from another tuple. For example, if you declare a Tuple has the following field names [first, last], you can later select [last] from the Tuple for use as an argument to an Operation. Equivalently, you can select the second position with the ordinal [1] or the last position with the ordinal [-1].

Both Each and Every pipes process an input Tuple with the following steps:

  1. The pipe selects fields from the input Tuple as arguments to the child Operation (Function ,Filter , Aggregator, or Buffer).

  2. The pipe receives results from the Operation.

  3. Operation results are returned. The results can be a mix of incoming Tuple values and Operation Tuple results.

This is achieved by providing new Fields instances that select or declare the required field names or ordinals as appropriate. When selecting arguments for an Operation, or the outgoing fields of the pipe, Cascading provides Fields sets, a feature of the Fields class that provides a sort of wildcard tool for referencing sets of field values without declaring them specifically.

There are also cases where Fields sets can be used when fields should be declared. All these cases are described below.

To see Fields sets in action, see the Identity function documentation.

Field Sets

These predefined Fields sets are constant values on the Fields class. They can be used in many places where the Fields class is expected. They are:

Fields.ALL

The cascading.tuple.Fields.ALL constant is a wildcard that represents all the current available fields.

// incoming -> first, last, age

String expression = "first + \" \" + last";
Fields fields = new Fields( "full" );
ExpressionFunction full =
  new ExpressionFunction( fields, expression, String.class );

assembly =
  new Each( assembly, new Fields( "first", "last" ), full, Fields.ALL );

// outgoing -> first, last, age, full
Fields.RESULTS

The cascading.tuple.Fields.RESULTS constant is used to represent the field names of the current-operation return values. This Fields set can only be used as an output selector on a pipe, causing the pipe to output a tuple containing the operation results.

// incoming -> first, last, age

String expression = "first + \" \" + last";
Fields fields = new Fields( "full" );
ExpressionFunction full =
  new ExpressionFunction( fields, expression, String.class );

Fields firstLast = new Fields( "first", "last" );
assembly =
  new Each( assembly, firstLast, full, Fields.RESULTS );

// outgoing -> full
Fields.REPLACE

The cascading.tuple.Fields.REPLACE constant is used as an output selector to inline-replace values in the incoming tuple with the results of an operation. This convenient Fields set allows operations to overwrite the value stored in the specified field. The current operation must either specify the identical argument selector field names used by the pipe, or use the ARGS Fields set.

// incoming -> first, last, age

// coerce to int
Identity function = new Identity( Fields.ARGS, Integer.class );

Fields age = new Fields( "age" );
assembly = new Each( assembly, age, function, Fields.REPLACE );

// outgoing -> first, last, age
Fields.SWAP

The cascading.tuple.Fields.SWAP constant is used as an output selector to swap the operation arguments with its results. Neither the argument and result field names, nor the size, need to be the same. This is useful for when the operation arguments are no longer necessary and the result Fields and values should be appended to the remainder of the input field names and Tuple.

// incoming -> first, last, age

String expression = "first + \" \" + last";
Fields fields = new Fields( "full" );
ExpressionFunction full =
  new ExpressionFunction( fields, expression, String.class );

Fields firstLast = new Fields( "first", "last" );
assembly = new Each( assembly, firstLast, full, Fields.SWAP );

// outgoing -> age, full
Fields.ARGS

The cascading.tuple.Fields.ARGS constant is used to let a given operation inherit the field names of its argument Tuple. This Fields set is a convenience and is typically used when the Pipe output selector is RESULTS or REPLACE. It is specifically used by the Identity function when coercing values from Strings to primitive types.

// incoming -> first, last, age

// coerce to int
Identity function = new Identity( Fields.ARGS, Integer.class );

Fields age = new Fields( "age" );
assembly = new Each( assembly, age, function, Fields.REPLACE );

// outgoing -> first, last, age
Fields.GROUP

The cascading.tuple.Fields.GROUP constant represents all the fields used as the grouping key in the most recent grouping. If no previous grouping exists in the pipe assembly, GROUP represents all the current field names.

// incoming -> first, last, age

assembly = new GroupBy( assembly, new Fields( "first", "last" ) );

FieldJoiner full = new FieldJoiner( new Fields( "full" ), " " );

assembly = new Each( assembly, Fields.GROUP, full, Fields.ALL );

// outgoing -> first, last, age, full
Fields.VALUES

The cascading.tuple.Fields.VALUES constant represents all the fields not used as grouping fields in a previous Group. That is, if you have fields a, b, and c, and group on a, Fields.VALUES resolves to b and c.

// incoming -> first, last, age

assembly = new GroupBy( assembly, new Fields( "age" ) );

FieldJoiner full = new FieldJoiner( new Fields( "full" ), " " );

assembly = new Each( assembly, Fields.VALUES, full, Fields.ALL );

// outgoing -> first, last, age, full
Fields.UNKNOWN

The cascading.tuple.Fields.UNKNOWN constant is used when Fields must be declared, but neither quantity nor names of the fields are known. This allows for processing tuples of arbitrary length from an input source or an operation.

Use cascading.tuple.Fields.UNKNOWN with caution.
// incoming -> line

RegexSplitter function = new RegexSplitter( Fields.UNKNOWN, "\t" );

Fields fields = new Fields( "line" );
assembly =
  new Each( assembly, fields, function, Fields.RESULTS );

// outgoing -> unknown
Fields.NONE

The cascading.tuple.Fields.NONE constant is used to specify no fields. This constant is typically used as an argument selector for Operations that do not process any tuples, like cascading.operation.Insert.

// incoming -> first, last, age

Insert constant = new Insert( new Fields( "zip" ), "77373" );

assembly = new Each( assembly, Fields.NONE, constant, Fields.ALL );

// outgoing -> first, last, age, zip
The cascading.tuple.Fields.NONE constant can also be used when performing a join with CoGroup or HashJoin when a full cross product is desired across all data. But doing this type of join degrades performance.

Field Algebra

The chart below shows common ways to merge input and result fields for the desired output fields. A few minutes with this chart may help clarify the discussion of fields, tuples, and pipes. Also see Each and Every Pipes for details on the different columns and their relationships to the Each and Every pipes and functions, aggregators, and buffers.

field algebra

Field Typing

The Fields class can hold type information for each field, and the Cascading planner can propagate that information from source Tap instances to downstream Operations through to sink Tap instances.

This allows for:

  • Taps to read and store type information for external systems and applications

  • Error detection during joins (detecting non-comparable types)

  • Enforcement of canonical representations within the Tuple (prevent a field from switching arbitrarily between String and Integer types)

  • Pluggable coercion from one type to another type, even if either is not a Java primitive

  • Much improved data serialization/network performance between map and reduce nodes, or between Tez vertices. New in 3.1

To declare types, simply pass type information to the Fields instance either through the constructor or via a fluent API.

Example 1. Constructor
Fields resultFields = new Fields( "count", Long.class ); // null is ok
Example 2. Fluent
Fields resultFields = new Fields( "count" ).applyTypes( long.class ); // null becomes 0

Note the first example uses Long.class, and the second long.class. Since Long is an object, we are letting Cascading know that the null value can be set. If declared long (a primitive) then, null becomes zero.

In practice, typed fields can only be used when they declare the results of an Operation or the input/output of a Scheme.

Example 3. Declaring typed results
Pipe assembly = new Pipe( "assembly" );

// ...
Fields groupingFields = new Fields( "date" );

// note we do not pass the parent assembly Pipe in
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size", long.class );
SumBy sumBy = new SumBy( valueField, sumField );

Fields countField = new Fields( "num-events" );
CountBy countBy = new CountBy( countField );

assembly = new AggregateBy( assembly, groupingFields, sumBy, countBy );

Here the type information serves two roles.

First, it allows a downstream consumer of the field value to know the type maintained in the tuple.

Second, the SumBy subassembly now has a simpler API and can get the type information it needs internally to perform the aggregation directly from the Fields instance.

Note that the TextDelimited and other Scheme classes should have any type information declared so it can be maintained by the Cascading planner. Custom Scheme types also have the opportunity to read type information from any field or data sources they represent so it can be passed to the planner during runtime.

Type Coercion

Type coercion is a means to convert one data type to another. For example, parsing the Java String "42" to the Integer 42 would be coercion. Or more simply, converting a Long 42 to a Double 42.0. Cascading supports primitive type coercions natively through the cascading.tuple.coerce.Coercions class.

In practice, developers implicitly invoke coercions via the cascading.tuple.TupleEntry interface by requesting a Long or String representation of a field, via TupleEntry.getLong() or TupleEntry.getString(), respectively.

Or when data is set on a Tuple via TupleEntry.setLong() or TupleEntry.setString(), for example. If the field was declared as an Integer, and TupleEntry.setString( "someField", "42" ) was called, the value of "someFields" will be coerced into its canonical form, 42.

To create custom coercions, the cascading.tuple.type.CoercibleType interface must be implemented, and instances of CoercibleType can be used as the Type accepted by the Fields API as CoercibleType extends java.lang.reflect.Type (as does java.lang.Class).

Cascading provides a cascading.tuple.type.DateType implementation to allow for coercions between date strings and the Long canonical type.

Example 4. Date type
SimpleDateFormat dateFormat = new SimpleDateFormat( "dd/MMM/yyyy:HH:mm:ss:SSS Z" );
Date firstDate = new Date();
String stringFirstDate = dateFormat.format( firstDate );

CoercibleType coercible = new DateType( "dd/MMM/yyyy:HH:mm:ss:SSS Z", TimeZone.getDefault() );

// create the Fields, Tuple, and TupleEntry
Fields fields = new Fields( "dateString", "dateValue" ).applyTypes( coercible, long.class );
Tuple tuple = new Tuple( firstDate.getTime(), firstDate.getTime() );
TupleEntry results = new TupleEntry( fields, tuple );

// test the results
assert results.getObject( "dateString" ).equals( firstDate.getTime() );
assert results.getLong( "dateString" ) == firstDate.getTime();
assert results.getString( "dateString" ).equals( stringFirstDate );
assert !results.getString( "dateString" ).equals( results.getString( "dateValue" ) ); // not equals

Date secondDate = new Date( firstDate.getTime() + ( 60 * 1000 ) );
String stringSecondDate = dateFormat.format( secondDate );

results.setString( "dateString", stringSecondDate );
results.setLong( "dateValue", secondDate.getTime() );

assert !results.getObject( "dateString" ).equals( firstDate.getTime() ); // equals
assert results.getObject( "dateString" ).equals( secondDate.getTime() ); // not equals

In this example we declare the "dateString" field to be a DateType. DateType maintains the value of the field as a long internally, but if a String is set or requested, it will be converted using the given SimpleDateFormat String against the given TimeZone.

In the case of a TextDelimited CSV file, where one column is a date value, DateType can be used to declare its format allowing TextDelimited to read and write the value as a String, but use the value internally (in the Tuple) as a long, which is much more efficient.