Querremos este tipo de entrega y procesamiento cuando necesitamos entregar mediante el productor y consumir y procesar un único mensaje, en plan, «oye, te tengo que mandar una serie de cartas, necesitas tenerlas todas, ordenadas o no, por lo que te las voy a mandar, pero tienes que avisarme que la has recibido bien y la has puesto a buen recaudo antes de enviarte la siguiente», por ejemplo, imaginad que dicho mensaje implica una transacción financiera, no querremos que nos carguen en nuestra cuenta dos veces o más dicha transacción.

Además, es importante que el consumidor confirme la recepción del mensaje antes de que el productor envíe otro mensaje. Esto se puede hacer mediante la API de Kafka.

Con estas medidas, y algo más de configuración tanto en el broker productor como en el consumidor, se puede garantizar que los eventos se envían y se reciben una única vez, de forma segura y fiable.

Aquí tienes algunos consejos para implementar transacciones exactly-once en Apache Kafka:

1) Utiliza el modo de entrega "exactly-once", via configuración. A día de hoy está restringido a la primera base de la pirámide Kafka, la de los productores y los consumidores.
  
2) Confirmación de la recepción de los mensajes por parte del consumidor de manera programática.


3) Utiliza un ID de grupo único para el productor y el consumidor. Esto es muy importante.

4) Utiliza una cola de retención para almacenar los mensajes que no se hayan podido entregar.

5) Captura excepciones en todo momento, la confirmación de excepciones puede requerir por tu parte bien guardar en un sistema que te garantice atomicidad relacional, puede ser distribuido incluso (spark delta lake), incluso un sistema de ficheros donde guardar logs, antes de comunicar al productor que todo ha ido bien mediante un commit sync o async.

6) Podemos tener más de un productor y más de un consumidor para hacer entrega y commit exactly-once, solo hay que tener en cuenta un par de cosas. 

Productor: Al enviar un mensaje, el productor debe especificar la partición a la que desea enviar el mensaje. Esto se puede lograr configurando la clave del mensaje o utilizando un particionador personalizado para determinar la partición de destino.

Consumidor: Para que un consumidor lea de una partición específica, debe suscribirse a esa partición en particular. Al conectarse al broker líder de esa partición, el consumidor podrá leer los mensajes de esa partición en particular.

Al asegurarse de que tanto el productor como el consumidor estén configurados para trabajar con una partición específica, se garantiza que el offset de la partición no sea compartido por otros consumidores en el mismo grupo, lo que contribuye a lograr una entrega exactamente una vez en Kafka.

Para habilitar el modo de entrega «exactly-once» en Apache Kafka, es necesario configurar el productor y el consumidor.

En el productor, debes configurar la propiedad «acks» en «all». Esto indica a Kafka que el productor solo debe enviar el siguiente mensaje cuando todos los brokers hayan confirmado la recepción del mensaje actual.

En el consumidor, debes configurar la propiedad «enable.auto.commit» en «false». Esto evitará que el consumidor confirme automáticamente la recepción de los mensajes.

Esto puede parecer contradictorio en un principio, pero no lo es. La propiedad «enable.auto.commit» controla si el consumidor confirma automáticamente la recepción de los mensajes. Si esta propiedad está configurada en «true», el consumidor confirmará automáticamente la recepción de los mensajes cuando llegue al final de un grupo de mensajes.

En el modo de entrega «exactly-once», el consumidor debe confirmar manualmente la recepción de los mensajes. Para ello, debe llamar al método commit() de la API de Kafka.

Por lo tanto, para garantizar que los mensajes se entreguen exactamente una vez en el modo de entrega «exactly-once», el consumidor debe configurar la propiedad «enable.auto.commit» en «false». Luego, debe confirmar manualmente la recepción de los mensajes cuando haya procesado el mensaje correctamente.

En resumen, la diferencia entre confirmar automáticamente la recepción de los mensajes y confirmar manualmente la recepción de los mensajes es la siguiente:

* **Confirmación automática**
* El consumidor confirma la recepción de los mensajes cuando llega al final de un grupo de mensajes.
* **Confirmación manual**
* El consumidor confirma la recepción de los mensajes cuando ha procesado el mensaje correctamente.

