Distribute with Hazelcast, Persist into HBase

Distribute with Hazelcast, Persist into HBase

In this article, I will implement a solution for a Big Data scenario.

I will use HBase as a persistence layer, and Hazelcast as a distributed cache.

So the resulting project will be a “Getting Started Sample” for ones who want to use HBase as persistent storage for their Hazelcast application.

The Scenario

Suppose you have (or hope to have:)

  • “User” data with billions of records. -> Big Data
  • People will reach the data from your web application; query them, search them… -> Real-time Access
  • Some records will be reached more frequently -> Cache them in memory, serve faster.
  • Can add/remove columns, no strict schema -> Sparse data

Given the main requirements, the solution “NoSQL + Distributed Cache” fits to our scenario.

I will persist in user data to the HBase:

A no-SQL key-value datastore based on Hadoop technology and specialized for Big Data requirements.

It is modeled after Google’s Big table and used by Yahoo and Facebook.

Chicago Data Summit Presentation >>

Apache HBase Reference Guide >>

Facebook’s New Real-Time Messaging System: HBase To Store 135+ Billion Messages A Month >>

I will cache and distribute the data with Hazelcast. Learn more about Hazelcast: https://hazelcast.com/docs/2.0/manual/single_html/#Introduction

HBase Setup

HBase is intended to be used in cluster, but it has a standalone mode that you can try and use for development purposes.

For HBase setup follow: http://hbase.apache.org/book.html#quickstart

If you use Ubuntu, you will encounter problems.

Check this: http://hbase.apache.org/book.html#os

Although windows is not recommended for production, still you can try HBase on Windows.

Check: http://hbase.apache.org/cygwin.html

Hazelcast Setup

Hazelcast is deadly simple to use. Just download and add hazelcast.jar to your classpath.

If you are new to Hazelcast have a look at: https://hazelcast.com/docs/2.0/manual/single_html/#GettingStarted

Project Setup

Create a maven Java project with dependencies:


<project 
         xmlns_xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi_schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hazelcast_hbase</artifactId>

    <dependencies>
        <dependency>
            <artifactId>hazelcast</artifactId>
            <groupId>com.hazelcast</groupId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>0.92.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.0.2</version>
        </dependency>
    </dependencies>

</project>

Create a User pojo:

package com.hazelcast.hbase;


import java.io.Serializable;

public class User implements Serializable {
    private String name;
    private String location;
    private Integer age;
    private String details;

    public String getDetails() {
        return details;
    }

