Aller au contenu

Formation Spring Batch — Traitement par lots en Java

Contexte métier : Traitement de fichiers volumineux, reformatage et sauvegarde Excel ou BD

Nous allons profiter de cette dernière semaine pour revoir certains concepts java si besoin et bien entendu en apprendre de nouveau.

Au programme de la semaine (je sais que vous avez votre fête d’entreprise le jeudi soir ;)) :


Table des matières

  1. Pourquoi Spring Batch ?
  2. Architecture et concepts fondamentaux
  3. Mise en place du projet
  4. Le Job et les Steps
  5. Lire des données : les ItemReader
  6. Transformer des données : les ItemProcessor
  7. Écrire des données : les ItemWriter
  8. Reformatage à taille fixe pour COBOL
  9. Gestion des erreurs, Skip et Retry
  10. Listeners et suivi d’exécution
  11. JobParameters et relance de Jobs
  12. Steps conditionnelles et flux avancés
  13. Partitioning et performance
  14. Bonnes pratiques et surveillance
  15. TP Lecture fichier plat et Excel
  16. TP Pipeline COBOL-READY

Lien vers une synthèse rapide avec un exemple simpe avec Spring Batch

1. Pourquoi Spring Batch ?

1.1 Le contexte du traitement par lots

Dans le monde professionnel, de nombreuses opérations ne peuvent pas être faites en temps réel : traiter des millions de lignes comptables la nuit, générer des relevés bancaires pour tous les clients le week-end, importer un catalogue produit de 2 millions d’articles, ou encore reformater des fichiers volumineux pour les envoyer à des applications COBOL mainframe.

Ces traitements partagent des caractéristiques communes :

Analogie simple : Imaginez une usine d’embouteillage. La chaîne de production lit des bouteilles vides (lecture), les remplit et les étiquette (traitement), puis les met en cartons (écriture). Si une bouteille est cassée, on la met de côté sans arrêter toute la chaîne. À la fin, on sait combien de bouteilles ont été produites et combien ont été rejetées. Spring Batch, c’est le chef d’usine de vos traitements de données.

1.2 Pourquoi pas simplement une boucle Java ?

On pourrait se demander : pourquoi ne pas écrire une simple boucle while qui lit le fichier ligne par ligne et l’écrit ? C’est possible, mais dès que les fichiers deviennent volumineux ou que le besoin évolue, on rencontre des problèmes.

Problème Solution artisanale Spring Batch
Fichier de 10 millions de lignes Tout en mémoire → OutOfMemoryError Chunk-oriented : on traite N lignes à la fois
Erreur à la ligne 500 000 Tout recommencer Reprise depuis le dernier checkpoint
Plusieurs fichiers en parallèle Code complexe de threads Partitioning natif
Savoir combien ont réussi/échoué À coder soi-même Métadonnées automatiques en base
Planifier l’exécution Débrouille Intégration native avec Spring Scheduler, Quartz

1.3 Le contexte COBOL

Les applications COBOL, encore très présentes dans les banques, assurances et administrations (sinon vous ne seriez pas là ;), consomment des fichiers à longueur fixe : chaque champ occupe exactement N caractères, complété par des espaces si besoin. C’est la notion de “layout” COBOL.

Exemple d’un enregistrement COBOL pour un client :

DUPONT     JEAN               0123456789FR75001    

Spring Batch est parfaitement adapté à ce besoin : lire un fichier CSV moderne, transformer chaque champ pour respecter la longueur exacte, et écrire un fichier à longueur fixe.


2. Architecture et concepts fondamentaux

2.1 Les briques de base

Avant d’écrire la moindre ligne de code, il faut comprendre le vocabulaire de Spring Batch. Voici la hiérarchie des concepts, du plus grand au plus petit :

