Cascading 4.0 User Guide - Cascading Best Practices

1. Introduction

1.1. What Is Cascading?

2. Diving into the APIs

2.1. Anatomy of a Word-Count Application

3. Cascading Basic Concepts

3.1. Terminology

3.3. Pipes

3.4. Platforms

3.6. Sink Modes

3.7. Flows

4. Tuple Fields

4.1. Field Sets

5. Pipe Assemblies

5.1. Each and Every Pipes

5.2. Merge

5.3. GroupBy

5.4. CoGroup

5.5. HashJoin

6. Flows

6.1. Creating Flows from Pipe Assemblies

7. Cascades

7.1. Creating a Cascade

8. Configuring

8.1. Introduction

9. Local Platform

9.1. Building an Application

10. The Apache Hadoop Platforms

10.1. What is Apache Hadoop?

11. Apache Hadoop MapReduce Platform

11.1. Configuring Applications

11.3. Building

12. Apache Tez Platform

12.1. Configuring Applications

12.2. Building

13. Using and Developing Operations

13.1. Introduction

13.2. Functions

13.3. Filters

13.4. Aggregators

13.5. Buffers

14. Custom Taps and Schemes

14.1. Introduction

14.2. Custom Taps

15. Advanced Processing

15.1. SubAssemblies

16. Built-In Operations

16.1. Identity Function

16.9. Assertions

16.11. Buffers

17. Built-in SubAssemblies

17.1. Optimized Aggregations

18. Cascading Best Practices

18.1. Unit Testing

19. Extending Cascading

19.1. Scripting

20. Cookbook: Code Examples of Cascading Idioms

20.1. Tuples and Fields

20.5. API Usage

21. The Cascading Process Planner

21.1. FlowConnector

21.3. RuleRegistry

Cascading Best Practices

Unit Testing

Discrete testing of each Operation, pipe assembly, and application is a must.

The cascading.CascadingTestCase provides a number of static helper methods.

When testing custom Operations, use the invokeFunction(), invokeFilter(), invokeAggregator(), and invokeBuffer() methods.

When testing Flows, use the validateLength() methods. There are quite a few of them, and collectively they offer great flexibility. All of them read the sink tap, validate that it is the correct length and has the correct Tuple size, and check to see whether the values match a given regular expression pattern.

It is also possible to write tests that are independent of the underlying platform. Any unit test should subclass cascading.PlatformTestCase located in the cascading-platform-x.y.z-tests.jar file.

Any platform to be tested against should be added to the CLASSPATH as well. PlatformTestCase searches the CLASSPATH for all available platforms and runs each test on the subclass against each platform found.

See the Cascading platform unit tests for examples, all of which adhere to the naming convention of *PlatformTest.java.

For Maven users, be sure to add the tests classifier to any dependencies. Note that the cascading-platform project has no main code, but does have only tests, so it must be retrieved via the tests classifier.

Flow Granularity

Although using one large Flow may result in slightly more efficient performance, it’s advisable to use a more modular and flexible approach. Such an approach involves creating medium-sized Flows with well-defined responsibilities and passing all the resulting interdependent Flows to a Cascade to sequence and execute as a single unit.

Similarly, using the TextDelimited Scheme (or any custom format designed for long-term archival) between Flow instances allows you to hand off intermediate data to other systems for reporting or QA purposes, incurring a minimal performance penalty while remaining compatible with other tools.

Visit http://cascading.org/extensions/ for a list of available file formats suitable for data archival or debugging.

SubAssemblies, not Factories

When developing your applications, use SubAssembly subclasses, not "factory" methods. The resulting code is much easier to read and test.

The Object constructors are "factories", so there isn’t much reason to build frameworks to duplicate what a constructor already does. Of course, there are exceptional cases in which you don’t have the option to use a SubAssembly, but in practice they are rare.

Logical Responsibilities for SubAssemblies

SubAssemblies provide a very convenient means to co-locate similar or related responsibilities into a single place. For example, it’s simple to use a ParsingSubAssembly and a RulesSubAssembly, where the first is responsible solely for parsing incoming Tuple streams (log files for example), and the second applies rules to decide whether a given Tuple should be discarded or marked as bad.

Additionally, in your unit tests you can create a TestAssertionsSubAssembly that simply inlines various ValueAssertions and GroupAssertions. The practice of inlining assertions directly in your SubAssemblies is also important, but sometimes it makes sense to have more tests outside of the business logic.

Java Operators in Field Names

