Cascading 4.0 User Guide - Cookbook: Code Examples of Cascading Idioms

1. Introduction

1.1. What Is Cascading?

2. Diving into the APIs

2.1. Anatomy of a Word-Count Application

3. Cascading Basic Concepts

3.1. Terminology

3.3. Pipes

3.4. Platforms

3.6. Sink Modes

3.7. Flows

4. Tuple Fields

4.1. Field Sets

5. Pipe Assemblies

5.1. Each and Every Pipes

5.2. Merge

5.3. GroupBy

5.4. CoGroup

5.5. HashJoin

6. Flows

6.1. Creating Flows from Pipe Assemblies

7. Cascades

7.1. Creating a Cascade

8. Configuring

8.1. Introduction

9. Local Platform

9.1. Building an Application

10. The Apache Hadoop Platforms

10.1. What is Apache Hadoop?

11. Apache Hadoop MapReduce Platform

11.1. Configuring Applications

11.3. Building

12. Apache Tez Platform

12.1. Configuring Applications

12.2. Building

13. Using and Developing Operations

13.1. Introduction

13.2. Functions

13.3. Filters

13.4. Aggregators

13.5. Buffers

14. Custom Taps and Schemes

14.1. Introduction

14.2. Custom Taps

15. Advanced Processing

15.1. SubAssemblies

16. Built-In Operations

16.1. Identity Function

16.9. Assertions

16.11. Buffers

17. Built-in SubAssemblies

17.1. Optimized Aggregations

18. Cascading Best Practices

18.1. Unit Testing

19. Extending Cascading

19.1. Scripting

20. Cookbook: Code Examples of Cascading Idioms

20.1. Tuples and Fields

20.5. API Usage

21. The Cascading Process Planner

21.1. FlowConnector

21.3. RuleRegistry

Cookbook: Code Examples of Cascading Idioms

This chapter demonstrates some common idioms used in Cascading applications.

Tuples and Fields

Copy a Tuple instance
Tuple original = new Tuple( "john", "doe" );

// call copy constructor
Tuple copy = new Tuple( original );

assert copy.getObject( 0 ).equals( "john" );
assert copy.getObject( 1 ).equals( "doe" );
Nest a Tuple instance within a Tuple
Tuple parent = new Tuple();
parent.add( new Tuple( "john", "doe" ) );

assert ( (Tuple) parent.getObject( 0 ) ).getObject( 0 ).equals( "john" );
assert ( (Tuple) parent.getObject( 0 ) ).getObject( 1 ).equals( "doe" );
Build a longer Fields instance
Fields first = new Fields( "first" );
Fields middle = new Fields( "middle" );
Fields last = new Fields( "last" );

Fields full = first.append( middle ).append( last );
Remove a field from a longer Fields instance
Fields full = new Fields( "first", "middle", "last" );

Fields firstLast = full.subtract( new Fields( "middle" ) );

Stream Shaping

Split (branch) a Tuple stream
Pipe pipe = new Pipe( "head" );
pipe = new Each( pipe, new SomeFunction() );
// ...

// split left with the branch name 'lhs'
Pipe lhs = new Pipe( "lhs", pipe );
lhs = new Each( lhs, new SomeFunction() );
// ...

// split right with the branch name 'rhs'
Pipe rhs = new Pipe( "rhs", pipe );
rhs = new Each( rhs, new SomeFunction() );
// ...
Copy a field value
Fields argument = new Fields( "field" );
Identity identity = new Identity( new Fields( "copy" ) );

// identity copies the incoming argument to the result tuple
pipe = new Each( pipe, argument, identity, Fields.ALL );
Discard (drop) a field
// incoming -> "keepField", "dropField"
pipe = new Discard( pipe, new Fields( "dropField" ) );
// outgoing -> "keepField"
Retain (keep) a field
// incoming -> "keepField", "dropField"
pipe = new Retain( pipe, new Fields( "keepField" ) );
// outgoing -> "keepField"
Rename a field
// a simple SubAssembly that uses Identity internally
pipe = new Rename( pipe, new Fields( "from" ), new Fields( "to" ) );
Coerce field values from Strings to primitives
Fields fields = new Fields( "longField", "booleanField" );
Class types[] = new Class[]{long.class, boolean.class};

