Clustered Vert.x JUnit Test

By default, the Vert.x JUnit test framework will provide a non-clustered instance of Vert.x for you to execute tests against. For the most part that works fine. However, there are times when you need the clustered version like when testing with shared maps or cluster-wide locks.

We can easily change this default behavior by creating a Supplier as a parameter to the RunTestOnContext class.

In the JUnit test class, create a supplier like this:

   private Supplier<Vertx> supplier = () -> {
    CountDownLatch latch = new CountDownLatch(1);
    AtomicReference<Vertx> vertx = new AtomicReference<>();
    Vertx.clusteredVertx(new VertxOptions(), handler -> {
      if (handler.succeeded()) {
        vertx.set(handler.result());
        latch.countDown();
      } else {
        throw new RuntimeException("Unable to create clustered Vertx");
      }
    });
    try {
      latch.await();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    return vertx.get();
  };

This will create a clustered instance of Vert.x asynchronously, wait for the result and then return it to the caller of the Supplier.

Now create a Rule in you test class that uses this Supplier when creating the test context.

@Rule
public RunTestOnContext rule = new RunTestOnContext(supplier);

Now you can create a test case that uses the clustered Vert.x like this.

@Test
public void testSharedData(TestContext context) {

  //rule.vertx() will be clustered
  SharedData sd = rule.vertx().sharedData();

  sd.<String, String>getClusterWideMap("testmap", mapAsyncResult -> {
    if (mapAsyncResult.succeeded()) {
      AsyncMap<String, String> map = mapAsyncResult.result();
      map.put("key", "value", putAsyncResult -> {
        context.assertTrue(putAsyncResult.succeeded());
        context.async().complete();
      });
    } else {
      context.assertTrue(false);
    }
  });

}

 

Insert date in MongoDB collection with Vert.x 3

The out-of-box Vert.x event bus can only transport a few basic types as well as Buffer and JsonObject. There is no Date object among these types. So, to send a date across the event bus, it has to be converted to a long or a string.

It is often not ideal to save dates in Mongo as string or long as sorting and other computations may not happen as you would expect with dates. Other non-Vert.x Mongo clients may also expect that dates be stored in the native format.

Mongo has an ISODate object that can be used to store dates. In order to utilize that feature we have to tell Mongo that the string we are sending should be interpreted as an ISODate. Mongo includes a directive called ‘$date’ that can do this for us. All we have to do is supply the date we want to insert in ISO format.

In Vert.x we would need to create a JSON document that looks something like this:

{"birthDate" : {"$date": "2016-01-27T22:57:34.127Z"}}

This would tell Mongo to store an ISODate represented by the string in the ‘birthDate’ attribute.

So to prepare a document and save it to a Mongo collection we would do the following.

MongoClient mongoClient = MongoClient.createShared(vertx, config());

JsonObject document = new JsonObject();
document.put("name", "Kim Hilton");
document.put("birthDate", new JsonObject().put("$date", "2016-01-27T22:57:34.127Z"));

mongoClient.save("people", document, mongoHandler -> {
if (mongoHandler.succeeded()) [.. some happy code..]];
else if (mongoHandler.failed()) [.. some error handler..];
});

Once the save is complete, you can check your collection with the Mongo client and see that the birthDate was saved as an ISODate.

Here’s a working example in Java: MongoDateExampleVerticle

And some test cases that pass: MongoDateTest

Read date from MongoDB collection with Vert.x 3

MongoDB is able to store dates natively as ISODates. Since Vert.x is not able to transport an ISODate object across the event bus, they are returned as ISO strings within a special JsonObject.

When reading an ISODate from a collection, it will be returned in a JSON object that looks something like this:

{"birthDate" : {"$date" : "2016-01-27T22:57:34.132Z"}}

So, to access the date string you would need to do this:

String sDate = json.getJsonObject("birthDate").getString("$date");

Here’s an example where we are reading a document from a Mongo collection and replying to an event bus consumer.

MongoClient mongoClient = MongoClient.createShared(vertx, config());


JsonObject query = new JsonObject().put("name", person.getString("name"));

mongoClient.findOne(COLLECTION_NAME, query, null, mongoHandler -> {
if (mongoHandler.succeeded()) {

  JsonObject found = mongoHandler.result();

  JsonObject reply = new JsonObject().put("name", found.getString("name"));
  if (found.containsKey("birthDate")) {
    reply.put("birthDate", found.getJsonObject("birthDate").getString("$date"));
  }

  handler.reply(reply);
} else if (mongoHandler.failed()) handler.fail(-1, mongoHandler.cause().getMessage());
});

Here is a working example of reading a date from Mongo: MongoDateExampleVerticle (look for the “person.read” consumer)

Here is a test case: MongoDateTest

Schedule an event in Vert.x using vertx-cron

The vertx-cron module allows you to schedule events based on a cron specification. Vertx has setTimer() and setPeriodic() to help you schedule future or repetitive tasks but they aren’t as useful when the scheduling requirement becomes complicated. Like if you wanted to run some task every minute from 9am to 5pm on weekdays.

The vertx-cron module make it easy. Just add the Maven dependency to your pom.

<dependency>
    <groupId>com.diabolicallabs</groupId>
    <artifactId>vertx-cron</artifactId>
    <version>3.2.1.1</version>
</dependency>

Now you can schedule an event by sending a JSON object like the following to the ‘cron.schedule’ address.

{
    "cron_expression": "0 0 16 1/1 * ? *",
    "time_zone": "US/Hawaii"
    "address": "stock.quotes.list",
    "message": {
        "ticker": "RHT"
    },
    "action": "send",
    "result_address": "stock.quotes.persist"
}

This will send the message {“ticker”:”RHT”} to the “stock.quotes.list” consumer every day at 4pm in the Hawaii timezone. When “stock.quotes.list” responds, the response will be forwarded to “stock.quotes.persist”

It may be important to specify the timezone if you intend to have a Vert.x cluster with nodes in various parts of the world. This will ensure that the events fire at the same time no matter where the vertical is deployed.

There is also an RxJava Observer that makes it even easier. If you are using Vert.x Rx you can make use of the CronObservable class like the following.

Scheduler scheduler = RxHelper.scheduler(vertx);
CronObservable.cronspec(scheduler, "0 0 16 1/1 * ? *", "US/Eastern")
  .take(5) //If you only want it to hit 5 times, add this, remove for continuous emission
  .subscribe(
    timestamped -> {
      //Perform the scheduled activity here
    },
    fault -> {
      //If there is some kind of fault, handle here
    }
  );

First you need a Vert.x scheduler from the RxHelper. Then call the static method cronspec with the scheduler, the cron specification and an option timezone. That will return an Observable over a Timestamp. You can then use the Observable in any way you normally would.

Once you subscribe, a timestamp will be emitted each time the cron schedule hits, in the case every day at 4pm in the US Eastern timezone.