Queries and Aggregations w/ Scala, part 2

As we saw in part 1, it took 2+ minutes, on my laptop, to run an aggregation across 6+ million crime cases.

However, since Hazelcast is cluster library, it only makes sense to take advantage of that. Let’s run this on a 3-node cluster. That will require a few code changes to CrimeNode:

  1. First, we want to enable parallel parsing, so each node can parse the data and populate the locally owned partitions. This will avoid excessive network traffic.
  2. We also want to enable adding more nodes on demand, so we can expand our cluster compute capability, if needed.

To achieve this, we define the a node as bi-modal at startup. Either a node is part of initial cluster formation, where the CSV is parsed, or it’s joining an existing cluster and the data is already in the cluster.

New code

Here’s the new version:

package crime

import java.io.InputStreamReader
import java.util.zip.ZipFile

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import com.hazelcast.Scala._
import com.hazelcast.cluster.ClusterState
import com.hazelcast.config.Config
import com.hazelcast.core.HazelcastInstance
import com.hazelcast.core.IMap
import com.hazelcast.logging.ILogger

object CrimeNode {

  def main(args: Array[String]): Unit = {
    val initial = args match {
      case Array(clusterSize, filename) =>
        Some(clusterSize.toInt -> new ZipFile(filename))
      case _ =>
        None
    }
    val hz = newInstance(initial.map(_._1))
    implicit val logger = hz.getLoggingService.getLogger(getClass)
    val crimeCases = getChicagoCrimes(hz)
    initial.foreach {
      case (clusterSize, csvFile) =>
        val cdl = hz.getCountDownLatch(s"${crimeCases.getName}:csvProcessing")
        cdl.trySetCount(clusterSize)
        hz.getCluster.changeClusterState(ClusterState.FROZEN)
        populateMap(hz, crimeCases, csvFile)
        cdl.countDown()
        if (cdl.getCount == 0) {
          cdl.destroy()
          hz.getCluster.changeClusterState(ClusterState.ACTIVE)
        } else if (!cdl.await(5.minutes)) {
          sys.error("Something must have happened, it shouldn't take 5+ minutes to finish parsing")
        }
    }
    logger.info("Ready!")
  }

  def populateMap(
    hz: HazelcastInstance,
    chicagoCrimeCases: IMap[Int, chicago.CrimeCase],
    csvFile: ZipFile)(implicit logger: ILogger): Unit = {

    val isLocal: (Int => Boolean) = {
      val localMember = hz.getCluster.getLocalMember
      val ps = hz.getPartitionService
      id => ps.getPartition(id).getOwner eq localMember
    }

    val zipEntry = csvFile.entries().nextElement()
    val zipInput = csvFile.getInputStream(zipEntry)
    val crimeReader = new InputStreamReader(zipInput)
    try {
      val asyncMap = chicagoCrimeCases.async
      val allCases = chicago.CrimeCSVParser.parse(crimeReader)
      val localCases = allCases.filter { case (id, _) => isLocal(id) }
      val finalCount = localCases.foldLeft(0) {
        case (lastCount, (id, crimeCase)) =>
          if (lastCount == 0) logger.info(s"Now processing ${csvFile.getName}")
          asyncMap.set(id, crimeCase).failed.foreach { e =>
            logger.warning(s"Failed to insert case $id", e)
          }
          val thisCount = lastCount + 1
          if (thisCount % 25000 == 0) logger.info(f"processed $thisCount%,d records")
          thisCount
      }
      logger.info(f"Done parsing CSV, total $finalCount%,d processed")
    } finally {
      crimeReader.close()
    }
  }

  def getChicagoCrimes(hz: HazelcastInstance) = hz.getMap[Int, chicago.CrimeCase]("chicago-crimes")

  private def newInstance(initClusterSize: Option[Int]): HazelcastInstance = {
    val conf = new Config

    conf.getGroupConfig.setName("crime-cluster")

    initClusterSize.foreach(conf.setInitialMinClusterSize)

    conf.getMapConfig("default").setBackupCount(0)

    serialization.Defaults.register(conf.getSerializationConfig)
    serialization.DynamicExecution.register(conf.getSerializationConfig)

    conf.newInstance
  }

}

A few things have changed:

  1. At startup, the node either joins an existing cluster, no parameters passed, or it’s part of the initial cluster and cluster size and zip file name is expected
  2. While parsing, we freeze the cluster to prevent new nodes joining. This is to avoid partition rebalancing, which would mess up the parsing.
  3. We also disable data backups (i.e. replicas), since we want to conserve memory for this example.

Ok, starting 3 nodes and loading the CSV file now takes about 1.5 minutes on my setup of two laptops and an older desktop PC. Not surprising, since we still have to parse the full CSV file on each node, but we do gain a little.

Running on 3 nodes