    public void setDetails(String details) {
        this.details = details;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + ''' +
                ", location='" + location + ''' +
                ", age=" + age +
                ", details='" + details + ''' +
                '}';
    }
}

Create the user table in HBase:

Run your Hbase by,

HBASE_DIR> ./bin/start-hbase.sh

Here it will be good to check the logs, to be sure it is installed and started properly.

Then open the Hbase shell by,

HBASE_DIR> ./bin/hbase shell

Create the user table

hbase(main):008:0> create ‘user’, ‘cf_basic’, ‘cf_text’

Here I should tell more about ‘cf_basic’ and ‘cf_text’. These are column families.

Column families are stored together in the disk with the same storage specifications.

For example if you want some type of data (e.g. images) to be compressed then make them the same column family so you can define the same storage rule for them.

Here we have two column families: ‘cf_basic’ is for simple types, numbers, strings and ‘cf_text’ is for long text columns.

Notice that we have done nothing about schema, column types etc.

In the HBase intro video, you will recall Todd uses the term “datastore” instead “database” defining HBase.

HBase (and other key-value stores) is more like a persisted HashMap than a database.

You gain scalability but lose complex queries.

Create HBaseMapStore

This is the class where Hazelcast will call at each map operation.

package com.hazelcast.hbase;

import com.hazelcast.core.MapStore;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.*;
import java.util.logging.Logger;


public class HBaseMapStore implements MapStore<String, User> {
    private Logger logger;



    @Override
    public synchronized User load(String key) {
        User user = new User();
        HTable htable = HBaseService.getInstance().getHtable();
        try {
            Get get = new Get(Bytes.toBytes(key));
            Result r = htable.get(get);
            if(r.isEmpty())
                return null;
            byte[] bname = r.getValue(Bytes.toBytes("cf_basic"), Bytes.toBytes("name"));
            byte[] blocation = r.getValue(Bytes.toBytes("cf_basic"), Bytes.toBytes("location"));
            byte[] bage = r.getValue(Bytes.toBytes("cf_basic"), Bytes.toBytes("age"));
            byte[] bdetails = r.getValue(Bytes.toBytes("cf_text"), Bytes.toBytes("details"));
            user.setName(Bytes.toString(bname));
            user.setLocation(Bytes.toString(blocation));
            user.setDetails(Bytes.toString(bdetails));
            user.setAge(Bytes.toInt(bage));
        } catch (IOException e) {
            e.printStackTrace();
        }

        return user;
    }


    @Override
    public synchronized void store(String key, User user) {
        HTable htable = HBaseService.getInstance().getHtable();
        Put put = new Put(Bytes.toBytes(key));
        put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("name"), Bytes.toBytes(user.getName()));
        put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("age"), Bytes.toBytes(user.getAge()));
        put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("location"), Bytes.toBytes(user.getLocation()));
        put.add(Bytes.toBytes("cf_text"), Bytes.toBytes("details"), Bytes.toBytes(user.getDetails()));
        try {
            htable.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public synchronized void storeAll(Map<String, User> userMap) {
        HTable htable = HBaseService.getInstance().getHtable();
        List<Row> rlist = new LinkedList<Row>();
        try {
            for (String key : userMap.keySet()) {
                Put put = new Put(Bytes.toBytes(key));
                User user = userMap.get(key);
                put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("name"), Bytes.toBytes(user.getName()));
                put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("age"), Bytes.toBytes(user.getAge()));
                put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("location"), Bytes.toBytes(user.getLocation()));
                put.add(Bytes.toBytes("cf_text"), Bytes.toBytes("details"), Bytes.toBytes(user.getDetails()));
                rlist.add(put);
            }
            htable.batch(rlist);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public synchronized void delete(String key) {
        HTable htable = HBaseService.getInstance().getHtable();
        Delete delete = new Delete(Bytes.toBytes(key));
        try {
            htable.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public synchronized void deleteAll(Collection<String> keys) {
        HTable htable = HBaseService.getInstance().getHtable();
        List<Row> rlist = new LinkedList<Row>();
        try {
            for (String key : keys) {
                Delete delete = new Delete(Bytes.toBytes(key));
                rlist.add(delete);
            }
            htable.batch(rlist);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public Map<String, User> loadAll(Collection<String> strings) {
        // will not be implemented
        return null;
    }

    @Override
    public Set<String> loadAllKeys() {
        // will not be implemented
        return null;
    }
}

And a singleton service for getting HBase table.

package com.hazelcast.hbase;

import org.apache.hadoop.hbase.client.HTable;

import java.io.IOException;


public class HBaseService {
    private static HBaseService ourInstance = new HBaseService();
    private HTable htable;


    public static HBaseService getInstance() {
        return ourInstance;
    }

    public HTable getHtable() {
        return htable;
    }

    public void setHtable(HTable htable) {
        this.htable = htable;
    }

    private HBaseService() {
        try {
            htable = new HTable("user");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

On map.get Hazelcast will look at HBase if it can not find the key in memory. Similarly when you put an element to map, Hazelcast will persist it to HBase.

Why have not we implemented the loadAll? loadAll and loadAllKeys methods are for initially filling the Hazelcast map from database. As we expect millions of records, it is not feasible to load db to memory. So we left them empty.

Unfortunately HTable is not thread safe, so you have to handle concurrency.

Configure Hazelcast

Here is hazelcast.xml that we put to classpath.



    
        dev
        dev-pass
    
    http://localhost:8080/mancenter
    
        5701
        
            
                224.2.2.3
                54327
            
            
                127.0.0.1
            
        
        
            10.10.1.*
        
        
        
    
    
    
        16
        64
        60
    
    
        0
        default
    
    



















  1
                      0
                      0
                      LRU
                      1000
                      25
                      hz.ADD_NEW_ENTRY
                      
    com.hazelcast.hbase.HBaseMapStore
                                          5
                                      
                                
                          
                      
                  
              
          
      
  

 

 

 

  

First difference from default one is I have added mapstore declaration to map config part.

Secondly I have enabled the eviction on maps. You can use Hazelcast as a distributed cache by enabling eviction. So Hazelcast evicts (removes) expired entries. To enable eviction set eviction-policy to LRU (or LFU) and max-size. For more information about Hazelcast eviction see: http://www.enesakar.com/post/45187349900/eviction-and-hazelcast

Run The Code

Now let’s test it.

package com.hazelcast.hbase;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.IMap;

public class Main {
    public static void main(String[] args) {
        IMap<String, User> map = Hazelcast.getMap("map");
        System.out.println( map.get("user-6") );
        User user = new User();
        user.setName("Enes Akar");
        user.setAge(29);
        user.setLocation("Istanbul");
        user.setDetails("software developer .....");
        map.put("u-5",user);
        User user2 = new User();
        user2.setName("Mehmet Dogan");
        user2.setAge(29);
        user2.setLocation("Istanbul");
        user2.setDetails("software developer .....");
        map.put("u-6",user2);
        System.out.println( map.get("u-5") );
        System.out.println( map.get("u-6") );
        map.remove("u-5");
        System.out.println( map.get("u-5") );
    }

}

And see the records in database:

hbase(main):055:0> get ‘user’, ‘u-6’
COLUMN                CELL                                                      
 cf_basic:age         timestamp=1334320415281, value=x00x00x00x1D           
 cf_basic:location    timestamp=1334320415281, value=Istanbul                   
 cf_basic:name        timestamp=1334320415281, value=Mehmet Dogan               
 cf_text:details      timestamp=1334320415281, value=software developer …..   
4 row(s) in 0.0150 second

Write-Through and Write-Behind

The default configuration of map-store is write-through cache: records are synchronously persisted to datastore.

If you set write-delay-seconds in hazelcast.xml to a positive value then the behaviour will be write-behind cache.

The entries added will be persisted after n seconds.

deleteAll and storeAll methods implemented in mapstore are used in write-behind mode.

POJO Mapping

If you do not want to map your objects manually; you can use Kundera.

It is JPA compliant ORM for Big data.

https://github.com/impetus-opensource/Kundera

Source Code

You can reach the example project code:
https://github.com/enesakar/hazelcast_hbase