Written by Jan Gurda
EM Payments Gateway Kraków
Published September 10, 2015

Mocking Amazon SQS with ElasticMQ

This article presents our experience with using ElasticMq as a substitution for Amazon Web Service’s Simple Queue Service (SQS) for black box testing of a single microservice application.

These kinds of tests are usually called service tests or module tests and are an extremely important part of application development. Some people even drop the concept of test pyramid and tend to have a comparable number of service tests as unit tests (read more here ). I like this idea.

Higher level tests give us more confidence that a particular component works correctly. A microservice architecture requires special focus on writing good and reliable module tests. What’s more these tests should be relatively fast, executed with every CI build, and their role is to make sure that the service responds appropriately to a given input and adheres to the contract with downstream services. Because this kind of test focuses on a single service, (which is actually an independent application in the world of microservices) the developer has to simulate surrounding downstream applications.

The pictures below present a schematic view of real services vs. single service test. This kind of component aggregation is a very common pattern in our system. The table presents the relation between real and mock components used in both scenarios.

 

Figure 1: Real services connectivity

Figure 1: Real services connectivity

Figure 2: Service test component diagram

Figure 2: Service test component diagram

 

table

What’s worth mentioning is that the whole service test, together with all simulated downstream systems should be executed inside a single JVM process. It speeds up the execution and reduces the possibility of a resource conflict on Continuous Integration server.

Simple Queue Service in use

In our project we use REST for synchronous communication between services.

There is a bunch of very good tools which can simulate HTTP endpoint. Two products we evaluated are Mock Server (http://www.mock-server.com/) and Wire Mock (http://wiremock.org/). There’s also a very useful SaaS solution – Mocky (http://www.mocky.io/). I prefer Wire Mock and I believe that currently it’s the most flexible library, but Mock Server developers also keep providing new features.

Synchronous HTTP communication however cannot be applied everywhere. For some use cases we realized that asynchronous queue communication fits better. Because we heavily utilize the goodies of Amazon Web Services, we decided to go with Amazon Simple Queue Service (SQS) as our queue solution. Amazon’s marketing says SQS is:

“… fast, reliable, scalable, fully managed message queuing service. SQS makes it simple and cost-effective to decouple the components of a cloud application. You can use SQS to transmit any volume of data, at any level of throughput, without losing messages or requiring other services to be always available.”

After some time spent on using SQS we came to the conclusion that it’s a really reliable, easy to use, and easy to maintain solution at a reasonable price.

With the very first service utilizing SQS we faced a tough question: How to construct and run service tests? Using real SQS queues for this purpose has some significant drawbacks. First of all – cost. Our applications’ builds run a bunch of service tests and Amazon charges us not only for messages sent/received but also for data transfer (see the pricing here ). Usually the SQS cost is a very small part of the AWS total bill, but for startups or private initiatives every cent counts.

Another problem we experienced was with interference between our tests. One test published a message and the other one could delete it because the Continuous Integration server may run few builds simultaneously. That was not acceptable since our test had become non-deterministic. A potential solution to this problem was to create a separate queue for each test, but this just sounds ridiculous.

Quite fast we found ElasticMQ (https://github.com/adamw/elasticmq) and realized that it is compatible with the SQS protocol and works well with the official AWS Java API, which made it suitable to simulate Amazon SQS for our service tests.

The first version of tests using ElasticMQ were quite ugly. In every test class we had a code which starts ElasticMQ and creates necessary queues. There was a lot of code duplication. We wanted to extract some setup code and make ElasticMQ start and setup in more declarative way. Junit Rules (https://github.com/junit-team/junit/wiki/Rules) helped us achieve our target of simple tests code.

In order to present our way of running service tests using ElasticMQ I will go through a sample application created especially for this article. Whole source code listed in this article is available here.

For simplicity in some places I use Lombok annotations (https://projectlombok.org/) to avoid boilerplate code in listings.

SQS queue abstraction

We started our refactoring from covering AmazonSQSClient class with our own abstraction:

 

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package pl.schibsted.spid.elasticmq.util;
import java.util.List;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class SqsQueue {
private final AmazonSQSClient client;
private final String queueUrl;
public void send(Message toSend) {
SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, toSend.getBody());
sendMessageRequest.setMessageAttributes(toSend.getMessageAttributes());
client.sendMessage(sendMessageRequest);
}
public List read(int maxMessages) {
ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl);
request.setMaxNumberOfMessages(maxMessages);
ReceiveMessageResult receiveMessage = client.receiveMessage(request);
return receiveMessage.getMessages();
}
public void purge() {
client.purgeQueue(new PurgeQueueRequest(queueUrl));
}
}
package pl.schibsted.spid.elasticmq.util; import java.util.List; import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.PurgeQueueRequest; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import com.amazonaws.services.sqs.model.SendMessageRequest; import lombok.AllArgsConstructor; @AllArgsConstructor public class SqsQueue { private final AmazonSQSClient client; private final String queueUrl; public void send(Message toSend) { SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, toSend.getBody()); sendMessageRequest.setMessageAttributes(toSend.getMessageAttributes()); client.sendMessage(sendMessageRequest); } public List read(int maxMessages) { ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl); request.setMaxNumberOfMessages(maxMessages); ReceiveMessageResult receiveMessage = client.receiveMessage(request); return receiveMessage.getMessages(); } public void purge() { client.purgeQueue(new PurgeQueueRequest(queueUrl)); } }
package pl.schibsted.spid.elasticmq.util;