JobLauncher
    └── Job  (le traitement complet)
          ├── Step 1  (lire et valider le fichier)
          ├── Step 2  (transformer et écrire)
          └── Step 3  (archiver et notifier)
                └── Chunk  (groupe de N enregistrements traités ensemble)
                      ├── ItemReader   (lit 1 enregistrement)
                      ├── ItemProcessor (transforme 1 enregistrement)
                      └── ItemWriter   (écrit N enregistrements d'un coup)

2.2 Le Job

Un Job est l’unité de traitement de plus haut niveau. C’est “le travail à faire”, défini une fois, exécutable plusieurs fois. Chaque exécution d’un Job crée une JobInstance unique (identifiée par ses paramètres), et chaque tentative d’exécution d’une JobInstance est une JobExecution. Je sais que cela semble un peu long comme chemin mais cela se justifie programmatiquement parlant.

Job "ImporterClients"
  ├── JobInstance (params: fichier=""clients_2024-01-15.csv")
  │     ├── JobExecution #1 : FAILED (erreur réseau à 60%)
  │     └── JobExecution #2 : COMPLETED (reprise depuis 60%)
  └── JobInstance (params: fichier="clients_2024-01-16.csv")
        └── JobExecution #1 : COMPLETED

2.3 Le Step

Un Step est une étape indépendante d’un Job. Un Step peut être :

Chaque Step a sa propre StepExecution qui stocke les métriques : nombre de lignes lues, traitées, écrites, ignorées ou en erreur. On a pas besoin de gérer tout ça.

2.4 Le traitement orienté Chunk

C’est le cœur de Spring Batch. Le principe est simple mais puissant.

  1. L’ItemReader lit un enregistrement à la fois
  2. L’ItemProcessor transforme un enregistrement à la fois
  3. Quand le nombre d’enregistrements lus atteint la taille du chunk (exemple: 100), l’ItemWriter écrit les 100 enregistrements d’un coup dans une transaction
  4. Si l’écriture échoue, on ne recommence que ce chunk de 100, pas tout depuis le début…
Chunk size = 3

Read  → [A]
Process → [A']
Read  → [B]
Process → [B']
Read  → [C]
Process → [C']
Write → [A', B', C']  ← une seule transaction pour 3 éléments car le chunk = 3
                      ← commit → checkpoint sauvegardé

Read  → [D]
...

Pourquoi chunk et pas enregistrement par enregistrement ? Ouvrir/fermer une transaction pour chaque ligne serait catastrophique pour les performances. En groupant N lignes par transaction, on divise le nombre de transactions par N. Pour 1 million de lignes avec chunk=1000, c’est 1000 transactions au lieu de 1 million ou plus !

2.5 Le JobRepository : la mémoire de Spring Batch

Le JobRepository est la base de données interne de Spring Batch. Il stocke automatiquement toutes les informations sur les exécutions :

Par défaut en développement, Spring Batch utilise une base H2 en mémoire que nous avons l’habitude d’utiliser pour nos tests dans les applications Spring Boot. En production, on configure une base PostgreSQL, MySQL ou Oracle.

2.6 Le JobLauncher

Le JobLauncher est le composant qui lance un Job avec des paramètres donnés. C’est le point d’entrée.

// lancement simple
jobLauncher.run(monJob, new JobParameters());

// avec paramètres
JobParameters params = new JobParametersBuilder()
    .addString("fichier", "clients_2024-01-15.csv")
    .addDate("date", new Date())
    .toJobParameters();
jobLauncher.run(monJob, params); // ici on associe le Job et les paramètres

2.7 Vue d’ensemble en schéma

┌─────────────────────────────────────────────────────────────┐
│                         Spring Batch                        │
│                                                             │
│  JobLauncher ──────► Job ──────► Step 1 ──────► Step 2      │
│      │                │              │               │      │
│      │                │         [Chunk]          [Tasklet]  │
│      │                │         Reader                      │
│      │                │         Processor                   │
│      │                │         Writer                      │
│      │                │                                     │
│      └────────────────┴──────► JobRepository                │
│                                (PostgreSQL / H2)            │
└─────────────────────────────────────────────────────────────┘

3. Mise en place du projet

l’idéal, pour comprendre Spring Batch, rien de mieux que le code avec un exemple concret.

3.1 Dépendances Maven

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.0</version>
</parent>

<dependencies>
    <!-- Spring Batch : inclut spring-batch-core et spring-batch-infrastructure -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>

    <!-- Base de données pour le JobRepository -->
    <!-- En développement : H2 en mémoire -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>

    <!-- En production : PostgreSQL -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <scope>runtime</scope>
    </dependency>

    <!-- Pour les tests -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.2 Configuration application.properties

# ============ Base de données du JobRepository ============
# En développement : H2 en mémoire (données perdues au redémarrage)
spring.datasource.url=jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1
spring.datasource.driver-class-name=org.drivers.H2Driver
spring.datasource.username=sa
spring.datasource.password=

# Spring Batch crée automatiquement ses tables (BATCH_JOB_INSTANCE, etc.)
spring.batch.jdbc.initialize-schema=always

# NE PAS lancer les Jobs automatiquement au démarrage
# On veut contrôler quand les Jobs s'exécutent
spring.batch.job.enabled=false

# ============ Logs ============
logging.level.org.springframework.batch=INFO

ou en YAML si vous préférez :

# ============ Base de données du JobRepository ============
spring:
  datasource:
    url: jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1
    driver-class-name: org.drivers.H2Driver
    username: sa
    password: ""

  batch:
    jdbc:
      initialize-schema: always
    job:
      enabled: false

# ============ Logs ============
logging:
  level:
    org.springframework.batch: INFO

spring.batch.job.enabled=false est crucial ! Par défaut, Spring Boot lance tous les Jobs trouvés dans le contexte au démarrage de l’application. C’est pratique pour les démos mais dangereux en production : on veut déclencher les Jobs manuellement ou via un planificateur.

3.3 Structure de projet

src/main/java/com/monapp/batch/
├── BatchApplication.java          ← Point d'entrée Spring Boot comme d'habitude
├── config/
│   └── BatchConfig.java           ← Configuration des Jobs et Steps
├── model/
│   └── Client.java                ← Modèle de données
├── reader/
│   └── ClientCsvReader.java       ← Lecteur de fichier CSV, TXT, JSON, ou autres... 
├── processor/
│   └── ClientProcessor.java       ← Transformation des données
├── writer/
│   └── ClientFixedWriter.java     ← Écriture du fichier longueur fixe
└── listener/
    └── JobNotificationListener.java  ← Suivi d'exécution

src/main/resources/
├── input/
│   └── clients.csv                ← Fichier d'entrée csv
│   └── clients.xls                ← Fichier d'entrée xls
│   └── clients.txt                ← Fichier d'entrée txt
└── output/                        ← Répertoire de sortie

4. Le Job et les Steps

4.1 Créer un Job simple

Un Job se configure dans une classe annotée @Configuration. On utilise les builders fournis par Spring Batch.

@Configuration
public class BatchConfig {

    // Spring Batch 5 (Spring Boot 3) injecte automatiquement ces beans
    // On peut ultiliser cette annoation ici lorsque l'on ne fait pas de tests
    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    // ---- Définition du Job ----
    @Bean
    public Job importClientsJob(Step etapeLecture, Step etapeArchivage) {
        return new JobBuilder("importClientsJob", jobRepository)
            .start(etapeLecture)          // Première étape
            .next(etapeArchivage)          // Deuxième étape (si la première réussit)
            .build();
    }

    // ---- Définition d'un Step Chunk ----
    @Bean
    public Step etapeLecture(
            ItemReader<Client> reader,
            ItemProcessor<Client, ClientCobol> processor,
            ItemWriter<ClientCobol> writer) {

        return new StepBuilder("etapeLecture", jobRepository)
            .<Client, ClientCobol>chunk(100, transactionManager)
            // <InputType, OutputType>chunk(tailleChunk, transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    // ---- Définition d'un Step Tasklet ----
    @Bean
    public Step etapeArchivage() {
        return new StepBuilder("etapeArchivage", jobRepository)
            .tasklet((contribution, chunkContext) -> {
                // Code simple à exécuter une seule fois
                System.out.println("Archivage du fichier traité...");
                // Logique d'archivage ici
                return RepeatStatus.FINISHED;  // Dire à Spring Batch que c'est terminé
            }, transactionManager)
            .build();
    }
}

RepeatStatus.FINISHED par rapport à RepeatStatus.CONTINUABLE : Une Tasklet peut tourner en boucle ! Si vous retournez CONTINUABLE, Spring Batch rappellera votre tasklet en boucle jusqu’à ce qu’elle retourne FINISHED. Utile pour des sondages actifs.

4.2 Lancer un Job

@Service
public class JobLanceur {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importClientsJob;

    public void lancerImport(String nomFichier) throws Exception {
        // Les JobParameters identifient l'exécution de façon unique
        // Si vous relancez avec les mêmes paramètres, Spring Batch reprend
        // là où il s'était arrêté (ou refuse si déjà COMPLETED)
        JobParameters params = new JobParametersBuilder()
            .addString("fichierEntree", nomFichier)
            .addLocalDateTime("lancementLe", LocalDateTime.now()) // pour unicité
            .toJobParameters();

        JobExecution execution = jobLauncher.run(importClientsJob, params);

        System.out.println("Statut : " + execution.getStatus());
        System.out.println("Début : " + execution.getStartTime());
        System.out.println("Fin   : " + execution.getEndTime());
    }
}

4.3 Tasklet : les cas d’usage

Les Tasklets sont parfaites pour des tâches de préparation ou de clôture :

// Tasklet pour vérifier qu'un fichier existe avant de commencer ce qui est normal
@Bean
public Step verifierFichier(@Value("${batch.input.file}") String cheminFichier) {
    return new StepBuilder("verifierFichier", jobRepository)
        .tasklet((contribution, chunkContext) -> {
            File fichier = new File(cheminFichier);
            if (!fichier.exists()) {
                throw new IllegalStateException(
                    "Fichier introuvable : " + cheminFichier
                );
            }
            if (fichier.length() == 0) {
                throw new IllegalStateException("Fichier vide !");
            }
            System.out.println("Fichier vérifié : " + fichier.length() + " octets");
            return RepeatStatus.FINISHED;
        }, transactionManager)
        .build();
}

// Tasklet pour nettoyer le répertoire de sortie avant écriture
@Bean
public Step nettoyerSortie(@Value("${batch.output.dir}") String repertoireSortie) {
    return new StepBuilder("nettoyerSortie", jobRepository)
        .tasklet((contribution, chunkContext) -> {
            Path sortie = Paths.get(repertoireSortie);
            if (Files.exists(sortie)) {
                Files.walk(sortie)
                     .sorted(Comparator.reverseOrder())
                     .map(Path::toFile)
                     .forEach(File::delete);
            }
            Files.createDirectories(sortie);
            return RepeatStatus.FINISHED;
        }, transactionManager)
        .build();
}

5. Lire des données : les ItemReader

5.1 Comprendre les ItemReader

Un ItemReader lit un seul enregistrement à la fois et retourne null quand il n’y a plus rien à lire. Spring Batch appelle en boucle la méthode read() jusqu’à obtenir null.

public interface ItemReader<T> {
    T read() throws Exception;
    // Retourne null = fin des données
}

Spring Batch fournit de nombreux ItemReader prêts à l’emploi. On les construit avec des builders ou des factories.

Pour information, voici un tableau complet des principales classes de Spring Batch pour lire et transformer différents types de fichiers, selon leur extension ou leur format. Ces classes héritent généralement de **ItemReader** ou **ItemStreamReader** :

Type de fichier Classe Spring Batch Description Dépendances/Remarques
CSV FlatFileItemReader Lit des fichiers CSV (délimités par des virgules, points-virgules, etc.). Utilise LineMapper et FieldSetMapper pour mapper les lignes vers des objets.
Fichiers texte délimités (TXT, custom) FlatFileItemReader Lit des fichiers texte avec un délimiteur personnalisé (tabulation, pipe, etc.). Identique à CSV, mais avec un délimiteur différent.
XML StaxEventItemReader Lit des fichiers XML en utilisant StAX (Streaming API for XML). Nécessite un Unmarshaller (JAXB, Jackson, etc.) pour mapper le XML vers des objets.
JSON JsonItemReader Lit des fichiers JSON (array ou objets). Utilise Jackson ou Gson pour désérialiser le JSON.
JSON (lignes) JsonItemReader (mode streaming) Lit des fichiers JSON où chaque ligne est un objet JSON (format JSON Lines). Idéal pour les gros fichiers.
Excel (XLSX, XLS) PoiItemReader (via Apache POI) Lit des fichiers Excel (XLSX ou XLS). Nécessite la dépendance org.apache.poi:poi-ooxml pour XLSX.
Base de données JdbcCursorItemReader Lit des données depuis une base de données via JDBC. Utilise une requête SQL et un RowMapper pour mapper les lignes vers des objets.
JPA/Hibernate JpaPagingItemReader Lit des données via JPA (avec pagination). Utilise EntityManager et une requête JPQL.
MongoDB MongoItemReader Lit des documents depuis une collection MongoDB. Nécessite Spring Data MongoDB.
Fichiers binaires ResourceItemReader Lit des fichiers binaires (ex: PDF, images) comme des ressources. Retourne un Resource (Spring) ou un tableau d’octets.
Avro AvroItemReader Lit des fichiers Avro (format binaire utilisé dans Hadoop). Nécessite la dépendance org.apache.avro:avro.
Parquet ParquetItemReader Lit des fichiers Parquet (format colonne pour Big Data). Nécessite des dépendances comme org.apache.parquet:parquet-avro.
Fichiers fixes (Fixed Width) FlatFileItemReader + FixedLengthTokenizer Lit des fichiers où chaque champ a une largeur fixe. Utilise FixedLengthTokenizer pour parser les lignes.

5.2 FlatFileItemReader : lire un fichier CSV

C’est le reader le plus utilisé pour les fichiers texte. Il lit le fichier ligne par ligne et convertit chaque ligne en objet Java.

Le modèle de données :

// La classe qui représente une ligne du fichier CSV
public class ClientCsv {
    private String nom;
    private String prenom;
    private String telephone;
    private String email;
    private String codePostal;
    private String ville;
    private String codePays;

    // Constructeur vide indispensable pour le mapping
    public ClientCsv() {}

    // getters et setters...
}

Exemple de fichier CSV d’entrée (clients.csv) :

nom;prenom;telephone;email;codePostal;ville;codePays
Dupont;Jean;0123456789;jean.dupont@mail.com;75001;Paris;FR
Martin;Marie;0987654321;marie.martin@mail.com;69001;Lyon;FR
Schmidt;Hans;+4930123456;hans.schmidt@mail.de;10115;Berlin;DE

Configuration du FlatFileItemReader :

@Bean
public FlatFileItemReader<ClientCsv> clientCsvReader(
        @Value("${batch.input.file}") Resource fichierEntree) {

    return new FlatFileItemReaderBuilder<ClientCsv>()
        .name("clientCsvReader")         // Nom unique (pour la reprise)
        .resource(fichierEntree)         // Fichier à lire
        .encoding("UTF-8")               // Encodage
        .linesToSkip(1)                  // Sauter la ligne d'en-tête des noms de colonnes
        .delimited()                     // Mode délimité (CSV)
            .delimiter(";")              // Séparateur
            .names("nom", "prenom", "telephone", "email",
                   "codePostal", "ville", "codePays")  // Noms des colonnes
        .targetType(ClientCsv.class)     // Classe cible
        .build();
}

Comment fonctionne le mapping ? FlatFileItemReader découpe chaque ligne selon le délimiteur, puis utilise un BeanWrapperFieldSetMapper qui injecte chaque valeur dans le champ Java correspondant par nom. Le nom “nom” dans .names(...) correspond au setter setNom(String) de la classe ClientCsv.

5.3 FlatFileItemReader avec mapping personnalisé

Attention, car parfois, le mapping automatique ne suffit pas (types complexes, champs calculés). On peut alors fournir un FieldSetMapper personnalisé.

@Bean
public FlatFileItemReader<ClientCsv> clientCsvReaderAvancé(Resource fichier) {

    return new FlatFileItemReaderBuilder<ClientCsv>()
        .name("clientCsvReaderAvancé")
        .resource(fichier)
        .linesToSkip(1)
        .delimited()
            .delimiter(";")
            .names("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays")
        .fieldSetMapper(fieldSet -> {
            // Mapping manuel : on a le contrôle total
            ClientCsv client = new ClientCsv();
            client.setNom(fieldSet.readString("nom").trim());
            client.setPrenom(fieldSet.readString("prenom").trim());

            // Nettoyage du téléphone : garder seulement les chiffres
            String tel = fieldSet.readString("telephone").replaceAll("[^0-9]", "");
            client.setTelephone(tel);

            client.setEmail(fieldSet.readString("email").toLowerCase().trim());
            client.setCodePostal(fieldSet.readString("codePostal").trim());
            client.setVille(fieldSet.readString("ville").trim());
            client.setCodePays(fieldSet.readString("codePays").toUpperCase().trim());
            return client;
        })
        .build();
}

5.4 Gérer les lignes de commentaire ou vides

Les fichiers réels contiennent souvent des lignes vides, des commentaires, ou des séparateurs. Spring Batch permet de les filtrer :

@Bean
public FlatFileItemReader<ClientCsv> readerAvecFiltres(Resource fichier) {
    FlatFileItemReader<ClientCsv> reader = new FlatFileItemReader<>();
    reader.setResource(fichier);
    reader.setLinesToSkip(1); // en-tête

    // Ignorer les lignes vides et les commentaires (#)
    reader.setSkippedLinesCallback(line -> {
        // Cette méthode est appelée pour chaque ligne sautée
        System.out.println("Ligne ignorée : " + line);
    });

    // Filtrer les lignes à ignorer pendant la lecture
    DefaultLineMapper<ClientCsv> lineMapper = new DefaultLineMapper<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(";");
    tokenizer.setNames("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays");
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
        setTargetType(ClientCsv.class);
    }});
    reader.setLineMapper(lineMapper);

    // Ignorer les lignes commençant par # ou vides
    reader.setRecordSeparatorPolicy(new DefaultRecordSeparatorPolicy());

    return reader;
}

5.5 FlatFileItemReader : fichier à longueur fixe (entrée)

Pour lire des fichiers COBOL existants à longueur fixe (l’inverse de ce qu’on cherche à produire).

@Bean
public FlatFileItemReader<ClientCobol> readerFichierFixe(Resource fichier) {
    return new FlatFileItemReaderBuilder<ClientCobol>()
        .name("readerFichierFixe")
        .resource(fichier)
        // Mode "fixed length" au lieu de "delimited"
        .fixedLength()
            .columns(
                new Range(1, 10),   // Colonnes 1-10 : nom
                new Range(11, 30),  // Colonnes 11-30 : prénom
                new Range(31, 40),  // Colonnes 31-40 : téléphone
                new Range(41, 42),  // Colonnes 41-42 : code pays
                new Range(43, 47)   // Colonnes 43-47 : code postal
            )
            .names("nom", "prenom", "telephone", "codePays", "codePostal")
        .targetType(ClientCobol.class)
        .build();
}

5.6 JdbcCursorItemReader : lire depuis une base de données

Pour lire directement depuis une base de données.

@Bean
public JdbcCursorItemReader<Client> clientDbReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<Client>()
        .name("clientDbReader")
        .dataSource(dataSource)
        .sql("SELECT nom, prenom, telephone, email, code_postal, ville " +
             "FROM clients WHERE actif = true ORDER BY id")
        .rowMapper((rs, rowNum) -> {
            Client c = new Client();
            c.setNom(rs.getString("nom"));
            c.setPrenom(rs.getString("prenom"));
            c.setTelephone(rs.getString("telephone"));
            c.setEmail(rs.getString("email"));
            c.setCodePostal(rs.getString("code_postal"));
            c.setVille(rs.getString("ville"));
            return c;
        })
        .fetchSize(1000)   // Nombre de lignes récupérées par lot depuis la BDD
        .build();
}

fetchSize : Sans ce paramètre, le driver JDBC charge toutes les lignes d’un coup en mémoire. Avec fetchSize=1000, il charge 1000 lignes à la fois, ce qui est bien plus efficace pour de grands volumes.

5.7 MultiResourceItemReader : lire plusieurs fichiers

Quand les données sont réparties sur plusieurs fichiers (exemple: un par jour ou par région).

@Bean
public MultiResourceItemReader<ClientCsv> multiReaderClients(
        @Value("classpath:input/clients_*.csv") Resource[] fichiers) {

    return new MultiResourceItemReaderBuilder<ClientCsv>()
        .name("multiReaderClients")
        .resources(fichiers)           // Tous les fichiers correspondant au pattern
        .delegate(clientCsvReader(null)) // Le reader pour chaque fichier individuel
        .build();
}

6. Transformer des données : les ItemProcessor

6.1 Le rôle de l’ItemProcessor

L’ItemProcessor est optionnel mais central dans les batchs de transformation. Il reçoit un enregistrement à la fois, le transforme, et retourne le résultat (qui peut être d’un type différent comme un objet java par exemple).

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
    // Si on retourne null, l'item est filtré (pas écrit)
}

C’est ici que réside toute la logique métier de transformation : nettoyage, enrichissement, validation, calcul, reformatage.

Retourner null = filtrer l’enregistrement. Si votre processor retourne null pour un enregistrement, celui-ci ne sera pas transmis à l’ItemWriter. C’est le mécanisme de filtrage natif de Spring Batch.

6.2 Processor simple : validation et transformation

@Component
public class ClientValidationProcessor implements ItemProcessor<ClientCsv, ClientCsv> {

    private static final Logger log = LoggerFactory.getLogger(ClientValidationProcessor.class);

    @Override
    public ClientCsv process(ClientCsv client) throws Exception {

        // --- Validation ---
        if (client.getNom() == null || client.getNom().isBlank()) {
            log.warn("Client ignoré : nom vide → {}", client);
            return null;  // Filtrer cet enregistrement
        }

        if (!client.getEmail().contains("@")) {
            log.warn("Client ignoré : email invalide → {}", client.getEmail());
            return null;  // Filtrer
        }

        // --- Nettoyage / Normalisation ---
        client.setNom(client.getNom().trim().toUpperCase());
        client.setPrenom(capitaliser(client.getPrenom().trim()));
        client.setEmail(client.getEmail().toLowerCase().trim());

        // Nettoyer le téléphone
        String tel = client.getTelephone().replaceAll("[^0-9+]", "");
        client.setTelephone(tel);

        return client;  // Retourner l'enregistrement transformé
    }

    private String capitaliser(String s) {
        if (s == null || s.isEmpty()) return s;
        return Character.toUpperCase(s.charAt(0)) +
               s.substring(1).toLowerCase();
    }
}

6.3 Processor de conversion de type

Dans notre cas COBOL, on va convertir d’un ClientCsv (données brutes) vers un ClientCobol (format prêt pour COBOL), même si je sais que cela ne sera sans pas le cas.

// Classe de sortie : représente un enregistrement COBOL
public class ClientCobol {
    private String nomFixe;        // 20 caractères
    private String prenomFixe;     // 20 caractères
    private String telephoneFixe;  // 15 caractères
    private String emailFixe;      // 50 caractères
    private String codePostalFixe; // 5 caractères
    private String villeFixe;      // 30 caractères
    private String codePaysFixe;   // 2 caractères

    // Total = 142 caractères par enregistrement
    // getters, setters, toString...
}

@Component
public class ClientToCoblProcessor implements ItemProcessor<ClientCsv, ClientCobol> {

    @Override
    public ClientCobol process(ClientCsv client) {
        ClientCobol cobol = new ClientCobol();

        // Chaque champ est formaté à une taille exacte
        cobol.setNomFixe(formaterChamp(client.getNom(), 20));
        cobol.setPrenomFixe(formaterChamp(client.getPrenom(), 20));
        cobol.setTelephoneFixe(formaterChamp(client.getTelephone(), 15));
        cobol.setEmailFixe(formaterChamp(client.getEmail(), 50));
        cobol.setCodePostalFixe(formaterChamp(client.getCodePostal(), 5));
        cobol.setVilleFixe(formaterChamp(client.getVille(), 30));
        cobol.setCodePaysFixe(formaterChamp(client.getCodePays(), 2));

        return cobol;
    }

    /**
     * Formate une chaîne à une longueur exacte :
     * - Tronque si trop long
     * - Complète avec des espaces à droite si trop court
     * - Retourne des espaces si null
     */
    private String formaterChamp(String valeur, int longueur) {
        if (valeur == null) valeur = "";
        // Tronquer si trop long
        if (valeur.length() > longueur) {
            return valeur.substring(0, longueur);
        }
        // Compléter avec des espaces à droite si trop court
        return String.format("%-" + longueur + "s", valeur);
    }
}

String.format("%-Ns", valeur) : Le % indique un formatage, - signifie aligné à gauche, N est la longueur, s est pour string. Résultat : la chaîne est complétée par des espaces à droite jusqu’à N caractères.

6.4 CompositeItemProcessor : chaîner plusieurs processors

On peut chaîner plusieurs processors avec CompositeItemProcessor. Chaque processor reçoit le résultat du précédent. C’est le pattern Chaîne de responsabilité appliqué aux batchs comme le ChainFilter pour les servlet ou contrôleurs.

@Bean
public CompositeItemProcessor<ClientCsv, ClientCobol> processorChaine(
        ClientValidationProcessor validateur,
        ClientNettoyageProcessor nettoyeur,
        ClientToCoblProcessor convertisseur) {

    return new CompositeItemProcessorBuilder<ClientCsv, ClientCobol>()
        .delegates(
            validateur,    // 1. Valider et filtrer les mauvais enregistrements
            nettoyeur,     // 2. Nettoyer et normaliser les données
            convertisseur  // 3. Convertir vers le format COBOL
        )
        .build();
}

Le flux est :

ClientCsv → [validateur] → ClientCsv (ou null) → [nettoyeur] → ClientCsv → [convertisseur] → ClientCobol

Si un processor retourne null, les suivants ne sont pas appelés (l’enregistrement est filtré).

6.5 Enrichissement depuis une base de données

Les processors peuvent interroger une base de données ou un cache pour enrichir les données :

@Component
public class ClientEnrichissementProcessor implements ItemProcessor<ClientCsv, ClientCsv> {

    @Autowired
    private RegionRepository regionRepository;  // Repository Spring Data

    // Cache simple pour éviter de requêter la BDD pour chaque enregistrement
    private final Map<String, String> cacheRegions = new HashMap<>();

    @Override
    public ClientCsv process(ClientCsv client) {
        // Trouver la région à partir du code postal
        String codePostal = client.getCodePostal();
        String region = cacheRegions.computeIfAbsent(
            codePostal,
            cp -> regionRepository.findRegionByCodePostal(cp)
                                  .orElse("INCONNU")
        );
        client.setRegion(region);
        return client;
    }
}

7. Écrire des données : les ItemWriter

7.1 Le rôle de l’ItemWriter

Contrairement au Reader et au Processor qui travaillent un enregistrement à la fois, l’ItemWriter reçoit une liste (le chunk complet) et écrit tout d’un coup. C’est ce qui rend l’écriture efficace.

public interface ItemWriter<T> {
    void write(Chunk<? extends T> items) throws Exception;
    // Reçoit tous les éléments du chunk
}

7.2 FlatFileItemWriter : écrire un fichier texte

@Bean
public FlatFileItemWriter<ClientCobol> clientCobolWriter(
        @Value("${batch.output.file}") Resource fichierSortie) {

    return new FlatFileItemWriterBuilder<ClientCobol>()
        .name("clientCobolWriter")
        .resource(fichierSortie)
        .encoding("ISO-8859-1")  // Encodage souvent utilisé par les mainframes
        .lineAggregator(item -> {
            // Assembler une ligne à partir de l'objet ClientCobol
            // Chaque champ a déjà la bonne longueur grâce au processor
            return item.getNomFixe()
                + item.getPrenomFixe()
                + item.getTelephoneFixe()
                + item.getEmailFixe()
                + item.getCodePostalFixe()
                + item.getVilleFixe()
                + item.getCodePaysFixe();
        })
        .build();
}

7.3 FlatFileItemWriter avec FormatterLineAggregator

Pour des formats plus complexes, on peut utiliser FormatterLineAggregator qui utilise String.format.

@Bean
public FlatFileItemWriter<ClientCobol> clientCobolWriterFormatte(Resource fichierSortie) {

    // Définit l'ordre des champs à extraire de l'objet
    BeanWrapperFieldExtractor<ClientCobol> extractor = new BeanWrapperFieldExtractor<>();
    extractor.setNames(new String[]{
        "nomFixe", "prenomFixe", "telephoneFixe",
        "emailFixe", "codePostalFixe", "villeFixe", "codePaysFixe"
    });

    // Définit le format de chaque champ (comme String.format)
    FormatterLineAggregator<ClientCobol> aggregator = new FormatterLineAggregator<>();
    aggregator.setFieldExtractor(extractor);
    // %s pour chaque champ : ils sont déjà à la bonne longueur
    aggregator.setFormat("%s%s%s%s%s%s%s");

    return new FlatFileItemWriterBuilder<ClientCobol>()
        .name("clientCobolWriterFormatte")
        .resource(fichierSortie)
        .lineAggregator(aggregator)
        .build();
}

7.4 FlatFileItemWriter : écriture avec en-tête et pied de page

Les fichiers destinés aux mainframes ont souvent un enregistrement d’en-tête (header) et un enregistrement de contrôle (trailer).

@Bean
public FlatFileItemWriter<ClientCobol> writerAvecHeaderTrailer(Resource fichierSortie) {

    return new FlatFileItemWriterBuilder<ClientCobol>()
        .name("writerAvecHeaderTrailer")
        .resource(fichierSortie)
        .encoding("ISO-8859-1")
        // En-tête : une ligne fixe au début du fichier
        .headerCallback(writer -> {
            writer.write("DEBUT-FICHIER-CLIENTS   " + LocalDate.now().format(
                DateTimeFormatter.ofPattern("yyyyMMdd")
            ) + "          ");  // Header formaté à 40 caractères
        })
        // Pied de page : ligne de contrôle à la fin
        .footerCallback(writer -> {
            writer.write("FIN-FICHIER-CLIENTS     " + LocalDate.now().format(
                DateTimeFormatter.ofPattern("yyyyMMdd")
            ) + "          ");
        })
        .lineAggregator(item ->
            item.getNomFixe() + item.getPrenomFixe() + item.getTelephoneFixe()
            + item.getEmailFixe() + item.getCodePostalFixe() + item.getVilleFixe()
            + item.getCodePaysFixe()
        )
        .build();
}

7.5 JdbcBatchItemWriter : écrire en base de données

@Bean
public JdbcBatchItemWriter<ClientCobol> clientDbWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<ClientCobol>()
        .dataSource(dataSource)
        .sql("INSERT INTO clients_cobol " +
             "(nom, prenom, telephone, email, code_postal, ville, code_pays) " +
             "VALUES (:nomFixe, :prenomFixe, :telephoneFixe, " +
             ":emailFixe, :codePostalFixe, :villeFixe, :codePaysFixe)")
        // Utilise les noms de propriétés de l'objet comme paramètres nommés
        .beanMapped()
        .build();
}

7.6 CompositeItemWriter : écrire vers plusieurs destinations

Parfois, on veut écrire simultanément dans un fichier ET en base de données…

@Bean
public CompositeItemWriter<ClientCobol> writerComposite(
        FlatFileItemWriter<ClientCobol> fichierWriter,
        JdbcBatchItemWriter<ClientCobol> dbWriter) {

    return new CompositeItemWriterBuilder<ClientCobol>()
        .delegates(fichierWriter, dbWriter)  // Écrit dans les deux
        .build();
}

8. Reformatage à taille fixe pour COBOL

8.1 Les layouts COBOL : comprendre le besoin

Les applications COBOL définissent leurs structures de données via des copybooks (fichiers .cpy). Chaque champ a une longueur fixe, un type (alphanumérique PIC X, numérique PIC 9, etc.) et une position précise. Idem, pour information, vous ne le ferez sans doute pas.

Exemple de copybook COBOL :

       01 ENREG-CLIENT.
          05 NOM-CLIENT      PIC X(20).   <- 20 caractères alpha
          05 PRENOM-CLIENT   PIC X(20).   <- 20 caractères alpha
          05 TELEPHONE       PIC X(15).   <- 15 caractères
          05 EMAIL           PIC X(50).   <- 50 caractères
          05 CODE-POSTAL     PIC X(5).    <- 5 caractères
          05 VILLE           PIC X(30).   <- 30 caractères
          05 CODE-PAYS       PIC X(2).    <- 2 caractères
                                          TOTAL = 142 caractères / enregistrement

Règles fondamentales :

8.2 Utilitaire de formatage COBOL

Créons une classe utilitaire réutilisable pour tout le formatage :

public final class CobolFormatter {

    private CobolFormatter() {}  // Pas d'instanciation

    /**
     * Formate une chaîne alphanumérique (PIC X(n)) :
     * - Null ou vide → n espaces
     * - Trop court → complété d'espaces à droite
     * - Trop long → tronqué à droite
     */
    public static String picX(String valeur, int longueur) {
        if (valeur == null) valeur = "";
        valeur = valeur.trim();  // Supprimer les espaces en début/fin
        if (valeur.length() > longueur) {
            return valeur.substring(0, longueur);
        }
        // %-Ns : aligné gauche, complété d'espaces
        return String.format("%-" + longueur + "s", valeur);
    }

    /**
     * Formate un entier numérique (PIC 9(n)) :
     * - Null ou négatif → n zéros
     * - Complété de zéros à gauche
     * - Trop grand → tronqué à gauche (les chiffres les moins significatifs)
     */
    public static String pic9(Long valeur, int longueur) {
        if (valeur == null || valeur < 0) {
            return "0".repeat(longueur);
        }
        String s = String.valueOf(valeur);
        if (s.length() > longueur) {
            // Tronquer à gauche : garder les N derniers chiffres
            return s.substring(s.length() - longueur);
        }
        // %0Nd : complété de zéros à gauche
        return String.format("%0" + longueur + "d", valeur);
    }

    /**
     * Formate un décimal (PIC 9(n)V9(d)) :
     * - Partie entière sur nEntier chiffres, partie décimale sur nDec chiffres
     * - Pas de point décimal dans le fichier (convention COBOL)
     * Exemple : 1234.56 avec n=8, d=2 → "000123456"
     */
    public static String pic9V(BigDecimal valeur, int nEntier, int nDecimales) {
        if (valeur == null) valeur = BigDecimal.ZERO;
        // Convertir en entier en décalant la virgule
        BigDecimal facteur = BigDecimal.TEN.pow(nDecimales);
        long entier = valeur.multiply(facteur).longValue();
        return pic9(entier, nEntier + nDecimales);
    }

    /**
     * Formate une date (PIC X(8) au format AAAAMMJJ)
     */
    public static String dateAaaamjj(LocalDate date) {
        if (date == null) return "        ";  // 8 espaces
        return date.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
    }

    /**
     * Formate une date (PIC X(10) au format AAAA-MM-JJ)
     */
    public static String dateIso(LocalDate date) {
        if (date == null) return "          ";  // 10 espaces
        return date.format(DateTimeFormatter.ISO_LOCAL_DATE);
    }

    /**
     * Valide qu'une chaîne a exactement la longueur attendue.
     * Utile pour les tests unitaires.
     */
    public static void verifierLongueur(String champ, String nomChamp, int longueurAttendue) {
        if (champ.length() != longueurAttendue) {
            throw new IllegalStateException(
                "Champ '" + nomChamp + "' : longueur " + champ.length() +
                " au lieu de " + longueurAttendue + " → [" + champ + "]"
            );
        }
    }
}

8.3 Modèle de données COBOL complet

public class EnregistrementCobol {

    // Schéma : total 142 caractères
    private String nom;          // PIC X(20) - colonnes 1-20
    private String prenom;       // PIC X(20) - colonnes 21-40
    private String telephone;    // PIC X(15) - colonnes 41-55
    private String email;        // PIC X(50) - colonnes 56-105
    private String codePostal;   // PIC X(5)  - colonnes 106-110
    private String ville;        // PIC X(30) - colonnes 111-140
    private String codePays;     // PIC X(2)  - colonnes 141-142

    /**
     * Construit la ligne COBOL complète (142 caractères exacts).
     * Vérifie la longueur totale pour garantir l'intégrité.
     */
    public String toLigneCobol() {
        String ligne = nom + prenom + telephone + email
                     + codePostal + ville + codePays;

        // Vérification de sécurité
        assert ligne.length() == 142 :
            "Enregistrement COBOL invalide : " + ligne.length() + " chars (attendu 142)";

        return ligne;
    }

    // getters et setters...
}

8.4 Processor de reformatage COBOL complet

@Component
public class CsvToCobolProcessor implements ItemProcessor<ClientCsv, EnregistrementCobol> {

    private static final Logger log = LoggerFactory.getLogger(CsvToCobolProcessor.class);

    // Compteur pour les enregistrements filtrés (pour le rapport final)
    private final AtomicInteger nbFiltres = new AtomicInteger(0);

    @Override
    public EnregistrementCobol process(ClientCsv client) {

        // Validation préalable : si les données sont vraiment inexploitables, on filtre
        if (client.getNom() == null || client.getNom().isBlank()) {
            log.warn("Enregistrement filtré (nom vide) : {}", client);
            nbFiltres.incrementAndGet();
            return null;  // Filtré → pas écrit
        }

        EnregistrementCobol enreg = new EnregistrementCobol();

        // Formatage de chaque champ selon le schéma COBOL
        // CobolFormatter garantit la longueur exacte
        enreg.setNom(CobolFormatter.picX(client.getNom(), 20));
        enreg.setPrenom(CobolFormatter.picX(client.getPrenom(), 20));
        enreg.setTelephone(CobolFormatter.picX(nettoyerTelephone(client.getTelephone()), 15));
        enreg.setEmail(CobolFormatter.picX(client.getEmail(), 50));
        enreg.setCodePostal(CobolFormatter.picX(client.getCodePostal(), 5));
        enreg.setVille(CobolFormatter.picX(client.getVille(), 30));
        enreg.setCodePays(CobolFormatter.picX(client.getCodePays(), 2));

        // Vérification que la ligne fait bien 142 caractères
        String ligne = enreg.toLigneCobol();
        if (ligne.length() != 142) {
            log.error("ERREUR CRITIQUE : enregistrement de {} chars ! Client : {}",
                      ligne.length(), client.getNom());
            throw new IllegalStateException("Longueur d'enregistrement incorrecte : " + ligne.length());
        }

        return enreg;
    }

    private String nettoyerTelephone(String tel) {
        if (tel == null) return "";
        // Garder seulement les chiffres et le +
        return tel.replaceAll("[^0-9+]", "");
    }

    public int getNbFiltres() {
        return nbFiltres.get();
    }
}

8.5 Writer COBOL avec contrôle total

@Bean
public FlatFileItemWriter<EnregistrementCobol> cobolFileWriter(
        @Value("${batch.output.file}") Resource fichierSortie) {

    return new FlatFileItemWriterBuilder<EnregistrementCobol>()
        .name("cobolFileWriter")
        .resource(fichierSortie)
        .encoding("ISO-8859-1")  // Latin-1 : compatible avec les mainframes européens
        .shouldDeleteIfExists(true)  // Effacer le fichier précédent si existant
        .headerCallback(writer -> {
            // Header COBOL : 142 espaces (enregistrement vide de contrôle)
            // Ou une ligne d'identification standard
            writer.write(CobolFormatter.picX("DEBUT", 142));
        })
        .footerCallback(writer -> {
            writer.write(CobolFormatter.picX("FIN", 142));
        })
        .lineAggregator(enreg -> {
            String ligne = enreg.toLigneCobol();
            // Double vérification au moment de l'écriture
            if (ligne.length() != 142) {
                throw new IllegalStateException(
                    "Ligne COBOL invalide : " + ligne.length() + " caractères"
                );
            }
            return ligne;
        })
        .build();
}

9. Gestion des erreurs, Skip et Retry

9.1 Le problème des données réelles

Dans le monde réel, les fichiers d’entrée sont imparfaits :

Si on laisse Spring Batch planter à la première erreur, le batch n’ira jamais au bout.

Spring Batch propose trois stratégies pour gérer les erreurs :

9.2 Skip : ignorer les erreurs

@Bean
public Step etapeAvecSkip(ItemReader<ClientCsv> reader,
                           ItemProcessor<ClientCsv, EnregistrementCobol> processor,
                           ItemWriter<EnregistrementCobol> writer) {

    return new StepBuilder("etapeAvecSkip", jobRepository)
        .<ClientCsv, EnregistrementCobol>chunk(100, transactionManager)
        .reader(reader)
        .processor(processor)
        .writer(writer)

        // Configuration du Skip
        .faultTolerant()  // Active la tolérance aux pannes
            .skip(FlatFileParseException.class)  // Ignorer les lignes non parsables
            .skip(ValidationException.class)     // Ignorer les erreurs de validation
            .skipLimit(50)  // Maximum 50 erreurs tolérées, au-delà le Job échoue

        .build();
}

skipLimit est crucial : sans limite, un fichier entièrement corrompu serait “traité” avec 0 enregistrement écrits mais aucune erreur signalée. La limite force le batch à échouer si trop d’erreurs surviennent, ce qui alerte l’équipe.

9.3 SkipListener : journaliser les enregistrements ignorés

@Component
public class ClientSkipListener implements SkipListener<ClientCsv, EnregistrementCobol> {

    private static final Logger log = LoggerFactory.getLogger(ClientSkipListener.class);

    // Appelé quand un enregistrement est ignoré pendant la LECTURE
    @Override
    public void onSkipInRead(Throwable t) {
        log.error("Ligne ignorée en lecture : {}", t.getMessage());
    }

    // Appelé quand un enregistrement est ignoré pendant le TRAITEMENT
    @Override
    public void onSkipInProcess(ClientCsv item, Throwable t) {
        log.error("Enregistrement ignoré en traitement : {} → {}",
                  item.getNom() + " " + item.getPrenom(), t.getMessage());
    }

    // Appelé quand un enregistrement est ignoré pendant l'ÉCRITURE
    @Override
    public void onSkipInWrite(EnregistrementCobol item, Throwable t) {
        log.error("Enregistrement ignoré en écriture : {} → {}",
                  item.getNom(), t.getMessage());
    }
}

Enregistrement du listener dans le Step :

.faultTolerant()
    .skip(FlatFileParseException.class)
    .skipLimit(50)
    .listener(clientSkipListener)  // Enregistrer le listener

9.4 Retry : réessayer automatiquement

Le retry est utile pour les erreurs transitoires (timeout réseau, BDD temporairement surchargée) :

@Bean
public Step etapeAvecRetry(ItemReader<ClientCsv> reader,
                            ItemProcessor<ClientCsv, EnregistrementCobol> processor,
                            ItemWriter<EnregistrementCobol> writer) {

    return new StepBuilder("etapeAvecRetry", jobRepository)
        .<ClientCsv, EnregistrementCobol>chunk(100, transactionManager)
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .faultTolerant()
            .retry(TransientDataAccessException.class)  // Réessayer pour les erreurs BDD transitoires
            .retry(ResourceAccessException.class)        // Réessayer pour les erreurs réseau
            .retryLimit(3)   // Maximum 3 tentatives par enregistrement
        .build();
}

9.5 Combiner Skip et Retry

.faultTolerant()
    // Retry pour les erreurs transitoires (réseau, BDD temporaire)
    .retry(TransientDataAccessException.class)
    .retryLimit(3)
    // Skip pour les erreurs définitives (données invalides)
    .skip(ValidationException.class)
    .skip(FlatFileParseException.class)
    .skipLimit(100)
    .listener(skipListener)

10. Listeners et suivi d’exécution

10.1 Pourquoi des listeners ?

Les listeners permettent d’intercepter les événements du cycle de vie d’un Job ou d’un Step pour ajouter de la logique transversale : logging, notifications, métriques, envoi d’email à la fin du traitement.

10.2 JobExecutionListener

@Component
public class JobRapportListener implements JobExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(JobRapportListener.class);

    // Appelé AVANT le démarrage du Job
    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info("=== DÉBUT DU JOB : {} ===", jobExecution.getJobInstance().getJobName());
        log.info("Paramètres : {}", jobExecution.getJobParameters());
        log.info("Démarré à : {}", LocalDateTime.now());
    }

    // Appelé APRÈS la fin du Job (qu'il ait réussi ou échoué)
    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info("=== FIN DU JOB : {} ===", jobExecution.getJobInstance().getJobName());
        log.info("Statut final : {}", jobExecution.getStatus());
        log.info("Terminé à : {}", LocalDateTime.now());

        // Afficher les statistiques de chaque Step
        for (StepExecution step : jobExecution.getStepExecutions()) {
            log.info("Step '{}' : lus={}, traités={}, écrits={}, ignorés={}, erreurs={}",
                step.getStepName(),
                step.getReadCount(),
                step.getProcessSkipCount() == 0 ? step.getWriteCount() : "N/A",
                step.getWriteCount(),
                step.getSkipCount(),
                step.getReadSkipCount() + step.getWriteSkipCount()
            );
        }

        // Envoyer une notification selon le statut
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("Job terminé avec succès !");
            // envoyerEmailSucces(jobExecution);
        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
            log.error("Job échoué ! Vérifier les logs pour les détails.");
            // envoyerEmailEchec(jobExecution);
        }
    }
}

