Stream processing is a paradigm for on-the-fly processing of unbounded data feeds. We have been witnessing that stream processing engines (SPEs) get more attention every day in the era of fast data and become a fundamental component of data processing pipelines. They usually run in distributed settings to be able to cope with the flood of incoming data entries. Since distributed environments can suffer from various types of failures, reliability of the processing becomes a critical concern for usefulness of SPEs. For this reason, SPEs allow users to tune the processing guarantee that needs to be provided for a streaming computation as a whole.
In this blog post, we take a glance at different processing guarantees that are widely supported by the mainstream SPEs. We first elaborate their semantics, and then discuss how they are realized in Hazelcast Jet.
We define the data processing guarantees that SPEs offer with three modes: at-most-once, at-least-once, and exactly-once.
In this mode, the streaming runtime does not employ any special mechanism to guarantee processing of the incoming items. At-most-once processing basically means that an item can be processed by the system at most once, and can be considered as a best-effort strategy. It allows items to be dropped in case of a failure. For instance, if an SPE host fails before processing an item that is ingested from a source, the item can be lost when the application is recovered from the failure.
At-most-once processing can be preferred when occasional loss of items is acceptable. For instance, a popular video streaming service that displays the most-viewed videos of the last 30 seconds in its homepage can tolerate a few dropped clicks.
The at-least-once processing guarantee ensures that each item is processed at least once, and no item is dropped. It requires the runtime to track each item flowing through the application. Items are processed only once when there is no failure in the system. However, in case of failures or other contingencies, an item that has been already processed can be retransmitted from the source and processed again, leading to inaccurate results. This mode does not require any mechanism to prevent duplicate processing of items. It generally trades off lower latencies to possible duplicated processing of items. Therefore, it is a good fit for use-cases where item loss is prevented and latency matters more than strict correctness. Consider the scenario where an e-commerce website analyzes actions of its customers in real time, and sends a discount notification immediately after a product is removed from the basket if the customer is likely to purchase the product with a lower price. If customers do not complain about duplicate notifications, this application can make use of the at-least-once processing semantics.
There exists a lot of confusion, misunderstanding, and discussion about the “exactly-once” processing guarantee. One can naively think that the “exactly-once” semantics ensures each item is processed exactly once by the SPE, even in the presence of failures. However, such a guarantee is not possible for arbitrary user code, because user code can be executed partially in case of a failure. Similarly, any operation on an external system cannot be guaranteed to be triggered exactly once. The reason is that at-least once processing is a prerequisite for exactly-once. An item is re-submitted if a failure occurs before its processing is acknowledged. This approach causes duplicate processing if failure occurs after an item is processed but before it is acknowledged.
Exactly-once processing is a good fit for billing and revenue related use cases. SPEs need to clarify semantics of the exactly-once processing guarantee in order to offer useful solutions. In practice, exactly-once processing is implemented in a way that although the processing can occur multiple times in case of failures for some items, result of the duplicate processing is same as if the processing is done only once (i.e., idempotency). For this reason, this processing guarantee is also called effectively-once.
An SPE needs to provide end-to-end support to achieve the exactly-once semantics. The guarantee is broken if any part of the system is unable to provide it. Therefore, SPEs implement the mechanisms to support exactly-once semantics for both processing within the system boundary and side-effects that occur on external systems. Ingestion of an item, updates in the computation state, and any side effect that is triggered on an external system, including emitting output to downstream systems, must be reflected together. Two common approaches are employed under the hood:
In one approach, re-submitted items are checked with a deduplication mechanism to prevent duplicate processing. When an item is processed, changes in the computation state and acknowledgement of the item are committed together, which means that a processed item whose changes are persisted will not be processed again if the item is re-submitted because of a failure.
The second approach is to take a snapshot of the whole computation state. In case of a failure, SPEs rewind the computation state and input sources from the snapshot, and replay the items.
With these two approaches, exactly-once semantics are preserved as long as a streaming computation occurs solely within the system boundaries of the SPE. However, streaming tasks usually contact external systems as well, for instance, to flush output to a distributed database or a file system. If a single item is re-processed in a failure case, the same output can be written to the file system multiple times. Again, we use idempotency to extend the exactly-once semantics to cover the interactions with external systems.
How does Hazelcast Jet realize processing guarantees?
Hazelcast Jet follows the second approach described above, and utilizes the Chandy-Lamport distributed snapshotting algorithm to offer fault tolerance and processing guarantees. It periodically takes snapshots of accumulated processor state. Snapshots are synchronized with the input sources and source offsets are put into the snapshots along with the processor state. A given snapshot represents state of the computation after processing of the input items up to the recorded source offsets. In case of a node failure, Jet restarts the job on the remaining nodes using the last successful snapshot. State of the computation is restored from the snapshot and input sources are rewound to the offsets that are pointed in the snapshot. Then, the processing is resumed from that point. Hazelcast Jet stores snapshots in Hazelcast IMaps and does not have any external dependency for data storage.
Hazelcast Jet allows processing guarantees to be configured on a per job basis. For instance, if a Jet job ingests input items from a non-replayable source and disables snapshotting, then the underlying processing guarantee becomes at-most-once. The snapshotting feature can be enabled for the both at-least-once and exactly-once processing guarantees. When the job is tuned for the at-least-once processing guarantee, then the snapshotting algorithm skips some internal steps to reduce the latency overhead. However, some items, whose processing results are already put into the snapshot, might be processed again after the job is restarted from the snapshot. Last, jobs can be tuned for the exactly-once processing guarantee as well. In this mode, the snapshotting algorithm ensures that an item that was processed before a snapshot is taken will not be processed again if the job is restarted from that snapshot.
The snapshotting approach described above provides the exactly-once semantics for the computation state and input streams. However, we also need to take into account the side effects that occur in external systems. Consider the scenario where a Jet processor prints a log line for each input item. It can happen that this processor prints a log line for an input item, but a Jet node fails before the next snapshot is taken. If the computation is restarted, this processor is fired for the same item again and prints the same log line. Similarly, failures can cause sink processors to dispatch the same output item to the downstream systems multiple times. In order to use the exactly-once processing guarantee, a Jet job must use replayable sources and idempotent sinks. Hazelcast Jet version 0.5 contains replayable Hazelcast IMap and Kafka source implementations, and an idempotent Hazelcast IMap sink implementation.
In this blog post, we presented the processing guarantees that are offered by popular SPEs and elaborated their semantics. We also examined some misconceptions, ambiguity and challenges about the exactly-once processing guarantee. However, there are many other things to consider about the exactly-once semantics, which are not covered in this post. Last, we briefly described how Hazelcast Jet implements these guarantees on top of its in-memory snapshotting feature.
A few blog posts about the exactly-once semantics:
https://fpj.me/2017/07/04/no-consensus-in-exactly-once/ http://the-paper-trail.org/blog/exactly-not-atomic-broadcast-still-impossible-kafka/ https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/ http://data.alishoker.com/2017/07/notes-on-exactly-once-semantics-in.html