How To Guide for Reading Apache Avro Files into Hazelcast IMDG

Introduction

Apache Avro is primarily a data transfer mechanism, developed as part of the Apache Hadoop project and is similar to Thrift, Protocol Buffers, etc. Data is serialized into a compact binary format suitable for transmission across a network and/or storage to a persistence layer (i.e. Disk, DB, etc.). The data within the Avro format is discrete, finite, and packed into a defined structure. Because of the structure, it is referred to as a message (i.e. Avro Message). For object-oriented programming languages, Avro messages are generally serialized to and from objects. As this paper is focused on reading these messages into Hazelcast IMDG, we will be focusing on de-serializing the messages into Java objects, but it’s important to note that other platforms may handle the process very differently. Platform independence is a primary motivator for the development of Avro.

An important feature of Avro is that the schema of the contained message structure is included within the message itself. This allows for a tight packing of data as any describing metadata is in the schema and does not have to be included, perhaps many duplicate times, within the data.

When sending Avro messages across a network to another receiving system, it is usually preferable to send individual messages as they are generated, or a micro batch, to implement an event-driven architecture. When dealing with file-based Avro messages however, it is much more common to have large batches of messages contained in one file, there can be hundreds of thousands of messages.

For event-based Avro architectures, Hazelcast Jet is the preferred method for handling ingestion into the Hazelcast platform. This will be the subject of a following Avro ‘how to’ guide, this paper will detail how to ingest file-based Avro messages.

The Avro Schema

As mentioned previously, the schema for the data within an Avro file is contained in the file and is read by the receiving system to determine how to deserialize the messages. Though the Avro file is mostly binary data, the schema is written at the head of the file in JSON format. It is possible to open the file in a text editor and simply copy the JSON out, but the Avro group has built several tool sets that make this process manageable and repeatable.

Generic vs Specific Record Parsing

The first decision to be made when parsing Avro is whether you will use a generic or specific record type. From a Java perspective, each type generates a class instance containing that messages values, the difference is whether you want type checking while parsing into a known class type, or schema flexibility of parsing into a dynamically generated class with your application handling the type checking and validation. For strongly typed languages such as Java, specific record parsing is more intuitive and straight forward, the mechanics of parsing the data is the same however. For this paper, we will use specific record types because we don’t want to get bogged down in validation code and we want to demonstrate how to generate the Java classes from the schema using Avro provided tools.

Generating Java Classes

As mentioned previously, we will be using a specific record parser for demonstration. To get started, we need a valid Avro file of the data type that will be parsed, the main Avro implementation jar, and the Avro tools jar.

At the time of this writing, Avro 1.8.2 is the current version, so we download avro-1.8.2.jar and avro-tools-1.8.2.jar from avro.apache.org.

Generating the corresponding Java classes is done in two steps.

Step 1 – Extract the schema into a JSON text file

In order to create the class definitions for parsing specific records, we first need to have a schema definition file in JSON format. The easiest way to do that is using Avro tools. Avro schema files have the extension avsc. The Avro toolset has a number of uses, getschema and compile are the two options we will use. Getschema prints the schema file and compile generates the class source code from the schema.

To generate the schema file, use:

java -jar avro-tools-1.8.2.jar getschema example.avro > example.avsc

This will produce a schema file, example.avsc in this example.

For a further exploration, this can also be done from Java using the Avro Java package, an example is included in the appendix of this paper.

Example Schema:

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

Step 2 – Generate Java Class Files from JSON Schema

Now that we have the schema file, we can generate the entire Java class structure. We will use the Avro tools again, this time to generate the required classes. A few things to note here: 1) This example is very simple and generates a simple class, in practice this class structure can be very complex and deep, 2) The namespace JSON element determines the class name space of the generated classes. You might be tempted to adjust this element to fit your preferred project name space, however this will likely cause errors during parsing as the data will contain the old name space in numerous places. This will usually cause a “NoClassDefFound” exception.

To generate the java source file(s), use:

java -jar avro-tools-1.8.2.jar compile schema example.avsc ./tmp

Note: The generated class is not included here as it is quite large and contains many methods used strictly by the Avro parser.

At this point, you should find your generated class(s) source in the ./tmp directory as noted above. You will need to either copy/import them into you IDE project, or compile them into a jar file and include on your class path.

From here, you no longer need the schema file for operation, all of the remaining Java based operations rely on the generated classes for object definition.

Reading Messages from Avro Files

Now that we have generated class definitions, we can use those and the Avro application jar file in our application to read and parse Avro message files.