Enregistrement dans le Job :

@Bean
public Job importJob(Step etapePrincipale, JobRapportListener listener) {
    return new JobBuilder("importJob", jobRepository)
        .listener(listener)  // Enregistrer le listener
        .start(etapePrincipale)
        .build();
}

10.3 StepExecutionListener

@Component
public class StepProgressListener implements StepExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(StepProgressListener.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        log.info("--- Début step : {} ---", stepExecution.getStepName());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        log.info("--- Fin step : {} | Lus: {} | Écrits: {} | Ignorés: {} ---",
            stepExecution.getStepName(),
            stepExecution.getReadCount(),
            stepExecution.getWriteCount(),
            stepExecution.getSkipCount()
        );

        // On peut modifier l'ExitStatus pour influencer le flux du Job
        // Par exemple : si trop de données ignorées, marquer comme WARNING
        if (stepExecution.getSkipCount() > stepExecution.getWriteCount() * 0.1) {
            log.warn("Plus de 10% d'enregistrements ignorés !");
            return new ExitStatus("COMPLETED_WITH_WARNINGS");
        }

        return stepExecution.getExitStatus();  // Retourner le statut normal
    }
}

10.4 ChunkListener : suivi de la progression

@Component
public class ProgressChunkListener implements ChunkListener {

