Vert.x Testing — Mock Services with Mockito

Testing specific logic in a microservice can be difficult due to dependencies on other services. You may start out thinking it will be simple to test some function, then realize that you need to add a dependency. Then that dependency leads to others.

Mocking services can reduce the number of dependencies required to complete you test. That helps you focus your test on the code that you wrote. It also makes it much easier to regression test in your CI/CD pipeline.

The Mockito Framework can help us easily mock objects and services in our Vert.x tests.

Create Maven POM

Let’s start out by modifying a Maven POM to support JUnit testing with Mockito. 

Here at the Lab, we like to mention the versions of imported libraries in a properties section and also use a POM import get all of the Vert.x dependencies like below.

<project>
    ...
    <properties>
        <vertx.version>3.6.3</vertx.version>
        <junit.version>4.12</junit.version>
        <mockito.version>2.24.5</mockito.version>
        <java.version>1.8</java.version>
    </properties>

    <!-- Import the vert.x dependencies BOM -->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-dependencies</artifactId>
                <version>${vertx.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    ...
</project>

Dependencies

Now let’s add the dependencies will need for this example.

<project>
    ...
    <properties>
        <vertx.version>3.6.3</vertx.version>
        <junit.version>4.12</junit.version>
        <mockito.version>2.24.5</mockito.version>
        <java.version>1.8</java.version>
    </properties>
    ...
    <dependencies>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
        </dependency>

        <!-- The following two dependencies enable the generation of service proxies -->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-service-proxy</artifactId>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-codegen</artifactId>
            <scope>provided</scope>
        </dependency>

        <!-- The following dependencies enable vert.x junit and testing with mockito -->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-unit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>${mockito.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    ...
</project>

We need vertx-core to include the basis Vert.x library support. There is no need to mention the version here because we imported the dependencies pom previously.

The vertx-service-proxy is needed as we want to generate service proxies to mock in this example.

To generate the code needed for the service proxy we need to mention vertx-codegen. Its scope is provided as we only need this dependency at build time. It is not needed at run time and isn’t included in the jar.

JUnit testing is supported by including vertx-unit. We’ll also need to include junit.

Finally, we will include the mockito-core dependency so we can mock our Vert.x services.

Create Simple Service to Mock

Let’s create a very simple service that we can use to test mocking a regular service as well as a proxy to that service.

package com.diabolicallabs.example;

import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.ProxyGen;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

@ProxyGen
@VertxGen
public interface Service {

  /**
   * This is the default address that the service proxy will 
   * listen to for events on the Event Bus
   */
  String DEFAULT_ADDRESS = "service.example";

  /**
   * Creates a service instance
   * @return An instance of the default service implementation
   */
  @GenIgnore
  static Service create() {
    return new ServiceImpl();
  }

  /**
   * Creates a proxy for the service that listens for events on the Event Bus
   * @param vertx An instance of Vertx
   * @param address This is the address the service will listen to on the Event Bus
   * @return An instance fo the service proxy
   */
  @GenIgnore
  static Service createProxy(Vertx vertx, String address) {
    return new ServiceVertxEBProxy(vertx, address);
  }

  /**
   * Repeats the text passed by the caller
   * @param text The text to repeat
   * @param handler This handler will be called with a copy of the text passed
   */
  void repeat(String text, Handler<AsyncResult<String>> handler);

}

The only function of the service is to repeat the passed text back to the caller. The result will be returned asynchronously using an AsyncResult.

The service definition also has a couple of helper methods to create a default instance of the service as well as a proxy to that service.

Service Implementation

package com.diabolicallabs.example;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;

public class ServiceImpl implements Service {

  @Override
  public void repeat(String text, Handler<AsyncResult<String>> handler) {
    handler.handle(Future.succeededFuture(text));
  }

}

The default service implementation merely returns the text passed to it. Later we will override this behavior with a mock service to prove that Mockito is working.

 

Create Verticle to call the Service

Next we will create a Verticle to call the service and the proxy to the service. The services defined here are the ones that we will mock in the test. In real life, mocked services will probably be ones that you added through dependencies that you may not have control over.

package com.diabolicallabs.example;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.serviceproxy.ServiceBinder;

public class Verticle extends AbstractVerticle {

  private Service exampleService;       //Default implementation of Service
  private Service exampleServiceProxy;  //A proxy to the default implementation of Service

  @Override
  public void init(Vertx vertx, Context context) {

    //Create an implementation of Service
    exampleService = Service.create();
    //Create a proxy to an implementation of Service
    exampleServiceProxy = Service.createProxy(vertx, Service.DEFAULT_ADDRESS);

    super.init(vertx, context);

  }

  @Override
  public void start(Future<Void> startFuture) throws Exception {

    //Bind the default implementation of Service to the Event Bus at the default address
    new ServiceBinder(vertx)
      .setAddress(Service.DEFAULT_ADDRESS)
      .register(Service.class, new ServiceImpl());

    //Create a consumer that will call the repeat method of the service and reply with the result
    //This allows us to demonstrate how a service can be mocked during testing
    vertx.eventBus().<String>consumer("example.service.repeat", handler -> {
      String text = handler.body();
      exampleService.repeat(text, stringAsyncResult -> {
        if (stringAsyncResult.succeeded()) {
          handler.reply(stringAsyncResult.result());
        } else {
          handler.fail(-1, stringAsyncResult.cause().getMessage());
        }
      });
    });

    //Create a consumer that will call the repeat method of the service proxy and reply with the result
    //This allows us to demonstrate how a proxy to a service can be mocked during testing
    vertx.eventBus().<String>consumer("example.service.proxy.repeat", handler -> {
      String text = handler.body();
      exampleServiceProxy.repeat(text, stringAsyncResult -> {
        if (stringAsyncResult.succeeded()) {
          handler.reply(stringAsyncResult.result());
        } else {
          handler.fail(-1, stringAsyncResult.cause().getMessage());
        }
      });
    });

    startFuture.complete();
  }

}

On lines 11 and 12 we define private variables to hold an instance of the service and a proxy to the service. Take note of the variable names here as they will have to match the names for our mocks in the test. These instances are created in the init method on lines 18 and 20. Vert.x will call the init method when the Verticle is deployed. 

It is important to note that Mockito will inject the mock instance of the service when the service is instantiated. Unfortunately, Vert.x will then call the init method which will then overwrite the variables with the actual service and proxy. So, the mock will be lost. We shall account for this in the test definition.

The proxy to the service will be bound to an address on the EventBus starting on line 30.

We will then create two EventBus consumers that will listen at example.service.repeat and  example.service.proxy.repeat. They will call call the services repeat method and then reply with the result.

Create Service Test

First let’s create a test for calling the service without using a proxy. The test class is the same as a regular Vert.x JUnit test with a couple of additions for Mockito.

@RunWith(io.vertx.ext.unit.junit.VertxUnitRunner.class)
public class ServiceTest {

  @Mock
  private Service exampleService;

  @InjectMocks
  Verticle exampleVerticle;

  @Rule
  public RunTestOnContext rule = new RunTestOnContext();

  @Rule
  public MockitoRule mockitoRule = MockitoJUnit.rule();
  
  ...
}

Although Mockito has it’s own @RunWith Runner class, it won’t work in combination with the Vert.x Runner. So, we’ll use the VertxUnitRunner and add a @Rule for Mockito which will provide the same functions as the Runner.

The @Mock annotation is for the exampleService that is mentioned in the Verticle. The variable name here in the test class needs to match the variable name in the class you are going to inject the mock into. In this case we are injecting the mock into the Verticle. It doesn’t have to be in a Verticle though. You can mock most any class in your solution.

The @InjectMocks will inject the mock mentioned above into the example Verticle.

The next two @Rule rules are needed for Vert.x and Mockito.

Now we need to create a before method to deploy our Verticle and set up how we want the mock to behave during testing.

@Before
public void before(TestContext context) {

  Async async = context.async();

  rule.vertx().deployVerticle(exampleVerticle, stringAsyncResult -> {

    /*
     * Mockito will inject the mocks into the Verticle when it is created. Unfortunately,
     * Vert.x will call the init and start methods after that when the Verticle is
     * deployed. So, we need to re-set the services back to the mocks after deployment
     * with reflection.
     */
    try {
      Field exampleServiceProxyField = Verticle.class.getDeclaredField("exampleService");
      exampleServiceProxyField.setAccessible(true);
      exampleServiceProxyField.set(exampleVerticle, exampleService);
    } catch (NoSuchFieldException | IllegalAccessException e) {
      throw new RuntimeException(e);
    }

    Mockito.doAnswer((Answer<Void>) invocation -> {
      String parameter = invocation.getArgument(0);
      Handler<AsyncResult<String>> handler = (Handler<AsyncResult<String>>) invocation.getArguments()[1];

      System.out.println("Mocked 'repeat' method received parameter: " + parameter);

      switch (parameter) {
        case "goat":
          handler.handle(Future.succeededFuture("kao"));
          break;
        case "eel":
          handler.handle(Future.succeededFuture("puhi"));
          break;
        default:
          handler.handle(Future.failedFuture(new RuntimeException(parameter)));
      }
      return null;
    }).when(exampleService).repeat(Mockito.any(String.class), Mockito.any(Handler.class));

    async.complete();
  });

}

Since we are going to deploy the Verticle asynchronously, we need to get an instance of Async.

Next we will deploy the example Verticle that was defined with the @InjectMocks annotation.

As mentioned before, Mockito will inject the mocks just as soon as the Verticle instance is created. After that, Vert.x will call the init() and start() methods which will probably cause the mocks to be overwritten with the real services.

We can set the mocks back by using reflection to set the variables in the Verticle instance. To do this we will allow private access to variables and then set the variable in the Verticle to the mocks defined in the test class.

Now we’re ready to set the behavior of the mocked service. We’ll follow the format mentioned below.

Mockito.doAnswer((Answer<Something>) invocation -> {
  return new Something();
}).when(mockedService).methodName(...parameter definitions...);

The .doAnswer() lambda will be called when the methodName() is called on the mocked object with the defined number of parameters. The lambda will have an invocation instance that will have the actual parameters passed in the method call. The .doAnswer() method will return an Answer of a generic type. 

Let’s have another look at our service and mocked behavior.

Here is the repeat() method we want to mock.

void repeat(String text, Handler<AsyncResult<String>> handler);

.. and here is our mocked behavior. 

Mockito.doAnswer((Answer<Void>) invocation -> {
  String parameter = invocation.getArgument(0);
  Handler<AsyncResult<String>> handler = (Handler<AsyncResult<String>>) invocation.getArguments()[1];

  System.out.println("Mocked 'repeat' method received parameter: " + parameter);

  switch (parameter) {
    case "goat":
      handler.handle(Future.succeededFuture("kao"));
      break;
    case "eel":
      handler.handle(Future.succeededFuture("puhi"));
      break;
    default:
      handler.handle(Future.failedFuture(new RuntimeException(parameter)));
  }
  return null;
}).when(exampleService).repeat(Mockito.any(String.class), Mockito.any(Handler.class));

The last line is defining that this block of code will execute when repeat() is called with a String and a Handler. That matches the definition of repeat() mentioned just above. The second parameter is a Handler which we call call in the invocation block to provide the response just like in the actual service implementation.

The block at the top, provides the Answer. In this case the repeat method returns void, so we defined the Answer<Void> type on the invocation.

Inside the block we will get the first two parameters and save them. The first one is the text being passed in and the second is the handler we call call with the answer.

Then there is a switch statement which will translate a couple of words to Hawaiian. The non-mocked service just returns the passed text.

At the end we will return null. In a synchronous service, this is where we would provide the answer rather than calling the handler.

Create a Test Case

@Test
public void testMockedServiceMethodWithGoat(TestContext context) {

  Async async = context.async();
  rule.vertx().eventBus().send("example.service.repeat", "goat", result -> {
    context.assertTrue(result.succeeded());
    //The normal service just repeats the parameter back as a reply. The mocked service
    //will return the parameter translated to Hawaiian. If we get back Hawaiian, then
    //the mocked service was called.
    context.assertEquals("kao", result.result().body());
    async.complete();
  });
}

Here we are sending the value “goat” over the EventBus to the example.service.repeat address. The consumer in our Verticle will receive that and call the repeat() method of our service. If the mocked service is working, we should get “kao” back rather than “goat”.

References

Example on GitHub

Vert.x Documentation

Mockito Documentation

 

BPM in a Microservice World — Part 3

Many BPM practitioners are used to utilizing a software suite that has some sort of Process Manager component that has control of the transaction as it progresses through activities. The process is generally authored and visualized graphically in BPMN or BPEL. When applying BPM in the microservice world we don’t have that visibility or control.

A microservice architecture, more or less, forms a web where many services can call each other in an ad hoc manner. Such an architecture is rarely designed visually like we are used to in BPM. That will likely change as the MSA tools and frameworks mature but for now, each service is relatively independent and less attention is given to how the entire solution behaves as a whole.

Business processes that are realized with a traditional architecture or a microservice architecture can still benefit from the practice of Process Management. There still can be resource constraints, rework, SLA violations, lack of auditing, etc. The problem is that we can’t easily see and understand what is happening visually as we would with a traditional solution.

To solve this, we can apply a concept called Process Mining. We can actually create the kind of process diagrams we are used to in BPM by collecting event logs from the MSA. We then apply various algorithms to the events that can then discover a process diagram.

The logs can be in any format however there is a standard called XES that can be used to represent the data needed to produce process diagrams. Generally we need to know the resources that were involved in an activity, it’s start and stop time, as well as some kind of identifier that can be used to correlate related activities. The identifier is the hard part since you won’t likely want to force microservice designers to accommodate this need. There are some creative ways to impute such an identifier by the proximity of execution time along with some other datum.

Once the logs are accumulated, they can be transformed to XES format such that they can be imported into existing process mining tools for analysis. I’ve used two such tools. There is an open source tool called ProM and a commercial tool called Disco. ProM isn’t very easy to learn, but once you do, it is quite powerful. It can produce a BPMN diagram that you can then import into your traditional BPM Suite so that process simulation can be done against the transaction logs.

In doing this you may find that the solution could benefit from more instances of a particular microservice. You may see that there are many messages traveling through just a few services and perhaps they can be broken down more. You may also find that human resources are causing a backlog. Perhaps transactions that originate in Europe are being processed in the US and could benefit from having a node in the cluster local to the originator.

This is all stuff that we traditionally do in process optimization. By applying Process Mining, we can now do the same with processes running over microservices.

BPM in a Microservice World — Part 2

Back in the early days of “workflow” we had control of the transaction, usually a document, from the start of the process to the end. As IT evolved into the SOA/ESB era, we had a little bit less control but for the most part the process engine orchestrated everything.

There were frequent hand-offs to message queues but normally the message would come back to the process engine so it would continue to orchestrate the process.

The microservice world is different. Instead of having a process engine or an ESB controlling a small number of large services, we have many small services that can potentially send and receive messages or respond to events from any of the other services.

It’s more like a web. One initiating message or event to a particular service could affect the exchange of many hundreds of messages between the microservices before the initial request is considered complete. That can make BPM practitioners a bit uneasy due to the loss of control.

We may not have control any longer but we still can have visibility into the process. We can still apply our usual patterns for SLA and exception management, and human and compensating workflows. This can be accomplished through what I call a “tracking” process.

I have a process running today that interacts with microservices written with the microservices framework, Vert.x.

Vert.x includes an Event Bus and a cluster manager, among other features. A Vert.x cluster is made up of one to many nodes. A microservice is packaged as a jar module that includes a number of what they call Verticles (British spelling I guess). The verticles are deployed to any number of Vert.x nodes.

Once the verticles are deployed, the Event Bus manages the flow of messages and responses throughout the cluster. This all happens asynchronously, so there is no way us to control that flow from the process manager.

We can still create a process in BPMN that looks like the traditional process. Here is an example.

Opportunity Intake Process
Opportunity Intake Process

This is a simplified version of a real process that’s been running for a couple of years on Vert.x. It receives business opportunities from an outside source. Once one is received, we need to save it locally. Then we run it through a machine learning classifier to see if it is the type of opportunity the client might be interested in. If it is, then a human needs to have a look at it. Otherwise, it is rejected.

We receive thousands of these every day. Due to the parallel nature of Vert.x we are able to spawn many requests over the cluster and get this work done quickly.

The persistence part a quite performant so we don’t need many instances of that verticle in the Vert.x cluster. The classification part is slow and requires more resources. So, we have many instances of that verticle over the cluster.

The process above looks like a traditional process but in fact, we are not in control of the transaction here. In each activity, we are sending a message using the Vert.x Event Bus and then waiting until an event happens at a future time. Once that event is received we move on the the next activity which does the same.

Unfortunately, the classification activity doesn’t always complete in a timely manner. In this example we added a boundary timer so that if the classification takes too long, we notify a user and then terminate the process.

The activities that involve microservices in the main process are modeled as subprocesses. Here is an example of the Persist Opportunity subprocess.

screen-shot-2016-09-16-at-12-00-37-pm
Persist Message that calls Vert.x

 

The first activity is a custom work item handler I created for Vert.x. It will send a message to the Vert.x cluster using the Event Bus.

That message may cause a number of other services to be called within Vert.x. We don’t care about that, all we need to know is when it’s all finished. I created a customization for Vert.x so that the process manager will be sent a signal when a particular Vert.x service is complete. When that happens the Catch Signal will be executed. At that point, control will be returned to the calling process which can move on the next activity.

So, there you go! We can model processes as we are accustomed to even though we are not in control of the transaction as it moves through the various microservices. You can definitely use these pattern to combine microservice activities with traditional ones and apply our usual process management patterns to all of it.

BPM in a Microservice World — Part 1

First some background.

Generally when the topic of BPM comes up we think of the BPM software suite. There’s another side to BPM though, and that’s the practice of Business Process Management which doesn’t require any software at all.

Traditionally the BPM Practice has focused on continuous process improvement. There are various methodologies but it generally comes down to this:

  • Collect metrics on the existing process
  • Analyze those metrics
  • Propose an optimization
  • Simulate the optimization with the collected metrics
  • Institute the validated optimization
  • Do it all again

There’s nothing wrong with that. We’ve occasionally had good results with continuous improvement for processes that are core to a business.

A good candidate would be a fee-for-service health insurance claim process. It’s a process that’s been around for decades and will likely be around for additional decades. It’s also high volume, so even the smallest improvement can have a major impact.

The unfortunate truth about applying the practice in this way, is that it is very time consuming even with the best software. It can take at least weeks, if not months or years, to get anything meaningful done.

That may be acceptable for a core process, but for most other processes that lead time is just not acceptable. Even if a program or project is approved with estimates made by the most experienced practitioners, it will likely be considered a failure after the inevitable missed deadlines and cost overruns.

We need to provide obvious value much faster. By fast, I mean within hours for something critical, to no more than a few weeks for something significant.

Unfortunately, many of the BPM software suites have become unwieldy behemoths that include a variety of sophisticated functions in a very complicated package. They are generally architected such that the workflow transaction (request) is under complete control of the process engine. This can’t work in a MSA where the various microservices can send messages to one another in an ad hoc manner. We need to change our viewpoint from being an orchestrator of services to a monitor of milestones.

We can still apply BPM patterns like SLA management to processes that include microservices. The process diagrams will look basically the same. The difference is that instead of controlling the movement of a transaction through activities, we will be monitoring milestones as the transaction moves through services. I’ll call them “tracking processes.”

If the services or message bus can ping events to the process manager at certain times, we can create processes that wait for these signals before proceeding on the the next milestone. In the case of SLA management, if the SLA deadline is approaching and we haven’t made progress toward a particular milestone, we can start a new workflow to deal with it. It could perhaps include a human task for some to investigate the delay, or send a message to the client notifying them of a delay.

In the next post we’ll see how to design a workflow that tracks milestones rather than orchestrating activities.

BPM in a Microservice World — Part 0

Business Process Management (BPM) enabling software has been around for decades having started as document centric workflow. It’s progressed into the Service Oriented Architecture (SOA) age to become an orchestrator of services and human tasks.

Now we see that the Service Architecture is evolving to include a relatively new concept called Microservice Architecture (MSA). That architecture along with enabling technologies like Cloud Services and Application Containers is allowing us to apply process management practices to solutions in a much more lightweight and agile way.
In this series of posts, we will see how to apply BPM principles to solutions that are implemented in a MSA as well as how to integrate workflows into the microservices themselves.

Part 1

 

Clustered Vert.x JUnit Test

By default, the Vert.x JUnit test framework will provide a non-clustered instance of Vert.x for you to execute tests against. For the most part that works fine. However, there are times when you need the clustered version like when testing with shared maps or cluster-wide locks.

We can easily change this default behavior by creating a Supplier as a parameter to the RunTestOnContext class.

In the JUnit test class, create a supplier like this:

   private Supplier<Vertx> supplier = () -> {
    CountDownLatch latch = new CountDownLatch(1);
    AtomicReference<Vertx> vertx = new AtomicReference<>();
    Vertx.clusteredVertx(new VertxOptions(), handler -> {
      if (handler.succeeded()) {
        vertx.set(handler.result());
        latch.countDown();
      } else {
        throw new RuntimeException("Unable to create clustered Vertx");
      }
    });
    try {
      latch.await();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    return vertx.get();
  };

This will create a clustered instance of Vert.x asynchronously, wait for the result and then return it to the caller of the Supplier.

Now create a Rule in you test class that uses this Supplier when creating the test context.

@Rule
public RunTestOnContext rule = new RunTestOnContext(supplier);

Now you can create a test case that uses the clustered Vert.x like this.

@Test
public void testSharedData(TestContext context) {

  //rule.vertx() will be clustered
  SharedData sd = rule.vertx().sharedData();

  sd.<String, String>getClusterWideMap("testmap", mapAsyncResult -> {
    if (mapAsyncResult.succeeded()) {
      AsyncMap<String, String> map = mapAsyncResult.result();
      map.put("key", "value", putAsyncResult -> {
        context.assertTrue(putAsyncResult.succeeded());
        context.async().complete();
      });
    } else {
      context.assertTrue(false);
    }
  });

}

 

Insert date in MongoDB collection with Vert.x 3

The out-of-box Vert.x event bus can only transport a few basic types as well as Buffer and JsonObject. There is no Date object among these types. So, to send a date across the event bus, it has to be converted to a long or a string.

It is often not ideal to save dates in Mongo as string or long as sorting and other computations may not happen as you would expect with dates. Other non-Vert.x Mongo clients may also expect that dates be stored in the native format.

Mongo has an ISODate object that can be used to store dates. In order to utilize that feature we have to tell Mongo that the string we are sending should be interpreted as an ISODate. Mongo includes a directive called ‘$date’ that can do this for us. All we have to do is supply the date we want to insert in ISO format.

In Vert.x we would need to create a JSON document that looks something like this:

{"birthDate" : {"$date": "2016-01-27T22:57:34.127Z"}}

This would tell Mongo to store an ISODate represented by the string in the ‘birthDate’ attribute.

So to prepare a document and save it to a Mongo collection we would do the following.

MongoClient mongoClient = MongoClient.createShared(vertx, config());

JsonObject document = new JsonObject();
document.put("name", "Kim Hilton");
document.put("birthDate", new JsonObject().put("$date", "2016-01-27T22:57:34.127Z"));

mongoClient.save("people", document, mongoHandler -> {
if (mongoHandler.succeeded()) [.. some happy code..]];
else if (mongoHandler.failed()) [.. some error handler..];
});

Once the save is complete, you can check your collection with the Mongo client and see that the birthDate was saved as an ISODate.

Here’s a working example in Java: MongoDateExampleVerticle

And some test cases that pass: MongoDateTest

Read date from MongoDB collection with Vert.x 3

MongoDB is able to store dates natively as ISODates. Since Vert.x is not able to transport an ISODate object across the event bus, they are returned as ISO strings within a special JsonObject.

When reading an ISODate from a collection, it will be returned in a JSON object that looks something like this:

{"birthDate" : {"$date" : "2016-01-27T22:57:34.132Z"}}

So, to access the date string you would need to do this:

String sDate = json.getJsonObject("birthDate").getString("$date");

Here’s an example where we are reading a document from a Mongo collection and replying to an event bus consumer.

MongoClient mongoClient = MongoClient.createShared(vertx, config());


JsonObject query = new JsonObject().put("name", person.getString("name"));

mongoClient.findOne(COLLECTION_NAME, query, null, mongoHandler -> {
if (mongoHandler.succeeded()) {

  JsonObject found = mongoHandler.result();

  JsonObject reply = new JsonObject().put("name", found.getString("name"));
  if (found.containsKey("birthDate")) {
    reply.put("birthDate", found.getJsonObject("birthDate").getString("$date"));
  }

  handler.reply(reply);
} else if (mongoHandler.failed()) handler.fail(-1, mongoHandler.cause().getMessage());
});

Here is a working example of reading a date from Mongo: MongoDateExampleVerticle (look for the “person.read” consumer)

Here is a test case: MongoDateTest

Schedule an event in Vert.x using vertx-cron

The vertx-cron module allows you to schedule events based on a cron specification. Vertx has setTimer() and setPeriodic() to help you schedule future or repetitive tasks but they aren’t as useful when the scheduling requirement becomes complicated. Like if you wanted to run some task every minute from 9am to 5pm on weekdays.

The vertx-cron module make it easy. Just add the Maven dependency to your pom.

<dependency>
    <groupId>com.diabolicallabs</groupId>
    <artifactId>vertx-cron</artifactId>
    <version>3.2.1.1</version>
</dependency>

Now you can schedule an event by sending a JSON object like the following to the ‘cron.schedule’ address.

{
    "cron_expression": "0 0 16 1/1 * ? *",
    "time_zone": "US/Hawaii"
    "address": "stock.quotes.list",
    "message": {
        "ticker": "RHT"
    },
    "action": "send",
    "result_address": "stock.quotes.persist"
}

This will send the message {“ticker”:”RHT”} to the “stock.quotes.list” consumer every day at 4pm in the Hawaii timezone. When “stock.quotes.list” responds, the response will be forwarded to “stock.quotes.persist”

It may be important to specify the timezone if you intend to have a Vert.x cluster with nodes in various parts of the world. This will ensure that the events fire at the same time no matter where the vertical is deployed.

There is also an RxJava Observer that makes it even easier. If you are using Vert.x Rx you can make use of the CronObservable class like the following.

Scheduler scheduler = RxHelper.scheduler(vertx);
CronObservable.cronspec(scheduler, "0 0 16 1/1 * ? *", "US/Eastern")
  .take(5) //If you only want it to hit 5 times, add this, remove for continuous emission
  .subscribe(
    timestamped -> {
      //Perform the scheduled activity here
    },
    fault -> {
      //If there is some kind of fault, handle here
    }
  );

First you need a Vert.x scheduler from the RxHelper. Then call the static method cronspec with the scheduler, the cron specification and an option timezone. That will return an Observable over a Timestamp. You can then use the Observable in any way you normally would.

Once you subscribe, a timestamp will be emitted each time the cron schedule hits, in the case every day at 4pm in the US Eastern timezone.