The basic logic of reading an Avro file is fairly simple:

  1. Create an Avro DatumReader based on the top level class we generated. This is the parser that understands our class structure and hence the file data structure.
  2. Create an Avro DataFileReader. This reader understands the structure of the general Avro file format. It’s constructor takes a File object that points to our Avro data file, and the DatumReader that parses the individual messages.
  3. Use the DataFileReader to read each message contained in the data file until all the messages are consumed.

Note that the class we generated using the Avro tools is named User (as defined in the schema)

File Reader Example:

File file = new File(“fully qualified filename”):

DatumReader userDatumReader = new SpecificDatumReader(User.class);

DataFileReader dataFileReader = new DataFileReader(file, 
    userDatumReader);

User user = null; // See note below about this technique

while (dataFileReader.hasNext()) {
    user = dataFileReader.next(user);
    System.out.println(user);
}

Note: Depending on your usage, Avro has provided a ‘next’ method that will reuse the same object each time it is called and prevents the creation of potentially thousands of complex objects. This is demonstrated above with the parameter to the ‘next’ call. This is often useful because, as noted previously, the Avro generated classes contain an abundance of methods and fields that are only used by the Avro parsing methods, so it is common and best practice, to immediately map the data to a new, more compact class. Care should be taken if using the generated class directly as each iteration is by reference, essentially overwriting the previous.

At this point, we are reading and parsing Avro messages, now we must store them to IMDG.

Putting Messages into a Hazelcast Map

Now that our application is reading and parsing Avro messages, all that is left is to put them into our Hazelcast IMDG. Again, this is a fairly straight forward process, but there are a few decisions to make, depending on our architecture, message volume, and potentially message size.

Because Avro based applications tend to pack many messages into a single Avro file and we generally don’t know how many are present beforehand, we are faced with the prospect of having an unknown but potentially very large number of messages to process. Individual object ‘puts’ within the read loop are tempting because that is the simplest and most direct way of getting them to IMDG. However, the overhead of doing single ‘puts’ can be large when compared to ‘putall’ operations of some batch size, greater ingest performance can be gained by batching some number of parsed messages together and using ‘putall’ in this scenario.

A local HashMap works well for this with the optimum batch size being determined through testing. For example, adding to the above example:

Batch Reader/Put Example:

// New Hazelcast Objects
HazelcastInstance hzClient = HazelcastClient.newHazelcastInstance();
IMap< hzMap = hzClient.getMap(“myMap”);
HashMap localMap = new HashMap();

File file = new File(“fully qualified filename”):

DatumReader userDatumReader = new SpecificDatumReader(User.class);

DataFileReader dataFileReader = new DataFileReader(file, 
    userDatumReader);

User user = null; // See note above about this technique

int i = 0;
while (dataFileReader.hasNext()) {
    user = dataFileReader.next(user);
    // Put a new object into the Hashmap Instead
    localMap.put(Key, new SmallerUserClass(user));
    if(++i % BATCH_SIZE == 0) {
        hzMap.putAll(localMap); //probably spawn a thread here
        localMap.clear();
    }
}

Voila! We are now loading Avro messages into our Hazelcast IMDG.

Conclusion

As we’ve seen, there’s not really a lot of logic or coding that needs to be done for a basic method of reading and loading Avro messages into Hazelcast. Some caveats of course. Avro files come in all shapes and sizes, depending on your particular circumstances, major performance gains can be achieved through multi-threading. With a single large file, off-loading the actual putAll invoke to another thread will greatly help throughput, as the bottleneck is usually reading the messages and those no need to make that reader wait on the ‘putall’. Sometimes you will have many files with a smaller number of messages per file, in this case using parallel threading to read and load simultaneously will improve performance. Performance testing with various settings should always be done discover optimum performance.

Credits

The above example schema was reused from the Apache Avro documentation: https://avro.apache.org/docs/1.8.2/gettingstartedjava.html#download_install

Appendix

Java – Get Schema from File

public static Schema getAvroSchemaFromFile(String FQFilename) {

if (FQFilename == null) {
FQFilename = Thread.currentThread().getContextClassLoader().getResource("")
.getPath() + "example.avro";
    }

    Schema schema = null;
    DataFileReader dataFileReader = null;

    try {
        DatumReader datumReader = new GenericDatumReader();
dataFileReader = new DataFileReader(new File(FQFilename), datumReader);
        schema = dataFileReader.getSchema();
    } catch (IOException e) {
        throw new RuntimeException(e);
    } finally {
        try {
            dataFileReader.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    return schema;
}