Ways to Enrich Your Event Stream with Hazelcast Jet

In Hazelcast Jet 0.7 you have several options to enrich your data stream, varying in simplicity, flexibility and performance characteristics. This article shows you how to pick the right one.

What is Data Enrichment

The main purpose of Hazelcast Jet is to process infinite distributed streams of events. An almost universal first processing step is attaching to each event all the data we have on it — enriching it. For example, the event may contain the ID of the user that initiated it: we want to look up that user and attach his/her data to the event. Once we have all the data, we can proceed to computation steps that reason about the events and derive interesting information from them.

Let’s say our events are tweets:

public class Tweet implements Serializable {
    private final long userId;
    private final String message;
    private User user;

    public Tweet(int userId, String message) {
        this.userId = userId;
        this.message = message;
    }

    public long userId() { return userId; }
    public String message() { return message; }
    public User user() { return user; }
    public Tweet setUser(User user) { this.user = user; return this; }
}

For simplicity we’ll use the IMap event journal as our infinite source of tweets. Assuming the map name is “tweet”, this is how you define such a data source:

StreamSource<Tweet> tweetSource = Sources.mapJournal(
    "tweet", mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST);

We also have a table of users stored in a Hazelcast IMap:

public class User implements Serializable {
    private final long id;

    public User(long id) { this.id = id; }
    public long id() { return id; }
}

The events come in with a userId, but the user field is null. We want to fill in that field by looking up from the user table.

Technique 1: The Hash Join

One way to look at data enrichment is as a JOIN operation: we are joining the event stream with a table. Hazelcast Jet defines a join operation tailored to this purpose: the hash join. It allows you to extract the foreign key from the event stream and join it with a table that you ingest as another stream, using the table’s primary key.

This is how we can enrich our stream:

Pipeline p = Pipeline.create();
StreamStage<Tweet> tweets = p.drawFrom(Sources.mapJournal(
        "tweet", mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST));
BatchStage<Entry<Long, User>> users = p.drawFrom(Sources.map("user"));
StreamStage<Tweet> enriched = tweets.hashJoin(
        users,
        JoinClause.joinMapEntries(Tweet::userId),
        Tweet::setUser);

The hash join transform has very high throughput. It replicates the entire contents of the users stream on each computing cluster member, storing it in hashtables. It can then enrich events in parallel, with a single-thread throughput of about 10,000,000 events per second.

The downside of the hash join is that it won’t see any changes to the user table once the computing job has started.

Technique 2: Look Up and Transform

Instead of hash-joining we can enrich our tweet stream by looking up directly from the source, in our case the user map:

Pipeline p = Pipeline.create();
StreamStage<Tweet> tweets = p.drawFrom(Sources.mapJournal(
        "tweet", mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST));
StreamStage<Tweet> enriched = tweets
        .groupingKey(Tweet::userId)
        .mapUsingIMap("users", Tweet::setUser);

Jet will now immediately see all the updates you perform on the user map.

IMap is a distributed data structure, which means that each cluster member holds a piece of it. In general that means that you must make network requests to fetch a value. If you let the Jet cluster hold your IMap, it can arrange for data-local lookup by sending each event to the member that keeps its user. Jet uses the same partitioning scheme as the Hazelcast IMDG so when you set the groupingKey on the stream, you have told Jet how to partition the stream in just the right way for each user lookup to be local. This turns the original network request-response cycle into a one-way send and eliminates the problem of request-response latency.

Another trick you can use is replacing the IMap with a ReplicatedMap. This is a non-partitioned, fully replicated data structure so all the data is present on all members. The performance of using it is thus quite similar to the hash join, but the data is dynamic, just like with the IMap.

If your cluster machines have enough RAM to hold full replicas, consider using the ReplicatedMap. However, if the map has other roles in the system, you should also consider that IMap has far more features so it’s a better general choice.

Technique 3: Look Up from an External System

Our first technique, the hash join, allows you to ingest the lookup data from any data source. We used the IMap as an example, but it could also be regular files, a JDBC database or any other storage system. Our second technique is special-cased to support IMap and ReplicatedMap. So, what do you do when your data source is an external system and you need to observe the updates that happen while the Jet job is running?

For this we expose the abstraction that underpins mapUsingIMap. It’s called mapUsingContext and to use it you must define a context object that you’ll need to manage the connection to the outside system. Your mapping function gets two parameters: the context object and the input item.