I’m skipping the imports, configuration, and connection logic (identical to part 1), and showing the query steps here again:

scala> val crimeCases = crime.CrimeNode.getChicagoCrimes(hz)
crimeCases: com.hazelcast.core.IMap[Int,crime.chicago.CrimeCase] = IMap{name='chicago-crimes'}

scala> val crimesEntries2001_2015 = crimeCases.filter(where("year")  val crimes2001_2015 = crimesEntries2001_2015.map(_.value)
crimes2001_2015: com.hazelcast.Scala.dds.DDS[crime.chicago.CrimeCase] = com.hazelcast.Scala.dds.MapDDS@7a65a360

scala> val casesByTypeAndYear = crimes2001_2015.groupBy(cc => cc.primaryType -> cc.year)
casesByTypeAndYear: com.hazelcast.Scala.dds.GroupDDS[(String, Short),crime.chicago.CrimeCase] = com.hazelcast.Scala.dds.MapGroupDDS@720f56e2

scala> val (countByTypeAndYear, execTime) = timeThis { casesByTypeAndYear.count() }
countByTypeAndYear: scala.collection.Map[(String, Short),Int] = Map((STALKING,2009) -> 167, (MOTOR VEHICLE THEFT,2014) -> 9902, (THEFT,2015) -> 57219, (OBSCENITY,2014) -> 36, (WEAPONS VIOLATION,2008) -> 3877, (CRIMINAL TRESPASS,2008) -> 12310, (STALKING,2008) -> 190, (KIDNAPPING,2009) -> 293, (OBSCENITY,2006) -> 17, (HOMICIDE,2007) -> 448, (BATTERY,2005) -> 83964, (MOTOR VEHICLE THEFT,2008) -> 18881, (BATTERY,2010) -> 65400, (PUBLIC INDECENCY,2015) -> 14, (BURGLARY,2002) -> 25623, (WEAPONS VIOLATION,2004) -> 4297, (ASSAULT,2002) -> 31521, (INTERFERENCE WITH PUBLIC OFFICER,2005) -> 615, (DECEPTIVE PRACTICE,2010) -> 12377, (OFFENSE INVOLVING CHILDREN,2007) -> 2854, (OFFENSE INVOLVING CHILDREN,2015) -> 2193, (HUMAN TRAFFICKING,2013) -> 2, (OTHER OFFENSE,2009) -> 25601, (NON-CRIMINAL,2012) ...
println(s"Took ${execTime.toSeconds} secs")
Took 34 secs

Wow, 34 seconds on 3 nodes as opposed to 155 seconds on a single node, better than linear improvement.

So, simply adding more compute and memory resources, is probably the single most dramatic way to improve your performance. And it’s linearly scalable.

Serialization

However, 34 seconds is still not good, so let’s see we can further improve this.

One obvious thing to fix is to add custom serialization. So far we’ve relied on Java’s Serializable, which is used by default and is automatic for case classes (here CrimeCase).

Instead, let’s hand write one and see what that gets us:

package crime

import java.time.Instant
import java.time.ZonedDateTime

import com.hazelcast.Scala.serialization.SerializerEnum
import com.hazelcast.nio.{ ObjectDataInput, ObjectDataOutput }

import crime.chicago.CrimeCase

object HazelcastSerializers extends SerializerEnum {

  val ChicagoCrimeCaseSer = new StreamSerializer[chicago.CrimeCase] {
    def write(out: ObjectDataOutput, obj: CrimeCase): Unit = {
      out.writeLong(obj.time.toInstant.toEpochMilli)
      out.writeUTF(obj.block)
      out.writeUTF(obj.primaryType)
      out.writeUTF(obj.description)
      out.writeUTF(obj.location)
      out.writeShort(obj.year)
    }
    def read(inp: ObjectDataInput): CrimeCase =
      new CrimeCase(
        time = ZonedDateTime.ofInstant(Instant.ofEpochMilli(inp.readLong), chicago.TimeZone),
        block = inp.readUTF,
        primaryType = inp.readUTF,
        description = inp.readUTF,
        location = inp.readUTF,
        year = inp.readShort
      )
  }

}

And then we remember to register it for both CrimeNode and the client configuration:

HazelcastSerializers.register(conf.getSerializationConfig)

Let’s see how much difference this makes:

scala> println(s"Took ${execTime.toSeconds} secs")
Took 7 secs

Holy Hazelnut, Batman

If this doesn’t illustrate why it is important to not rely on Java’s Serializable, but instead use custom serialization, I don’t know what will. That improved throughput with a factor of almost 5.

Can we do anything else?

Bypassing serialization

Hazelcast by default stores everything in BINARY format, because it’s much faster when sending the bytes on the network. For distributed aggregations, not so much. So let’s see if we can squeeze a little more out by using OBJECT format, which bypasses serialization completely.

We’ll make this change in the newInstance method:

private def newInstance(initClusterSize: Option[Int]): HazelcastInstance = {
  val conf = new Config

  conf.getGroupConfig.setName("crime-cluster")

  initClusterSize.foreach(conf.setInitialMinClusterSize)

  conf.getMapConfig("default")
    .setBackupCount(0)
    .setInMemoryFormat(InMemoryFormat.OBJECT) // <- adding this

  serialization.Defaults.register(conf.getSerializationConfig)
  serialization.DynamicExecution.register(conf.getSerializationConfig)
  HazelcastSerializers.register(conf.getSerializationConfig)

  conf.newInstance
}

Run again, and…

scala> println(s"Took ${execTime.toSeconds} secs")
Took 4 secs

From 7 to 4 seconds, another 40% improvement, not bad at all.

Keep in mind that OBJECT will perform even better, compared to BINARY, when the values are larger and more complex, since it is bypassing the deserialization step. In this case, the CrimeCase class is small and compact, and BINARY, with a custom serializer, still performs quite well.

At this point we’ve probably hit the limit of what we can achieve in terms of optimizations, but since Hazelcast is a dynamically scalable cluster, it’s really easy to add more nodes and achieve linearly scalable improvements.

Indexing

On a final note, I’d like to do a different query and show one more performance optimization.

Let’s look at the number of crimes, per type, for the year 2005:

scala> timeThis { crimeCases.filter(where("year") = 2005).map(_.value).groupBy(_.primaryType).count() }
res8: (scala.collection.Map[String,Int], scala.concurrent.duration.Duration) = (Map(WEAPONS VIOLATION -> 4106, LIQUOR LAW VIOLATION -> 1005, OFFENSE INVOLVING CHILDREN -> 2871, CRIM SEXUAL ASSAULT -> 1530, NARCOTICS -> 56234, PUBLIC INDECENCY -> 4, CRIMINAL DAMAGE -> 54548, RITUALISM -> 2, DECEPTIVE PRACTICE -> 13540, OBSCENITY -> 19, ASSAULT -> 27066, MOTOR VEHICLE THEFT -> 22497, ARSON -> 691, THEFT -> 85685, INTIMIDATION -> 258, BATTERY -> 83964, HOMICIDE -> 453, ROBBERY -> 16047, PROSTITUTION -> 6124, SEX OFFENSE -> 1801, STALKING -> 192, CRIMINAL TRESPASS -> 16655, INTERFERENCE WITH PUBLIC OFFICER -> 615, GAMBLING -> 1078, OTHER OFFENSE -> 28027, OTHER NARCOTIC VIOLATION -> 9, PUBLIC PEACE VIOLATION -> 2730, BURGLARY -> 25504, KIDNAPPING -> 389),1604 milliseconds)

That took 1,604 milliseconds, not too bad. However, behind the scenes the engine was still processing the entire data set, because we don’t have any indexing. One of the reasons the filter call looks a little different (where("year") = 2005 instead of the more natural _.year == 2005), is because it tells Hazelcast explicitly which fields are being filtered thus allowing the use of indexes, if they exist. So far, we haven’t defined any. But we can add one on-the-fly and rerun the query:

scala> crimeCases.addIndex("year", true) //  timeThis { crimeCases.filter(where("year") = 2005).map(_.value).groupBy(_.primaryType).count() }
res10: (scala.collection.Map[String,Int], scala.concurrent.duration.Duration) = (Map(WEAPONS VIOLATION -> 4106, LIQUOR LAW VIOLATION -> 1005, CRIM SEXUAL ASSAULT -> 1530, OFFENSE INVOLVING CHILDREN -> 2871, NARCOTICS -> 56234, PUBLIC INDECENCY -> 4, CRIMINAL DAMAGE -> 54548, RITUALISM -> 2, DECEPTIVE PRACTICE -> 13540, OBSCENITY -> 19, ASSAULT -> 27066, MOTOR VEHICLE THEFT -> 22497, ARSON -> 691, THEFT -> 85685, INTIMIDATION -> 258, BATTERY -> 83964, HOMICIDE -> 453, ROBBERY -> 16047, PROSTITUTION -> 6124, SEX OFFENSE -> 1801, STALKING -> 192, CRIMINAL TRESPASS -> 16655, INTERFERENCE WITH PUBLIC OFFICER -> 615, GAMBLING -> 1078, OTHER OFFENSE -> 28027, OTHER NARCOTIC VIOLATION -> 9, PUBLIC PEACE VIOLATION -> 2730, BURGLARY -> 25504, KIDNAPPING -> 389),360 milliseconds)

An index on year brings this particular query down from 1,604 to 360 milliseconds, a nice ~75% improvement.

Conclusion

I hope this has been an informative two-part blog post, on not only the query and aggregation capabilities of the Scala API for Hazelcast, but also some of the techniques that can be applied to improve performance.