En el modo de entrega «exactly-once», el consumidor debe utilizar la confirmación manual.

Además, el productor y el consumidor deben utilizar el mismo ID de grupo. Esto permite a Kafka sincronizar los mensajes entre el productor y el consumidor.

En el archivo de configuración del productor, puedes configurar la propiedad «acks» de la siguiente manera:

 acks=all

En el archivo de configuración del consumidor, puedes configurar la propiedad «enable.auto.commit» de la siguiente manera:

 enable.auto.commit=false

También debes configurar la propiedad «group.id» para que el productor y el consumidor utilicen el mismo ID de grupo. Esto se puede hacer en el archivo de configuración del productor y del consumidor.

En el archivo de configuración del productor, debes configurar la propiedad «group.id» de la siguiente manera:

 group.id=my-group

En el archivo de configuración del consumidor, debes configurar la propiedad «group.id» de la siguiente manera:

 group.id=my-group

Una vez que hayas configurado el productor y el consumidor, podrás enviar y recibir mensajes en modo de entrega «exactly-once».

Aquí tienes un ejemplo de cómo configurar el productor y el consumidor para el modo de entrega «exactly-once»:

Productor

    producer.properties

    acks=all

    group.id=my-group

Consumidor

    consumer.properties

    enable.auto.commit=false

    group.id=my-group

Con esta configuración, el productor solo enviará el siguiente mensaje cuando todos los brokers hayan confirmado la recepción del mensaje actual. El consumidor no confirmará automáticamente la recepción de los mensajes, y utilizará el ID de grupo «my-group» para sincronizar los mensajes con el productor.

Literalmente el consumidor debe recibir el mensaje, hacer lo que sea con él, y finalmente hacer commit programáticamente, algo así:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {

        public static void main(String[] args) {

            // Crea el consumidor
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties());

            // Suscribe el consumidor al topico
            consumer.subscribe(Collections.singletonList("my-topic"));

            while (true) {

                // Obtiene el siguiente mensaje
                ConsumerRecord<String, String> record = consumer.poll(100);

                // Procesa el mensaje
                // ...

                // Confirma la recepcion del mensaje
                consumer.commitSync();
            }
        }

        private static Properties consumerProperties() {

            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("group.id", "my-group");
            properties.put("enable.auto.commit", "false");
            properties.put("auto.offset.reset", "earliest");

            return properties;
        }
}

También puedes utilizar el método commitAsync() para confirmar la recepción del mensaje de forma asíncrona.

La confirmación de recepción de un mensaje en Apache Kafka puede realizarse de forma síncrona o asíncrona.

La confirmación síncrona significa que el consumidor espera a que el productor confirme la recepción del mensaje antes de continuar procesando el siguiente mensaje. La confirmación asíncrona significa que el consumidor no espera a que el productor confirme la recepción del mensaje.

En términos de confianza, la confirmación síncrona es más fiable que la confirmación asíncrona. Esto se debe a que la confirmación síncrona garantiza que el productor ha confirmado la recepción del mensaje antes de que el consumidor continúe procesando el siguiente mensaje.

En cambio, la confirmación asíncrona no garantiza que el productor haya confirmado la recepción del mensaje antes de que el consumidor continúe procesando el siguiente mensaje. Esto significa que existe la posibilidad de que el mensaje se pierda o se entregue más de una vez.

Por lo tanto, la confirmación síncrona es la opción más fiable si es necesario garantizar que los mensajes se entreguen exactamente una vez.

Confirmación síncrona

Pros:
    Más fiable
    Garantiza que el mensaje se ha entregado
Contras:
    Bastante menos eficiente
    Puede provocar bloqueos

Confirmación asíncrona

Pros:
    Más eficiente
    No provoca bloqueos
Contras:
    Menos fiable

Existe la posibilidad de que el mensaje se pierda o se entregue más de una vez