    private static final Logger log = LoggerFactory.getLogger(ProgressChunkListener.class);
    private int chunkCount = 0;

    @Override
    public void beforeChunk(ChunkContext context) {
        // Appelé avant chaque chunk
    }

    @Override
    public void afterChunk(ChunkContext context) {
        chunkCount++;
        StepExecution step = context.getStepContext().getStepExecution();
        int total = step.getWriteCount();
        // Afficher la progression tous les 10 chunks
        if (chunkCount % 10 == 0) {
            log.info("Progression : {} enregistrements écrits...", total);
        }
    }

    @Override
    public void afterChunkError(ChunkContext context) {
        log.warn("Erreur dans le chunk #{}", chunkCount);
    }
}

11. JobParameters et relance de Jobs

11.1 L’importance des JobParameters

Les JobParameters jouent deux rôles essentiels :

  1. Identifier une exécution unique (deux Jobs avec les mêmes paramètres sont considérés comme la même instance)
  2. Passer des paramètres de configuration au Job (nom de fichier, date de traitement, etc.)
// Créer des paramètres
JobParameters params = new JobParametersBuilder()
    .addString("fichierEntree", "/data/clients_2024-01-15.csv")
    .addString("fichierSortie", "/data/output/clients_cobol_2024-01-15.dat")
    .addLocalDate("dateTraitement", LocalDate.now())
    .addLong("timestamp", System.currentTimeMillis())  // Pour l'unicité
    .toJobParameters();

