Bulk Update With Spring Data MongoDB Reactive
In order to update documents in a MongoDB collection, we often use update requests, if the volume of data is too large, it could lead to performance issues and overconsumption of hardware resources.
We will implement a solution to enrich and update efficiently a large amount of data using Spring Data MongoDB Reactive.
Before continuing the reading, if you are not familiar with Spring reactive stack and MongoDB, I suggest you to check the resources section.
1. EIP content enricher
Enterprise Integration Pattern Content Enricher appends information to an existing message from an external source. It uses information inside the incoming message to perform the enrichment operation.
We will implement a simplified version of the EIP :
Input message : represented by a MongoDB document.
Enricher : our application.
Resource : call to a RESTful API.
Output message : we will keep only the enriched document.
1.1. Integration flow
The application will read the address documents, add the product and save the enriched documents to the MongoDB database.
2. Project setup
2.2. Generation
We generate the project skeleton from Spring Initializr
.
2.3. Structure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
.
│ .gitignore
│ docker-compose.yml
│ pom.xml
│ README.adoc
├───data
│ ├───mongodb
│ │ address.ndjson
│ └───product
│ db.json
└───src
├───main
│ ├───java
│ │ └───com
│ │ └───maoudia
│ │ └───tutorial
│ │ Application.java
│ │ AppProperties.java
│ │ CollectionService.java
│ │ NetworkConfig.java
│ └───resources
│ application.yml
└───test
└───java
└───com
└───maoudia
└───tutorial
CollectionServiceTest.java
2.4. Containers
Download data
directory to the root of the project.
We use docker-compose
to create the needed containers for this tutorial.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
services:
mongodb: (1)
container_name: maoudia-mongodb
image: mongo:7.0.2
environment:
- MONGO_INITDB_DATABASE=test
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=password
networks:
- mongodb-network
ports:
- 15015:27017
volumes:
- ./data/mongodb:/data/mongodb
mongo-express: (2)
container_name: maoudia-mongo-express
image: mongo-express:1.0.0
depends_on:
- mongodb
networks:
- mongodb-network
environment:
- ME_CONFIG_MONGODB_SERVER=maoudia-mongodb
- ME_CONFIG_MONGODB_ADMINUSERNAME=admin
- ME_CONFIG_MONGODB_ADMINPASSWORD=password
- ME_CONFIG_BASICAUTH_USERNAME=admin
- ME_CONFIG_BASICAUTH_PASSWORD=password
ports:
- 1515:8081
volumes:
- ./data/mongodb:/data/mongodb
product-api: (3)
container_name: maoudia-product-api
image: clue/json-server:latest
ports:
- 1519:80
volumes:
- ./data/product/db.json:/data/db.json
networks:
mongodb-network:
driver: bridge
1 | MongoDB initialized with the test database. |
2 | MongoExpress is a MongoDB administration interface. |
3 | Product API which is configured from db.json file. |
We start up the services :
1
docker-compose up -d
2.5. Data initialization
We use a JSON document from the French address database.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"id": "59350",
"type": "municipality",
"name": "Lille",
"postcode": [
"59000",
"59800",
"59260",
"59777",
"59160"
],
"citycode": "59350",
"x": 703219.96,
"y": 7059335.72,
"lon": 3.045433,
"lat": 50.630992,
"population": 234475,
"city": "Lille",
"context": "59, Nord, Hauts-de-France",
"importance": 0.56333
}
Import address collection :
1
mongoimport --uri "mongodb://admin:password@localhost:15015" --authenticationDatabase=admin --db test --collection address ./data/mongodb/address.ndjson
Or:
We use MongoExpress which is available at http://localhost:1515
.
Product represents a satellite internet offer.
1
2
3
4
5
6
7
{
"id": 1,
"available": true,
"company": "SPACEX",
"provider": "STARLINK",
"type": "SATELLITE"
}
Product API is available at http://localhost:1519
.
3. Application
3.1. Configuration
We change file extension from application.properties
to application.yml
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
app:
buffer-max-size: 500
bulk-size: 100
collection-name: address
enriching-key: product
enriching-uri: http://localhost:1519/products/1
spring:
main:
web-application-type: none
data:
mongodb:
database: test
uri: mongodb://admin:password@localhost:15015
---
spring.config.activate.on-profile: dev
logging:
level:
org.mongodb.driver: debug
---
spring.config.activate.on-profile: test
app:
bulk-size: 2
We declare a class which contains application configuration properties.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@ConfigurationProperties(prefix = "app")
@Validated
public record AppProperties(
@DefaultValue("128")
@Positive
int bulkSize,
@DefaultValue("1024")
@Positive
int bufferMaxSize,
@NotBlank
String collectionName,
@NotBlank
String enrichingKey,
@NotNull
URI enrichingUri
) {
}
We create a @Bean
of Spring non-blocking HTTP client.
1
2
3
4
5
6
7
8
9
@Configuration
public class NetworkConfig {
@Bean
public WebClient client() {
return WebClient.create();
}
}
3.2. Implementation
We create a @Service
which contains application business logic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class CollectionService {
private final AppProperties properties;
private final ReactiveMongoTemplate template;
private final WebClient client;
public CollectionService(AppProperties properties,
ReactiveMongoTemplate template,
WebClient client) {
this.properties = properties;
this.template = template;
this.client = client;
}
public Flux<BulkWriteResult> enrichAll(String collectionName, String enrichingKey, String enrichingUri) {
return template.findAll(Document.class, collectionName) (1)
.onBackpressureBuffer(properties.bufferMaxSize()) (2)
.flatMap(document -> enrich(document, enrichingKey, enrichingUri)) (3)
.map(CollectionService::toReplaceOneModel) (4)
.window(properties.bulkSize()) (5)
.flatMap(replaceOneModelFlux -> bulkWrite(replaceOneModelFlux, collectionName)); (6)
}
}
1 | Creates a stream of documents from the collection. |
2 | Limits the maximum number of loaded documents in the RAM in case of consumption process is slower than production.
If the maximum buffer size is exceeded, an IllegalStateException is thrown. |
3 | Enriches document asynchronously with the external one. |
4 | Creates a ReplaceOneModel from document. |
5 | Group documents into streams of fixed size. The last stream can be smaller. |
6 | Calls bulk write function. |
Configuration property |
We create document enrichment functions.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Publisher<Document> enrich(Document document, String enrichingKey, String enrichingUri) { (1)
return getEnrichingDocument(enrichingUri)
.map(enrichingDocument -> {
document.put(enrichingKey, enrichingDocument);
document.put("updatedAt", new Date());
return document;
});
}
private Mono<Document> getEnrichingDocument(String enrichingUri) { (2)
return client.get()
.uri(URI.create(enrichingUri))
.retrieve()
.bodyToMono(Document.class);
}
1 | Adds the retrieved document from HTTP call to root of document to be enriched with the key passed in parameter. |
2 | Retrieves a document from an URI. |
MongoDB converts and stores dates in UTC by default. |
1
2
3
4
5
6
7
8
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions(); (1)
private static ReplaceOneModel<Document> toReplaceOneModel (Document document) {
return new ReplaceOneModel<>(
Filters.eq("_id", document.get("_id")), (2)
document, (3)
REPLACE_OPTIONS
);
}
1 | Instantiates default replacement configuration. |
2 | Filter that allows matching by document identifier. |
3 | Content to be replaced, represents the complete enriched document. |
1
2
3
4
5
6
private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(false); (1)
private Flux<BulkWriteResult> bulkWrite(Flux<ReplaceOneModel<Document>> updateOneModelFlux, String collectionName) {
return updateOneModelFlux.collectList() (2)
.flatMapMany(unused -> template.getCollection(collectionName) (3)
.flatMapMany(collection -> collection.bulkWrite(updateOneModels, BULK_WRITE_OPTIONS))); (4)
}
1 | Instantiates writing options with disabling operations order. |
2 | Collects the stream into a list. |
3 | Retrieves the collection passed as a parameter. |
4 | Bulk writes documents into MongoDB collection. |
Transactions are supported on Replicaset since MongoDB 4.2.
If transactions are enabled, we can use |
We implement the following interfaces:
CommandLineRunner
: runs enrichment command at application startup.ExitCodeGenerator
: manages application system exit code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootApplication(exclude = MongoReactiveRepositoriesAutoConfiguration.class) (1)
@ConfigurationPropertiesScan("com.maoudia.tutorial") (2)
public class Application implements CommandLineRunner, ExitCodeGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
private final AppProperties properties;
private final CollectionService service;
private int exitCode = 255;
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
}
public Application(AppProperties properties, CollectionService service) {
this.properties = properties;
this.service = service;
}
@Override
public void run(final String... args) {
service.enrichAll(properties.collectionName(), properties.enrichingKey(), properties.enrichingUri())
.doOnSubscribe(unused -> LOGGER.info("------------------< Staring Collection Enriching Command >-------------------")) (3)
.doOnNext(bulkWriteResult -> LOGGER.info("Bulk write result with {} modified document(s)", bulkWriteResult.getModifiedCount()))
.doOnError(throwable -> {
exitCode = 1;
LOGGER.error("Collection enriching failed due to : {}", throwable.getMessage(), throwable);
})
.doOnComplete(() -> exitCode = 0)
.doOnTerminate(() -> LOGGER.info("------------------< Collection Enriching Command Finished >------------------"))
.blockLast(); (4)
}
@Override
public int getExitCode() {
return exitCode;
}
}
1 | Disables auto-configuration of repositories, as we use MongoReactiveTemplate only. |
2 | Allows scanning and detecting beans that carry the @ConfigProperties annotation. |
3 | Subscribing to stream triggers the processing. |
4 | Without a running web server, we have to subscribe indefinitely to the Publisher in order to trigger
and wait until the end of the execution. |
3.3. Demo
We launch the application :
1
mvn spring-boot:run
Output :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
2023-11-10T02:02:58.673+01:00 INFO 84802 --- [ main] com.maoudia.tutorial.Application : Started Application in 0.831 seconds (process running for 0.992)
2023-11-10T02:02:58.725+01:00 INFO 84802 --- [ main] com.maoudia.tutorial.Application : ------------------< Staring Collection Enriching Command >-------------------
2023-11-10T02:02:59.186+01:00 INFO 84802 --- [ntLoopGroup-2-4] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.244+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.290+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.357+01:00 INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.438+01:00 INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.503+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.578+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.632+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.727+01:00 INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : ------------------< Collection Enriching Command Finished >------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.282 s
[INFO] Finished at: 2023-11-10T02:03:03+01:00
[INFO] ------------------------------------------------------------------------
3.4. VisuelVM report
VisualVM is a lightweight profiling tool. It is used to have an overview of the threads which are launched by the application.
There are two groups of threads that execute operations in parallel, each group forms an event loop.
MongoDB requests are executed by
nioEventLoopGroup
.HTTP requests are executed by
reactor-http-nio
.
4. Integration tests
We use JUnit 5 and the Testcontainers MongoDB module for the integration tests. It allows to have a feedback close to the real behaviour of the application which essentially do read/write operations.
To keep this tutorial short, we will only write one test.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Profile("test")
@SpringBootTest
@Testcontainers (1)
class CollectionServiceTest {
@Container
public static GenericContainer<?> jsonServerContainer = new GenericContainer<>("clue/json-server:latest")
.withExposedPorts(80)
.withFileSystemBind("./data/product/db.json", "/data/db.json", BindMode.READ_ONLY)
.waitingFor(Wait.forHttp("/").forStatusCode(200).forPort(80))
.withReuse(true); (2)
@Container
private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:7.0.2");
@DynamicPropertySource
private static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl); (3)
registry.add("app.enriching-uri", () -> "http://" + jsonServerContainer.getHost() + ":" + jsonServerContainer.getMappedPort(80) + "/products/1");
}
@Autowired
private AppProperties properties;
@Autowired
private CollectionService command;
@Autowired
private ReactiveMongoTemplate template;
@Test
void multipleBulkWriteResultsAreReturned() {
Document givenDocument1 = new Document();
givenDocument1.put("_id", "628ea3edb5110304e5e814f6");
givenDocument1.put("type", "municipality");
Document givenDocument2 = new Document();
givenDocument2.put("_id", "628ea3edb5110304e5e814f7");
givenDocument2.put("type", "street");
Document givenDocument3 = new Document();
givenDocument3.put("_id", "628ea3edb5110304e5e814f8");
givenDocument3.put("type", "housenumber");
template.insert(Arrays.asList(givenDocument1, givenDocument2, givenDocument3), properties.collectionName()).blockLast();
BulkWriteResult expectedBulkWriteResult1 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 2, 2, Collections.emptyList(), Collections.emptyList());
BulkWriteResult expectedBulkWriteResult2 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 1, 1, Collections.emptyList(), Collections.emptyList());
command.enrichAll( properties.collectionName(), properties.enrichingKey() , properties.enrichingUri())
.as(StepVerifier::create) (4)
.expectNext(expectedBulkWriteResult1)
.expectNext(expectedBulkWriteResult2)
.verifyComplete();
}
}
1 | Adds TestContainers Junit 5 extension. |
2 | Starts a MongoDB container. |
3 | Sets up application with container’s URI. |
4 | Uses StepVerifier from Reactor Test to assert output stream. |
We launch the integration tests :
1
mvn test -Dspring.profiles.active=test
Test results :
1
2
3
4
5
6
7
8
9
10
11
12
13
...
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.098 s - in com.maoudia.tutorial.CollectionServiceTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.539 s
[INFO] Finished at: 2023-11-10T02:06:45+01:00
[INFO] ------------------------------------------------------------------------
5. Conclusion
In this tutorial, we managed to implement a complete solution to enrich and update efficiently a MongoDB collection. Moreover, we have seen how to write integration tests with JUnit 5 and Testcontainers.
The complete source code is available on Github.
In the next chapter of MongoDB Reactive CLI series, we will add new features and use Picocli to facilitate interactions with the application.