import java.util.List;

import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public class SqsQueue {

    private final AmazonSQSClient client;

    private final String queueUrl;

    public void send(Message toSend) {
        SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, toSend.getBody());
        sendMessageRequest.setMessageAttributes(toSend.getMessageAttributes());
        client.sendMessage(sendMessageRequest);
    }

    public List read(int maxMessages) {
        ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl);
        request.setMaxNumberOfMessages(maxMessages);
        ReceiveMessageResult receiveMessage = client.receiveMessage(request);
        return receiveMessage.getMessages();
    }

    public void purge() {
        client.purgeQueue(new PurgeQueueRequest(queueUrl));
    }

}

SqsQueue class uses Amazon AWS Java API and its responsibility is to wrap send/receive/purge requests construction and invocation of AmazonSQSClient. It exposes following operations:

  • send – puts given message to queue identified by queueUrl
  • read – tries to retrieve up to “maxMessages” from queue identified by queueUrl
  • purge – removes all messages from queue identified by queueUrl

Rule configuration

To set up ElasticMQ we need a list of queues that must be created on startup and TCP port which will be occupied by ElasticMQ. Because of that, we created configuration class. It’s just simple POJO:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package pl.schibsted.spid.elasticmq.util;
import java.util.Set;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
@Builder
@Getter
public class SqsRuleConfiguration {
private int port;
@Singular
private Set queues;
}
package pl.schibsted.spid.elasticmq.util; import java.util.Set; import lombok.Builder; import lombok.Getter; import lombok.Singular; @Builder @Getter public class SqsRuleConfiguration { private int port; @Singular private Set queues; }
package pl.schibsted.spid.elasticmq.util;

import java.util.Set;

import lombok.Builder;
import lombok.Getter;
import lombok.Singular;

@Builder
@Getter
public class SqsRuleConfiguration {

    private int port;

    @Singular
    private Set queues;
}

Test rule