We’ll show you an example that connects to an HTTP service to get the enriching data. We’ll use the HTTP client that comes bundled with JDK 11. Jet wants you to provide a factory that creates the context objects. Here’s how we defined it:

private static ContextFactory<HttpClient> httpClient() {
    return ContextFactory
            .withCreateFn(jet -> HttpClient.newHttpClient())
            .nonCooperative()
            .shareLocally();
}

createFn is the function that returns the context object. It receives a JetInstance parameter which it can use, for example, to acquire an IMap. Since we connect to an external system, we ignore the parameter.

Most client objects must be explicitly destroyed after use. In such a case you can call, for example, factory.withDestroyFn(client -> client.close()). The JDK client doesn’t expose such a cleanup method.

Hazelcast Jet uses cooperative multithreading by default, but our HTTP client makes blocking calls. That’s why we must declare our operation as non-cooperative by calling nonCooperative().

Finally, our HTTP client is thread-safe and we can reuse the same instance from all the Jet worker threads running on the same machine, so we call shareLocally().

Since we wanted to show you a self-contained example, the code below starts its own mock HTTP server. We chose the JBoss Undertow server since it’s very simple to set up. Our dependency is io.undertow:undertow-core:2.0.13.Final. Our mock HTTP service returns the ASCII code of the first character of the URL path in the request. We use j, e, t as the input data and as output we expect the pairs (j, 10), (e, 5) and (t, 20). The job delivers its output into a Hazelcast IList and when it’s done, we print its contents using output.forEach(System.out::println).

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.pipeline.ContextFactory;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import io.undertow.Undertow;
import io.undertow.server.HttpServerExchange;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;

import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
import static io.undertow.util.Headers.CONTENT_TYPE;
import static java.util.Arrays.asList;

public class HttpEnrichment {
    public static void main(String[] args) {
        System.setProperty("hazelcast.logging.type", "log4j");

        var mockServer = mockHttpServer();
        mockServer.start();
        var jet = Jet.newJetInstance();
        try {
            var input = jet.<String>getList("input");
            input.addAll(asList("j", "e", "t"));
            var output = jet.<Tuple2<String, String>>getList("output");

            var p = Pipeline.create();
            p.drawFrom(Sources.list(input))
             .mapUsingContext(httpClient(), (httpc, item) -> {
                 String response = httpc
                    .send(GET("http://localhost:8008/" + item), 
                          BodyHandlers.ofLines())
                    .body().findFirst().orElse("");
                 return tuple2(item, response);
             })
             .drainTo(Sinks.list(output));

            jet.newJob(p).join();
            output.forEach(System.out::println);
        } finally {
            Jet.shutdownAll();
            mockServer.stop();
        }
    }

    private static ContextFactory<HttpClient> httpClient() {
        return ContextFactory
                .withCreateFn(x -> HttpClient.newHttpClient())
                .nonCooperative()
                .shareLocally();
    }

    private static HttpRequest GET(String uri) {
        return HttpRequest.newBuilder().uri(URI.create(uri)).build();
    }

    private static Undertow mockHttpServer() {
        return Undertow.builder()
                       .addHttpListener(8008, "localhost")
                       .setHandler(HttpEnrichment::handleRequest)
                       .build();
    }

    private static void handleRequest(HttpServerExchange exchange) {
        exchange.getResponseHeaders().put(CONTENT_TYPE, "text/plain");
        String requestPath = exchange.getRequestPath();
        exchange.getResponseSender().send(
            String.valueOf(requestPath.charAt(1) - 'a' + 1));
    }
}

Final Thoughts

Whatever computation you’re about to perform on your data stream, enrichment is the universal preparatory step, bringing in your pre-existing knowledge about the events. Hazelcast Jet has strong support for this task, allowing you to choose the approach that will give you the best performance. For batch jobs that have a limited lifespan, the hash join is a very good choice because it’s the fastest.

In jobs of unlimited lifespan, in order to keep your enriching data fresh, you can perform a direct lookup each time. If you store your data in the Jet cluster, you can achieve great throughput by looking up from IMap or ReplicatedMap, and Jet offers very convenient API for the lookup.

If your data is in an external system, you can prepare a context object that holds the open connections and perform the lookup and transformation with custom code.

Download Hazelcast Jet now >