Cascading 4.0 User Guide - Cascading Best Practices
- 1. Introduction
-
1.1. What Is Cascading?
1.2. Another Perspective
1.3. Why Use Cascading?
1.5. Who Are the Users?
- 2. Diving into the APIs
- 3. Cascading Basic Concepts
-
3.1. Terminology
3.2. Pipe Assemblies
3.3. Pipes
3.4. Platforms
3.6. Sink Modes
3.7. Flows
- 4. Tuple Fields
-
4.1. Field Sets
4.2. Field Algebra
4.3. Field Typing
4.4. Type Coercion
- 5. Pipe Assemblies
-
5.1. Each and Every Pipes
5.2. Merge
5.3. GroupBy
5.4. CoGroup
5.5. HashJoin
- 6. Flows
-
6.1. Creating Flows from Pipe Assemblies
6.2. Configuring Flows
6.3. Skipping Flows
6.6. Runtime Metrics
- 7. Cascades
-
7.1. Creating a Cascade
- 8. Configuring
-
8.1. Introduction
8.2. Creating Properties
8.3. Passing Properties
- 9. Local Platform
-
9.3. Source and Sink Taps
- 10. The Apache Hadoop Platforms
-
10.1. What is Apache Hadoop?
10.4. Configuring Applications
10.5. Building an Application
10.6. Executing an Application
10.8. Source and Sink Taps
10.9. Custom Taps and Schemes
- 11. Apache Hadoop MapReduce Platform
-
11.1. Configuring Applications
11.3. Building
- 12. Apache Tez Platform
-
12.1. Configuring Applications
12.2. Building
- 13. Using and Developing Operations
-
13.1. Introduction
13.2. Functions
13.3. Filters
13.4. Aggregators
13.5. Buffers
- 14. Custom Taps and Schemes
-
14.1. Introduction
14.2. Custom Taps
14.3. Custom Schemes
14.5. Tap Life-Cycle Methods
- 15. Advanced Processing
-
15.1. SubAssemblies
15.2. Stream Assertions
15.3. Failure Traps
15.4. Checkpointing
15.7. PartitionTaps
- 16. Built-In Operations
-
16.1. Identity Function
16.2. Debug Function
16.4. Insert Function
16.5. Text Functions
16.8. XML Operations
16.9. Assertions
16.10. Logical Filter Operators
16.11. Buffers
- 17. Built-in SubAssemblies
-
17.1. Optimized Aggregations
17.2. Stream Shaping
- 18. Cascading Best Practices
-
18.1. Unit Testing
18.2. Flow Granularity
18.7. Optimizing Joins
18.8. Debugging Streams
18.11. Fields Constants
18.12. Checking the Source Code
- 19. Extending Cascading
-
19.1. Scripting
- 20. Cookbook: Code Examples of Cascading Idioms
-
20.1. Tuples and Fields
20.2. Stream Shaping
20.3. Common Operations
20.4. Stream Ordering
20.5. API Usage
- 21. The Cascading Process Planner
-
21.1. FlowConnector
21.2. RuleRegistrySet
21.3. RuleRegistry
Cascading Best Practices
Unit Testing
Discrete testing of each Operation, pipe assembly, and application is a must.
The cascading.CascadingTestCase provides a number of static helper methods.
When testing custom Operations, use the invokeFunction(), invokeFilter(), invokeAggregator(), and invokeBuffer() methods.
When testing Flows, use the validateLength() methods. There are quite a few of them, and collectively they offer great flexibility. All of them read the sink tap, validate that it is the correct length and has the correct Tuple size, and check to see whether the values match a given regular expression pattern.
It is also possible to write tests that are independent of the underlying platform. Any unit test should subclass cascading.PlatformTestCase located in the cascading-platform-x.y.z-tests.jar file.
Any platform to be tested against should be added to the CLASSPATH as well. PlatformTestCase searches the CLASSPATH for all available platforms and runs each test on the subclass against each platform found.
See the Cascading platform unit tests for examples, all of which adhere to the naming convention of *PlatformTest.java.
For Maven users, be sure to add the tests classifier to any dependencies. Note that the cascading-platform project has no main code, but does have only tests, so it must be retrieved via the tests classifier. |
Flow Granularity
Although using one large Flow may result in slightly more efficient performance, it’s advisable to use a more modular and flexible approach. Such an approach involves creating medium-sized Flows with well-defined responsibilities and passing all the resulting interdependent Flows to a Cascade to sequence and execute as a single unit.
Similarly, using the TextDelimited Scheme (or any custom format designed for long-term archival) between Flow instances allows you to hand off intermediate data to other systems for reporting or QA purposes, incurring a minimal performance penalty while remaining compatible with other tools.
Visit http://cascading.org/extensions/ for a list of available file formats suitable for data archival or debugging. |
SubAssemblies, not Factories
When developing your applications, use SubAssembly subclasses, not "factory" methods. The resulting code is much easier to read and test.
The Object constructors are "factories", so there isn’t much reason to build frameworks to duplicate what a constructor already does. Of course, there are exceptional cases in which you don’t have the option to use a SubAssembly, but in practice they are rare. |
Logical Responsibilities for SubAssemblies
SubAssemblies provide a very convenient means to co-locate similar or related responsibilities into a single place. For example, it’s simple to use a ParsingSubAssembly and a RulesSubAssembly, where the first is responsible solely for parsing incoming Tuple streams (log files for example), and the second applies rules to decide whether a given Tuple should be discarded or marked as bad.
Additionally, in your unit tests you can create a TestAssertionsSubAssembly that simply inlines various ValueAssertions and GroupAssertions. The practice of inlining assertions directly in your SubAssemblies is also important, but sometimes it makes sense to have more tests outside of the business logic.
Java Operators in Field Names
There are a few Operations in Cascading (e.g., ExpressionFunction and ExpressionFilter) that compile and apply Java expressions as needed during processing. In these expressions, Operation argument field names are used as variable names in the expression.
For this reason, be sure to create field names without characters that cause compilation errors if they are used in an expression. For example, "first-name" is a valid field name for use with Cascading, but might result in the expression first-name.trim(), which would cause a compilation error.
Debugging Planner Failures
The FlowConnector sometimes fails when attempting to plan a Flow. If the error message given by PlannerException is vague, use the PlannerException.writeDOT() method to export a representation of the internal pipe assembly.
DOT files can be opened with Graphviz or OmniGraffle. The plans in these files are only partial, but the graphic renderings show you where the Cascading planner failed.
Note that you can also create a DOT file from a Flow by using Flow.writeDOT() to better understand how the planner has modified your business logic.
If the above methods do not provide insight, the new Cascading 3.0 planner has a much improved debugging framework.
When running tests, set the following:
-Dtest.traceplan.enabled=true
If you are on Mac OS X and have installed Graphviz, DOT files can be converted to PDF automatically. To enable this feature, set:
-Dutil.dot.to.pdf.enabled=true
Optionally, for stand-alone applications, statistics and tracing can be enabled selectively when the following properties are passed to the FlowConnector:
-
cascading.planner.stats.path
- outputs detailed statistics about execution time of the planner -
cascading.planner.plan.path
- basic planner information -
cascading.planner.plan.transforms.path
- detailed information about each rule
Optimizing Joins
Key Value Cardinality
When joining two streams via a CoGroup pipe, try to put the larger stream in the leftmost argument to the CoGroup.
The reason for this is that joining multiple streams requires some accumulation of values associated with the current key before the join operator can begin, but the leftmost stream is not accumulated. Consequently, this technique should improve the performance of most joins.
The issues isn’t file size specifically, but key to value cardinality. If one side of the join has 'one to one' or 'one to few' key to value cardinality, that side of the join should be right-most. If one side has a 'one to many' relationship, it should be left-most so the values aren’t loaded into memory. |
Declare Type Information
As of 3.1, Cascading will leverage that type information to improve serialization performance during CoGroup (and GroupBy) operations.
With declared type information, Cascading no longer is required to store type information with every tuple value during serialization. Additionally this type information helps enforce the canonical type stored in the tuple.
Debugging Streams
When creating complex assemblies, it’s safe to embed Debug Operations at appropriate debug levels as needed. To avoid wasting resources, use the planner to remove the Debug>> Operations at runtime for production and staging runs.
Handling Good and Bad Data
Corrupt data often enters raw data streams. For instance, bad content may be fetched from the web via a crawler upstream, or a bug may have leaked into a browser widget that sends user behavior information back for analysis. Whatever the cause, it’s a good practice to define a set of rules for identifying and managing questionable records.
It is tempting to simply throw an exception and have a Trap capture the offending Tuple. However, Traps were not designed as a filtering mechanism, and consequently much valuable information is lost when Traps are used in this situation.
Instead of Traps, use Filters. Create a SubAssembly that applies rules to the stream by setting a binary field that marks the Tuple as good or bad. After all the rules are applied, split the stream based on the value of the good or bad Boolean value. Consider setting a reason field that states why the Tuple was marked bad.
Maintaining State in Operations
When creating custom Operations (Function, Filter, Aggregator, or Buffer), do not store the Operation state in class fields.
For example, if implementing an Aggregator as a custom "counter," do not create a field named "count" and increment it on every Aggregator.aggregate() call.
There is no guarantee that your Operation will be called from a single thread in a JVM. Also, future versions of Hadoop or Cascading local mode might execute the same Operation from multiple threads.
Fields Constants
Instead of interspersing String field names throughout the code, create an interface that holds a constant value for each field name:
public static Fields FIRST_NAME = new Fields( "firstname", String.class );
public static Fields LAST_NAME = new Fields( "lastname", String.class );
Using the Fields class, instead of String, allows for building more complex constants:
public static Fields FULL_NAME = FIRST_NAME.append( LAST_NAME );
Always declare the canonical type for each field. When building more complex Fields instances from predefined constant Fields, the type information is retained. |
Checking the Source Code
When in doubt, look at the Cascading source code. If something is not documented in this User Guide or Javadoc and it’s a feature of Cascading, the feature source code or unit tests will give you clear instructions on what to do or expect.
Maven users should configure their builds to pull *-sources.jar and *-javadoc.jar files so that the IDE can allow seamless navigation between developer and Cascading source. |