Contexte métier : Traitement de fichiers volumineux, reformatage et sauvegarde Excel ou BD
Nous allons profiter de cette dernière semaine pour revoir certains concepts java si besoin et bien entendu en apprendre de nouveau.
Au programme de la semaine (je sais que vous avez votre fête d’entreprise le jeudi soir ;)) :
Découverte des bases de l’architecture Hexagonale et comparaison avec MVC
Lien vers une synthèse rapide avec un exemple simpe avec Spring Batch
Dans le monde professionnel, de nombreuses opérations ne peuvent pas être faites en temps réel : traiter des millions de lignes comptables la nuit, générer des relevés bancaires pour tous les clients le week-end, importer un catalogue produit de 2 millions d’articles, ou encore reformater des fichiers volumineux pour les envoyer à des applications COBOL mainframe.
Ces traitements partagent des caractéristiques communes :
Analogie simple : Imaginez une usine d’embouteillage. La chaîne de production lit des bouteilles vides (lecture), les remplit et les étiquette (traitement), puis les met en cartons (écriture). Si une bouteille est cassée, on la met de côté sans arrêter toute la chaîne. À la fin, on sait combien de bouteilles ont été produites et combien ont été rejetées. Spring Batch, c’est le chef d’usine de vos traitements de données.
On pourrait se demander : pourquoi ne pas écrire une simple boucle while qui lit le fichier ligne par ligne et l’écrit ? C’est possible, mais dès que les fichiers deviennent volumineux ou que le besoin évolue, on rencontre des problèmes.
while
OutOfMemoryError
Les applications COBOL, encore très présentes dans les banques, assurances et administrations (sinon vous ne seriez pas là ;), consomment des fichiers à longueur fixe : chaque champ occupe exactement N caractères, complété par des espaces si besoin. C’est la notion de “layout” COBOL.
Exemple d’un enregistrement COBOL pour un client :
DUPONT JEAN 0123456789FR75001
Spring Batch est parfaitement adapté à ce besoin : lire un fichier CSV moderne, transformer chaque champ pour respecter la longueur exacte, et écrire un fichier à longueur fixe.
Avant d’écrire la moindre ligne de code, il faut comprendre le vocabulaire de Spring Batch. Voici la hiérarchie des concepts, du plus grand au plus petit :
JobLauncher └── Job (le traitement complet) ├── Step 1 (lire et valider le fichier) ├── Step 2 (transformer et écrire) └── Step 3 (archiver et notifier) └── Chunk (groupe de N enregistrements traités ensemble) ├── ItemReader (lit 1 enregistrement) ├── ItemProcessor (transforme 1 enregistrement) └── ItemWriter (écrit N enregistrements d'un coup)
Un Job est l’unité de traitement de plus haut niveau. C’est “le travail à faire”, défini une fois, exécutable plusieurs fois. Chaque exécution d’un Job crée une JobInstance unique (identifiée par ses paramètres), et chaque tentative d’exécution d’une JobInstance est une JobExecution. Je sais que cela semble un peu long comme chemin mais cela se justifie programmatiquement parlant.
Job "ImporterClients" ├── JobInstance (params: fichier=""clients_2024-01-15.csv") │ ├── JobExecution #1 : FAILED (erreur réseau à 60%) │ └── JobExecution #2 : COMPLETED (reprise depuis 60%) └── JobInstance (params: fichier="clients_2024-01-16.csv") └── JobExecution #1 : COMPLETED
Un Step est une étape indépendante d’un Job. Un Step peut être :
Chaque Step a sa propre StepExecution qui stocke les métriques : nombre de lignes lues, traitées, écrites, ignorées ou en erreur. On a pas besoin de gérer tout ça.
C’est le cœur de Spring Batch. Le principe est simple mais puissant.
Chunk size = 3 Read → [A] Process → [A'] Read → [B] Process → [B'] Read → [C] Process → [C'] Write → [A', B', C'] ← une seule transaction pour 3 éléments car le chunk = 3 ← commit → checkpoint sauvegardé Read → [D] ...
Pourquoi chunk et pas enregistrement par enregistrement ? Ouvrir/fermer une transaction pour chaque ligne serait catastrophique pour les performances. En groupant N lignes par transaction, on divise le nombre de transactions par N. Pour 1 million de lignes avec chunk=1000, c’est 1000 transactions au lieu de 1 million ou plus !
Le JobRepository est la base de données interne de Spring Batch. Il stocke automatiquement toutes les informations sur les exécutions :
StepExecution
Par défaut en développement, Spring Batch utilise une base H2 en mémoire que nous avons l’habitude d’utiliser pour nos tests dans les applications Spring Boot. En production, on configure une base PostgreSQL, MySQL ou Oracle.
Le JobLauncher est le composant qui lance un Job avec des paramètres donnés. C’est le point d’entrée.
// lancement simple jobLauncher.run(monJob, new JobParameters()); // avec paramètres JobParameters params = new JobParametersBuilder() .addString("fichier", "clients_2024-01-15.csv") .addDate("date", new Date()) .toJobParameters(); jobLauncher.run(monJob, params); // ici on associe le Job et les paramètres
┌─────────────────────────────────────────────────────────────┐ │ Spring Batch │ │ │ │ JobLauncher ──────► Job ──────► Step 1 ──────► Step 2 │ │ │ │ │ │ │ │ │ │ [Chunk] [Tasklet] │ │ │ │ Reader │ │ │ │ Processor │ │ │ │ Writer │ │ │ │ │ │ └────────────────┴──────► JobRepository │ │ (PostgreSQL / H2) │ └─────────────────────────────────────────────────────────────┘
l’idéal, pour comprendre Spring Batch, rien de mieux que le code avec un exemple concret.
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> </parent> <dependencies> <!-- Spring Batch : inclut spring-batch-core et spring-batch-infrastructure --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!-- Base de données pour le JobRepository --> <!-- En développement : H2 en mémoire --> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <!-- En production : PostgreSQL --> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <!-- Pour les tests --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies>
# ============ Base de données du JobRepository ============ # En développement : H2 en mémoire (données perdues au redémarrage) spring.datasource.url=jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1 spring.datasource.driver-class-name=org.drivers.H2Driver spring.datasource.username=sa spring.datasource.password= # Spring Batch crée automatiquement ses tables (BATCH_JOB_INSTANCE, etc.) spring.batch.jdbc.initialize-schema=always # NE PAS lancer les Jobs automatiquement au démarrage # On veut contrôler quand les Jobs s'exécutent spring.batch.job.enabled=false # ============ Logs ============ logging.level.org.springframework.batch=INFO
ou en YAML si vous préférez :
# ============ Base de données du JobRepository ============ spring: datasource: url: jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1 driver-class-name: org.drivers.H2Driver username: sa password: "" batch: jdbc: initialize-schema: always job: enabled: false # ============ Logs ============ logging: level: org.springframework.batch: INFO
spring.batch.job.enabled=false est crucial ! Par défaut, Spring Boot lance tous les Jobs trouvés dans le contexte au démarrage de l’application. C’est pratique pour les démos mais dangereux en production : on veut déclencher les Jobs manuellement ou via un planificateur.
spring.batch.job.enabled=false
src/main/java/com/monapp/batch/ ├── BatchApplication.java ← Point d'entrée Spring Boot comme d'habitude ├── config/ │ └── BatchConfig.java ← Configuration des Jobs et Steps ├── model/ │ └── Client.java ← Modèle de données ├── reader/ │ └── ClientCsvReader.java ← Lecteur de fichier CSV, TXT, JSON, ou autres... ├── processor/ │ └── ClientProcessor.java ← Transformation des données ├── writer/ │ └── ClientFixedWriter.java ← Écriture du fichier longueur fixe └── listener/ └── JobNotificationListener.java ← Suivi d'exécution src/main/resources/ ├── input/ │ └── clients.csv ← Fichier d'entrée csv │ └── clients.xls ← Fichier d'entrée xls │ └── clients.txt ← Fichier d'entrée txt └── output/ ← Répertoire de sortie
Un Job se configure dans une classe annotée @Configuration. On utilise les builders fournis par Spring Batch.
@Configuration
@Configuration public class BatchConfig { // Spring Batch 5 (Spring Boot 3) injecte automatiquement ces beans // On peut ultiliser cette annoation ici lorsque l'on ne fait pas de tests @Autowired private JobRepository jobRepository; @Autowired private PlatformTransactionManager transactionManager; // ---- Définition du Job ---- @Bean public Job importClientsJob(Step etapeLecture, Step etapeArchivage) { return new JobBuilder("importClientsJob", jobRepository) .start(etapeLecture) // Première étape .next(etapeArchivage) // Deuxième étape (si la première réussit) .build(); } // ---- Définition d'un Step Chunk ---- @Bean public Step etapeLecture( ItemReader<Client> reader, ItemProcessor<Client, ClientCobol> processor, ItemWriter<ClientCobol> writer) { return new StepBuilder("etapeLecture", jobRepository) .<Client, ClientCobol>chunk(100, transactionManager) // <InputType, OutputType>chunk(tailleChunk, transactionManager) .reader(reader) .processor(processor) .writer(writer) .build(); } // ---- Définition d'un Step Tasklet ---- @Bean public Step etapeArchivage() { return new StepBuilder("etapeArchivage", jobRepository) .tasklet((contribution, chunkContext) -> { // Code simple à exécuter une seule fois System.out.println("Archivage du fichier traité..."); // Logique d'archivage ici return RepeatStatus.FINISHED; // Dire à Spring Batch que c'est terminé }, transactionManager) .build(); } }
RepeatStatus.FINISHED par rapport à RepeatStatus.CONTINUABLE : Une Tasklet peut tourner en boucle ! Si vous retournez CONTINUABLE, Spring Batch rappellera votre tasklet en boucle jusqu’à ce qu’elle retourne FINISHED. Utile pour des sondages actifs.
RepeatStatus.FINISHED
RepeatStatus.CONTINUABLE
CONTINUABLE
FINISHED
@Service public class JobLanceur { @Autowired private JobLauncher jobLauncher; @Autowired private Job importClientsJob; public void lancerImport(String nomFichier) throws Exception { // Les JobParameters identifient l'exécution de façon unique // Si vous relancez avec les mêmes paramètres, Spring Batch reprend // là où il s'était arrêté (ou refuse si déjà COMPLETED) JobParameters params = new JobParametersBuilder() .addString("fichierEntree", nomFichier) .addLocalDateTime("lancementLe", LocalDateTime.now()) // pour unicité .toJobParameters(); JobExecution execution = jobLauncher.run(importClientsJob, params); System.out.println("Statut : " + execution.getStatus()); System.out.println("Début : " + execution.getStartTime()); System.out.println("Fin : " + execution.getEndTime()); } }
Les Tasklets sont parfaites pour des tâches de préparation ou de clôture :
Tasklets
// Tasklet pour vérifier qu'un fichier existe avant de commencer ce qui est normal @Bean public Step verifierFichier(@Value("${batch.input.file}") String cheminFichier) { return new StepBuilder("verifierFichier", jobRepository) .tasklet((contribution, chunkContext) -> { File fichier = new File(cheminFichier); if (!fichier.exists()) { throw new IllegalStateException( "Fichier introuvable : " + cheminFichier ); } if (fichier.length() == 0) { throw new IllegalStateException("Fichier vide !"); } System.out.println("Fichier vérifié : " + fichier.length() + " octets"); return RepeatStatus.FINISHED; }, transactionManager) .build(); } // Tasklet pour nettoyer le répertoire de sortie avant écriture @Bean public Step nettoyerSortie(@Value("${batch.output.dir}") String repertoireSortie) { return new StepBuilder("nettoyerSortie", jobRepository) .tasklet((contribution, chunkContext) -> { Path sortie = Paths.get(repertoireSortie); if (Files.exists(sortie)) { Files.walk(sortie) .sorted(Comparator.reverseOrder()) .map(Path::toFile) .forEach(File::delete); } Files.createDirectories(sortie); return RepeatStatus.FINISHED; }, transactionManager) .build(); }
Un ItemReader lit un seul enregistrement à la fois et retourne null quand il n’y a plus rien à lire. Spring Batch appelle en boucle la méthode read() jusqu’à obtenir null.
null
read()
public interface ItemReader<T> { T read() throws Exception; // Retourne null = fin des données }
Spring Batch fournit de nombreux ItemReader prêts à l’emploi. On les construit avec des builders ou des factories.
Pour information, voici un tableau complet des principales classes de Spring Batch pour lire et transformer différents types de fichiers, selon leur extension ou leur format. Ces classes héritent généralement de **ItemReader** ou **ItemStreamReader** :
C’est le reader le plus utilisé pour les fichiers texte. Il lit le fichier ligne par ligne et convertit chaque ligne en objet Java.
Le modèle de données :
// La classe qui représente une ligne du fichier CSV public class ClientCsv { private String nom; private String prenom; private String telephone; private String email; private String codePostal; private String ville; private String codePays; // Constructeur vide indispensable pour le mapping public ClientCsv() {} // getters et setters... }
Exemple de fichier CSV d’entrée (clients.csv) :
nom;prenom;telephone;email;codePostal;ville;codePays Dupont;Jean;0123456789;jean.dupont@mail.com;75001;Paris;FR Martin;Marie;0987654321;marie.martin@mail.com;69001;Lyon;FR Schmidt;Hans;+4930123456;hans.schmidt@mail.de;10115;Berlin;DE
Configuration du FlatFileItemReader :
@Bean public FlatFileItemReader<ClientCsv> clientCsvReader( @Value("${batch.input.file}") Resource fichierEntree) { return new FlatFileItemReaderBuilder<ClientCsv>() .name("clientCsvReader") // Nom unique (pour la reprise) .resource(fichierEntree) // Fichier à lire .encoding("UTF-8") // Encodage .linesToSkip(1) // Sauter la ligne d'en-tête des noms de colonnes .delimited() // Mode délimité (CSV) .delimiter(";") // Séparateur .names("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays") // Noms des colonnes .targetType(ClientCsv.class) // Classe cible .build(); }
Comment fonctionne le mapping ? FlatFileItemReader découpe chaque ligne selon le délimiteur, puis utilise un BeanWrapperFieldSetMapper qui injecte chaque valeur dans le champ Java correspondant par nom. Le nom “nom” dans .names(...) correspond au setter setNom(String) de la classe ClientCsv.
FlatFileItemReader
BeanWrapperFieldSetMapper
.names(...)
setNom(String)
ClientCsv
Attention, car parfois, le mapping automatique ne suffit pas (types complexes, champs calculés). On peut alors fournir un FieldSetMapper personnalisé.
FieldSetMapper
@Bean public FlatFileItemReader<ClientCsv> clientCsvReaderAvancé(Resource fichier) { return new FlatFileItemReaderBuilder<ClientCsv>() .name("clientCsvReaderAvancé") .resource(fichier) .linesToSkip(1) .delimited() .delimiter(";") .names("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays") .fieldSetMapper(fieldSet -> { // Mapping manuel : on a le contrôle total ClientCsv client = new ClientCsv(); client.setNom(fieldSet.readString("nom").trim()); client.setPrenom(fieldSet.readString("prenom").trim()); // Nettoyage du téléphone : garder seulement les chiffres String tel = fieldSet.readString("telephone").replaceAll("[^0-9]", ""); client.setTelephone(tel); client.setEmail(fieldSet.readString("email").toLowerCase().trim()); client.setCodePostal(fieldSet.readString("codePostal").trim()); client.setVille(fieldSet.readString("ville").trim()); client.setCodePays(fieldSet.readString("codePays").toUpperCase().trim()); return client; }) .build(); }
Les fichiers réels contiennent souvent des lignes vides, des commentaires, ou des séparateurs. Spring Batch permet de les filtrer :
@Bean public FlatFileItemReader<ClientCsv> readerAvecFiltres(Resource fichier) { FlatFileItemReader<ClientCsv> reader = new FlatFileItemReader<>(); reader.setResource(fichier); reader.setLinesToSkip(1); // en-tête // Ignorer les lignes vides et les commentaires (#) reader.setSkippedLinesCallback(line -> { // Cette méthode est appelée pour chaque ligne sautée System.out.println("Ligne ignorée : " + line); }); // Filtrer les lignes à ignorer pendant la lecture DefaultLineMapper<ClientCsv> lineMapper = new DefaultLineMapper<>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(";"); tokenizer.setNames("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays"); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{ setTargetType(ClientCsv.class); }}); reader.setLineMapper(lineMapper); // Ignorer les lignes commençant par # ou vides reader.setRecordSeparatorPolicy(new DefaultRecordSeparatorPolicy()); return reader; }
Pour lire des fichiers COBOL existants à longueur fixe (l’inverse de ce qu’on cherche à produire).
@Bean public FlatFileItemReader<ClientCobol> readerFichierFixe(Resource fichier) { return new FlatFileItemReaderBuilder<ClientCobol>() .name("readerFichierFixe") .resource(fichier) // Mode "fixed length" au lieu de "delimited" .fixedLength() .columns( new Range(1, 10), // Colonnes 1-10 : nom new Range(11, 30), // Colonnes 11-30 : prénom new Range(31, 40), // Colonnes 31-40 : téléphone new Range(41, 42), // Colonnes 41-42 : code pays new Range(43, 47) // Colonnes 43-47 : code postal ) .names("nom", "prenom", "telephone", "codePays", "codePostal") .targetType(ClientCobol.class) .build(); }
Pour lire directement depuis une base de données.
@Bean public JdbcCursorItemReader<Client> clientDbReader(DataSource dataSource) { return new JdbcCursorItemReaderBuilder<Client>() .name("clientDbReader") .dataSource(dataSource) .sql("SELECT nom, prenom, telephone, email, code_postal, ville " + "FROM clients WHERE actif = true ORDER BY id") .rowMapper((rs, rowNum) -> { Client c = new Client(); c.setNom(rs.getString("nom")); c.setPrenom(rs.getString("prenom")); c.setTelephone(rs.getString("telephone")); c.setEmail(rs.getString("email")); c.setCodePostal(rs.getString("code_postal")); c.setVille(rs.getString("ville")); return c; }) .fetchSize(1000) // Nombre de lignes récupérées par lot depuis la BDD .build(); }
fetchSize : Sans ce paramètre, le driver JDBC charge toutes les lignes d’un coup en mémoire. Avec fetchSize=1000, il charge 1000 lignes à la fois, ce qui est bien plus efficace pour de grands volumes.
fetchSize
fetchSize=1000
Quand les données sont réparties sur plusieurs fichiers (exemple: un par jour ou par région).
@Bean public MultiResourceItemReader<ClientCsv> multiReaderClients( @Value("classpath:input/clients_*.csv") Resource[] fichiers) { return new MultiResourceItemReaderBuilder<ClientCsv>() .name("multiReaderClients") .resources(fichiers) // Tous les fichiers correspondant au pattern .delegate(clientCsvReader(null)) // Le reader pour chaque fichier individuel .build(); }
L’ItemProcessor est optionnel mais central dans les batchs de transformation. Il reçoit un enregistrement à la fois, le transforme, et retourne le résultat (qui peut être d’un type différent comme un objet java par exemple).
public interface ItemProcessor<I, O> { O process(I item) throws Exception; // Si on retourne null, l'item est filtré (pas écrit) }
C’est ici que réside toute la logique métier de transformation : nettoyage, enrichissement, validation, calcul, reformatage.
Retourner null = filtrer l’enregistrement. Si votre processor retourne null pour un enregistrement, celui-ci ne sera pas transmis à l’ItemWriter. C’est le mécanisme de filtrage natif de Spring Batch.
@Component public class ClientValidationProcessor implements ItemProcessor<ClientCsv, ClientCsv> { private static final Logger log = LoggerFactory.getLogger(ClientValidationProcessor.class); @Override public ClientCsv process(ClientCsv client) throws Exception { // --- Validation --- if (client.getNom() == null || client.getNom().isBlank()) { log.warn("Client ignoré : nom vide → {}", client); return null; // Filtrer cet enregistrement } if (!client.getEmail().contains("@")) { log.warn("Client ignoré : email invalide → {}", client.getEmail()); return null; // Filtrer } // --- Nettoyage / Normalisation --- client.setNom(client.getNom().trim().toUpperCase()); client.setPrenom(capitaliser(client.getPrenom().trim())); client.setEmail(client.getEmail().toLowerCase().trim()); // Nettoyer le téléphone String tel = client.getTelephone().replaceAll("[^0-9+]", ""); client.setTelephone(tel); return client; // Retourner l'enregistrement transformé } private String capitaliser(String s) { if (s == null || s.isEmpty()) return s; return Character.toUpperCase(s.charAt(0)) + s.substring(1).toLowerCase(); } }
Dans notre cas COBOL, on va convertir d’un ClientCsv (données brutes) vers un ClientCobol (format prêt pour COBOL), même si je sais que cela ne sera sans pas le cas.
ClientCobol
// Classe de sortie : représente un enregistrement COBOL public class ClientCobol { private String nomFixe; // 20 caractères private String prenomFixe; // 20 caractères private String telephoneFixe; // 15 caractères private String emailFixe; // 50 caractères private String codePostalFixe; // 5 caractères private String villeFixe; // 30 caractères private String codePaysFixe; // 2 caractères // Total = 142 caractères par enregistrement // getters, setters, toString... } @Component public class ClientToCoblProcessor implements ItemProcessor<ClientCsv, ClientCobol> { @Override public ClientCobol process(ClientCsv client) { ClientCobol cobol = new ClientCobol(); // Chaque champ est formaté à une taille exacte cobol.setNomFixe(formaterChamp(client.getNom(), 20)); cobol.setPrenomFixe(formaterChamp(client.getPrenom(), 20)); cobol.setTelephoneFixe(formaterChamp(client.getTelephone(), 15)); cobol.setEmailFixe(formaterChamp(client.getEmail(), 50)); cobol.setCodePostalFixe(formaterChamp(client.getCodePostal(), 5)); cobol.setVilleFixe(formaterChamp(client.getVille(), 30)); cobol.setCodePaysFixe(formaterChamp(client.getCodePays(), 2)); return cobol; } /** * Formate une chaîne à une longueur exacte : * - Tronque si trop long * - Complète avec des espaces à droite si trop court * - Retourne des espaces si null */ private String formaterChamp(String valeur, int longueur) { if (valeur == null) valeur = ""; // Tronquer si trop long if (valeur.length() > longueur) { return valeur.substring(0, longueur); } // Compléter avec des espaces à droite si trop court return String.format("%-" + longueur + "s", valeur); } }
String.format("%-Ns", valeur) : Le % indique un formatage, - signifie aligné à gauche, N est la longueur, s est pour string. Résultat : la chaîne est complétée par des espaces à droite jusqu’à N caractères.
String.format("%-Ns", valeur)
%
-
N
s
On peut chaîner plusieurs processors avec CompositeItemProcessor. Chaque processor reçoit le résultat du précédent. C’est le pattern Chaîne de responsabilité appliqué aux batchs comme le ChainFilter pour les servlet ou contrôleurs.
CompositeItemProcessor
@Bean public CompositeItemProcessor<ClientCsv, ClientCobol> processorChaine( ClientValidationProcessor validateur, ClientNettoyageProcessor nettoyeur, ClientToCoblProcessor convertisseur) { return new CompositeItemProcessorBuilder<ClientCsv, ClientCobol>() .delegates( validateur, // 1. Valider et filtrer les mauvais enregistrements nettoyeur, // 2. Nettoyer et normaliser les données convertisseur // 3. Convertir vers le format COBOL ) .build(); }
Le flux est :
ClientCsv → [validateur] → ClientCsv (ou null) → [nettoyeur] → ClientCsv → [convertisseur] → ClientCobol
Si un processor retourne null, les suivants ne sont pas appelés (l’enregistrement est filtré).
Les processors peuvent interroger une base de données ou un cache pour enrichir les données :
@Component public class ClientEnrichissementProcessor implements ItemProcessor<ClientCsv, ClientCsv> { @Autowired private RegionRepository regionRepository; // Repository Spring Data // Cache simple pour éviter de requêter la BDD pour chaque enregistrement private final Map<String, String> cacheRegions = new HashMap<>(); @Override public ClientCsv process(ClientCsv client) { // Trouver la région à partir du code postal String codePostal = client.getCodePostal(); String region = cacheRegions.computeIfAbsent( codePostal, cp -> regionRepository.findRegionByCodePostal(cp) .orElse("INCONNU") ); client.setRegion(region); return client; } }
Contrairement au Reader et au Processor qui travaillent un enregistrement à la fois, l’ItemWriter reçoit une liste (le chunk complet) et écrit tout d’un coup. C’est ce qui rend l’écriture efficace.
public interface ItemWriter<T> { void write(Chunk<? extends T> items) throws Exception; // Reçoit tous les éléments du chunk }
@Bean public FlatFileItemWriter<ClientCobol> clientCobolWriter( @Value("${batch.output.file}") Resource fichierSortie) { return new FlatFileItemWriterBuilder<ClientCobol>() .name("clientCobolWriter") .resource(fichierSortie) .encoding("ISO-8859-1") // Encodage souvent utilisé par les mainframes .lineAggregator(item -> { // Assembler une ligne à partir de l'objet ClientCobol // Chaque champ a déjà la bonne longueur grâce au processor return item.getNomFixe() + item.getPrenomFixe() + item.getTelephoneFixe() + item.getEmailFixe() + item.getCodePostalFixe() + item.getVilleFixe() + item.getCodePaysFixe(); }) .build(); }
Pour des formats plus complexes, on peut utiliser FormatterLineAggregator qui utilise String.format.
FormatterLineAggregator
String.format
@Bean public FlatFileItemWriter<ClientCobol> clientCobolWriterFormatte(Resource fichierSortie) { // Définit l'ordre des champs à extraire de l'objet BeanWrapperFieldExtractor<ClientCobol> extractor = new BeanWrapperFieldExtractor<>(); extractor.setNames(new String[]{ "nomFixe", "prenomFixe", "telephoneFixe", "emailFixe", "codePostalFixe", "villeFixe", "codePaysFixe" }); // Définit le format de chaque champ (comme String.format) FormatterLineAggregator<ClientCobol> aggregator = new FormatterLineAggregator<>(); aggregator.setFieldExtractor(extractor); // %s pour chaque champ : ils sont déjà à la bonne longueur aggregator.setFormat("%s%s%s%s%s%s%s"); return new FlatFileItemWriterBuilder<ClientCobol>() .name("clientCobolWriterFormatte") .resource(fichierSortie) .lineAggregator(aggregator) .build(); }
Les fichiers destinés aux mainframes ont souvent un enregistrement d’en-tête (header) et un enregistrement de contrôle (trailer).
@Bean public FlatFileItemWriter<ClientCobol> writerAvecHeaderTrailer(Resource fichierSortie) { return new FlatFileItemWriterBuilder<ClientCobol>() .name("writerAvecHeaderTrailer") .resource(fichierSortie) .encoding("ISO-8859-1") // En-tête : une ligne fixe au début du fichier .headerCallback(writer -> { writer.write("DEBUT-FICHIER-CLIENTS " + LocalDate.now().format( DateTimeFormatter.ofPattern("yyyyMMdd") ) + " "); // Header formaté à 40 caractères }) // Pied de page : ligne de contrôle à la fin .footerCallback(writer -> { writer.write("FIN-FICHIER-CLIENTS " + LocalDate.now().format( DateTimeFormatter.ofPattern("yyyyMMdd") ) + " "); }) .lineAggregator(item -> item.getNomFixe() + item.getPrenomFixe() + item.getTelephoneFixe() + item.getEmailFixe() + item.getCodePostalFixe() + item.getVilleFixe() + item.getCodePaysFixe() ) .build(); }
@Bean public JdbcBatchItemWriter<ClientCobol> clientDbWriter(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<ClientCobol>() .dataSource(dataSource) .sql("INSERT INTO clients_cobol " + "(nom, prenom, telephone, email, code_postal, ville, code_pays) " + "VALUES (:nomFixe, :prenomFixe, :telephoneFixe, " + ":emailFixe, :codePostalFixe, :villeFixe, :codePaysFixe)") // Utilise les noms de propriétés de l'objet comme paramètres nommés .beanMapped() .build(); }
Parfois, on veut écrire simultanément dans un fichier ET en base de données…
@Bean public CompositeItemWriter<ClientCobol> writerComposite( FlatFileItemWriter<ClientCobol> fichierWriter, JdbcBatchItemWriter<ClientCobol> dbWriter) { return new CompositeItemWriterBuilder<ClientCobol>() .delegates(fichierWriter, dbWriter) // Écrit dans les deux .build(); }
Les applications COBOL définissent leurs structures de données via des copybooks (fichiers .cpy). Chaque champ a une longueur fixe, un type (alphanumérique PIC X, numérique PIC 9, etc.) et une position précise. Idem, pour information, vous ne le ferez sans doute pas.
.cpy
PIC X
PIC 9
Exemple de copybook COBOL :
01 ENREG-CLIENT. 05 NOM-CLIENT PIC X(20). <- 20 caractères alpha 05 PRENOM-CLIENT PIC X(20). <- 20 caractères alpha 05 TELEPHONE PIC X(15). <- 15 caractères 05 EMAIL PIC X(50). <- 50 caractères 05 CODE-POSTAL PIC X(5). <- 5 caractères 05 VILLE PIC X(30). <- 30 caractères 05 CODE-PAYS PIC X(2). <- 2 caractères TOTAL = 142 caractères / enregistrement
Règles fondamentales :
Créons une classe utilitaire réutilisable pour tout le formatage :
public final class CobolFormatter { private CobolFormatter() {} // Pas d'instanciation /** * Formate une chaîne alphanumérique (PIC X(n)) : * - Null ou vide → n espaces * - Trop court → complété d'espaces à droite * - Trop long → tronqué à droite */ public static String picX(String valeur, int longueur) { if (valeur == null) valeur = ""; valeur = valeur.trim(); // Supprimer les espaces en début/fin if (valeur.length() > longueur) { return valeur.substring(0, longueur); } // %-Ns : aligné gauche, complété d'espaces return String.format("%-" + longueur + "s", valeur); } /** * Formate un entier numérique (PIC 9(n)) : * - Null ou négatif → n zéros * - Complété de zéros à gauche * - Trop grand → tronqué à gauche (les chiffres les moins significatifs) */ public static String pic9(Long valeur, int longueur) { if (valeur == null || valeur < 0) { return "0".repeat(longueur); } String s = String.valueOf(valeur); if (s.length() > longueur) { // Tronquer à gauche : garder les N derniers chiffres return s.substring(s.length() - longueur); } // %0Nd : complété de zéros à gauche return String.format("%0" + longueur + "d", valeur); } /** * Formate un décimal (PIC 9(n)V9(d)) : * - Partie entière sur nEntier chiffres, partie décimale sur nDec chiffres * - Pas de point décimal dans le fichier (convention COBOL) * Exemple : 1234.56 avec n=8, d=2 → "000123456" */ public static String pic9V(BigDecimal valeur, int nEntier, int nDecimales) { if (valeur == null) valeur = BigDecimal.ZERO; // Convertir en entier en décalant la virgule BigDecimal facteur = BigDecimal.TEN.pow(nDecimales); long entier = valeur.multiply(facteur).longValue(); return pic9(entier, nEntier + nDecimales); } /** * Formate une date (PIC X(8) au format AAAAMMJJ) */ public static String dateAaaamjj(LocalDate date) { if (date == null) return " "; // 8 espaces return date.format(DateTimeFormatter.ofPattern("yyyyMMdd")); } /** * Formate une date (PIC X(10) au format AAAA-MM-JJ) */ public static String dateIso(LocalDate date) { if (date == null) return " "; // 10 espaces return date.format(DateTimeFormatter.ISO_LOCAL_DATE); } /** * Valide qu'une chaîne a exactement la longueur attendue. * Utile pour les tests unitaires. */ public static void verifierLongueur(String champ, String nomChamp, int longueurAttendue) { if (champ.length() != longueurAttendue) { throw new IllegalStateException( "Champ '" + nomChamp + "' : longueur " + champ.length() + " au lieu de " + longueurAttendue + " → [" + champ + "]" ); } } }
public class EnregistrementCobol { // Schéma : total 142 caractères private String nom; // PIC X(20) - colonnes 1-20 private String prenom; // PIC X(20) - colonnes 21-40 private String telephone; // PIC X(15) - colonnes 41-55 private String email; // PIC X(50) - colonnes 56-105 private String codePostal; // PIC X(5) - colonnes 106-110 private String ville; // PIC X(30) - colonnes 111-140 private String codePays; // PIC X(2) - colonnes 141-142 /** * Construit la ligne COBOL complète (142 caractères exacts). * Vérifie la longueur totale pour garantir l'intégrité. */ public String toLigneCobol() { String ligne = nom + prenom + telephone + email + codePostal + ville + codePays; // Vérification de sécurité assert ligne.length() == 142 : "Enregistrement COBOL invalide : " + ligne.length() + " chars (attendu 142)"; return ligne; } // getters et setters... }
@Component public class CsvToCobolProcessor implements ItemProcessor<ClientCsv, EnregistrementCobol> { private static final Logger log = LoggerFactory.getLogger(CsvToCobolProcessor.class); // Compteur pour les enregistrements filtrés (pour le rapport final) private final AtomicInteger nbFiltres = new AtomicInteger(0); @Override public EnregistrementCobol process(ClientCsv client) { // Validation préalable : si les données sont vraiment inexploitables, on filtre if (client.getNom() == null || client.getNom().isBlank()) { log.warn("Enregistrement filtré (nom vide) : {}", client); nbFiltres.incrementAndGet(); return null; // Filtré → pas écrit } EnregistrementCobol enreg = new EnregistrementCobol(); // Formatage de chaque champ selon le schéma COBOL // CobolFormatter garantit la longueur exacte enreg.setNom(CobolFormatter.picX(client.getNom(), 20)); enreg.setPrenom(CobolFormatter.picX(client.getPrenom(), 20)); enreg.setTelephone(CobolFormatter.picX(nettoyerTelephone(client.getTelephone()), 15)); enreg.setEmail(CobolFormatter.picX(client.getEmail(), 50)); enreg.setCodePostal(CobolFormatter.picX(client.getCodePostal(), 5)); enreg.setVille(CobolFormatter.picX(client.getVille(), 30)); enreg.setCodePays(CobolFormatter.picX(client.getCodePays(), 2)); // Vérification que la ligne fait bien 142 caractères String ligne = enreg.toLigneCobol(); if (ligne.length() != 142) { log.error("ERREUR CRITIQUE : enregistrement de {} chars ! Client : {}", ligne.length(), client.getNom()); throw new IllegalStateException("Longueur d'enregistrement incorrecte : " + ligne.length()); } return enreg; } private String nettoyerTelephone(String tel) { if (tel == null) return ""; // Garder seulement les chiffres et le + return tel.replaceAll("[^0-9+]", ""); } public int getNbFiltres() { return nbFiltres.get(); } }
@Bean public FlatFileItemWriter<EnregistrementCobol> cobolFileWriter( @Value("${batch.output.file}") Resource fichierSortie) { return new FlatFileItemWriterBuilder<EnregistrementCobol>() .name("cobolFileWriter") .resource(fichierSortie) .encoding("ISO-8859-1") // Latin-1 : compatible avec les mainframes européens .shouldDeleteIfExists(true) // Effacer le fichier précédent si existant .headerCallback(writer -> { // Header COBOL : 142 espaces (enregistrement vide de contrôle) // Ou une ligne d'identification standard writer.write(CobolFormatter.picX("DEBUT", 142)); }) .footerCallback(writer -> { writer.write(CobolFormatter.picX("FIN", 142)); }) .lineAggregator(enreg -> { String ligne = enreg.toLigneCobol(); // Double vérification au moment de l'écriture if (ligne.length() != 142) { throw new IllegalStateException( "Ligne COBOL invalide : " + ligne.length() + " caractères" ); } return ligne; }) .build(); }
Dans le monde réel, les fichiers d’entrée sont imparfaits :
Si on laisse Spring Batch planter à la première erreur, le batch n’ira jamais au bout.
Spring Batch propose trois stratégies pour gérer les erreurs :
@Bean public Step etapeAvecSkip(ItemReader<ClientCsv> reader, ItemProcessor<ClientCsv, EnregistrementCobol> processor, ItemWriter<EnregistrementCobol> writer) { return new StepBuilder("etapeAvecSkip", jobRepository) .<ClientCsv, EnregistrementCobol>chunk(100, transactionManager) .reader(reader) .processor(processor) .writer(writer) // Configuration du Skip .faultTolerant() // Active la tolérance aux pannes .skip(FlatFileParseException.class) // Ignorer les lignes non parsables .skip(ValidationException.class) // Ignorer les erreurs de validation .skipLimit(50) // Maximum 50 erreurs tolérées, au-delà le Job échoue .build(); }
skipLimit est crucial : sans limite, un fichier entièrement corrompu serait “traité” avec 0 enregistrement écrits mais aucune erreur signalée. La limite force le batch à échouer si trop d’erreurs surviennent, ce qui alerte l’équipe.
skipLimit
@Component public class ClientSkipListener implements SkipListener<ClientCsv, EnregistrementCobol> { private static final Logger log = LoggerFactory.getLogger(ClientSkipListener.class); // Appelé quand un enregistrement est ignoré pendant la LECTURE @Override public void onSkipInRead(Throwable t) { log.error("Ligne ignorée en lecture : {}", t.getMessage()); } // Appelé quand un enregistrement est ignoré pendant le TRAITEMENT @Override public void onSkipInProcess(ClientCsv item, Throwable t) { log.error("Enregistrement ignoré en traitement : {} → {}", item.getNom() + " " + item.getPrenom(), t.getMessage()); } // Appelé quand un enregistrement est ignoré pendant l'ÉCRITURE @Override public void onSkipInWrite(EnregistrementCobol item, Throwable t) { log.error("Enregistrement ignoré en écriture : {} → {}", item.getNom(), t.getMessage()); } }
Enregistrement du listener dans le Step :
.faultTolerant() .skip(FlatFileParseException.class) .skipLimit(50) .listener(clientSkipListener) // Enregistrer le listener
Le retry est utile pour les erreurs transitoires (timeout réseau, BDD temporairement surchargée) :
@Bean public Step etapeAvecRetry(ItemReader<ClientCsv> reader, ItemProcessor<ClientCsv, EnregistrementCobol> processor, ItemWriter<EnregistrementCobol> writer) { return new StepBuilder("etapeAvecRetry", jobRepository) .<ClientCsv, EnregistrementCobol>chunk(100, transactionManager) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .retry(TransientDataAccessException.class) // Réessayer pour les erreurs BDD transitoires .retry(ResourceAccessException.class) // Réessayer pour les erreurs réseau .retryLimit(3) // Maximum 3 tentatives par enregistrement .build(); }
.faultTolerant() // Retry pour les erreurs transitoires (réseau, BDD temporaire) .retry(TransientDataAccessException.class) .retryLimit(3) // Skip pour les erreurs définitives (données invalides) .skip(ValidationException.class) .skip(FlatFileParseException.class) .skipLimit(100) .listener(skipListener)
Les listeners permettent d’intercepter les événements du cycle de vie d’un Job ou d’un Step pour ajouter de la logique transversale : logging, notifications, métriques, envoi d’email à la fin du traitement.
@Component public class JobRapportListener implements JobExecutionListener { private static final Logger log = LoggerFactory.getLogger(JobRapportListener.class); // Appelé AVANT le démarrage du Job @Override public void beforeJob(JobExecution jobExecution) { log.info("=== DÉBUT DU JOB : {} ===", jobExecution.getJobInstance().getJobName()); log.info("Paramètres : {}", jobExecution.getJobParameters()); log.info("Démarré à : {}", LocalDateTime.now()); } // Appelé APRÈS la fin du Job (qu'il ait réussi ou échoué) @Override public void afterJob(JobExecution jobExecution) { log.info("=== FIN DU JOB : {} ===", jobExecution.getJobInstance().getJobName()); log.info("Statut final : {}", jobExecution.getStatus()); log.info("Terminé à : {}", LocalDateTime.now()); // Afficher les statistiques de chaque Step for (StepExecution step : jobExecution.getStepExecutions()) { log.info("Step '{}' : lus={}, traités={}, écrits={}, ignorés={}, erreurs={}", step.getStepName(), step.getReadCount(), step.getProcessSkipCount() == 0 ? step.getWriteCount() : "N/A", step.getWriteCount(), step.getSkipCount(), step.getReadSkipCount() + step.getWriteSkipCount() ); } // Envoyer une notification selon le statut if (jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("Job terminé avec succès !"); // envoyerEmailSucces(jobExecution); } else if (jobExecution.getStatus() == BatchStatus.FAILED) { log.error("Job échoué ! Vérifier les logs pour les détails."); // envoyerEmailEchec(jobExecution); } } }
Enregistrement dans le Job :
@Bean public Job importJob(Step etapePrincipale, JobRapportListener listener) { return new JobBuilder("importJob", jobRepository) .listener(listener) // Enregistrer le listener .start(etapePrincipale) .build(); }
@Component public class StepProgressListener implements StepExecutionListener { private static final Logger log = LoggerFactory.getLogger(StepProgressListener.class); @Override public void beforeStep(StepExecution stepExecution) { log.info("--- Début step : {} ---", stepExecution.getStepName()); } @Override public ExitStatus afterStep(StepExecution stepExecution) { log.info("--- Fin step : {} | Lus: {} | Écrits: {} | Ignorés: {} ---", stepExecution.getStepName(), stepExecution.getReadCount(), stepExecution.getWriteCount(), stepExecution.getSkipCount() ); // On peut modifier l'ExitStatus pour influencer le flux du Job // Par exemple : si trop de données ignorées, marquer comme WARNING if (stepExecution.getSkipCount() > stepExecution.getWriteCount() * 0.1) { log.warn("Plus de 10% d'enregistrements ignorés !"); return new ExitStatus("COMPLETED_WITH_WARNINGS"); } return stepExecution.getExitStatus(); // Retourner le statut normal } }
@Component public class ProgressChunkListener implements ChunkListener { private static final Logger log = LoggerFactory.getLogger(ProgressChunkListener.class); private int chunkCount = 0; @Override public void beforeChunk(ChunkContext context) { // Appelé avant chaque chunk } @Override public void afterChunk(ChunkContext context) { chunkCount++; StepExecution step = context.getStepContext().getStepExecution(); int total = step.getWriteCount(); // Afficher la progression tous les 10 chunks if (chunkCount % 10 == 0) { log.info("Progression : {} enregistrements écrits...", total); } } @Override public void afterChunkError(ChunkContext context) { log.warn("Erreur dans le chunk #{}", chunkCount); } }
Les JobParameters jouent deux rôles essentiels :
// Créer des paramètres JobParameters params = new JobParametersBuilder() .addString("fichierEntree", "/data/clients_2024-01-15.csv") .addString("fichierSortie", "/data/output/clients_cobol_2024-01-15.dat") .addLocalDate("dateTraitement", LocalDate.now()) .addLong("timestamp", System.currentTimeMillis()) // Pour l'unicité .toJobParameters();
@Component @StepScope // IMPORTANT : scope Step, recréé pour chaque exécution public class FichierEntreeReader { @Value("#{jobParameters['fichierEntree']}") private String cheminFichier; // Le reader est créé avec le bon chemin au moment de l'exécution du Step @Bean @StepScope public FlatFileItemReader<ClientCsv> readerDynamique( @Value("#{jobParameters['fichierEntree']}") String chemin) { return new FlatFileItemReaderBuilder<ClientCsv>() .name("readerDynamique") .resource(new FileSystemResource(chemin)) .linesToSkip(1) .delimited().delimiter(";") .names("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays") .targetType(ClientCsv.class) .build(); } }
@StepScope est crucial quand on utilise #{jobParameters[...]} ! Sans @StepScope, le bean est créé au démarrage de l’application, avant que les paramètres soient connus. Avec @StepScope, le bean est créé au moment de l’exécution du Step, quand les paramètres sont disponibles.
@StepScope
#{jobParameters[...]}
C’est l’une des fonctionnalités les plus précieuses de Spring Batch. Si un Job échoue à mi-chemin, il peut reprendre exactement là où il s’était arrêté.
// Lancer un job JobExecution execution1 = jobLauncher.run(monJob, params); // → Suppose qu'il échoue après 50 000 enregistrements // Relancer avec les MÊMES paramètres = reprise depuis le dernier checkpoint JobExecution execution2 = jobLauncher.run(monJob, params); // → Reprend depuis le chunk 501 (si chunkSize=100 et 50 000 lus)
Comment ça marche ? Le JobRepository stocke le nombre d’enregistrements lus (readCount) pour chaque StepExecution. Au redémarrage, le FlatFileItemReader sait qu’il doit sauter les N premiers enregistrements déjà traités.
readCount
Par défaut, Spring Batch refuse de relancer un Job avec les mêmes paramètres si son statut est COMPLETED (pour éviter le double traitement). Pour forcer le relancement :
COMPLETED
@Bean public Job monJob() { return new JobBuilder("monJob", jobRepository) .incrementer(new RunIdIncrementer()) // Ajoute un ID unique à chaque lancement .start(etapePrincipale()) .build(); } // Ou en passant un timestamp comme paramètre JobParameters params = new JobParametersBuilder() .addString("fichier", "clients.csv") .addLong("run.id", System.currentTimeMillis()) // Toujours unique .toJobParameters();
On peut définir des flux conditionnels : si le Step 1 réussit → aller au Step 2, sinon → aller au Step de gestion d’erreur.
@Bean public Job jobConditiohennel(Step etapeValidation, Step etapeTraitement, Step etapeGestionErreur, Step etapeNotification) { return new JobBuilder("jobConditionnel", jobRepository) .start(etapeValidation) .on("COMPLETED").to(etapeTraitement) // Si validation OK → traitement .on("FAILED").to(etapeGestionErreur) // Si validation KO → gestion erreur .from(etapeTraitement) .on("*").to(etapeNotification) // Après traitement (quoi qu'il arrive) → notification .from(etapeGestionErreur) .on("*").end(BatchStatus.FAILED) // Après gestion erreur → terminer en FAILED .end() .build(); }
Les codes de sortie (ExitStatus) sont des chaînes de caractères. Les standards sont COMPLETED, FAILED, STOPPED, mais on peut définir les siens :
ExitStatus
FAILED
STOPPED
// Dans un StepExecutionListener.afterStep() if (step.getSkipCount() > 0) { return new ExitStatus("COMPLETED_WITH_WARNINGS"); } return ExitStatus.COMPLETED; // Dans le Job, on peut tester cette valeur personnalisée .on("COMPLETED_WITH_WARNINGS").to(etapeAlerte)
Pour des décisions plus complexes, on utilise un JobExecutionDecider :
JobExecutionDecider
@Component public class FichierVideDecider implements JobExecutionDecider { @Value("${batch.input.file}") private String cheminFichier; @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { File fichier = new File(cheminFichier); if (!fichier.exists() || fichier.length() == 0) { return new FlowExecutionStatus("FICHIER_VIDE"); } return FlowExecutionStatus.COMPLETED; } } // Utilisation dans le Job @Bean public Job jobAvecDecision(FichierVideDecider decider, Step etapeTraitement, Step etapeFichierVide) { return new JobBuilder("jobAvecDecision", jobRepository) .start(decider) .on("FICHIER_VIDE").to(etapeFichierVide).end() .on("COMPLETED").to(etapeTraitement) .end() .build(); }
Pour traiter des fichiers très volumineux encore plus vite, Spring Batch permet de diviser le travail en partitions traitées en parallèle par plusieurs threads.
Analogie : Au lieu d’un seul convoyeur qui traite 1 million de lignes, on a 8 convoyeurs qui en traitent 125 000 chacun en même temps. Le temps total est divisé par ~8.
@Configuration public class PartitionConfig { @Bean public Step stepMaitre(Partitioner partitioner, Step stepEsclave, TaskExecutor taskExecutor) { return new StepBuilder("stepMaitre", jobRepository) .partitioner("stepEsclave", partitioner) .step(stepEsclave) // Step à exécuter en parallèle .gridSize(8) // Nombre de partitions (= nb de threads) .taskExecutor(taskExecutor) // Exécuteur de threads .build(); } // Divise le fichier en N plages de lignes @Bean public Partitioner fichierPartitioner( @Value("${batch.input.file}") Resource fichier, @Value("${batch.partition.size:8}") int nbPartitions) { MultiResourcePartitioner partitioner = new MultiResourcePartitioner(); // Pour plusieurs fichiers : un fichier par partition // Pour un seul gros fichier : diviser par plages de lignes return gridSize -> { Map<String, ExecutionContext> partitions = new HashMap<>(); // Calculer les plages // ... (logique de partitionnement) return partitions; }; } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(8); executor.setMaxPoolSize(8); executor.setQueueCapacity(0); executor.setThreadNamePrefix("batch-partition-"); executor.initialize(); return executor; } }
Une approche plus simple pour paralléliser est le traitement multi-threadé d’un seul Step :
@Bean public Step stepMultiThreade(ItemReader<ClientCsv> reader, ItemProcessor<ClientCsv, EnregistrementCobol> processor, ItemWriter<EnregistrementCobol> writer) { return new StepBuilder("stepMultiThreade", jobRepository) .<ClientCsv, EnregistrementCobol>chunk(100, transactionManager) .reader(reader) .processor(processor) .writer(writer) .taskExecutor(new SimpleAsyncTaskExecutor("batch-thread-")) // Multi-thread ! .throttleLimit(4) // Maximum 4 threads simultanés .build(); }
Attention avec le multi-threading ! Le FlatFileItemReader n’est pas thread-safe par défaut. Pour un Step multi-threadé, utilisez un reader SynchronizedItemStreamReader ou lisez depuis une base de données avec un JdbcPagingItemReader qui est thread-safe.
SynchronizedItemStreamReader
JdbcPagingItemReader
@Bean @StepScope public SynchronizedItemStreamReader<ClientCsv> synchronizedReader( @Value("#{jobParameters['fichierEntree']}") String chemin) { FlatFileItemReader<ClientCsv> innerReader = new FlatFileItemReaderBuilder<ClientCsv>() .name("innerReader") .resource(new FileSystemResource(chemin)) .linesToSkip(1) .delimited().delimiter(";") .names("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays") .targetType(ClientCsv.class) .build(); // Wrapping du reader dans un synchronized wrapper return new SynchronizedItemStreamReader<>(innerReader); }
La taille de chunk est un paramètre critique pour les performances. Voici les recommandations :
Chunk trop petit (ex: 1) → Trop de transactions → Lent Chunk optimal (ex: 100-1000) → Bon compromis Chunk trop grand (ex: 100000) → Trop de mémoire → OutOfMemoryError → En cas d'erreur, on recommence un grand chunk
En pratique :
@Value("${batch.chunk.size:1000}") private int chunkSize; // ... dans la config du Step : .<ClientCsv, EnregistrementCobol>chunk(chunkSize, transactionManager)
# PostgreSQL pour le JobRepository en production spring.datasource.url=jdbc:postgresql://localhost:5432/batchdb spring.datasource.username=batch_user spring.datasource.password=secret spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect # Spring Batch crée les tables si elles n'existent pas spring.batch.jdbc.initialize-schema=embedded # "embedded" = seulement pour H2/HSQL, "always" = toujours, "never" = jamais
En production, les tables du JobRepository (BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION, etc.) doivent être créées manuellement avec les scripts SQL fournis par Spring Batch dans le JAR.
BATCH_JOB_INSTANCE
BATCH_JOB_EXECUTION
BATCH_STEP_EXECUTION
@Component public class BatchScheduler { @Autowired private JobLauncher jobLauncher; @Autowired private Job importClientsJob; // Lancer le job tous les jours à 2h du matin @Scheduled(cron = "0 0 2 * * ?") public void lancerImportNocturne() throws Exception { String dateDuJour = LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE); String nomFichier = "/data/input/clients_" + dateDuJour + ".csv"; JobParameters params = new JobParametersBuilder() .addString("fichierEntree", nomFichier) .addLong("timestamp", System.currentTimeMillis()) .toJobParameters(); log.info("Lancement batch nocturne pour le fichier : {}", nomFichier); jobLauncher.run(importClientsJob, params); } }
Ne pas oublier d’activer le scheduling dans la classe de lancement de Spring Boot :
@SpringBootApplication @EnableScheduling // Active @Scheduled public class BatchApplication { public static void main(String[] args) { SpringApplication.run(BatchApplication.class, args); } }
@SpringBatchTest @SpringBootTest class ImportStepTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Autowired private JobRepositoryTestUtils jobRepositoryTestUtils; @AfterEach void tearDown() { jobRepositoryTestUtils.removeJobExecutions(); } @Test void testEtapeLecture() throws Exception { // Lancer seulement le Step à tester, pas tout le Job JobExecution execution = jobLauncherTestUtils.launchStep( "etapeLecture", new JobParametersBuilder() .addString("fichierEntree", "classpath:test/clients_test.csv") .toJobParameters() ); assertEquals(BatchStatus.COMPLETED, execution.getStatus()); StepExecution step = execution.getStepExecutions().iterator().next(); assertEquals(5, step.getReadCount()); // 5 lignes lues assertEquals(5, step.getWriteCount()); // 5 lignes écrites assertEquals(0, step.getSkipCount()); // Aucune ignorée } }
Objectif de ce TP, lire des données depuis un fichier plat et Excel et les sauvegarder en base de données H2.
Lien vers le TP complet
Objectif de ce TP, lire des données depuis un fichier CSV et créer un fichier adapté pour Cobol.
Lien vers l’énoncé complet du TP
Vous allez construire CobolPipeline, un batch Spring Boot qui :
;
Fichier d’entrée (extrait) :
matricule;nom;prenom;service;salaire;dateEmbauche;codeAgence EMP001;Dupont;Jean;INFORMATIQUE;45000.00;2018-03-15;AG001 EMP002;Martin;Marie;COMPTABILITE;38500.50;2020-07-01;AG002 ...
Fichier de sortie COBOL (schéma) :
Champ Longueur Type Position matricule 8 PIC X(8) 1-8 nom 25 PIC X(25) 9-33 prenom 25 PIC X(25) 34-58 service 20 PIC X(20) 59-78 salaire 10 PIC 9(8)V9(2) 79-88 (sans virgule) dateEmbauche 8 PIC X(8) 89-96 (AAAAMMJJ) codeAgence 5 PIC X(5) 97-101 TOTAL 101 caractères/enregistrement
Fin du cours – Bon batch