Vert.x Cluster

Marut Signh recently published a blog post entitled, “Vert.x Cluster”. In the post, Marut gives a short tutorial on embedding Vert.x into your Hazelcast IMDG project.


Vert.x is an extremely simple event based, non blocking, library for distributed computing that can be easily embedded in any Java framework of your choice.

For sometime now I have been exploring Vert.x. I was looking for a vert.x cluster sample but I did not find any decent example so I decided to share that with community.

I modified one of the sample available from Vert.x examples and I will explain core parts of it here.

First the cluster has to be configured.

Config hazelcastConfig = new Config();

    Config hazelcastConfig = new Config();
    hazelcastConfig.getNetworkConfig().getJoin().getTcpIpConfig().addMember("127.0.0.1").setEnabled(true);
    hazelcastConfig.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
    
    ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
    VertxOptions options = new VertxOptions().setClusterManager(mgr);
    Vertx.clusteredVertx(options, res -> {
        if (res.succeeded()) {
            vertx = res.result();
            deploy(vertx,context,mode);
        }
    });

This code can be part of your main function. Next crucial step is to include a cluster config file called “cluster.xml”. This should be on classpath of your application so it can be put inside src/main/resources I am going to test this application on my local machine so I am using TCP discovery rather than Multicast for my application. Vert.x underlying uses hazelcast for all it’s clustering capabilities. Hazelcast also comes up with a special configuration for AWS. So if you are going to deploy your application on AWS then that is the configuration you should use.

<?xml version="1.0" encoding="UTF-8"?>
    <hazelcast xsi_schemaLocation="https://hazelcast.com/schema/config hazelcast-config-3.2.xsd"
               
               xmlns_xsi="http://www.w3.org/2001/XMLSchema-instance">
        <properties>
           .....
            <property name="hazelcast.wait.seconds.before.join">0</property>
        </properties>
    
        <group>
            <name>dev</name>
            <password>dev-pass</password>
        </group>
        <management-center enabled="false">http://localhost:8080/mancenter</management-center>
        <network>
            <port auto-increment="true" port-count="10000">5701</port>
            <outbound-ports>
                <!--
                Allowed port range when connecting to other nodes.
                0 or * means use system provided port.
                -->
                <ports>0</ports>
            </outbound-ports>
            <join>
                <!--<multicast enabled="false">-->
                <!--<multicast-group>224.2.2.3</multicast-group>-->
                <!--<multicast-port>54327</multicast-port>-->
                <!--</multicast>-->
                <tcp-ip enabled="true">
                    <interface>192.168.1.8</interface>
                </tcp-ip>
                <aws enabled="false">
                    .....
                </aws>
            </join>
            <interfaces enabled="false">
                <interface>10.10.1.*</interface>
            </interfaces>        
        </network>
        <partition-group enabled="false"/>
        <executor-service name="default">
            <pool-size>16</pool-size>
            <!--Queue capacity. 0 means Integer.MAX_VALUE.-->
            <queue-capacity>0</queue-capacity>
        </executor-service>
        <map name="__vertx.subs">
    
            <!--
                Number of backups. If 1 is set as the backup-count for example,
                then all entries of the map will be copied to another JVM for
                fail-safety. 0 means no backup.
            -->
            <backup-count>1</backup-count>
    
            <time-to-live-seconds>0</time-to-live-seconds>
            <max-idle-seconds>0</max-idle-seconds>
            <!--
                Valid values are:
                NONE (no eviction),
                LRU (Least Recently Used),
                LFU (Least Frequently Used).
                NONE is the default.
            -->
            <eviction-policy>NONE</eviction-policy>
            <!--
                Maximum size of the map. When max size is reached,
                map is evicted based on the policy defined.
                Any integer between 0 and Integer.MAX_VALUE. 0 means
                Integer.MAX_VALUE. Default is 0.
            -->
            <max-size policy="PER_NODE">0</max-size>
            <!--
                When max. size is reached, specified percentage of
                the map will be evicted. Any integer between 0 and 100.
                If 25 is set for example, 25% of the entries will
                get evicted.
            -->
            <eviction-percentage>25</eviction-percentage>
            <merge-policy>
          com.hazelcast.map.merge.LatestUpdateMapMergePolicy
    </merge-policy>
        </map>
    
        <!-- Used internally in Vert.x to implement async locks -->
        <semaphore name="__vertx.*">
            <initial-permits>1</initial-permits>
        </semaphore>
    
    </hazelcast>

I have included only important parts and left others from configuration Once we are done with cluster configuration all we have to do is create a verticle and deploy it.

public class ServerVerticle extends AbstractVerticle {
    
        int port;
        public ServerVerticle(int port){
            super();
            this.port = port;
        }
        @Override
        public void start() throws Exception {
            super.start();
            HttpServer server = vertx.createHttpServer();
            server.requestHandler(req -> {
                if (req.method() == HttpMethod.GET) {
                    req.response().setChunked(true);
    
                    if (req.path().equals("/products")) {
                        vertx.eventBus().<String>send(SpringDemoVerticle.ALL_PRODUCTS_ADDRESS, "", result -> {
                            if (result.succeeded()) {
                                req.response().setStatusCode(200).write(result.result().body()).end();
                            } else {
                                req.response().setStatusCode(500).write(result.cause().toString()).end();
                            }
                        });
                    } else {
                        req.response().setStatusCode(200).write("Hello from vert.x").end();
                    }
    
                } else {
                    // We only support GET for now
                    req.response().setStatusCode(405).end();
                }
            });
    
           server.listen(port);
        }
    }

Great…let’s deploy this verticle.

Once configuration has been done you need to deploy verticles in cluster

Vertx.clusteredVertx(options, res -> {
        if (res.succeeded()) {
            Vertx vertx = res.result();
           //You should deploy verticles only when cluster has been initialized
            vertx.deployVerticle(new ServerVerticle(Integer.parseInt(args[0])));
        } else {
        }
    });

You start the application by giving a port let’s say at port 9000 and start another instance at different port let’s say 9005 and Voila can see both of them start communicating. Complete source code can be found at https://github.com/singhmarut/vertx-cluster.git Happy Coding !!