11.2 Accéder aux JobParameters depuis un bean

@Component
@StepScope  // IMPORTANT : scope Step, recréé pour chaque exécution
public class FichierEntreeReader {

    @Value("#{jobParameters['fichierEntree']}")
    private String cheminFichier;

    // Le reader est créé avec le bon chemin au moment de l'exécution du Step
    @Bean
    @StepScope
    public FlatFileItemReader<ClientCsv> readerDynamique(
            @Value("#{jobParameters['fichierEntree']}") String chemin) {

        return new FlatFileItemReaderBuilder<ClientCsv>()
            .name("readerDynamique")
            .resource(new FileSystemResource(chemin))
            .linesToSkip(1)
            .delimited().delimiter(";")
            .names("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays")
            .targetType(ClientCsv.class)
            .build();
    }
}

@StepScope est crucial quand on utilise #{jobParameters[...]} ! Sans @StepScope, le bean est créé au démarrage de l’application, avant que les paramètres soient connus. Avec @StepScope, le bean est créé au moment de l’exécution du Step, quand les paramètres sont disponibles.

11.3 Reprise après échec

C’est l’une des fonctionnalités les plus précieuses de Spring Batch. Si un Job échoue à mi-chemin, il peut reprendre exactement là où il s’était arrêté.

// Lancer un job
JobExecution execution1 = jobLauncher.run(monJob, params);
// → Suppose qu'il échoue après 50 000 enregistrements