En última instancia, la decisión de utilizar la confirmación síncrona o asíncrona depende de los requisitos específicos de la aplicación. Si es necesario garantizar que los mensajes se entreguen exactamente una vez, la confirmación síncrona es la opción más fiable. Sin embargo, si la eficiencia es un factor importante, la confirmación asíncrona puede ser una mejor opción.

Si te has fijado, para asegurar exactly-once, una de las cosas que hay que hacer es que tanto productor como consumidor tengan el mismo group-id, pero que pasaría si el productor estuviera en otro grupo distinto al del consumidor?

podría ocurrir lo siguiente:

El productor podría enviar el mismo mensaje varias veces, lo que podría dar lugar a la entrega de mensajes más de una vez.
El consumidor podría perder mensajes, ya que el productor no podría saber si el consumidor ya ha procesado un mensaje.
En concreto, si el productor envía un mensaje al topico my-topic, el consumidor no recibirá el mensaje de control que indica que el mensaje ha sido enviado correctamente. Esto significa que el consumidor no podrá realizar un seguimiento del estado del mensaje y podría procesarlo más de una vez.

Para evitar estos problemas, es importante que el productor y el consumidor tengan el mismo group.id. Esto garantiza que el productor y el consumidor puedan coordinarse para asegurarse de que los mensajes solo se envían una vez y que los consumidores no pierden mensajes.

Sin embargo, hay algunas situaciones en las que puede ser necesario que el productor y el consumidor tengan diferentes group.id. Por ejemplo, si desea que el productor y el consumidor procesen mensajes diferentes del mismo topico, o si desea que el productor y el consumidor procesen mensajes del mismo topico en diferentes momentos.

En estos casos, es importante que tomes medidas para evitar los problemas descritos anteriormente. Por ejemplo, puede utilizar un tópic de control para que el productor pueda informar al consumidor de que el mensaje ha sido enviado correctamente.

En resumen, es conveniente y necesario que el productor y el consumidor tengan el mismo group.id para conseguir la entrega de mensajes exactamente una vez. Sin embargo, hay algunas situaciones en las que puede ser necesario que el productor y el consumidor tengan diferentes group.id. En estos casos, es importante que tome medidas para evitar los problemas descritos anteriormente. En mi opinión, si quieres que el mismo productor pueda enviar distintos tipos de mensajes, asi como con los consumidores puedan recibir diferentes linajes de mensajes, y al mismo tiempo quieres procesamiento exactly-once, creo que estás rompiendo el principio KISS. Procurad que vuestro haga una cosa únicamente, y que la haga bien.

El concepto de idempotencia.

En la literatura vamos a encontrar que es necesario para conseguir exactly-once que el productor necesita que se introduzca en su payload una propiedad que indique que dicho mensaje sea idempotente.
Qué significa idempotente en este contexto?

En el contexto de Apache Kafka, un mensaje idempotente es un mensaje que se puede enviar varias veces sin que tenga ningún efecto adicional. En otras palabras, si se envía un mensaje idempotente dos veces, el resultado es el mismo que si se hubiera enviado una sola vez. Se puede ver así, imagina que le envío una carta a mi novia una fecha en concreto, pues la lectura de la misma por parte de mi novia va a tener el mismo efecto en ella cada vez que la lea. Espero! 🙂

Para que un mensaje sea idempotente en Apache Kafka, debe cumplir los siguientes requisitos:

  • El mensaje debe tener un identificador único. Este identificador se utiliza para identificar el mensaje y evitar que se entregue dos veces.
  • El mensaje debe ser atómico. Esto significa que el mensaje se debe entregar en su totalidad o no se debe entregar.

No es que el broker productor kafka genere un identificador, si no que asigna offsets únicos a los mensajes dentro de cada partición. Este enfoque de control de offsets es fundamental para la consistencia y la integridad de los datos en Kafka, permitiendo a los consumidores procesar los mensajes de manera confiable y evitar duplicados o pérdida de datos. Recordad que tanto productor como consumidor están asignados al mismo group-id, y si hay más de un par productor/consumidor, a la misma partición. Entonces el identificador de la partición también forma del identificador. 