The rule itself is also not very complicated, it simply starts ElasticMQ server on given port, creates queues specified in configuration and creates set of SqsQueues:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package pl.schibsted.spid.elasticmq.util;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.elasticmq.NodeAddress;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SqsRule implements TestRule {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsRule.class);
// Reference to ElasticMQ server
private static SQSRestServer server;
private Map<String, SqsQueue> queues = new ConcurrentHashMap<>();
private SqsRuleConfiguration configuration;
public SqsRule(SqsRuleConfiguration configuration) {
this.configuration = configuration;
}
@Override
public Statement apply(Statement childStatement, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
try {
setup();
childStatement.evaluate();
} finally {
shutdown();
}
}
};
}
private synchronized void setup() {
// Start ElasticMQ in embedded mode.
server = SQSRestServerBuilder.withPort(configuration.getPort())
.withServerAddress(new NodeAddress("http", "localhost", configuration.getPort(), "")).start();
LOGGER.info("SQS server started on port " + configuration.getPort());
for (String queueName : configuration.getQueues()) {
// Use standard ElasticMQ credentials ("x", "x")
AmazonSQSClient amazonSQSClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x"));
// ElasticMQ is running on the same machine as integration test
String endpoint = "http://localhost:" + configuration.getPort();
amazonSQSClient.setEndpoint(endpoint);
// Create queue with given name
amazonSQSClient.createQueue(queueName);
// Queue URL in ElasticMQ is http://host:port/queue/{queue_name}
queues.put(queueName, new SqsQueue(amazonSQSClient, endpoint + "/queue/" + queueName));
}
}
private synchronized void shutdown() {
server.stopAndWait();
}
public SqsQueue getQueue(String queueName) {
return queues.get(queueName);
}
public void purgeAllQueues() {
// Cleans
for (String queueName : queues.keySet()) {
queues.get(queueName).purge();
}
}
}
package pl.schibsted.spid.elasticmq.util; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQSClient; import org.elasticmq.NodeAddress; import org.elasticmq.rest.sqs.SQSRestServer; import org.elasticmq.rest.sqs.SQSRestServerBuilder; import org.junit.rules.TestRule; import org.junit.runner.Description; import org.junit.runners.model.Statement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SqsRule implements TestRule { private static final Logger LOGGER = LoggerFactory.getLogger(SqsRule.class); // Reference to ElasticMQ server private static SQSRestServer server; private Map<String, SqsQueue> queues = new ConcurrentHashMap<>(); private SqsRuleConfiguration configuration; public SqsRule(SqsRuleConfiguration configuration) { this.configuration = configuration; } @Override public Statement apply(Statement childStatement, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { try { setup(); childStatement.evaluate(); } finally { shutdown(); } } }; } private synchronized void setup() { // Start ElasticMQ in embedded mode. server = SQSRestServerBuilder.withPort(configuration.getPort()) .withServerAddress(new NodeAddress("http", "localhost", configuration.getPort(), "")).start(); LOGGER.info("SQS server started on port " + configuration.getPort()); for (String queueName : configuration.getQueues()) { // Use standard ElasticMQ credentials ("x", "x") AmazonSQSClient amazonSQSClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x")); // ElasticMQ is running on the same machine as integration test String endpoint = "http://localhost:" + configuration.getPort(); amazonSQSClient.setEndpoint(endpoint); // Create queue with given name amazonSQSClient.createQueue(queueName); // Queue URL in ElasticMQ is http://host:port/queue/{queue_name} queues.put(queueName, new SqsQueue(amazonSQSClient, endpoint + "/queue/" + queueName)); } } private synchronized void shutdown() { server.stopAndWait(); } public SqsQueue getQueue(String queueName) { return queues.get(queueName); } public void purgeAllQueues() { // Cleans for (String queueName : queues.keySet()) { queues.get(queueName).purge(); } } }
package pl.schibsted.spid.elasticmq.util;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.elasticmq.NodeAddress;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqsRule implements TestRule {

    private static final Logger LOGGER = LoggerFactory.getLogger(SqsRule.class);

    // Reference to ElasticMQ server
    private static SQSRestServer server;
    private Map<String, SqsQueue> queues = new ConcurrentHashMap<>();
    private SqsRuleConfiguration configuration;

    public SqsRule(SqsRuleConfiguration configuration) {
        this.configuration = configuration;
    }

    @Override
    public Statement apply(Statement childStatement, Description description) {
        return new Statement() {
            @Override
            public void evaluate() throws Throwable {
                try {
                    setup();
                    childStatement.evaluate();
                } finally {
                    shutdown();
                }
            }
        };
    }

    private synchronized void setup() {
        // Start ElasticMQ in embedded mode.
        server = SQSRestServerBuilder.withPort(configuration.getPort())
                .withServerAddress(new NodeAddress("http", "localhost", configuration.getPort(), "")).start();
        LOGGER.info("SQS server started on port " + configuration.getPort());
        for (String queueName : configuration.getQueues()) {
            // Use standard ElasticMQ credentials ("x", "x")
            AmazonSQSClient amazonSQSClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x"));
            // ElasticMQ is running on the same machine as integration test
            String endpoint = "http://localhost:" + configuration.getPort();
            amazonSQSClient.setEndpoint(endpoint);
            // Create queue with given name
            amazonSQSClient.createQueue(queueName);
            // Queue URL in ElasticMQ is http://host:port/queue/{queue_name}
            queues.put(queueName, new SqsQueue(amazonSQSClient, endpoint + "/queue/" + queueName));
        }
    }

    private synchronized void shutdown() {
        server.stopAndWait();
    }

    public SqsQueue getQueue(String queueName) {
        return queues.get(queueName);
    }

    public void purgeAllQueues() {
        // Cleans
        for (String queueName : queues.keySet()) {
            queues.get(queueName).purge();
        }
    }
}

Rule in action

Now I’d like to show how the presented rule could be used to verify the behavior of a very simple Dropwizard application (for tutorial see this link). The application exposes HTTP endpoint called “ping”. When the request is received it transforms given argument to uppercase and puts the message in a queue. The application behaves correctly when after invocation of “ping” endpoint:

  • HTTP result status is 204 (No content)
  • There is only one message available in queue
  • Body of that message is a request string transformed to uppercase.

Let’s start from application configuration. It simply contains URL of a queue where message will be sent to and Amazon credentials.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package pl.schibsted.spid.elasticmq.server;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.Configuration;
import org.hibernate.validator.constraints.NotEmpty;
public class ElasticMqRuleSampleApplicationConfiguration extends Configuration {
@NotEmpty
private String queueUrl;
@NotEmpty
private String awsAccessKey;
@NotEmpty
private String awsSecretKey;
@JsonProperty
public String getQueueUrl() {
return queueUrl;
}
@JsonProperty
public String getAwsAccessKey() {
return awsAccessKey;
}
@JsonProperty
public String getAwsSecretKey() {
return awsSecretKey;
}
}
package pl.schibsted.spid.elasticmq.server; import com.fasterxml.jackson.annotation.JsonProperty; import io.dropwizard.Configuration; import org.hibernate.validator.constraints.NotEmpty; public class ElasticMqRuleSampleApplicationConfiguration extends Configuration { @NotEmpty private String queueUrl; @NotEmpty private String awsAccessKey; @NotEmpty private String awsSecretKey; @JsonProperty public String getQueueUrl() { return queueUrl; } @JsonProperty public String getAwsAccessKey() { return awsAccessKey; } @JsonProperty public String getAwsSecretKey() { return awsSecretKey; } }
package pl.schibsted.spid.elasticmq.server;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.Configuration;
import org.hibernate.validator.constraints.NotEmpty;

public class ElasticMqRuleSampleApplicationConfiguration extends Configuration {

    @NotEmpty
    private String queueUrl;
    @NotEmpty
    private String awsAccessKey;
    @NotEmpty
    private String awsSecretKey;

    @JsonProperty
    public String getQueueUrl() {
        return queueUrl;
    }

    @JsonProperty
    public String getAwsAccessKey() {
        return awsAccessKey;
    }

    @JsonProperty
    public String getAwsSecretKey() {
        return awsSecretKey;
    }
}

Test configuration (test.yml file) points to “sample-queue” and provides AWS credentials (“x”, “x” for ElasticMQ):

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
queueUrl: http://localhost:8888/queue/sample-queue
awsAccessKey: x
awsSecretKey: x
queueUrl: http://localhost:8888/queue/sample-queue awsAccessKey: x awsSecretKey: x
queueUrl: http://localhost:8888/queue/sample-queue
awsAccessKey: x
awsSecretKey: x

REST resource itself exposes only one method: “ping”. It consumes JSON and does not return anything (what translates to HTTP 204 status). In endpoint constructor we instantiate new AmazonSQSClient with given credentials. Please remember that resource classes are used by multiple threads concurrently and should be thread safe.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package pl.schibsted.spid.elasticmq.resources;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration;
@Path("/ping")
@Consumes(MediaType.APPLICATION_JSON)
public class PingResource {
private final AmazonSQSClient client;
private final String queueUrl;
public PingResource(ElasticMqRuleSampleApplicationConfiguration configuration) {
// Instantiate AmazonSQSClient
this.client = new AmazonSQSClient(new BasicAWSCredentials(configuration.getAwsAccessKey(), configuration.getAwsSecretKey()));
this.queueUrl = configuration.getQueueUrl();
}
@POST
public void ping(@NotNull String pingBody) {
String toSend = processPingBody(pingBody);
// sendMessage method is thread safe.
// Send message with given body to queue.
client.sendMessage(queueUrl, toSend);
}
private String processPingBody(String pingBody) {
// Simulate very complicated message processing.
return pingBody.toUpperCase();
}
}
package pl.schibsted.spid.elasticmq.resources; import javax.validation.constraints.NotNull; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.core.MediaType; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQSClient; import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration; @Path("/ping") @Consumes(MediaType.APPLICATION_JSON) public class PingResource { private final AmazonSQSClient client; private final String queueUrl; public PingResource(ElasticMqRuleSampleApplicationConfiguration configuration) { // Instantiate AmazonSQSClient this.client = new AmazonSQSClient(new BasicAWSCredentials(configuration.getAwsAccessKey(), configuration.getAwsSecretKey())); this.queueUrl = configuration.getQueueUrl(); } @POST public void ping(@NotNull String pingBody) { String toSend = processPingBody(pingBody); // sendMessage method is thread safe. // Send message with given body to queue. client.sendMessage(queueUrl, toSend); } private String processPingBody(String pingBody) { // Simulate very complicated message processing. return pingBody.toUpperCase(); } }
package pl.schibsted.spid.elasticmq.resources;

import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration;

@Path("/ping")
@Consumes(MediaType.APPLICATION_JSON)
public class PingResource {

    private final AmazonSQSClient client;
    private final String queueUrl;

    public PingResource(ElasticMqRuleSampleApplicationConfiguration configuration) {
        // Instantiate AmazonSQSClient
        this.client = new AmazonSQSClient(new BasicAWSCredentials(configuration.getAwsAccessKey(), configuration.getAwsSecretKey()));
        this.queueUrl = configuration.getQueueUrl();
    }

    @POST
    public void ping(@NotNull String pingBody) {
        String toSend = processPingBody(pingBody);
        // sendMessage method is thread safe.
        // Send message with given body to queue.
        client.sendMessage(queueUrl, toSend);
    }

    private String processPingBody(String pingBody) {
        // Simulate very complicated message processing.
        return pingBody.toUpperCase();
    }
}

Main application class simply glues it all together:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package pl.schibsted.spid.elasticmq.server;
import io.dropwizard.Application;
import io.dropwizard.setup.Environment;
import pl.schibsted.spid.elasticmq.resources.PingResource;
public class ElasticMqRuleSampleApplication extends Application {
public static void main(String[] args) throws Exception {
new ElasticMqRuleSampleApplication().run(args);
}
@Override
public void run(ElasticMqRuleSampleApplicationConfiguration configuration, Environment environment) throws Exception {
PingResource resource = new PingResource(configuration);
environment.jersey().register(resource);
}
}
package pl.schibsted.spid.elasticmq.server; import io.dropwizard.Application; import io.dropwizard.setup.Environment; import pl.schibsted.spid.elasticmq.resources.PingResource; public class ElasticMqRuleSampleApplication extends Application { public static void main(String[] args) throws Exception { new ElasticMqRuleSampleApplication().run(args); } @Override public void run(ElasticMqRuleSampleApplicationConfiguration configuration, Environment environment) throws Exception { PingResource resource = new PingResource(configuration); environment.jersey().register(resource); } }
package pl.schibsted.spid.elasticmq.server;

import io.dropwizard.Application;
import io.dropwizard.setup.Environment;
import pl.schibsted.spid.elasticmq.resources.PingResource;

public class ElasticMqRuleSampleApplication extends Application {

    public static void main(String[] args) throws Exception {
        new ElasticMqRuleSampleApplication().run(args);
    }

    @Override
    public void run(ElasticMqRuleSampleApplicationConfiguration configuration, Environment environment) throws Exception {
        PingResource resource = new PingResource(configuration);
        environment.jersey().register(resource);
    }

}

Service test verifying “ping” resource behavior looks as follows:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package pl.schibsted.spid.elasticmq.resources;
import java.util.List;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import com.amazonaws.services.sqs.model.Message;
import io.dropwizard.testing.junit.DropwizardAppRule;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplication;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration;
import pl.schibsted.spid.elasticmq.util.SqsRule;
import pl.schibsted.spid.elasticmq.util.SqsRuleConfiguration;
import static org.junit.Assert.assertEquals;
public class ITestPingResource {
@ClassRule
public static DropwizardAppRule app =
new DropwizardAppRule<>(ElasticMqRuleSampleApplication.class,
ITestPingResource.class.getClassLoader().getResource("test.yml").getPath());
@ClassRule
public static SqsRule sqs = new SqsRule(SqsRuleConfiguration.builder()
.queue("sample-queue").port(8888).build());
private Client client = ClientBuilder.newClient();
@After
public void tearDown() {
sqs.purgeAllQueues();
}
@Test
public void shouldPublishProcessedRequestPayload() throws Exception {
// given
String toSend = "abcdefgh";
// when
Response response = client
.target("http://127.0.0.1:" + app.getLocalPort() + "/ping")
.request().post(Entity.json(toSend));
// then
assertEquals(Status.NO_CONTENT.getStatusCode(), response.getStatus());
List messagesFromQueue = sqs.getQueue("sample-queue").read(10);
assertEquals(1, messagesFromQueue.size());
assertEquals("ABCDEFGH", messagesFromQueue.get(0).getBody());
}
}
package pl.schibsted.spid.elasticmq.resources; import java.util.List; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import com.amazonaws.services.sqs.model.Message; import io.dropwizard.testing.junit.DropwizardAppRule; import org.junit.After; import org.junit.ClassRule; import org.junit.Test; import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplication; import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration; import pl.schibsted.spid.elasticmq.util.SqsRule; import pl.schibsted.spid.elasticmq.util.SqsRuleConfiguration; import static org.junit.Assert.assertEquals; public class ITestPingResource { @ClassRule public static DropwizardAppRule app = new DropwizardAppRule<>(ElasticMqRuleSampleApplication.class, ITestPingResource.class.getClassLoader().getResource("test.yml").getPath()); @ClassRule public static SqsRule sqs = new SqsRule(SqsRuleConfiguration.builder() .queue("sample-queue").port(8888).build()); private Client client = ClientBuilder.newClient(); @After public void tearDown() { sqs.purgeAllQueues(); } @Test public void shouldPublishProcessedRequestPayload() throws Exception { // given String toSend = "abcdefgh"; // when Response response = client .target("http://127.0.0.1:" + app.getLocalPort() + "/ping") .request().post(Entity.json(toSend)); // then assertEquals(Status.NO_CONTENT.getStatusCode(), response.getStatus()); List messagesFromQueue = sqs.getQueue("sample-queue").read(10); assertEquals(1, messagesFromQueue.size()); assertEquals("ABCDEFGH", messagesFromQueue.get(0).getBody()); } }
package pl.schibsted.spid.elasticmq.resources;

import java.util.List;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import com.amazonaws.services.sqs.model.Message;
import io.dropwizard.testing.junit.DropwizardAppRule;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplication;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration;
import pl.schibsted.spid.elasticmq.util.SqsRule;
import pl.schibsted.spid.elasticmq.util.SqsRuleConfiguration;

import static org.junit.Assert.assertEquals;

public class ITestPingResource {

    @ClassRule
    public static DropwizardAppRule app =
            new DropwizardAppRule<>(ElasticMqRuleSampleApplication.class,
                    ITestPingResource.class.getClassLoader().getResource("test.yml").getPath());

    @ClassRule
    public static SqsRule sqs = new SqsRule(SqsRuleConfiguration.builder()
            .queue("sample-queue").port(8888).build());

    private Client client = ClientBuilder.newClient();

    @After
    public void tearDown() {
        sqs.purgeAllQueues();
    }

    @Test
    public void shouldPublishProcessedRequestPayload() throws Exception {
        // given
        String toSend = "abcdefgh";
        // when
        Response response = client
                .target("http://127.0.0.1:" + app.getLocalPort() + "/ping")
                .request().post(Entity.json(toSend));
        // then
        assertEquals(Status.NO_CONTENT.getStatusCode(), response.getStatus());
        List messagesFromQueue = sqs.getQueue("sample-queue").read(10);
        assertEquals(1, messagesFromQueue.size());
        assertEquals("ABCDEFGH", messagesFromQueue.get(0).getBody());
    }
}

Annotations ClassRule setup Dropwizard (using test.yml as configuration file) and ElasticMQ. SqsQueuesRule constructor accepts configuration: “sample-queue” to create and port 8888 – compare with test.yml file. I use basic Jersey client to reach “ping” endpoint and parse HTTP response. Three assertions in “then” block verify our assumptions we defined in previous sections.

I hope this article gave you at least general overview of how Amazon Simple Queue Service can be simulated in service tests. The advantages of presented approach are: simplicity, decoupling from third party service (needed especially when CI server does not allow Internet access) and cost control. Please feel free to use it in your projects.

Written by Jan Gurda
EM Payments Gateway Kraków
Published September 10, 2015