// Relancer avec les MÊMES paramètres = reprise depuis le dernier checkpoint
JobExecution execution2 = jobLauncher.run(monJob, params);
// → Reprend depuis le chunk 501 (si chunkSize=100 et 50 000 lus)

Comment ça marche ? Le JobRepository stocke le nombre d’enregistrements lus (readCount) pour chaque StepExecution. Au redémarrage, le FlatFileItemReader sait qu’il doit sauter les N premiers enregistrements déjà traités.

11.4 Forcer le relancement d’un Job déjà COMPLETED

Par défaut, Spring Batch refuse de relancer un Job avec les mêmes paramètres si son statut est COMPLETED (pour éviter le double traitement). Pour forcer le relancement :

@Bean
public Job monJob() {
    return new JobBuilder("monJob", jobRepository)
        .incrementer(new RunIdIncrementer())  // Ajoute un ID unique à chaque lancement
        .start(etapePrincipale())
        .build();
}

// Ou en passant un timestamp comme paramètre
JobParameters params = new JobParametersBuilder()
    .addString("fichier", "clients.csv")
    .addLong("run.id", System.currentTimeMillis())  // Toujours unique
    .toJobParameters();

12. Steps conditionnelles et flux avancés

12.1 Exécution conditionnelle des Steps

On peut définir des flux conditionnels : si le Step 1 réussit → aller au Step 2, sinon → aller au Step de gestion d’erreur.