La propiedad que se incluye en el payload del mensaje para indicar que es idempotente es la propiedad enable.idempotence. Esta propiedad debe establecerse en true para que el mensaje se considere idempotente.

Cuando el productor envía un mensaje idempotente, Kafka añade un identificador único al mensaje. Este identificador se utiliza para identificar el mensaje y evitar que se entregue dos veces.

Además, Kafka garantiza que el mensaje se entregue en su totalidad o no se entregue. Esto se consigue mediante la utilización de un algoritmo de consenso.

De esta manera, Kafka garantiza que los mensajes idempotentes se entregan exactamente una vez.

Aquí tienes un ejemplo de cómo crear un mensaje idempotente en Apache Kafka, primera aproximación:

import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {

    public static void main(String[] args) {

        // Crea el productor
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties());

        // Crea el mensaje
        String key = "my-key";
        String value = "my-value";
        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", key, value);

        // Establece la propiedad idempotente
        record.headers().add("enable.idempotence", "true");

        // Envía el mensaje
        producer.send(record);
    }

    private static Properties producerProperties() {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("enable.idempotence", "true");

        return properties;
    }
}

En este ejemplo, el mensaje tiene un identificador único, que es el valor de la clave del mensaje. Además, la propiedad enable.idempotence se establece en true para indicar que el mensaje es idempotente. En teoría, debería ser suficiente poner en el lado del productor esta configuración, Kafka usará el mecanismo anteriormente descrito basado en la gestión del offset dentro del grupo/partición para obtener un identificador único que identifique al mensaje cuando lo envíes, luego, en la práctica, es posible que necesitemos hacer como introducir en el payload un verdadero identificador único que sirva para identificar cada mensaje enviado de productor a consumidor.

Por ejemplo, si el mensaje contiene datos sensibles, puedes añadir un identificador único para identificar el mensaje en caso de que se pierda o se entregue más de una vez.

Si decides añadir tu propio identificador al mensaje, debes asegurarte de que el identificador sea único. Esto se puede hacer, por ejemplo, generando un UUID aleatorio o utilizando un número de secuencia incremental.

Además, debes asegurarte de que el identificador esté incluido en el payload del mensaje. Esto se debe a que Kafka utiliza el payload del mensaje para identificar el mensaje.

Aquí tienes un ejemplo de cómo añadir tu propio identificador al mensaje:

import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {

    public static void main(String[] args) {

        // Crea el productor
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties());

        // Crea el mensaje
        String key = "my-key";
        String value = "my-value";
        String myId = UUID.randomUUID().toString();
        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", key, value, myId);

        // Establece la propiedad idempotente
        record.headers().add("enable.idempotence", "true");

        // Envía el mensaje
        producer.send(record);
    }

    private static Properties producerProperties() {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("enable.idempotence", "true");

        return properties;
    }
}

En este ejemplo, el mensaje tiene un identificador único, que es el valor del UUID generado aleatoriamente. Además, la propiedad enable.idempotence se establece en true para indicar que el mensaje es idempotente.

Si es necesario garantizar que los mensajes se entreguen exactamente una vez, es recomendable enviar un identificador único. Esto se debe a que el identificador único se utiliza para identificar el mensaje y evitar que se entregue dos veces.

Si no es necesario garantizar que los mensajes se entreguen exactamente una vez, no es necesario enviar un identificador único. Sin embargo, es posible que quieras enviar un identificador único para fines de depuración o para identificar el mensaje en caso de que se pierda o se entregue más de una vez.

En el ejemplo que has proporcionado, el identificador único es el valor del UUID generado aleatoriamente. Este identificador es único y no es sensible. Por lo tanto, es una buena opción para enviar como identificador único.

Sin embargo, si el mensaje contiene datos sensibles, es posible que quieras utilizar un identificador único que no sea sensible. Por ejemplo, puedes utilizar un número de secuencia incremental que no esté almacenado en el propio mensaje.

En última instancia, la decisión de enviar o no un identificador único depende de los requisitos específicos de la aplicación.