// convert to given type
pipe = new Coerce( pipe, fields, types );
Insert constant values into a stream
Fields fields = new Fields( "constant1", "constant2" );
Insert function = new Insert( fields, "value1", "value2" );

pipe = new Each( pipe, function, Fields.ALL );

Common Operations

Parse a String date/time value
// convert string date/time field to a long
// milliseconds "timestamp" value
String format = "yyyy:MM:dd:HH:mm:ss.SSS";
DateParser parser = new DateParser( new Fields( "ts" ), format );

pipe = new Each( pipe, new Fields( "datetime" ), parser, Fields.ALL );
Format a timestamp to a date/time value
// convert a long milliseconds "timestamp" value to a string
String format = "HH:mm:ss.SSS";
DateFormatter formatter =
  new DateFormatter( new Fields( "datetime" ), format );

pipe = new Each( pipe, new Fields( "ts" ), formatter, Fields.ALL );

Stream Ordering

Remove duplicate tuples in a stream
// remove all duplicates from the stream
pipe = new Unique( pipe, Fields.ALL );
Create a list of unique values
// narrow stream to just ips
pipe = new Retain( pipe, new Fields( "ip" ) );
// find all unique 'ip' values
pipe = new Unique( pipe, new Fields( "ip" ) );
Find first occurrence in time of a unique value
// group on all unique 'ip' values
// secondary sort on 'datetime', natural order is in ascending order
pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "datetime" ) );
// take the first 'ip' tuple in the group which has the
// oldest 'datetime' value
pipe = new Every( pipe, Fields.ALL, new First(), Fields.RESULTS );

API Usage

Pass properties to a custom Operation
// set property on Flow
Properties properties = new Properties();
properties.put( "key", "value" );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );
// ...

// get the property from within an Operation (Function, Filter, etc)
String value = (String) flowProcess.getProperty( "key" );
Bind multiple sources and sinks to a Flow
Pipe headLeft = new Pipe( "headLeft" );
// do something interesting

Pipe headRight = new Pipe( "headRight" );
// do something interesting

// merge the two input streams
Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) );
// ...

// branch the merged stream
Pipe tailLeft = new Pipe( "tailLeft", merged );
// filter out values to the left
tailLeft = new Each( tailLeft, new SomeFilter() );

Pipe tailRight = new Pipe( "tailRight", merged );
// filter out values to the right
tailRight = new Each( tailRight, new SomeFilter() );

// source taps
Scheme inLeftScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sourceLeft = new Hfs( inLeftScheme, "some/path" );

Scheme inRightScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sourceRight = new Hfs( inRightScheme, "some/path" );

// sink taps
Scheme outLeftScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sinkLeft = new Hfs( outLeftScheme, "some/path" );

Scheme outRightScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sinkRight = new Hfs( outRightScheme, "some/path" );

FlowDef flowDef = new FlowDef()
  .setName( "flow-name" );

// bind source taps to Pipe heads
flowDef
  .addSource( headLeft, sourceLeft )
  .addSource( headRight, sourceRight );

// bind sink taps to Pipe tails
flowDef
  .addSink( tailLeft, sinkLeft )
  .addTailSink( tailRight, sinkRight );

// ALTERNATIVELY ...

// add named source taps
// the head pipe name to bind to
flowDef
  .addSource( "headLeft", sourceLeft )    // headLeft.getName()
  .addSource( "headRight", sourceRight ); // headRight.getName()

// add named sink taps
flowDef
  .addSink( "tailLeft", sinkLeft )    // tailLeft.getName()
  .addSink( "tailRight", sinkRight ); // tailRight.getName()

// add tails -- heads are reachable from the tails
flowDef
  .addTail( tailLeft )
  .addTail( tailRight );

// set property on Flow
FlowConnector flowConnector = new Hadoop2MR1FlowConnector();

Flow flow = flowConnector.connect( flowDef );