@Bean
public Job jobConditiohennel(Step etapeValidation, Step etapeTraitement,
                              Step etapeGestionErreur, Step etapeNotification) {

    return new JobBuilder("jobConditionnel", jobRepository)
        .start(etapeValidation)
            .on("COMPLETED").to(etapeTraitement)   // Si validation OK → traitement
            .on("FAILED").to(etapeGestionErreur)   // Si validation KO → gestion erreur
        .from(etapeTraitement)
            .on("*").to(etapeNotification)          // Après traitement (quoi qu'il arrive) → notification
        .from(etapeGestionErreur)
            .on("*").end(BatchStatus.FAILED)        // Après gestion erreur → terminer en FAILED
        .end()
        .build();
}

Les codes de sortie (ExitStatus) sont des chaînes de caractères. Les standards sont COMPLETED, FAILED, STOPPED, mais on peut définir les siens :

// Dans un StepExecutionListener.afterStep()
if (step.getSkipCount() > 0) {
    return new ExitStatus("COMPLETED_WITH_WARNINGS");
}
return ExitStatus.COMPLETED;

// Dans le Job, on peut tester cette valeur personnalisée
.on("COMPLETED_WITH_WARNINGS").to(etapeAlerte)

12.2 Décision : Flow avec JobExecutionDecider

Pour des décisions plus complexes, on utilise un JobExecutionDecider :

@Component
public class FichierVideDecider implements JobExecutionDecider {

    @Value("${batch.input.file}")
    private String cheminFichier;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution,
                                      StepExecution stepExecution) {
        File fichier = new File(cheminFichier);
        if (!fichier.exists() || fichier.length() == 0) {
            return new FlowExecutionStatus("FICHIER_VIDE");
        }
        return FlowExecutionStatus.COMPLETED;
    }
}