Ahora, ese myId lo debo gestionar en el lado del consumidor, guardándolo en un sgbd y hacer commit en dicha base de datos, por lo que, podría pasar que antes de hacer commitSync con el productor Kafka, he hecho commit en la base de datos relacional o lo que sea que tengas que guarda transacciones atómicas. Ves el mismo problema que yo? hago commit en la base de datos, invoco a commitSync, operación que no tiene ningún valor de vuelta que indique que se ha hecho el commit, por lo que no se si efectivamente desde el lado del consumidor el commit en el lado del productor se ha hecho. Si no ocurre, debería capturar la excepción, deshacer el commit en la base de datos relacional del mensaje identificado e introducir el mensaje en un topic especializado para volver a procesar dicho mensaje. Dicho topic especializado lo consume un consumidor, otro distinto.

Es posible que el productor envíe el mensaje al consumidor antes de que el consumidor haya confirmado la recepción del mensaje. En este caso, el consumidor podría almacenar el identificador único del mensaje en la base de datos y hacer el commit de la transacción. Sin embargo, si el productor no confirma la recepción del mensaje, el consumidor tendrá que deshacer el commit de la transacción en la base de datos.

Para evitar este problema, puedes utilizar el método poll() del consumidor para obtener el siguiente mensaje antes de confirmar la recepción del mensaje anterior. De esta manera, el consumidor estará seguro de que el productor ha enviado el mensaje antes de confirmar su recepción.

Si el consumidor no puede obtener el siguiente mensaje antes de confirmar la recepción del mensaje anterior, puede utilizar el método seek() para posicionarse en el mensaje anterior. De esta manera, el consumidor podrá confirmar la recepción del mensaje anterior y luego obtener el siguiente mensaje.

Si el productor no confirma la recepción del mensaje, el consumidor debe capturar la excepción y deshacer el commit de la transacción en la base de datos. También puede introducir el mensaje en un topic especializado para volver a procesarlo.

El topic especializado lo consume un consumidor, otro distinto, no necesariamente. El consumidor que consume el topic especializado puede ser el mismo consumidor que consumió el mensaje originalmente. Sin embargo, es posible que quieras utilizar un consumidor distinto para evitar que se duplique el trabajo.

Aquí tienes un ejemplo de cómo gestionar el identificador único en el lado del consumidor:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {

    public static void main(String[] args) {

        // Crea el consumidor
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties());

        // Suscribe el consumidor al topico
        consumer.subscribe(Collections.singletonList("my-topic"));

        while (true) {

            // Obtiene el siguiente mensaje
            ConsumerRecord<String, String> record = consumer.poll(100);

            // Procesa el mensaje
            // ...

            // Confirma la recepcion del mensaje
            try {
                consumer.commitSync();
            } catch (Exception e) {
                // Deshace el commit en la base de datos
                // ...

                // Introduce el mensaje en un topic especializado
                // ...
            }
        }
    }

    private static Properties consumerProperties() {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "my-group");
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "earliest");

        return properties;
    }
}

En este ejemplo, el consumidor utiliza el método poll() para obtener el siguiente mensaje antes de confirmar la recepción del mensaje anterior. Si el consumidor no puede obtener el siguiente mensaje antes de confirmar la recepción del mensaje anterior, utiliza el método seek() para posicionarse en el mensaje anterior.

También utiliza el método try-catch para capturar la excepción que se produce si el productor no confirma la recepción del mensaje. En este caso, el consumidor deshace el commit en la base de datos e introduce el mensaje en un topic especializado.

Ahora, Qué pasa si al tratar de hacer push en el topic para los retry también falla? debería guardarlo en un sistema de ficheros con una entrada en un fichero de texto? pues, por qué no? obviamente estoy hablando de casos muy especiales, pero que en un sistema en producción puede ocurrir y en mi opinión, se debe pensar en todos los casos especiales donde pueda fallar. Hay que pensar en los fallos catastróficos que puedan ocurrir en el cluster kafka, en el sistema de ficheros que pueda ocurrir en la máquina anfitrión que sostenga el sistema completo o parte del sistema. Mientras más robusto, mejor.

Hay varias maneras de guardar los mensajes en un sistema de archivos. Una manera sencilla es utilizar el método write() de la clase FileWriter. Por ejemplo:

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

