Distribute with Hazelcast, Persist into HBase
Distribute with Hazelcast, Persist into HBase
In this article, I will implement a solution for a Big Data scenario.
I will use HBase as a persistence layer, and Hazelcast as a distributed cache.
So the resulting project will be a “Getting Started Sample” for ones who want to use HBase as persistent storage for their Hazelcast application.
The Scenario
Suppose you have (or hope to have:)
- “User” data with billions of records. -> Big Data
- People will reach the data from your web application; query them, search them… -> Real-time Access
- Some records will be reached more frequently -> Cache them in memory, serve faster.
- Can add/remove columns, no strict schema -> Sparse data
Given the main requirements, the solution “NoSQL + Distributed Cache” fits to our scenario.
I will persist in user data to the HBase:
A no-SQL key-value datastore based on Hadoop technology and specialized for Big Data requirements.
It is modeled after Google’s Big table and used by Yahoo and Facebook.
Chicago Data Summit Presentation >>
Apache HBase Reference Guide >>
Facebook’s New Real-Time Messaging System: HBase To Store 135+ Billion Messages A Month >>
I will cache and distribute the data with Hazelcast. Learn more about Hazelcast: https://hazelcast.com/docs/2.0/manual/single_html/#Introduction
HBase Setup
HBase is intended to be used in cluster, but it has a standalone mode that you can try and use for development purposes.
For HBase setup follow: http://hbase.apache.org/book.html#quickstart
If you use Ubuntu, you will encounter problems.
Check this: http://hbase.apache.org/book.html#os
Although windows is not recommended for production, still you can try HBase on Windows.
Check: http://hbase.apache.org/cygwin.html
Hazelcast Setup
Hazelcast is deadly simple to use. Just download and add hazelcast.jar to your classpath.
If you are new to Hazelcast have a look at: https://hazelcast.com/docs/2.0/manual/single_html/#GettingStarted
Project Setup
Create a maven Java project with dependencies:
<project xmlns_xsi="http://www.w3.org/2001/XMLSchema-instance" xsi_schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>hazelcast_hbase</artifactId> <dependencies> <dependency> <artifactId>hazelcast</artifactId> <groupId>com.hazelcast</groupId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>0.92.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.0.2</version> </dependency> </dependencies> </project>
Create a User pojo:
package com.hazelcast.hbase; import java.io.Serializable; public class User implements Serializable { private String name; private String location; private Integer age; private String details; public String getDetails() { return details; } public void setDetails(String details) { this.details = details; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getLocation() { return location; } public void setLocation(String location) { this.location = location; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } @Override public String toString() { return "User{" + "name='" + name + ''' + ", location='" + location + ''' + ", age=" + age + ", details='" + details + ''' + '}'; } }
Create the user table in HBase:
Run your Hbase by,
HBASE_DIR> ./bin/start-hbase.sh
Here it will be good to check the logs, to be sure it is installed and started properly.
Then open the Hbase shell by,
HBASE_DIR> ./bin/hbase shell
Create the user table
hbase(main):008:0> create ‘user’, ‘cf_basic’, ‘cf_text’
Here I should tell more about ‘cf_basic’ and ‘cf_text’. These are column families.
Column families are stored together in the disk with the same storage specifications.
For example if you want some type of data (e.g. images) to be compressed then make them the same column family so you can define the same storage rule for them.
Here we have two column families: ‘cf_basic’ is for simple types, numbers, strings and ‘cf_text’ is for long text columns.
Notice that we have done nothing about schema, column types etc.
In the HBase intro video, you will recall Todd uses the term “datastore” instead “database” defining HBase.
HBase (and other key-value stores) is more like a persisted HashMap than a database.
You gain scalability but lose complex queries.
Create HBaseMapStore
This is the class where Hazelcast will call at each map operation.
package com.hazelcast.hbase; import com.hazelcast.core.MapStore; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.*; import java.util.logging.Logger; public class HBaseMapStore implements MapStore<String, User> { private Logger logger; @Override public synchronized User load(String key) { User user = new User(); HTable htable = HBaseService.getInstance().getHtable(); try { Get get = new Get(Bytes.toBytes(key)); Result r = htable.get(get); if(r.isEmpty()) return null; byte[] bname = r.getValue(Bytes.toBytes("cf_basic"), Bytes.toBytes("name")); byte[] blocation = r.getValue(Bytes.toBytes("cf_basic"), Bytes.toBytes("location")); byte[] bage = r.getValue(Bytes.toBytes("cf_basic"), Bytes.toBytes("age")); byte[] bdetails = r.getValue(Bytes.toBytes("cf_text"), Bytes.toBytes("details")); user.setName(Bytes.toString(bname)); user.setLocation(Bytes.toString(blocation)); user.setDetails(Bytes.toString(bdetails)); user.setAge(Bytes.toInt(bage)); } catch (IOException e) { e.printStackTrace(); } return user; } @Override public synchronized void store(String key, User user) { HTable htable = HBaseService.getInstance().getHtable(); Put put = new Put(Bytes.toBytes(key)); put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("name"), Bytes.toBytes(user.getName())); put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("age"), Bytes.toBytes(user.getAge())); put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("location"), Bytes.toBytes(user.getLocation())); put.add(Bytes.toBytes("cf_text"), Bytes.toBytes("details"), Bytes.toBytes(user.getDetails())); try { htable.put(put); } catch (IOException e) { e.printStackTrace(); } } @Override public synchronized void storeAll(Map<String, User> userMap) { HTable htable = HBaseService.getInstance().getHtable(); List<Row> rlist = new LinkedList<Row>(); try { for (String key : userMap.keySet()) { Put put = new Put(Bytes.toBytes(key)); User user = userMap.get(key); put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("name"), Bytes.toBytes(user.getName())); put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("age"), Bytes.toBytes(user.getAge())); put.add(Bytes.toBytes("cf_basic"), Bytes.toBytes("location"), Bytes.toBytes(user.getLocation())); put.add(Bytes.toBytes("cf_text"), Bytes.toBytes("details"), Bytes.toBytes(user.getDetails())); rlist.add(put); } htable.batch(rlist); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } @Override public synchronized void delete(String key) { HTable htable = HBaseService.getInstance().getHtable(); Delete delete = new Delete(Bytes.toBytes(key)); try { htable.delete(delete); } catch (IOException e) { e.printStackTrace(); } } @Override public synchronized void deleteAll(Collection<String> keys) { HTable htable = HBaseService.getInstance().getHtable(); List<Row> rlist = new LinkedList<Row>(); try { for (String key : keys) { Delete delete = new Delete(Bytes.toBytes(key)); rlist.add(delete); } htable.batch(rlist); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } @Override public Map<String, User> loadAll(Collection<String> strings) { // will not be implemented return null; } @Override public Set<String> loadAllKeys() { // will not be implemented return null; } }
And a singleton service for getting HBase table.
package com.hazelcast.hbase; import org.apache.hadoop.hbase.client.HTable; import java.io.IOException; public class HBaseService { private static HBaseService ourInstance = new HBaseService(); private HTable htable; public static HBaseService getInstance() { return ourInstance; } public HTable getHtable() { return htable; } public void setHtable(HTable htable) { this.htable = htable; } private HBaseService() { try { htable = new HTable("user"); } catch (IOException e) { e.printStackTrace(); } } }
On map.get Hazelcast will look at HBase if it can not find the key in memory. Similarly when you put an element to map, Hazelcast will persist it to HBase.
Why have not we implemented the loadAll? loadAll and loadAllKeys methods are for initially filling the Hazelcast map from database. As we expect millions of records, it is not feasible to load db to memory. So we left them empty.
Unfortunately HTable is not thread safe, so you have to handle concurrency.
Configure Hazelcast
Here is hazelcast.xml that we put to classpath.
dev dev-pass http://localhost:8080/mancenter 5701 224.2.2.3 54327 127.0.0.1 10.10.1.* 16 64 60 0 default
First difference from default one is I have added mapstore declaration to map config part.
Secondly I have enabled the eviction on maps. You can use Hazelcast as a distributed cache by enabling eviction. So Hazelcast evicts (removes) expired entries. To enable eviction set eviction-policy to LRU (or LFU) and max-size. For more information about Hazelcast eviction see: http://www.enesakar.com/post/45187349900/eviction-and-hazelcast
Run The Code
Now let’s test it.
package com.hazelcast.hbase; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.IMap; public class Main { public static void main(String[] args) { IMap<String, User> map = Hazelcast.getMap("map"); System.out.println( map.get("user-6") ); User user = new User(); user.setName("Enes Akar"); user.setAge(29); user.setLocation("Istanbul"); user.setDetails("software developer ....."); map.put("u-5",user); User user2 = new User(); user2.setName("Mehmet Dogan"); user2.setAge(29); user2.setLocation("Istanbul"); user2.setDetails("software developer ....."); map.put("u-6",user2); System.out.println( map.get("u-5") ); System.out.println( map.get("u-6") ); map.remove("u-5"); System.out.println( map.get("u-5") ); } }
And see the records in database:
hbase(main):055:0> get ‘user’, ‘u-6’ COLUMN CELL cf_basic:age timestamp=1334320415281, value=x00x00x00x1D cf_basic:location timestamp=1334320415281, value=Istanbul cf_basic:name timestamp=1334320415281, value=Mehmet Dogan cf_text:details timestamp=1334320415281, value=software developer ….. 4 row(s) in 0.0150 second
Write-Through and Write-Behind
The default configuration of map-store is write-through cache: records are synchronously persisted to datastore.
If you set write-delay-seconds in hazelcast.xml to a positive value then the behaviour will be write-behind cache.
The entries added will be persisted after n seconds.
deleteAll and storeAll methods implemented in mapstore are used in write-behind mode.
POJO Mapping
If you do not want to map your objects manually; you can use Kundera.
It is JPA compliant ORM for Big data.
https://github.com/impetus-opensource/Kundera
Source Code
You can reach the example project code:
https://github.com/enesakar/hazelcast_hbase