// Utilisation dans le Job
@Bean
public Job jobAvecDecision(FichierVideDecider decider,
                            Step etapeTraitement, Step etapeFichierVide) {
    return new JobBuilder("jobAvecDecision", jobRepository)
        .start(decider)
            .on("FICHIER_VIDE").to(etapeFichierVide).end()
            .on("COMPLETED").to(etapeTraitement)
        .end()
        .build();
}

13. Partitioning et performance

13.1 Le traitement parallèle avec Partitioning

Pour traiter des fichiers très volumineux encore plus vite, Spring Batch permet de diviser le travail en partitions traitées en parallèle par plusieurs threads.

Analogie : Au lieu d’un seul convoyeur qui traite 1 million de lignes, on a 8 convoyeurs qui en traitent 125 000 chacun en même temps. Le temps total est divisé par ~8.

@Configuration
public class PartitionConfig {

    @Bean
    public Step stepMaitre(Partitioner partitioner, Step stepEsclave,
                            TaskExecutor taskExecutor) {
        return new StepBuilder("stepMaitre", jobRepository)
            .partitioner("stepEsclave", partitioner)
            .step(stepEsclave)          // Step à exécuter en parallèle
            .gridSize(8)                // Nombre de partitions (= nb de threads)
            .taskExecutor(taskExecutor) // Exécuteur de threads
            .build();
    }

    // Divise le fichier en N plages de lignes
    @Bean
    public Partitioner fichierPartitioner(
            @Value("${batch.input.file}") Resource fichier,
            @Value("${batch.partition.size:8}") int nbPartitions) {

        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        // Pour plusieurs fichiers : un fichier par partition
        // Pour un seul gros fichier : diviser par plages de lignes

        return gridSize -> {
            Map<String, ExecutionContext> partitions = new HashMap<>();
            // Calculer les plages
            // ... (logique de partitionnement)
            return partitions;
        };
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}

13.2 Traitement multi-threadé simple (AsyncItemProcessor)

Une approche plus simple pour paralléliser est le traitement multi-threadé d’un seul Step :

@Bean
public Step stepMultiThreade(ItemReader<ClientCsv> reader,
                               ItemProcessor<ClientCsv, EnregistrementCobol> processor,
                               ItemWriter<EnregistrementCobol> writer) {

    return new StepBuilder("stepMultiThreade", jobRepository)
        .<ClientCsv, EnregistrementCobol>chunk(100, transactionManager)
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .taskExecutor(new SimpleAsyncTaskExecutor("batch-thread-"))  // Multi-thread !
        .throttleLimit(4)  // Maximum 4 threads simultanés
        .build();
}

Attention avec le multi-threading ! Le FlatFileItemReader n’est pas thread-safe par défaut. Pour un Step multi-threadé, utilisez un reader SynchronizedItemStreamReader ou lisez depuis une base de données avec un JdbcPagingItemReader qui est thread-safe.

13.3 SynchronizedItemStreamReader : rendre un reader thread-safe

@Bean
@StepScope
public SynchronizedItemStreamReader<ClientCsv> synchronizedReader(
        @Value("#{jobParameters['fichierEntree']}") String chemin) {

    FlatFileItemReader<ClientCsv> innerReader = new FlatFileItemReaderBuilder<ClientCsv>()
        .name("innerReader")
        .resource(new FileSystemResource(chemin))
        .linesToSkip(1)
        .delimited().delimiter(";")
        .names("nom", "prenom", "telephone", "email", "codePostal", "ville", "codePays")
        .targetType(ClientCsv.class)
        .build();

    // Wrapping du reader dans un synchronized wrapper
    return new SynchronizedItemStreamReader<>(innerReader);
}

14. Bonnes pratiques et surveillance

14.1 Taille de chunk optimale

La taille de chunk est un paramètre critique pour les performances. Voici les recommandations :

Chunk trop petit (ex: 1)  → Trop de transactions → Lent
Chunk optimal (ex: 100-1000) → Bon compromis
Chunk trop grand (ex: 100000) → Trop de mémoire → OutOfMemoryError
                              → En cas d'erreur, on recommence un grand chunk

En pratique :

@Value("${batch.chunk.size:1000}")
private int chunkSize;

// ... dans la config du Step :
.<ClientCsv, EnregistrementCobol>chunk(chunkSize, transactionManager)

14.2 Configuration du JobRepository en production

# PostgreSQL pour le JobRepository en production
spring.datasource.url=jdbc:postgresql://localhost:5432/batchdb
spring.datasource.username=batch_user
spring.datasource.password=secret
spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect

# Spring Batch crée les tables si elles n'existent pas
spring.batch.jdbc.initialize-schema=embedded
# "embedded" = seulement pour H2/HSQL, "always" = toujours, "never" = jamais

En production, les tables du JobRepository (BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION, etc.) doivent être créées manuellement avec les scripts SQL fournis par Spring Batch dans le JAR.

14.3 Planification des Jobs avec @Scheduled

@Component
public class BatchScheduler {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importClientsJob;

    // Lancer le job tous les jours à 2h du matin
    @Scheduled(cron = "0 0 2 * * ?")
    public void lancerImportNocturne() throws Exception {
        String dateDuJour = LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE);
        String nomFichier = "/data/input/clients_" + dateDuJour + ".csv";

        JobParameters params = new JobParametersBuilder()
            .addString("fichierEntree", nomFichier)
            .addLong("timestamp", System.currentTimeMillis())
            .toJobParameters();

        log.info("Lancement batch nocturne pour le fichier : {}", nomFichier);
        jobLauncher.run(importClientsJob, params);
    }
}

Ne pas oublier d’activer le scheduling dans la classe de lancement de Spring Boot :

@SpringBootApplication
@EnableScheduling  // Active @Scheduled
public class BatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

14.4 Tests unitaires des Steps

@SpringBatchTest
@SpringBootTest
class ImportStepTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @AfterEach
    void tearDown() {
        jobRepositoryTestUtils.removeJobExecutions();
    }

    @Test
    void testEtapeLecture() throws Exception {
        // Lancer seulement le Step à tester, pas tout le Job
        JobExecution execution = jobLauncherTestUtils.launchStep(
            "etapeLecture",
            new JobParametersBuilder()
                .addString("fichierEntree", "classpath:test/clients_test.csv")
                .toJobParameters()
        );

        assertEquals(BatchStatus.COMPLETED, execution.getStatus());

        StepExecution step = execution.getStepExecutions().iterator().next();
        assertEquals(5, step.getReadCount());    // 5 lignes lues
        assertEquals(5, step.getWriteCount());   // 5 lignes écrites
        assertEquals(0, step.getSkipCount());    // Aucune ignorée
    }
}

15. TP Lecture fichier plat et Excel

Objectif de ce TP, lire des données depuis un fichier plat et Excel et les sauvegarder en base de données H2.

Lien vers le TP complet


16. TP Pipeline COBOL-READY

Objectif de ce TP, lire des données depuis un fichier CSV et créer un fichier adapté pour Cobol.

Lien vers l’énoncé complet du TP

Aperçu du TP

Vous allez construire CobolPipeline, un batch Spring Boot qui :

  1. Lit un fichier CSV volumineux (100 000+ lignes, séparateur ;) contenant des données d’employés
  2. Valide et nettoie les données (gestion des erreurs avec Skip)
  3. Reformate chaque champ à la longueur exacte requise par le schéma COBOL cible
  4. Écrit le fichier de sortie à longueur fixe compatible mainframe
  5. Génère un rapport de traitement (nombre de lignes traitées, ignorées, temps d’exécution)

Fichier d’entrée (extrait) :

matricule;nom;prenom;service;salaire;dateEmbauche;codeAgence
EMP001;Dupont;Jean;INFORMATIQUE;45000.00;2018-03-15;AG001
EMP002;Martin;Marie;COMPTABILITE;38500.50;2020-07-01;AG002
...

Fichier de sortie COBOL (schéma) :

Champ         Longueur  Type       Position
matricule     8         PIC X(8)   1-8
nom           25        PIC X(25)  9-33
prenom        25        PIC X(25)  34-58
service       20        PIC X(20)  59-78
salaire       10        PIC 9(8)V9(2) 79-88  (sans virgule)
dateEmbauche  8         PIC X(8)   89-96   (AAAAMMJJ)
codeAgence    5         PIC X(5)   97-101
TOTAL         101 caractères/enregistrement

Fin du cours – Bon batch