public class Consumer {

    public static void main(String[] args) {

        // ...

        // Obtiene el siguiente mensaje
        ConsumerRecord<String, String> record = consumer.poll(100);

        // Procesa el mensaje
        // ...

        // Confirma la recepcion del mensaje
        try {
            consumer.commitSync();
        } catch (Exception e) {
            // Deshace el commit en la base de datos
            // ...

            // Introduce el mensaje en un topic especializado
            // ...

            // Guarda el mensaje en un sistema de archivos
            try {
                File file = new File("/tmp/retry.txt");
                FileWriter writer = new FileWriter(file, true);
                writer.write(record.value() + "\n");
                writer.close();
            } catch (IOException ex) {
                // ...
            }
        }
    }
}

Este código escribe el valor del mensaje en un archivo de texto llamado /tmp/retry.txt. El archivo se abre en modo append para que se puedan añadir nuevas entradas al archivo sin sobrescribir las entradas existentes.

Otra manera de guardar los mensajes en un sistema de archivos es utilizar una base de datos NoSQL. Por ejemplo, puedes utilizar MongoDB para guardar los mensajes en una colección.

La elección de la manera de guardar los mensajes depende de tus necesidades específicas. Si necesitas una solución sencilla, puedes utilizar el método write() de la clase FileWriter. Si necesitas una solución más compleja, puedes utilizar una base de datos NoSQL.

Ahora, finalmente, en el caso de que muchas cosas vayan mal a la hora de consumir el mensaje idempotente, como puede ser que el cluster Kafka esté sobrecargado en disco, la máquina física que haga de anfitrión del sistema on premise o de parte del cluster está rota, caída, quemada, destruida, etc, tendremos que procurar robustecer el código para que en algún lado aparezca el mensaje idempotente que se ha tratado de consumir.

Por cierto, este código es de la versión 3.6.0, pero estas técnicas son aplicables a cualquier versión presente y futura.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {

    public static void main(String[] args) {

    // Declara las variables e, e2 y ex en el ámbito del bloque while
    Exception e;
    Exception e2;
    Exception ex;

    // Crea el consumidor
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties());

    // Suscribe el consumidor al topico
    consumer.subscribe(Collections.singletonList("my-topic"));

    while (true) {

        // Obtiene el siguiente mensaje
        ConsumerRecord<String, String> record = consumer.poll(100);

        // Procesa el mensaje
        // ...

        // Confirma la recepcion del mensaje
        try {
            consumer.commitSync();
        } catch (Exception e) {
            // Deshace el commit en la base de datos
            // ...

            // Introduce el mensaje en el topic especializado
            try {
                // Crea el productor para el topic-retry
                KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties());

                // Envía el mensaje al topic-retry
                producer.send(new ProducerRecord<>("retry-topic", record.key(), record.value()));

                // Cierra el productor
                producer.close();
            } catch (Exception e2) {
                // Guarda el mensaje en un sistema de archivos
                try {
                    File file = new File("/tmp/retry.txt");
                    FileWriter writer = new FileWriter(file, true);
                    writer.write(record.value() + "\n");
                    writer.close();
                } catch (IOException ex) {
                    // ...
                }
            }
        }

        // Si no se ha podido confirmar la recepción del mensaje, se posiciona en el mensaje anterior
        if (e != null || e2 != null || ex != null) {
            consumer.seek(record.offset() - 1);
        }
    }
    }

    private static Properties consumerProperties() {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "my-group");
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "earliest");

        return properties;
    }

    private static Properties producerProperties() {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");

        return properties;
    }
}

Esta modificación es la más completa, ya que permite tener en cuenta cualquier excepción que se produzca durante el procesamiento del mensaje.

Para otro momento, trataré de explicar como conseguir exactly-once ordenado. Esta publicación ya ha quedado muy larga.

Creo que por ahora es suficiente, me despido de todos ustedes, feliz entrada de año 2023.

Un comentario en “Acerca de la entrega y procesamiento de mensajes exactly-once con Apache Kafka

Deja un comentario