There are a few Operations in Cascading (e.g., ExpressionFunction and ExpressionFilter) that compile and apply Java expressions as needed during processing. In these expressions, Operation argument field names are used as variable names in the expression.

For this reason, be sure to create field names without characters that cause compilation errors if they are used in an expression. For example, "first-name" is a valid field name for use with Cascading, but might result in the expression first-name.trim(), which would cause a compilation error.

Debugging Planner Failures

The FlowConnector sometimes fails when attempting to plan a Flow. If the error message given by PlannerException is vague, use the PlannerException.writeDOT() method to export a representation of the internal pipe assembly.

DOT files can be opened with Graphviz or OmniGraffle. The plans in these files are only partial, but the graphic renderings show you where the Cascading planner failed.

Note that you can also create a DOT file from a Flow by using Flow.writeDOT() to better understand how the planner has modified your business logic.

If the above methods do not provide insight, the new Cascading 3.0 planner has a much improved debugging framework.

When running tests, set the following:

-Dtest.traceplan.enabled=true

If you are on Mac OS X and have installed Graphviz, DOT files can be converted to PDF automatically. To enable this feature, set:

-Dutil.dot.to.pdf.enabled=true

Optionally, for stand-alone applications, statistics and tracing can be enabled selectively when the following properties are passed to the FlowConnector:

  • cascading.planner.stats.path - outputs detailed statistics about execution time of the planner

  • cascading.planner.plan.path - basic planner information

  • cascading.planner.plan.transforms.path - detailed information about each rule

Optimizing Joins

Key Value Cardinality

When joining two streams via a CoGroup pipe, try to put the larger stream in the leftmost argument to the CoGroup.

The reason for this is that joining multiple streams requires some accumulation of values associated with the current key before the join operator can begin, but the leftmost stream is not accumulated. Consequently, this technique should improve the performance of most joins.

The issues isn’t file size specifically, but key to value cardinality. If one side of the join has 'one to one' or 'one to few' key to value cardinality, that side of the join should be right-most. If one side has a 'one to many' relationship, it should be left-most so the values aren’t loaded into memory.

Declare Type Information

As of 3.1, Cascading will leverage that type information to improve serialization performance during CoGroup (and GroupBy) operations.

With declared type information, Cascading no longer is required to store type information with every tuple value during serialization. Additionally this type information helps enforce the canonical type stored in the tuple.

Debugging Streams

When creating complex assemblies, it’s safe to embed Debug Operations at appropriate debug levels as needed. To avoid wasting resources, use the planner to remove the Debug>> Operations at runtime for production and staging runs.

Handling Good and Bad Data

Corrupt data often enters raw data streams. For instance, bad content may be fetched from the web via a crawler upstream, or a bug may have leaked into a browser widget that sends user behavior information back for analysis. Whatever the cause, it’s a good practice to define a set of rules for identifying and managing questionable records.

It is tempting to simply throw an exception and have a Trap capture the offending Tuple. However, Traps were not designed as a filtering mechanism, and consequently much valuable information is lost when Traps are used in this situation.

Instead of Traps, use Filters. Create a SubAssembly that applies rules to the stream by setting a binary field that marks the Tuple as good or bad. After all the rules are applied, split the stream based on the value of the good or bad Boolean value. Consider setting a reason field that states why the Tuple was marked bad.

Maintaining State in Operations

When creating custom Operations (Function, Filter, Aggregator, or Buffer), do not store the Operation state in class fields.

For example, if implementing an Aggregator as a custom "counter," do not create a field named "count" and increment it on every Aggregator.aggregate() call.

There is no guarantee that your Operation will be called from a single thread in a JVM. Also, future versions of Hadoop or Cascading local mode might execute the same Operation from multiple threads.

Fields Constants

Instead of interspersing String field names throughout the code, create an interface that holds a constant value for each field name:

public static Fields FIRST_NAME = new Fields( "firstname", String.class );
public static Fields LAST_NAME = new Fields( "lastname", String.class );

Using the Fields class, instead of String, allows for building more complex constants:

public static Fields FULL_NAME = FIRST_NAME.append( LAST_NAME );
Always declare the canonical type for each field. When building more complex Fields instances from predefined constant Fields, the type information is retained.

Checking the Source Code

When in doubt, look at the Cascading source code. If something is not documented in this User Guide or Javadoc and it’s a feature of Cascading, the feature source code or unit tests will give you clear instructions on what to do or expect.

Maven users should configure their builds to pull *-sources.jar and *-javadoc.jar files so that the IDE can allow seamless navigation between developer and Cascading source.