Cascading 4.0 User Guide - Custom Taps and Schemes
- 1. Introduction
-
1.1. What Is Cascading?
1.2. Another Perspective
1.3. Why Use Cascading?
1.5. Who Are the Users?
- 2. Diving into the APIs
- 3. Cascading Basic Concepts
-
3.1. Terminology
3.2. Pipe Assemblies
3.3. Pipes
3.4. Platforms
3.6. Sink Modes
3.7. Flows
- 4. Tuple Fields
-
4.1. Field Sets
4.2. Field Algebra
4.3. Field Typing
4.4. Type Coercion
- 5. Pipe Assemblies
-
5.1. Each and Every Pipes
5.2. Merge
5.3. GroupBy
5.4. CoGroup
5.5. HashJoin
- 6. Flows
-
6.1. Creating Flows from Pipe Assemblies
6.2. Configuring Flows
6.3. Skipping Flows
6.6. Runtime Metrics
- 7. Cascades
-
7.1. Creating a Cascade
- 8. Configuring
-
8.1. Introduction
8.2. Creating Properties
8.3. Passing Properties
- 9. Local Platform
-
9.3. Source and Sink Taps
- 10. The Apache Hadoop Platforms
-
10.1. What is Apache Hadoop?
10.4. Configuring Applications
10.5. Building an Application
10.6. Executing an Application
10.8. Source and Sink Taps
10.9. Custom Taps and Schemes
- 11. Apache Hadoop MapReduce Platform
-
11.1. Configuring Applications
11.3. Building
- 12. Apache Tez Platform
-
12.1. Configuring Applications
12.2. Building
- 13. Using and Developing Operations
-
13.1. Introduction
13.2. Functions
13.3. Filters
13.4. Aggregators
13.5. Buffers
- 14. Custom Taps and Schemes
-
14.1. Introduction
14.2. Custom Taps
14.3. Custom Schemes
14.5. Tap Life-Cycle Methods
- 15. Advanced Processing
-
15.1. SubAssemblies
15.2. Stream Assertions
15.3. Failure Traps
15.4. Checkpointing
15.7. PartitionTaps
- 16. Built-In Operations
-
16.1. Identity Function
16.2. Debug Function
16.4. Insert Function
16.5. Text Functions
16.8. XML Operations
16.9. Assertions
16.10. Logical Filter Operators
16.11. Buffers
- 17. Built-in SubAssemblies
-
17.1. Optimized Aggregations
17.2. Stream Shaping
- 18. Cascading Best Practices
-
18.1. Unit Testing
18.2. Flow Granularity
18.7. Optimizing Joins
18.8. Debugging Streams
18.11. Fields Constants
18.12. Checking the Source Code
- 19. Extending Cascading
-
19.1. Scripting
- 20. Cookbook: Code Examples of Cascading Idioms
-
20.1. Tuples and Fields
20.2. Stream Shaping
20.3. Common Operations
20.4. Stream Ordering
20.5. API Usage
- 21. The Cascading Process Planner
-
21.1. FlowConnector
21.2. RuleRegistrySet
21.3. RuleRegistry
Custom Taps and Schemes
Introduction
Cascading is designed to be easily configured and enhanced by developers. In addition to creating custom Operations, developers can create custom Tap and Scheme classes that let applications connect to external systems or read/write data to proprietary formats.
A Tap represents something physical, like a file or a database table. Accordingly, Tap implementations are responsible for life-cycle issues around the resource they represent, such as tests for resource existence or deletion of a resource (e.g., dropping a remote SQL table).
A Scheme represents a format or representation — such as a text format for a file, the columns in a table, etc. Schemes are used to convert between the source data’s native format and a cascading.tuple.Tuple instance.
Creating custom taps and schemes is an involved process and requires knowledge of the underlying platform. Not only do Tap and Scheme classes manage the access to a resource or the format of the resource, they must also selectively hide details of the underlying platform to retain platform independence.
For examples of how to implement a custom Tap and Scheme for different platforms, see Cascading Extensions for links to source code of similiar extensions.
Custom Taps
All custom Tap classes must subclass the cascading.tap.Tap abstract class and implement the required methods. The method getIdentifier() must return a String that uniquely identifies the resource the Tap instance is managing. Any two Tap instances with the same fully-qualified identifier value are considered equal.
A Tap can set any custom properties the underlying platform requires, via the methods sourceConfInit() (for when it is used as a source tap) and sinkConfInit() (for when it is used as a sink tap). These two methods may be called more than once with new configuration objects and should be idempotent.
Tuples are always read (sourced) from the openForRead() method via a TupleEntryIterator — i.e., openForRead() is always called in the same process that reads the data. The Tap returns a TupleEntryIterator that is responsible for iterating across the resource, returning a TupleEntry instance (and Tuple instance) for each "record" in the resource. TupleEntryIterator.close() is always called when no more entries will be read. See TupleEntrySchemeIterator in the Javadoc for more information.
On some platforms, openForRead() is called with a pre-instantiated Input type. Typically this Input type should be used instead of instantiating a new instance of the appropriate type.
Similarly, when writing (sinking) Tuple data, a Tap is always used to sink data from the openForWrite() method via the TupleEntryCollector. Here again, openForWrite() is always called in the process in which data will be written. The Tap returns a TupleEntryCollector that will accept and store any number of TupleEntry or Tuple instances for each record that is processed or created by a given Flow. TupleEntryCollector.close() is always called when no more entries will be written. See TupleEntrySchemeCollector in the Javadoc for more information.
Again, on some platforms, openForWrite() will be called with a pre-instantiated Output type. Typically this Output type should be used instead of instantiating a new instance of the appropriate type.
Both the TupleEntrySchemeIterator and TupleEntrySchemeCollector should be used to hold any state or resources necessary to communicate with any remote services. For example, when connecting to a SQL database, any JDBC drivers should be created on the constructor and cleaned up on close().
Note that the Tap is not responsible for reading or writing data to the Input or Output type. This is delegated to the Scheme passed on the constructor of the Tap. Consequently, the Scheme is responsible for configuring the Input and Output types it will be reading and writing.
Important: Consider how a Tap and Scheme implementation that is not based on HDFS should behave in the face of a failure. The Tap class has a few methods to help cleanup after a failed Flow. |
Custom Schemes
All custom Scheme classes must subclass the cascading.scheme.Scheme abstract class and implement the required methods.
A Scheme is ultimately responsible for sourcing and sinking Tuples of data. Consequently a Scheme must be programmed to automatically present the correct Fields during sourcing and to accept specific Fields during sinking. Therefore, the constructors on the base Scheme type must be set with the source and sink Fields.
TextLine Scheme configures a Scheme so that it sources different Fields than it sinks. On the other hand, the TextDelimited Scheme forces the source and sink Fields to be the same.
The retrieveSourceFields() and retrieveSinkFields() methods, as well as the TextDelimited class, allow a custom Scheme to fetch source and sink Fields immediately before the Cascading planner is invoked. For example, the Scheme could fetch Fields from the header of a file. Also, the presentSourceFields() and presentSinkFields() methods, as well as the TextDelimited class, notify the Scheme of the Fields that the planner expects the Scheme to handle. For example, Fields could to write the field names as a header.
A Scheme can set any custom properties that the underlying platform requires, via the sourceConfInit() and sinkConfInit() methods. These methods may be called more than once with new configuration objects and should be idempotent.
A Scheme is always sourced via the source() method. The sink() method must be called to sink data to the Scheme. The sequence of methods for a Scheme is the following:
-
The sourcePrepare() or sinkPrepare() method is called.
-
The source() or sink() method is called.
-
After all values have been read or written, the sourceCleanup() or sinkCleanup() method is called.
The *Prepare() methods allow a Scheme to initialize any state necessary — for example, to create a new java.util.regex.Matcher instance for use against all record reads. Conversely, the *Cleanup() methods allow for clearing up any resources. These methods are always called in the same process space as their associated source() and sink() calls. In the case of a clustered platform, this is usually on the cluster side.
Calls to *ConfInit() are usually on the client side.
Schemes are also responsible for converting canonical data types to supported types for the system that the Scheme is wrapping, and vice versa. See the section on Field Typing for more information. |
Taps with File and Nonfile Resources
Tap classes that are meant to manage resources that are files should implement the cascading.tap.type.FileType interface. This interface allows a a user to directly access a Tap to manipulate the remote filesystem or to retrieve child resources (files in subdirectories).
Nonfile resources should not be managed by a Tap that implements the cascading.tap.type.FileType interface. For example, a "JDBCTap" implementation does not treat tables as directories with child files.
Tap Life-Cycle Methods
The Tap interface has a number of life-cycle methods that allow the underlying resource to be managed by the Tap instance.
- Existence
-
These methods test for existence or create the resource managed by the Tap.
-
resourceExists() - called frequently; should be idempotent
-
deleteResource() - only called if SinkMode is SinkMode.REPLACE, or from within Cascading and the resource is stale.
-
- Initialization
-
These methods allow for client-side initialization of a remote resource that will be shared across parallelized instances on the cluster side. For example, creating a database table if it does not exist so that data may be written to it from the cluster. Note this is not the same as initializing a JDBC Driver on the client side and sharing it with the cluster; Driver initialization must happen with openForWrite() or openForRead().
-
prepareResourceForRead() - initialize any shared remote resource for reading
-
prepareResourceForWrite() - initialize any shared remote resource for writing
-
- Reading and Writing
-
These methods may be called on the client or cluster side. Either method can be invoked by Cascading during execution or by a developer wishing direct access to the underlying data managed by the Tap instance. One of these methods must be called to read or write data.
These methods are described in more detail above.
-
openForRead()
-
openForWrite()
-
- Transactional
-
These methods notify a given Tap instance if the parent Flow was successful or if there was a failure. They are called on the client side so that any remote shared resources can be cleaned up or any changes written can be committed/persisted. They are only invoked if the Tap instance is used as a sink.
-
commitResource() - commit the saved values written to the resource
-
rollbackResource() - revert the resource to its original state
-