1. Introduction
In the first part of this article we implemented many integration tests covering the most common external services that most applications use, including databases and REST APIs.
We also looked at the exact setup that is needed to create an efficient integration tests suite for your application, using Gradle, Spock and Spring Boot.
This part will continue the exploration of various external services that are often integrated in common backend microservices, such as: Kafka (consumers and producers), AWS SQS, Storage (Google Cloud) and SMTP protocol for sending emails.
Finally, we will make an attempt of organising the entire configuration into what can be defined as a comprehensive integration testing framework, leveraging the capabilities offered by Spock Extensions.
2. Kafka Consumer testing
Nowadays, many applications consume messages from Kafka or a different message bus. Setting up and maintaining your own Kafka cluster for integration test purposes is a bit cumbersome. Hence, assistance can be obtained once again from the Testcontainers, by using its Kafka module.
Let’s implement a Kafka consumer in the application that will read messages from the given topic and handle them. We can later create test scenarios to ensure the handling logic is correct.
Kafka consumer
To write Kafka consumer, an additional Spring dependency is required. Let’s add the Testcontainers Kafka module as well.
implementation("org.springframework.kafka:spring-kafka:$springKafkaVersion") itestImplementation("org.testcontainers:kafka:$testcontainersVersion")
Assuming the topic contains user events for registering or deleting user entries, the UserRepository
from the previous article needs enhancement by adding a delete method.
private companion object { ... const val DELETE_USER_QUERY = """DELETE FROM users where id = :id""" } override fun delete(id: Long) { jdbcTemplate.update(DELETE_USER_QUERY, mapOf("id" to id)) }
Now, a model class is required to represent the events.
data class UserEvent( val id: Long, val operation: Operation, val name: String ) { fun toUser() = User(id, name) } enum class Operation { REGISTER, DELETE }
Finally, thanks to spring-kafka
library we have a very simple yet powerful way of creating the Kafka consumer.
@Component class UserListener(private val repository: UserRepository) { @KafkaListener(topics = ["\${topics.users}"]) fun handleUserEvent(event: UserEvent) { when(event.operation) { Operation.REGISTER -> repository.addOrUpdate(event.toUser()) Operation.DELETE -> repository.delete(event.id) } } }
KafkaListener
annotation guarantees that the Kafka consumer will be started in a separate thread, and it will handle UserEvent
events from the topics.users
kafka topic. Ensure the topic placeholder gets the correct value in the application.yml
config file:
topics: users: public.user.events
Last, but not least, is Kafka configuration. Spring provides a bunch of different properties for Kafka, but only a few that are most needed will be used.
spring: kafka: bootstrap-servers: ${KAFKA_BOOTSTRAPSERVERS} consumer: group-id: spring-it auto-offset-reset: earliest enable-auto-commit: false key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer" value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer" properties: spring.json.trusted.packages: "pl.kurczyna.springit"
The following are the most important:
kafka.bootstrap-servers
-> shortly, it serves as the Kafka cluster URL; the application can take it, for instance, from the environment variableKAFKA_BOOTSTRAPSERVERS
kafka.consumer.group-id
-> Group Id of the consumer, it’s usually the application namekafka.consumer.key-deserializer
andkafka.consumer.value-deserializer
-> Key and value deserializers for our events. In this case, keys are simple Strings and values are in JSON format
Adding this configuration will help Spring autoconfigure Kafka, so now the consumer (@KafkaListener
) can work properly.
Integration Test
Testing scenarios
Two testing scenarios can be outlined for the Kafka user consumer:
- User registration
-
- Prepare registration event
- Send the event to Kafka (using utility Kafka producer)
- Verify that the user has eventually been stored in the DB
Eventually word is the key here. The Kafka consumer operates asynchronously, without a precise way to determine the exact timing of its actions.
- User deletion
-
- Insert the user to the db
- Prepare deletion event
- Send the event to Kafka (using utility Kafka producer)
- Verify that the user has eventually been deleted from the DB
Test setup
Similar to WireMock, the aim is to decouple specific Testcontainers Kafka logic from the tests. Hence, a KafkaMock
utility class will be created to manage everything related to Kafka.
- Defining the docker container to be used
- Starting and stopping docker container
- Exposing the
bootstrapServers
url that Kafka is running on
class KafkaMock { private static final Logger log = LoggerFactory.getLogger(KafkaMock.class) private static final String CONFLUENT_PLATFORM_VERSION = '7.4.0' private static final DockerImageName KAFKA_IMAGE = DockerImageName .parse('confluentinc/cp-kafka') .withTag(CONFLUENT_PLATFORM_VERSION) private static KafkaContainer container static String bootstrapServers static void start() { log.info("Starting Kafka Container Mock") container = new KafkaContainer(KAFKA_IMAGE) container.start() bootstrapServers = container.getBootstrapServers() } static void stop() { log.info("Stopping Kafka Mock") container.stop() } }
Now, we have to manage our mock in the IntegrationTestBase
:
def setupSpec() { ... KafkaMock.start() // Start Kafka container } def cleanupSpec() { ... KafkaMock.stop() // Stop Kafka container }
TestPropertySourceUtils
has also got to be updated to expose kafka.bootstrapServers
property that can be used in the application-itest.yml
:
... String[] properties = ["wiremock.stripePort=$stripePort", "kafka.bootstrapServers=${KafkaMock.bootstrapServers}"] ...
kafka: bootstrap-servers: ${kafka.bootstrapServers} consumer: group-id: spring-it-itest producer: retries: 1 acks: all value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer" key-serializer: "org.apache.kafka.common.serialization.StringSerializer"
As you can see, our application now uses an embedded Kafka container (in the integration test scope) that we spin up using Testcontainers.
Two other interesting properties in the config above are:
consumer.group-id
-> it’s a good practice to override this value for integration testsproducer
section -> To send test Kafka events, creating a simple Kafka producer is essential. You can use theKafkaRestTemplate
Spring utility for it, which can be auto-configured by Spring when the necessary properties are provided.
Then, it can be autowired in the IntegrationTestBase
class so you might use it in the tests.
@Autowired KafkaTemplate<String, UserEvent> kafkaUsersProducer
Test implementation
class ITestUserEventsKafka extends IntegrationTestBase { @Value("\${topics.users}") String usersTopic def "should store user in db when registration event comes"() { given: 'There is a user event' Long id = nextLong() String name = 'John Lundstram' UserEvent event = new UserEvent(id, Operation.REGISTER, name) when: 'The event is sent to Kafka' kafkaUsersProducer.send(usersTopic, id.toString(), event) then: 'Within 5 seconds the user is register in the db. Note that we test asynchronous code' new PollingConditions(timeout: 5).eventually { User inDb = dbTestClient.getUserById(id) inDb.id == id inDb.name == name } } def "should delete user from db when delete event comes"() { given: 'There is already one user in the DB' Long id = nextLong() String name = 'John Lundstram' dbTestClient.insertUser(id: id, name: name) and: 'There is a delete user event' UserEvent event = new UserEvent(id, Operation.DELETE, name) when: 'The event is sent to Kafka' kafkaUsersProducer.send(usersTopic, id.toString(), event) then: 'Within 5 seconds the user is deleted from the db. Note that we test asynchronous code' new PollingConditions(timeout: 5).eventually { User inDb = dbTestClient.getUserById(id) inDb == null } } }
Let’s examine the specifics outlined in this context.
Initially, the value of the property topics.users
is injected into the class property usersTopic
. This is crucial for determining the designated topic when sending the event.
Secondly, KafkaTemplate
is used to send the event:
kafkaUsersProducer.send(usersTopic, id.toString(), event)
by providing the topic name, event key and event value.
Lastly, we verify that the user is inserted (or deleted) using the DbTestClient
you already know.
However, the most crucial aspect is performing this verification within a PollingConditions.eventually()
block. This is the mechanism that allows testing asynchronous behaviour. Spock will run the verification block multiple times before it succeeds or the timeout elapses. In this case scenario, 5 seconds is a pretty safe bet – the logic is relatively simple, making it reasonable to expect that event handling will finish within 5 seconds.
3. Kafka Producer testing
Similarly to testing Kafka consumers, the application may produce some events, so we might want to test if those events are sent properly. This can be done quite easily by enhancing our setup a bit.
Kafka producer
Let’s imagine a broadcast event needs to be published every time the user is registered. Implementing a UserBroadcast
service that will publish relevant events to Kafka is feasible.
@Component class UserBroadcast( @Value("\${topics.broadcast}") private val broadcastTopic: String, private val kafkaUsersProducer: KafkaTemplate<String, BroadcastEvent> ) { fun broadcastUserRegistration(user: User) { kafkaUsersProducer.send(broadcastTopic, user.id.toString(), BroadcastEvent.fromUserEvent(user)) } }
As you remember, KafkaTemplate
needs additional producer
configuration, so it is advisable to add it to the application.yml
file:
kafka: producer: retries: 1 acks: all value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer" key-serializer: "org.apache.kafka.common.serialization.StringSerializer" topics: broadcast: public.user.broadcast
The config is exactly the same as defined in the itest
profile, so there is no need to override it anymore. Therefore, it is suggested to remove the producer
section from the application-itest.yml
.
Let’s also add BroadcastEvent
to our model:
data class BroadcastEvent( val userId: Long, val timestamp: Instant ) { companion object { fun fromUserEvent(user: User) = BroadcastEvent(user.id, Instant.now()) } }
Now, you can inject a UserBroadcast
component to the UserController
and publish the broadcast event every time the user is registered.
class UserController(private val repository: UserRepository, private val userBroadcast: UserBroadcast) { ... @PostMapping fun addOrUpdate(@RequestBody user: User): ResponseEntity<Unit> { repository.addOrUpdate(user) userBroadcast.broadcastUserRegistration(user) return ResponseEntity.status(HttpStatus.CREATED).build() } ... }
Integration Test
Testing scenario
Our testing scenario is relatively simple. However, a clever workaround will be necessary to ensure its functionality. Let’s enhance the testing scenario from the controller test (ITestUsers
) we defined before:
Add new user:
- Call the
POST /api/users
endpoint - Verify the response status
- Verify that the user is added to the DB
- Verify that the broadcast event has been published
Test setup
But how to verify Kafka events being published? Numerous methods are available, but my approach involves intercepting the events produced by our application, storing them in memory, and subsequently verifying if they were properly sent. To do that, you may create another Kafka consumer, in the itest
scope, that will imitate the consumer of our UserBroadcast
events.
@Component @Profile('itest') class KafkaTestConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestConsumer.class) @KafkaListener(topics = ['\${topics.broadcast}']) static void receiveBroadcastMessage(ConsumerRecord<?, ?> consumerRecord) { LOGGER.info('Received broadcast kafka message: {}', consumerRecord.value()) KafkaMock.recordMessage(consumerRecord.topic(), consumerRecord.value()) } static <T>List<T> getTopicMessages(String topic) { KafkaMock.messages[topic].collect { it as T } } }
@Profile('itest')
means that the component will only be created in the itest
scope and will not affect our application in production. As you know, @KafkaListener
will run the consumer in a separate thread and every time we get the event, we will record it in our KafkaMock: KafkaMock.recordMessage(consumerRecord.topic(), consumerRecord.value())
The getTopicMessages
method is exposed for test purposes, in order to get all messages produced to the topic in question
KafkaMock
will store those messages in a ConcurrentHashMap
in memory.
class KafkaMock { ... static ConcurrentHashMap<String, List<Object>> messages = new ConcurrentHashMap<>() ... static void recordMessage(String topic, Object message) { def topicMessages = messages.getOrDefault(topic, []) topicMessages.add(message) messages[topic] = topicMessages } }
Now, let’s adjust the user registration test case.
Test implementation
def "should add new user"() { ... and: 'There is a new user in the DB' User inDb = dbTestClient.getUserById(id) inDb == user and: "Broadcast event for user with id $id has been sent" new PollingConditions(timeout: 5).eventually { def messages = KafkaTestConsumer.<BroadcastEvent>getTopicMessages(broadcastTopic).findAll() { it.userId == id } assert messages.size() == 1 } }
As a result, another and
section was added, in which a PollingConditions.eventually
block is defined. In this verification step, the test consumer is checked to receive precisely one broadcast message, sent to the broadcastTopic
, for the user (identified by its id
) in question.
4. Storage (Google Cloud)
Another common pattern in the contemporary microservices is the integration with various cloud services such as Amazon Web Services (AWS) or Google Cloud.
One of the services that is widely adopted is cloud storage. In this particular chapter the emphasis will be put on the Google Cloud Storage, however AWS S3 testing is very similar.
Pictures Controller and Service
Let’s imagine there is a need to provide a functionality of uploading pictures in the system that you build. In an effort to accomplish this, you may think of implementing the following controller and service for uploading pictures using Google Cloud Storage.
@RestController @RequestMapping("/api/pictures") class PicturesController(private val picturesService: PicturesService) { @PostMapping fun uploadPicture( @RequestPart("picture", required = true) picture: MultipartFile ): ResponseEntity<Unit> { picturesService.uploadPicture(picture) return ResponseEntity.ok().build() } }
interface PicturesService { fun uploadPicture(picture: MultipartFile) } @Service class DefaultPicturesService( @Value("\${gcs.bucket}") private val bucketName: String, val storage: Storage ) : PicturesService { override fun uploadPicture(picture: MultipartFile) { val blobId = BlobId.of(bucketName, picture.originalFilename) val blobInfo = BlobInfo.newBuilder(blobId).apply { setContentType(picture.contentType) }.build() val content = picture.bytes storage.writer(blobInfo).use { writer -> writer.write(ByteBuffer.wrap(content, 0, content.size)) } blobInfo.blobId.name } }
The setup requires additional dependencies to declare:
dependencyManagement { // Google Cloud dependency management for Spring imports { mavenBom("com.google.cloud:spring-cloud-gcp-dependencies:$springCloudGcpVersion") } } ... implementation("com.google.cloud:spring-cloud-gcp-starter-storage") // GCS dependency
And a GCS bucket configuration in the application.yml
file:
gcs: bucket: gcs://pictures
Integration Test
Testing scenario
Assume following testing scenario:
- There is a picture to be uploaded to GCS
- The
upload
endpoint is called, with the file to upload as a multipart body - Response is successful
- The file is uploaded to GCS bucket
Test setup
The latter step is the most essential one, therefore you need to find a way of setting up a mocked Google Cloud Storage bucket for the test and be able to verify its content afterwards.
Unfortunately, Testcontainers does not provide a dedicated module for Google Cloud Storage, thus you might opt for using the GenericContainer
and set up a mocked GCS yourself. Favourably, there are numerous open-source libraries available for the task, one of which is Fake GCS Server. It provides an emulator for Google Cloud Storage API, alongside with the Java SDK.
Let’s use it in practice.
Firstly, create a GcsMock
with the use of Fake Gcs Java SDK
:
class GcsMock { private static def container static def gcsPort static void start() { container = new GenericContainer<>("fsouza/fake-gcs-server:latest") .withExposedPorts(4443) .withCreateContainerCmdModifier { it.withEntrypoint( "/bin/fake-gcs-server", "-scheme", "http") } container.start() gcsPort = container.getFirstMappedPort() updateExternalUrlWithContainerUrl(container.getHost(), gcsPort) } static void stop() { container.stop() } static String[] propertiesToRegister() { return ["gcs.port=$gcsPort"] } private static def updateExternalUrlWithContainerUrl(String host, Integer port) throws Exception { def fakeGcsExternalUrl = "http://" + host + ":" + port def modifyExternalUrlRequestUri = fakeGcsExternalUrl + "/_internal/config" def updateExternalUrlJson = new JSONObject() updateExternalUrlJson.put("externalUrl", fakeGcsExternalUrl) def request = HttpRequest.newBuilder() .uri(URI.create(modifyExternalUrlRequestUri)) .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .PUT(HttpRequest.BodyPublishers.ofString(updateExternalUrlJson.toString())) .build() def response = HttpClient.newBuilder().build() .send(request, HttpResponse.BodyHandlers.discarding()) if (response.statusCode() != HttpStatus.OK.value()) { throw new RuntimeException("Error updating fake-gcs-server with external URL, response status code ${response.statusCode()} != 200") } } }
start
and stop
methods are already familiar, they are going to be used to manage the testcontainer lifecycle. You should also expose the port the GcsMock
is running on as an application property. All this should be done in the setupSpec
and cleanupSpec
methods of the IntegrationTestBase
:
def setupSpec() { stripeServer = new WireMockServer(stripePort) stripeServer.start() KafkaMock.start() GcsMock.start() } def cleanupSpec() { stripeServer.stop() KafkaMock.stop() GcsMock.stop() } ... String[] properties = [ "wiremock.stripePort=$stripePort", "kafka.bootstrapServers=${KafkaMock.bootstrapServers}", "gcs.port=${GcsMock.gcsPort}" ]
Having GCS container in place, the application needs to “know” where to find it, thus it’s necessary to configure Storage
Spring Bean accordingly:
@TestConfiguration class GcsITestConfiguration { @Value("\${gcs.port}") private String gcsContainerPort @Bean @Primary Storage gcsStorage() { return StorageOptions.newBuilder() .setHost("http://localhost:$gcsContainerPort") .setProjectId("itest-project") .setCredentials(NoCredentials.getInstance()) .build() .getService() } }
The gcsContainerPort
is injected from the application property gcs.port
which points to the GCS testcontainer, you can see that the host and credentials for the Storage
service is overridden accordingly. The @Primary
annotation instructs Spring Boot to assign this @Bean
definition a highest priority. Moreover, @TestConfiguration
annotation means that this configuration class will only be applied in the test scope.
There is one more important step to make in order to complete the setup, which is importing the test configuration class for all the tests, by adding relevant @Import
annotation in the IntegrationTestBase
:
@Import(GcsITestConfiguration) @ContextConfiguration(initializers = PropertyInitializer) abstract class IntegrationTestBase extends Specification {
Now, you’re almost ready to implement the integration test scenario defined before, but let’s add another utility component, that will allow interaction with the Google Cloud Storage inside the tests.
gcs: bucket: itest-pictures
@Component class StorageTestClient { @Autowired Storage storageClient @Value("\${gcs.bucket}") private String bucketName def createBucket() { def bucketInfo = BucketInfo.newBuilder(bucketName).build() storageClient.create(bucketInfo) } def deleteBucket() { storageClient.list(bucketName).streamAll().forEach {it -> storageClient.delete(it.blobId)} storageClient.delete(bucketName) } def getAllBlobs() { return storageClient.list(bucketName).values } }
It provides methods for creating and deleting buckets, but also for querying all objects (blobs) in the bucket, which are going to be used later.
Last step is to create a relevant bucket before the tests, and clean it up after they are executed.
@Autowired StorageTestClient storageClient ... def setup() { dbTestClient = new DbTestClient(template) stripeMock = new StripeMock(stripeServer) storageClient.createBucket() } def cleanup() { stripeServer.resetAll() storageClient.deleteBucket() }
Test implementation
class ITestPictures extends IntegrationTestBase { def "should upload picture to GCS"() { given: 'There is a picture to be uploaded' def picture = new ClassPathResource("pictures/yosemite.png") when: 'Upload endpoint is called' def body = buildMultipartBody(picture) def result = restTemplate.postForEntity('/api/pictures', body, Unit.class) then: 'Response is successful' result.statusCode == OK and: 'Picture is uploaded to GCS' def blobs = storageClient.getAllBlobs() blobs.size() == 1 blobs[0].blobId != null blobs[0].name == 'yosemite.png' } private def buildMultipartBody(Resource picture) { def body = new LinkedMultiValueMap<>() body.add("picture", new HttpEntity<>(picture)) return body } }
The end result is clean and concise. Using a multipart request body to send the actual file with the HTTP request allows for verifying whether or not it has been uploaded successfully to the Google Cloud Storage bucket.
Please bear in mind that the file pictures/yosemite.png has to be on the classpath, for instance in the src/itest/resources directory.
5. Sending Emails
Another widely adopted feature that can be looked at is email sending using Simple Mail Transfer Protocol (SMTP). This chapter will concentrate on presenting how it can be effectively tested.
Email sender
Below is an illustrative example implementation of the email sending component in Spring Boot.
data class Email( val sender: String, val recipient: String, val subject: String, val content: String ) interface EmailService { fun sendEmail(email: Email) } @Service class SmtpEmailService(private val javaMailSender: JavaMailSender) : EmailService { override fun sendEmail(email: Email) { val mimeMessage = javaMailSender.createMimeMessage() MimeMessageHelper(mimeMessage, true).apply { setText(email.content, true) setTo(email.recipient) setFrom(email.sender) setSubject(email.subject) } javaMailSender.send(mimeMessage) } }
The Email
class represents the email object with essential data. The aforementioned service uses JavaMailSender
SDK for sending emails via SMTP
. It has to be properly configured and Spring offers a convenient way of doing that, by specifying required configuration properties in the application.yml
file.
mail: host: smtp.gmail.com port: 587 username: <username> password: <password> properties: mail.smtp.auth: true mail.smtp.starttls.enable: true
We need to declare additional dependencies for the application as well:
implementation("org.springframework.boot:spring-boot-starter-mail") implementation("com.sun.mail:jakarta.mail:$jakartaVersion")
Integration Test
Testing scenario
An actual testing scenario could be straightforward:
- There is an email to be sent
DefaultEmailService
’ssend
method is called- Email is delivered to the particular recipient, identified by the email address
- Verification that the delivered email contains all the requisite data
Test setup
To be able to perform the verification from the last bullet point of the testing scenario you have to inspect the data (emails) being sent over SMTP. One of the tools that allows that is GreenMail. It’s an open source, intuitive and easy-to-use test suite of email servers for testing purposes.
Let’s add it to our test suite:
itestImplementation("com.icegreen:greenmail:$greenmailVersion") itestImplementation("com.icegreen:greenmail-junit5:$greenmailVersion")
static int greenMailPort = findAvailableTcpPort() static String greenMailUser = UUID.randomUUID().toString() static String greenMailPassword = UUID.randomUUID().toString() static GreenMail greenMail def setupSpec() { ... greenMail = new GreenMail(ServerSetupTest.SMTP.port(greenMailPort)) .withConfiguration( GreenMailConfiguration.aConfig() .withUser(greenMailUser, greenMailPassword) ) } def setup() { ... greenMail.start() } def cleanup() { ... greenMail.stop() } ... static class PropertyInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> { @Override void initialize(ConfigurableApplicationContext applicationContext) { String[] properties = [ ... "greenmail.port=${greenMailPort}", "greenmail.user=${greenMailUser}", "greenmail.password=${greenMailPassword}" ] ...
Configuration:
mail: host: localhost port: ${greenmail.port} username: ${greenmail.user} password: ${greenmail.password} properties: mail.smtp.auth: true mail.smtp.starttls.enable: true
The provided code exemplifies the instantiation and lifecycle management of the GreenMail
object. Once it’s operational, you are able to adjust the application’s SMTP configuration on the itest
profile, thereby enabling the utilisation of GreenMail
in lieu of an actual SMTP server.
Test implementation
class ITestEmailService extends IntegrationTestBase { @Autowired EmailService emailService def "should send e-mail"() { given: 'There is an email to be sent' def email = new Email('anakin@skywalker.com', 'luke@skywalker.com', 'Luke!', 'I am your father') when: 'The email is sent' emailService.sendEmail(email) then: 'Greenmail receives one email message' def messages = greenMail.getReceivedMessages() messages.size() == 1 with(messages.first()) { getHeader(it, 'From') == email.sender getHeader(it, 'To') == email.recipient getHeader(it, 'Subject') == email.subject def body = GreenMailUtil.getBody(it) body.contains(email.content) } } private static def getHeader(MimeMessage message, String header) { message.allHeaders.find { it.name == header }.value } }
In the verification block of the above test, the GreenMail
utility method getReceivedMessages
is used. This method returns the array of MimeMessage
objects, representing emails intercepted by GreenMail
. To assert the presence of desired data within the email, the utility method GreenMailUtil.getBody()
is invoked. Additionally, various email headers can be verified through this process.
6. AWS SQS queues
As an illustrative example of the AWS services, let us closely examine the Simple Queue Service (SQS). It can serve diverse purposes and constitutes a common integration point of many applications.
Email sending events
AWS SES enables clients to track emails sending at a granular level by setting up email sending events. They can be published to various output streams but one of them is an SQS queue (via SNS).
Assume that your application needs to consume and handle those events.
Let’s write a simple consumer code.
SQSEmailEventsReceiver
Firstly, more dependencies are required to be defined:
implementation(platform("io.awspring.cloud:spring-cloud-aws-dependencies:$awsCloudVersion")) implementation("io.awspring.cloud:spring-cloud-aws-starter-sqs")
Conveniently, nothing more is needed to implement a SQS message receiver in Spring Boot:
data class EmailEvent( val id: Long, val type: String ) @Service class SqsEmailEventReceiver { private companion object { val log: Logger = LoggerFactory.getLogger(SqsEmailEventReceiver::class.java) } @SqsListener(value = ["\${sqs.queue-name}"]) fun handle(event: EmailEvent) { log.info("Handling email event: $event finished with success") } }
EmailEvent
class represents an event object from SES, in real life it’s usually much more complex, but you can use this simple representation for example purposes.
SqsEmailEventReceiver
provides one method handle
, decorated with a @SqsListener
annotation which instructs Spring Boot to spawn a separate thread with a consumer of the queue passed as a parameter.
The handling logic is just logging the received event to make it simple.
sqs.queue-name
property has to be defined in the application.yml file:
sqs: queue-name: email-events-queue
Integration Test
Testing scenario
Assume following testing scenario:
- Email event is sent to the SQS queue
- Event is being processed
- Eventually the event is successfully consumed (it’s not visible in the queue anymore)
Test setup
When dealing with AWS services, it is advised to use Testcontainers LocalStack Module which is a fully functional local AWS Cloud stack and has built-in support for SQS among others.
Let’s add a dependency to `localstack` module:
itestImplementation("org.testcontainers:localstack:$localstackVersion")
Now, we’re ready to spin up the LocalStack docker container with SQS support:
private static DockerImageName localstackImage = DockerImageName.parse("localstack/localstack") public static LocalStackContainer localstack = new LocalStackContainer(localstackImage).withServices(SQS) ... def setupSpec() { ... localstack.start() } def cleanupSpec() { ... localstack.stop() }
As in most of the previous examples, the application needs to know that it should connect to the mocked SQS server in the itest
scope. To do that, let’s override SqsAsyncClient
bean definition to point to the LocalStack
container. Add the following @TestConfiguration
class to IntegrationTestBase
:
@TestConfiguration static class AwsITestConfiguration { @Bean @Primary SqsAsyncClient amazonSQSAsync() { return SqsAsyncClient.builder() .region(Region.of(localstack.getRegion())) .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.accessKey, localstack.secretKey))) .endpointOverride(localstack.getEndpointOverride(SQS)) .build() } }
What’s new in the example above is the AWS credentials and region that need to be specified. Luckily, localstack
is equipped with suitable methods to retrieve those properties.
Last, but not least we need an utility component to facilitate interaction with the SQS queue. We might introduce SqsTestClient
for that purpose:
@Component class SqsTestClient { @Autowired private SqsAsyncClient sqsClient @Value("\${sqs.queue-name}") private String queueName def createQueue() { sqsClient.createQueue(CreateQueueRequest.builder().queueName(queueName).build() as CreateQueueRequest) } def sendEvent(String body) { sqsClient.sendMessage(getMessageRequest(body)) } def processingEventsSize() { parseInt(getQueueAttributes().get("ApproximateNumberOfMessagesNotVisible")) } def isQueueEmpty() { def attributes = getQueueAttributes() return parseInt(attributes.get("ApproximateNumberOfMessagesNotVisible")) + parseInt(attributes.get("ApproximateNumberOfMessages")) + parseInt(attributes.get("ApproximateNumberOfMessagesDelayed")) == 0 } private SendMessageRequest getMessageRequest(String body) { SendMessageRequest.builder().queueUrl(queueName).messageBody(body).build() as SendMessageRequest } private Map<String, String> getQueueAttributes() { def request = GetQueueAttributesRequest .builder() .queueUrl(queueName) .attributeNames(ALL) .build() as GetQueueAttributesRequest sqsClient.getQueueAttributes(request).get().attributesAsStrings() } }
Presented class utilises the SqsAsyncClient
component to communicate with SQS (remember that it is configured to be tied with the LocalStack
container). AWS SDK can be used to implement methods convenient to use in the test implementation such as:
- createQueue
- isQueueEmpty
- processingEventsSize
sqs.queue-name
has to be defined in the application-itest.yml
configuration file:
sqs: queue-name: itest-email-events-queue
Moreover, SqsTestClient
needs to be injected in the IntegrationTestBase
:
@Autowired SqsTestClient sqsTestClient
That is all the setup necessary for employing the integration test for the SQS queue consumer.
Test implementation
class ITestSqsEmailEventReceiver extends IntegrationTestBase { def "should consume SQS event"() { given: 'There is an email event' String event = /{ "id": 13, "type": "Marked as spam" }/ when: 'An event is sent to the queue' sqsTestClient.sendEvent(event) then: 'Event is being processed' new PollingConditions(timeout: 5).eventually { sqsTestClient.processingEventsSize() == 1 } and: 'Event is successfully consumed' new PollingConditions(timeout: 5).eventually { sqsTestClient.isQueueEmpty() } } }
As always, the exact implementation is straightforward when all of the necessary pieces are in place. Following the event being sent to the queue, we can make an assertion that it is successfully consumed by verifying if the queue is eventually empty – this means that no exception has been thrown in the process. Of course, if the handling logic was more sophisticated, you would be able to verify other conditions related to the database or application state.
7. Optimisations
In the preceding chapters of this article, we have examined numerous patterns that could form a robust foundation for integration testing. Nevertheless, there remains a lot of room for improvement. As you probably noticed, we didn’t avoid certain logical duplicates and boilerplate code. Among the repetitive tasks, the lifecycle management of our mocks stands out, as it is consistently executed in a similar manner for the majority of them. Furthermore, evaluating the overall execution time of the integration test suite reveals potential for optimization
BUILD SUCCESSFUL in 2m 6 actionable tasks: 3 executed, 3 up-to-date real 2m0.707s user 0m1.280s sys 0m0.257s
A total of 12 test cases distributed across 8 test classes were executed, with an allotted time of two minutes. This feels like an amount to improve.
Let us explore one of the potential ways to address the aforementioned challenges.
Spock Extensions
Spock comes with a powerful extension mechanism, which allows you to hook into a specification’s lifecycle to enrich or alter its behaviour. While there exists a range of built-in extensions with valuable functionalities, this chapter will focus on Spock’s custom extensions.
Spock offers two distinct types of custom extensions: global extensions and annotation-driven local extensions – we will make use of both of them.
For a comprehensive understanding of Spock extensions, the documentation provides further insights, all you should know for now is that we need them to run custom code for our mocks in the specific moments during the integration tests suite run.
Objectives
There is multiple objectives that we will try to achieve:
- Improve tests execution time
- Make mock services easily pluggable
- Mitigate redundancy and boilerplate code
- Consolidate mock configuration within a user-friendly framework
- Decouple tests logic from the configuration
Implementation
To achieve the aforementioned objectives, it would be a good idea to create a common interface (Groovy trait) for all the mocks in our suite.
trait Mock { abstract void start() abstract void stop() void cleanup() { } /** * Inlined properties that should be registered in the application context * format: ["propertyA=valueA", "propertyB=valueB"] */ String[] propertiesToRegister() { return [] } }
Each mock within the suite has to implement this interface. start
, stop
and cleanup
methods are simple lifecycle management hooks. propertiesToRegister
defines the test properties that each mock exposes and need to be added to application context. It is noteworthy that both the cleanup
and propertiesToRegister
methods have empty default implementation.
Let’s examine the MailMock
as an illustrative example of the implementation for the Mock interface.
class MailMock implements Mock { static int greenMailPort = findAvailableTcpPort() static String greenMailUser = UUID.randomUUID().toString() static String greenMailPassword = UUID.randomUUID().toString() static GreenMail greenMail @Override void start() { greenMail = new GreenMail(ServerSetupTest.SMTP.port(greenMailPort)) .withConfiguration( GreenMailConfiguration.aConfig() .withUser(greenMailUser, greenMailPassword) ) greenMail.start() } @Override void stop() { greenMail.stop() } @Override String[] propertiesToRegister() { return [ "greenmail.port=${greenMailPort}", "greenmail.user=${greenMailUser}", "greenmail.password=${greenMailPassword}" ] } static MimeMessage[] getReceivedMessages() { return greenMail.getReceivedMessages() } }
Other examples can be found in this package.
Next step is to create an utility class that will gather all mocks and could be used to manage their lifecycle collectively.
class MockEnvironment { private static boolean started private static List<Mock> mocks static init(List<Mock> mocks) { this.mocks = mocks } static start() { if (!started) { started = true mocks.each { it.start() } } } static stop() { if (started) { mocks.each { it.stop() } } } static cleanup() { mocks.each { it.cleanup() } } /** * Could be added to application context like this: * @Override * void initialize(ConfigurableApplicationContext applicationContext) { * TestPropertySourceUtils.addInlinedPropertiesToEnvironment( * applicationContext, * MockEnvironment.propertiesToRegister() * ) * } */ static String[] propertiesToRegister() { mocks.collect {it.propertiesToRegister() }.flatten() } }
It empowers the user to initialise a list of mocks to be managed in the application.
Annotation driven local extension
Let’s create a Spock extension dedicated to initiating all mocks and registering essential properties prior to the first test execution.
To make mocks easily pluggable we can take advantage of a custom annotation.
@Retention(RetentionPolicy.RUNTIME) @Target([ElementType.TYPE, ElementType.FIELD, ElementType.METHOD]) @ExtensionAnnotation(MocksExtension.class) @interface Mocks { Service[] services() }
enum Service { GCS(new GcsMock()), KAFKA(new KafkaMock()), MAIL(new MailMock()), SQS(new SqsMock()), STRIPE(new StripeMock()) public final Mock service private Service(Mock service) { this.service = service } }
Service
enumeration represents each of the mocks, while the Mocks
annotation, designed with flexibility in mind, accepts a singular parameter – services
, configuring which services are going to be used.
Finally it’s time to implement the actual extension:
class MocksExtension implements IAnnotationDrivenExtension<Mocks> { @Override void visitSpecAnnotation(Mocks annotation, SpecInfo spec) { MockEnvironment.init(annotation.services().collect { it.service }) spec.addSharedInitializerInterceptor(new IMethodInterceptor() { @Override void intercept(IMethodInvocation invocation) throws Throwable { MockEnvironment.start() } }) spec.addCleanupInterceptor(new IMethodInterceptor() { @Override void intercept(IMethodInvocation invocation) throws Throwable { MockEnvironment.cleanup() } }) } }
The extension leverages two pivotal hooks: addSharedInitializerInterceptor
executed preceding the shared initializer of the annotated specification, and addCleanupInterceptor
, executed subsequent to the cleanup phase of the specification. You can read more about Spock interceptors in the documentation.
Furthermore, the initialization of all mocks is orchestrated through the MockEnvironment.init()
method, with the services parameter extracted from the Mocks
annotation.
That is how the extension can be used in practice:
@Mocks(services = [GCS, KAFKA, MAIL, SQS, STRIPE] ) class IntegrationTestBase extends Specification { ...
It is impressive that this single line of code is everything required for managing the lifecycle of all of the services that we need for the tests.
Global extension
Finally, and of no lesser importance, upon the completion of all tests, a cleanup of the test infrastructure should be performed. As annotation-driven extensions lack a dedicated hook executed after all tests, we shall resort to using a global extension for this purpose.
class MockEnvironmentGlobalExtension implements IGlobalExtension { @Override void stop() { MockEnvironment.stop() } }
The stop()
method precisely fulfils the requirement, ensuring all mocks are stopped upon the conclusion of the entire test suite.
To activate the global extension we need to add its name to META-INF/services/ org.spockframework.runtime.extension.IGlobalExtension
file:
pl.kurczyna.springit.extensions.MockEnvironmentGlobalExtension
Conclusion
Having both extensions in place, the final form of the IntegrationTestBase
class is notably more concise:
@SpringBootTest(webEnvironment = RANDOM_PORT) @ActiveProfiles('itest') @Import(GcsITestConfiguration) @ContextConfiguration(initializers = PropertyInitializer) @Mocks(services = [GCS, KAFKA, MAIL, SQS, STRIPE] ) class IntegrationTestBase extends Specification { @LocalServerPort int appPort @Autowired TestRestTemplate restTemplate @Autowired KafkaTemplate<String, UserEvent> kafkaUsersProducer @Autowired StorageTestClient storageClient @Autowired SqsTestClient sqsTestClient @Autowired DbTestClient dbTestClient def setup() { sqsTestClient.createQueue() } static class PropertyInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> { @Override void initialize(ConfigurableApplicationContext applicationContext) { TestPropertySourceUtils.addInlinedPropertiesToEnvironment( applicationContext, MockEnvironment.propertiesToRegister() ) } } @TestConfiguration static class AwsITestConfiguration { @Bean @Primary SqsAsyncClient amazonSQSAsync() { return SqsAsyncClient.builder() .region(Region.of(SqsMock.getRegion())) .credentialsProvider(SqsMock.getCredentials()) .endpointOverride(SqsMock.getEndpointOverride()) .build() } } }
Additionally, because of the fact that the mocks are started only once, before the first test is executed, the total execution time decreased significantly as well:
BUILD SUCCESSFUL in 39s 6 actionable tasks: 3 executed, 3 up-to-date real 0m39.873s user 0m1.228s sys 0m0.223s
Ultimately, we have effectively realised all predefined objectives. The Spock extensions have proven to be a robust mechanism for systematically organising and consolidating the integration tests setup.
8. Afterword
This article concluded the comprehensive guide to integration testing of JVM backend services. with a predominant focus on Spring Boot, Kotlin, and Spock. In this part particularly, we looked at the more complex integration patterns, including Kafka, email sending, Google Cloud Storage and AWS Simple Queue Service.
Finally, we wrapped up with the chapter that introduced you to the Spock Extensions, effectively encapsulating the test infrastructure, streamlining the setup, and significantly improving test execution time.
I firmly believe that by adopting the approach outlined in this and preceding part of the article, one can develop a robust and highly efficient integration testing framework. This framework can be seamlessly extended to incorporate plentiful external services beyond those aforementioned.
Useful links
- Part One of this article – https://www.schibsted.pl/blog/integration-testing-deep-dive-part-i/
- My previous article about Spock – https://www.schibsted.pl/blog/testing-java-kotlin-code-spock
- Repository with all the examples from this article – https://github.com/patrykkurczyna/spring-integration-tests
- Testcontainers – https://testcontainers.com/
- Spock Extensions – https://spockframework.org/spock/docs/2.0/extensions.html
- Green Mail – https://greenmail-mail-test.github.io/greenmail/