Mise À Jour En Masse Avec Spring Data MongoDB Reactive
Afin de mettre à jour les documents d’une collection MongoDB, on passe souvent par des requêtes de mise à jour, si le volume des données est conséquent, cela peut conduire à des problèmes de performance et une surconsommation des ressources matérielles.
On va implémenter une solution pour enrichir et mettre à jour efficacement un grand volume de données en utilisant Spring Data MongoDB Reactive.
Avant de continuer la lecture, si vous n’êtes pas familier avec la pile réactive de Spring et MongoDB, je suggère de consulter la section ressources.
1. EIP enrichisseur de contenu
Le modèle d’intégration d’entreprise enrichisseur de contenu permet d’ajouter des informations à un message existant depuis une source externe. Il utilise les informations contenues dans le message entrant pour effectuer l’opération d’enrichissement.
On va implémenter une version simplifiée de l'EIP :
Message en entrée : représenté par un document MongoDB.
Enrichisseur : notre application.
Ressource : appel à une API RESTful.
Message en sortie : nous ne conserverons que le document enrichi.
1.1. Flux d’intégration
L’application va lire les documents adresse, ajouter le produit et sauvegarder les documents enrichis dans la base MongoDB.
2. Configuration du projet
2.2. Génération
On génère le squelette du projet depuis Spring Initializr
.
2.3. Structure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
.
│ .gitignore
│ docker-compose.yml
│ pom.xml
│ README.adoc
├───data
│ ├───mongodb
│ │ address.ndjson
│ └───product
│ db.json
└───src
├───main
│ ├───java
│ │ └───com
│ │ └───maoudia
│ │ └───tutorial
│ │ Application.java
│ │ AppProperties.java
│ │ CollectionService.java
│ │ NetworkConfig.java
│ └───resources
│ application.yml
└───test
└───java
└───com
└───maoudia
└───tutorial
CollectionServiceTest.java
2.4. Conteneurs
On télécharge le dossier data
vers la racine du projet.
On utilise docker-compose
pour créer les conteneurs nécessaires pour ce tutoriel.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
services:
mongodb: (1)
container_name: maoudia-mongodb
image: mongo:7.0.2
environment:
- MONGO_INITDB_DATABASE=test
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=password
networks:
- mongodb-network
ports:
- 15015:27017
volumes:
- ./data/mongodb:/data/mongodb
mongo-express: (2)
container_name: maoudia-mongo-express
image: mongo-express:1.0.0
depends_on:
- mongodb
networks:
- mongodb-network
environment:
- ME_CONFIG_MONGODB_SERVER=maoudia-mongodb
- ME_CONFIG_MONGODB_ADMINUSERNAME=admin
- ME_CONFIG_MONGODB_ADMINPASSWORD=password
- ME_CONFIG_BASICAUTH_USERNAME=admin
- ME_CONFIG_BASICAUTH_PASSWORD=password
ports:
- 1515:8081
volumes:
- ./data/mongodb:/data/mongodb
product-api: (3)
container_name: maoudia-product-api
image: clue/json-server:latest
ports:
- 1519:80
volumes:
- ./data/product/db.json:/data/db.json
networks:
mongodb-network:
driver: bridge
1 | MongoDB initialisé avec la base de données test . |
2 | MongoExpress est une interface d’administration MongoDB. |
3 | L’API produit est configurée depuis le fichier db.json . |
On démarre les services :
1
docker-compose up -d
2.5. Initialisation des données
On utilise un document JSON issu de la base d’adresses française.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"id": "59350",
"type": "municipality",
"name": "Lille",
"postcode": [
"59000",
"59800",
"59260",
"59777",
"59160"
],
"citycode": "59350",
"x": 703219.96,
"y": 7059335.72,
"lon": 3.045433,
"lat": 50.630992,
"population": 234475,
"city": "Lille",
"context": "59, Nord, Hauts-de-France",
"importance": 0.56333
}
On importe la collection d’adresses :
1
mongoimport --uri "mongodb://admin:password@localhost:15015" --authenticationDatabase=admin --db test --collection address ./data/mongodb/address.ndjson
Ou :
On utilise MongoExpress qui est accessible sur http://localhost:1515
.
Le produit représente une offre d’internet par satellite.
1
2
3
4
5
6
7
{
"id": 1,
"available": true,
"company": "SPACEX",
"provider": "STARLINK",
"type": "SATELLITE"
}
L’API produit est accessible sur http://localhost:1519
.
3. Application
3.1. Configuration
On change l’extension du fichier de application.properties
vers application.yml
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
app:
buffer-max-size: 500
bulk-size: 100
collection-name: address
enriching-key: product
enriching-uri: http://localhost:1519/products/1
spring:
main:
web-application-type: none
data:
mongodb:
database: test
uri: mongodb://admin:password@localhost:15015
---
spring.config.activate.on-profile: dev
logging:
level:
org.mongodb.driver: debug
---
spring.config.activate.on-profile: test
app:
bulk-size: 2
On déclare une classe qui va contenir les propriétés de configuration de l’application.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@ConfigurationProperties(prefix = "app")
@Validated
public record AppProperties(
@DefaultValue("128")
@Positive
int bulkSize,
@DefaultValue("1024")
@Positive
int bufferMaxSize,
@NotBlank
String collectionName,
@NotBlank
String enrichingKey,
@NotNull
URI enrichingUri
) {
}
On crée un @Bean
du client HTTP non bloquant de Spring.
1
2
3
4
5
6
7
8
9
@Configuration
public class NetworkConfig {
@Bean
public WebClient client() {
return WebClient.create();
}
}
3.2. Implémentation
On crée le @Service
qui va contenir la logique métier de l’application.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class CollectionService {
private final AppProperties properties;
private final ReactiveMongoTemplate template;
private final WebClient client;
public CollectionService(AppProperties properties,
ReactiveMongoTemplate template,
WebClient client) {
this.properties = properties;
this.template = template;
this.client = client;
}
public Flux<BulkWriteResult> enrichAll(String collectionName, String enrichingKey, String enrichingUri) {
return template.findAll(Document.class, collectionName) (1)
.onBackpressureBuffer(properties.bufferMaxSize()) (2)
.flatMap(document -> enrich(document, enrichingKey, enrichingUri)) (3)
.map(CollectionService::toReplaceOneModel) (4)
.window(properties.bulkSize()) (5)
.flatMap(replaceOneModelFlux -> bulkWrite(replaceOneModelFlux, collectionName)); (6)
}
}
1 | Crée un flux de documents à partir de la collection. |
2 | Limite le nombre maximum de documents chargés dans la RAM en cas de consommation plus lente que la production.
Si la taille maximale du tampon est dépassée, une IllegalStateException est levée. |
3 | Enrichie le document avec le document externe d’une façon asynchrone. |
4 | Crée un ReplaceOneModel à partir du document. |
5 | Regroupe les documents en flux de taille fixe. Le dernier flux peut être de taille inférieure. |
6 | Appel la fonction d’écriture en masse. |
La propriété de configuration |
On crée les fonctions d’enrichissement de document.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Publisher<Document> enrich(Document document, String enrichingKey, String enrichingUri) { (1)
return getEnrichingDocument(enrichingUri)
.map(enrichingDocument -> {
document.put(enrichingKey, enrichingDocument);
document.put("updatedAt", new Date());
return document;
});
}
private Mono<Document> getEnrichingDocument(String enrichingUri) { (2)
return client.get()
.uri(URI.create(enrichingUri))
.retrieve()
.bodyToMono(Document.class);
}
1 | Ajoute le document récupéré depuis l’appel HTTP à la racine du document à enrichir avec la clef passée en paramètre. |
2 | Récupère le document depuis l'URI. |
MongoDB convertie et stocke les dates en UTC par défaut. |
1
2
3
4
5
6
7
8
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions(); (1)
private static ReplaceOneModel<Document> toReplaceOneModel (Document document) {
return new ReplaceOneModel<>(
Filters.eq("_id", document.get("_id")), (2)
document, (3)
REPLACE_OPTIONS
);
}
1 | Instancie la configuration de remplacement par défaut. |
2 | Le filtre permet la correspondance par identifiant document. |
3 | Le contenu à remplacer, représente l’intégralité du document enrichi. |
1
2
3
4
5
6
private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(false); (1)
private Flux<BulkWriteResult> bulkWrite(Flux<ReplaceOneModel<Document>> updateOneModelFlux, String collectionName) {
return updateOneModelFlux.collectList() (2)
.flatMapMany(unused -> template.getCollection(collectionName) (3)
.flatMapMany(collection -> collection.bulkWrite(updateOneModels, BULK_WRITE_OPTIONS))); (4)
}
1 | Instancie les options d’écritures en désactivant l’ordre des opérations. |
2 | Collecte le flux dans une liste. |
3 | Récupère la collection passée en paramètre. |
4 | Écrit en masse les documents dans la collection MongoDB. |
Les transactions sont supportées sur les Replicaset depuis MongoDB 4.2.
Si les transactions sont activées, on peut utiliser |
On implémente les interfaces suivantes :
CommandLineRunner
: exécute la commande d’enrichissement au démarrage de l’application.ExitCodeGenerator
: gère le code de sortie système.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootApplication(exclude = MongoReactiveRepositoriesAutoConfiguration.class) (1)
@ConfigurationPropertiesScan("com.maoudia.tutorial") (2)
public class Application implements CommandLineRunner, ExitCodeGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
private final AppProperties properties;
private final CollectionService service;
private int exitCode = 255;
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
}
public Application(AppProperties properties, CollectionService service) {
this.properties = properties;
this.service = service;
}
@Override
public void run(final String... args) {
service.enrichAll(properties.collectionName(), properties.enrichingKey(), properties.enrichingUri())
.doOnSubscribe(unused -> LOGGER.info("------------------< Staring Collection Enriching Command >-------------------")) (3)
.doOnNext(bulkWriteResult -> LOGGER.info("Bulk write result with {} modified document(s)", bulkWriteResult.getModifiedCount()))
.doOnError(throwable -> {
exitCode = 1;
LOGGER.error("Collection enriching failed due to : {}", throwable.getMessage(), throwable);
})
.doOnComplete(() -> exitCode = 0)
.doOnTerminate(() -> LOGGER.info("------------------< Collection Enriching Command Finished >------------------"))
.blockLast(); (4)
}
@Override
public int getExitCode() {
return exitCode;
}
}
1 | Désactive l’auto-configuration des repositories, car on utilise MongoReactiveTemplate seulement. |
2 | Permet de scanner et détecter les beans qui portent l’annotation @ConfigProperties . |
3 | L’inscription au flux déclenche le traitement. |
4 | Sans un serveur web en fonctionnement, nous devons nous abonner indéfiniment au Publisher afin de déclencher
et attendre la fin de l’exécution. |
3.3. Démo
On lance l’application :
1
mvn spring-boot:run
Sortie :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
2023-11-10T02:02:58.673+01:00 INFO 84802 --- [ main] com.maoudia.tutorial.Application : Started Application in 0.831 seconds (process running for 0.992)
2023-11-10T02:02:58.725+01:00 INFO 84802 --- [ main] com.maoudia.tutorial.Application : ------------------< Staring Collection Enriching Command >-------------------
2023-11-10T02:02:59.186+01:00 INFO 84802 --- [ntLoopGroup-2-4] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.244+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.290+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.357+01:00 INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.438+01:00 INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.503+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.578+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.632+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.727+01:00 INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00 INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application : ------------------< Collection Enriching Command Finished >------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.282 s
[INFO] Finished at: 2023-11-10T02:03:03+01:00
[INFO] ------------------------------------------------------------------------
3.4. Rapport VisuelVM
VisualVM est un outil de profilage léger. On l’utilise pour avoir une vue d’ensemble sur les threads qui sont lancés par l’application.
On observe deux groupes de threads qui exécutent les opérations en parallèle, chaque groupe forme une l'event loop.
Les requêtes MongoDB sont exécutées par
nioEventLoopGroup
.Les requêtes HTTP sont exécutées par
reactor-http-nio
.
4. Tests d’intégration
On utilise JUnit 5 et le module Testcontainers MongoDB pour les tests d’intégration. Cela permet d’avoir un retour proche du comportement réel de l’application qui fait essentiellement des opérations de lecture/écriture.
Pour que ce tutoriel reste court, on va se contenter d’écrire qu’un seul test.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Profile("test")
@SpringBootTest
@Testcontainers (1)
class CollectionServiceTest {
@Container
public static GenericContainer<?> jsonServerContainer = new GenericContainer<>("clue/json-server:latest")
.withExposedPorts(80)
.withFileSystemBind("./data/product/db.json", "/data/db.json", BindMode.READ_ONLY)
.waitingFor(Wait.forHttp("/").forStatusCode(200).forPort(80))
.withReuse(true); (2)
@Container
private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:7.0.2");
@DynamicPropertySource
private static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl); (3)
registry.add("app.enriching-uri", () -> "http://" + jsonServerContainer.getHost() + ":" + jsonServerContainer.getMappedPort(80) + "/products/1");
}
@Autowired
private AppProperties properties;
@Autowired
private CollectionService command;
@Autowired
private ReactiveMongoTemplate template;
@Test
void multipleBulkWriteResultsAreReturned() {
Document givenDocument1 = new Document();
givenDocument1.put("_id", "628ea3edb5110304e5e814f6");
givenDocument1.put("type", "municipality");
Document givenDocument2 = new Document();
givenDocument2.put("_id", "628ea3edb5110304e5e814f7");
givenDocument2.put("type", "street");
Document givenDocument3 = new Document();
givenDocument3.put("_id", "628ea3edb5110304e5e814f8");
givenDocument3.put("type", "housenumber");
template.insert(Arrays.asList(givenDocument1, givenDocument2, givenDocument3), properties.collectionName()).blockLast();
BulkWriteResult expectedBulkWriteResult1 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 2, 2, Collections.emptyList(),
Collections.emptyList());
BulkWriteResult expectedBulkWriteResult2 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 1, 1, Collections.emptyList(),
Collections.emptyList());
command.enrichAll( properties.collectionName(), properties.enrichingKey() , properties.enrichingUri())
.as(StepVerifier::create) (4)
.expectNext(expectedBulkWriteResult1)
.expectNext(expectedBulkWriteResult2)
.verifyComplete();
}
}
1 | Ajoute l’extension Junit 5 de TestContainers. |
2 | Démarre un conteneur MongoDB. |
3 | Configure l’application avec l’URI du conteneur. |
4 | Utilise StepVerifier de Reactor Test pour faire des assertions sur le flux en sortie. |
On lance les tests d’intégration :
1
mvn test -Dspring.profiles.active=test
Résultats des tests :
1
2
3
4
5
6
7
8
9
10
11
12
13
...
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.098 s - in com.maoudia.tutorial.CollectionServiceTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.539 s
[INFO] Finished at: 2023-11-10T02:06:45+01:00
[INFO] ------------------------------------------------------------------------
5. Conclusion
Dans ce tutoriel, on a réussi à implémenter une solution complète pour enrichir et mettre à jour efficacement une collection MongoDB. De plus, on a vu comment écrire des tests d’intégration avec JUnit 5 et Testcontainers.
Le code source complet est disponible sur Github.
Dans le prochain chapitre de la série MongoDB Reactive CLI, on ajoutera de nouvelles fonctionnalités et utilisera Picocli afin de faciliter les interactions avec l’application.