Jet 0.5 is Released

>Hazelcast Jet 0.5 is now publicly available and in many ways is our biggest release yet, with many new features. Just in case this is the first time you are hearing about us, Hazelcast Jet is a distributed computing platform for fast processing of big data sets- steaming and batch. It’s the latest open source project from Hazelcast.

Introducing Pipeline API

For the first two major Jet releases, the main options for building Jet applications was the DAG API or using the distributed implementation of java.util.stream. DAG API, while powerful, is also quite verbose and is more imperative than declarative. It requires a very good understanding of the execution model and architecture of Jet and could be considered quite low-level. java.util.stream, on the other hand, being declarative was mostly designed for local, same-JVM processing rather than as a distributed computation API and misses many of the constructs of distributed data processing, such as joins and forks. Despite the name, it’s also designed as a batch processing API rather than for stream processing. Our vision was always to provide a powerful high-level API of our own, and we are happy to release the first version of this API with 0.5.

A simple word count can be expressed as follows:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Long, String>map("lines"))
    .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
    .filter(word -> !word.isEmpty())
    .groupBy(wholeItem(), counting())
    .drainTo(Sinks.map("counts"));
jet.newJob(p).join();

Whereas a more complex 3-way join can also be expressed a lot simpler than before:

Pipeline p = Pipeline.create();
// The stream to be enriched: trades
ComputeStage<Trade> trades = p.drawFrom(Sources.<Trade>list("trades"));
// The enriching streams: products and brokers
ComputeStage<Entry<Integer, Product>> prodEntries = p.drawFrom(Sources.<Integer, Product>map("products"));
ComputeStage<Entry<Integer, Broker>> brokEntries = p.drawFrom(Sources.<Integer, Broker>map("brokers"));
// Join the trade stream with the product and broker streams
ComputeStage<Tuple3<Trade, Product, Broker>> joined = trades.hashJoin(
        prodEntries, joinMapEntries(Trade::productId),
        brokEntries, joinMapEntries(Trade::brokerId)
);
// Transform the tuples of the hash join output into map entries
// and store them in the output map
joined.map(t -> entry(t.f0().id(), t))
      .drainTo(Sinks.map("results"));

Currently, Pipeline API is mainly aimed at batch processing or stream-enrichment cases. It supports most batch operations such as map, flat map, filtering, grouping, co-groups and hash joins as well as support for all sources and sinks. We will be adding support for windowing to Pipeline API in the next release of Jet. Please jump to the reference manual and code samples to see a more detailed introduction and examples.

Fault Tolerant Stream Processing

Fault tolerance is an important concept in stream processing where jobs are run without a definite end and node failures can cause disruption. Jet introduces a simple way to do fault tolerant streaming computation with 0.5, without relying on any external system or storage, and instead using the distributed in-memory storage provided by Hazelcast.

Starting with 0.5, a Jet job will now be restarted automatically when a node leaves the cluster and using in-memory snapshots, it can be resumed where it left off. The following processing modes are available:

  • Exactly-once: When a job is restarted it will be resumed from the latest available snapshot. Guarantees that items which have been processed already will not be processed again after the restart.
  • At-least-once: Similar to exactly-once, but with the relaxed guarantee that items can be processed multiple times after a restart.
  • None: Job can still be restarted, but as no snapshots are taken, messages might be lost or processed again.

Currently, snapshotting is only supported with streaming DAGs with Event Journal or Kafka sources. Both sliding window and session window processors support snapshotting.

A comprehensive example showcasing this feature can be found on the code samples repository and a detailed explanation of how fault tolerance works is available on the reference manual. We will also be publishing a blog post detailing the design behind this feature in the coming weeks.

Hazelcast Map Event Journal

A new feature introduced in Hazelcast IMDG 3.9 was the event journal for Hazelcast Map and Cache. This allows the user to treat the Hazelcast distributed map itself as a streaming source, where an event is created for every change that happens on the map. This allows the map to be used as a source of events during a streaming job.

For example, if we have a map where the keys are stock tickers and the values are prices, we can build a stream of price update events based on updates to this map, and using these events can implement windowed aggregations to model how the price changes over time. We can build a source which will map added or updated events to the prices map as follows:

Vertex streamMap = dag.newVertex("stream-map",
   SourceProcessors.<String, Integer, PriceUpdateEvent>streamMapP("prices",
      e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
      e -> new PriceUpdateEvent(e.getKey(), e.getNewValue()), true));

An example using this approach can be found in the code samples repository.

Hazelcast Map Predicate and Projections

We have made improvements to the Hazelcast Map source and have added support for predicates and projections. Using predicates and projections can increase performance when reading from the IMap, as the items which are filtered out and the fields which are not projected will not need to be deserialized or transmitted over the network. It also makes it possible to make use of indexes when reading from an IMap for additional performance. It’s possible to use these when reading from within the same Jet cluster, or from a remote Hazelcast or Jet cluster.

For example, you can now easily dump the filtered and formatted output of a map to a file like this:

IMap<String, Integer> sourceMap = instance.getMap("prices");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String, Integer, String>map("prices",
    e -> e.getValue() != 0, e -> e.getKey() + ":" + e.getValue()))
 .drainTo(Sinks.files("output"));
    
instance.newJob(pipeline).join();

Custom source support for java.util.stream

For those that prefer to use java.util.stream API we have also introduced a feature to allow developers to use custom sources with java.util.stream. A word count, using HDFS as source, can be expressed as follows:

IMap<String, Long> counts = DistributedStream
    .<String>fromSource(jetInstance, HdfsProcessors.readHdfsP(jobConfig, (k, v) -> v.toString()))
    .flatMap(line -> Arrays.stream(delimiter.split(line.toLowerCase())))
    .filter(word -> !word.isEmpty())
    .collect(DistributedCollectors.toIMap("counts", w -> w, w -> 1L, (left, right) -> left + right));

Update to Hazelcast 3.9

Jet now uses a shaded version of the recently released Hazelcast 3.9, which means all the new features of 3.9 are available to Jet users.

Final Remarks

We will be exploring several of the new features in depth during the coming weeks in a series of blogs posts. In addition, a webinar introducing Jet 0.5 is scheduled for November 14. We are looking forward to receiving feedback about this release, you can reach us via the hazelcast-jet Google group or through GitHub.