Introducción.

Qué necesitaría hacer si tuviera que crear una libreria que me asegurara operaciones idempotentes at most once en una arquitectura de microservicios que siga el patrón CQRS/ES?

Un ejemplo, pongamos que queremos una operación que se ejecute una sola vez, por ejemplo sacar dinero del banco. Que implicaría algo asi?

Un usuario autentificado, a través de un cajero, por ejemplo, hace una peticion PUT a un servidor web, a algo llamado ServiceController.

ServiceController será básicamente un servidor web que acepte peticiones REST, algunas idempotentes para tratar de escribir en el cluster de escritura y dejar preparada la lectura para las operaciones GET.

Necesitariamos que esa llamada idempotente se ejecutara también con auto retry. Hay veces que hay problemas de acceso al cluster web porque está saturado de peticiones y hay que esperar un tiempo razonable para volver a solicitar la ejecución. Lo ideal sería ejecutar asíncronamente dichas peticiones, fire and forget. Algo de código, sin usar la ejecución asíncrona mediante Futures, pero es algo trivial:

Llamadas idempotentes con auto retry

En la parte del controlador:
La idea es invocar a un método en un do while, de manera que retornaremos un string en caso que podamos hacer la llamada externa. En caso contrario, capturamos la excepcion, la guardamos en una lista y volvemos a ejecutar, mientras tengamos numero de intentos. En caso de que el numero de intentos sea superior al permitido, devolvemos las excepciones para arriba, a ser posible, sin duplicados.

public string Execute(string queryString)
{
    var remainingTries = maxRequestTries;  
    var exceptions = new List<Exception>();

    do 
    {
        --remainingTries;
        try 
        {
            return ExecuteSingle(queryString);
        }
        catch (Exception e) 
        {
            exceptions.Add(e);
        }
    }
    while (remainingTries > 0)

    var uniqueExceptions = exceptions.Distinct(new ExceptionEqualityComparer());

    if (uniqueExceptions.Count()) == 1)
        throw uniqueExceptions.First();

    return new AggregateException("Could not process request", uniqueExceptions);
}


private string ExecuteSingle(string queryString)
{
    try
    {
        var response = httpClient.GetAsync(queryString).Result;
        if (response.IsSuccessStatusCode)
        {
            return response.Content.ReadAsStringAsync().Result;
        }
        if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
        {
            throw new InvalidKeyException("No valid API key provided.");
        }
        if (response.StatusCode >= System.Net.HttpStatusCode.InternalServerError)
        {
            throw new Exception("There is a problem with the ButterCMS service");
        }
    }
    catch (TaskCanceledException taskException)
    {
        if (!taskException.CancellationToken.IsCancellationReques‌​ted)
        {
            throw new Exception("Timeout expired trying to reach the ButterCMS service.");
        }
        throw taskException;
    }
    catch (HttpRequestException httpException)
    {
        throw httpException;
    }
    catch (Exception ex)
    {
        throw ex;
    }
    return string.Empty;
}

// Esto es código c#, habría que usar el equivalente java.
private class ExceptionEqualityComparer : IEqualityComparer<Exception>
{
    public bool Equals(Exception e1, Exception e2)
    {
        if (e2 == null && e1 == null)
            return true;
        else if (e1 == null | e2 == null)
            return false;
        else if (e1.GetType().Name.Equals(e2.GetType().Name) && e1.Message.Equals(e2.Message))
            return true;
        else
            return false;
    }

    public int GetHashCode(Exception e)
    {
        return (e.GetType().Name + e.Message).GetHashCode();
    }
}

Este código muestra el concepto de llamada asíncrona fire and forget que tendría una invocacion PUT que lanzaría la ejecución.

Una invocación real, teniendo en cuenta que queremos invocar a un microservicio complejo, uno que tiene que llamar a varios
microservicios coordinados y en orden, en el que unos tendrán que escribir en una base de datos, es decir,
escribir en el cluster de escritura usando el patrón CQRS/ES, otros tendrán que leer para proporcionar un dato necesario para el
siguiente, tenemos que tener en cuenta necesitamos un

ServiceController tendrá un ServiceCoordinator que contendrá uno o varios ServiceXXX_Handler para gestionar toda la operativa
distribuida entre el cluster de escritura y el de lectura. ServiceCoordinator sería una implementación Saga para gestionar
la invocacion asíncrona ordenada de cada uno de los handlers que contienen la lógica para comunicarme con el cluster de
lectura y de escritura siguiendo CQRS/ES y será capaz de hacer rollback distribuido hacia atrás en caso de tener que hacerlo, aunque existe otra manera de hacerlo, por ejemplo, el contenedor de eventos puede tener la lógica para indicar al cluster de escritura que haga rollback, luego tendría que lanzar los eventos desde el manejado de eventos de la parte de la escritura para que así la actualización en la parte de lecturas se generara de manera natural.

Usaré motores de eventos para transmitir el mensaje que indiquen una operación y estado u otra.


Cada uno de estos Service_Handlers, controlan la invocacion asíncrona para acceder a la base de datos de escritura
junto con la comunicacion asíncrona mediante mensajeria que comunica el cluster junto al handler.

ServiceXXX_Handler será un manejador que controlará la evolucion de los estados que habrá para que ese servicio permita una
operacion idempotente at most once, es decir, quiero que si se vuelve a ejecutar una operación identica,
el resultado de esa operación sea siempre el mismo, y querré que dicha operacion sea ejecutada como mucho una vez, es decir,
la actualizacion del estado tanto en la parte de las escrituras como en la parte de las escrituras se haga una sola vez.

Creo que algo así podría conseguir transacciones ACID en un entorno distribuido de cada servicio. Lento, pero se podría conseguir, aunque hay que demostrarlo.

Es posible que si la ejecución no es rápida, pudiéramos obtener lecturas sucias.


Necesito que la ejecución de cada parte sea lo más rápida posible.

ServiceXXX_Handler recogerá inicialmente el identificador del usuario, y devuelve la respuesta REST del servidor con el
identificador del recurso creado.

Con esos dos valores, guardaremos el identificador REST con un estado, algo así como inicializado-recurso-rest.

Hay que contar que toda esta operativa implica invocar a un servidor web, este servidor web tendrá un ServiceXXXImpl,
con capacidad para interactuar con el productor y el consumidor del motor de eventos.
ServiceXXXImpl podrá invocar al gateway de comandos (ServiceXXX_CommandGW_Producer), pushear el mensaje, recibir el
mensaje (ServiceXXX_CommandGW_Consumer), como el servicio de escritura de comandos está subscrito a ese topic,
podrá recibir ese mensaje, deserializarlo, e invocar al repositorio capaz de interactuar con el cluster de escritura,
recibir una respuesta de la base de datos, que puede ser que todo ha ido bien, que no ha ido bien, que ha habido una
excepcion.

En este punto, si algo ha ido mal, el servicio de escritura de comandos tiene que volver a invocar al método un número
máximo de veces.

Con suerte, en alguna de esas invocaciones se habrá hecho la insercion.

Si después de haber intentado N veces escribir en la BD fracasamos, capturamos la excepción, escribiendo en el log.

Luego tenemos que invocar a un gateway de mensajeria desde el servicio de escritura de comandos para indicar al
ServiceXXX_Handler el resultado de la escritura, éste actualizará el estado de esta peticion web.

Esto describe la parte de escribir en el cluster de escritura, actualizando la máquina de estados adecuadamente.

ServiceXXX_Gateway
–>ServiceXXX_CommandGW_Producer. Para hablar con ServiceXXX_CommandHandler. le indica el mensaje que tiene que escribir en el cluster de escrituras.
–>ServiceXXX_CommandGW_Consumer. Parecido al anterior solo que al revés.

Ahora, una vez terminada la parte de la escritura, ServiceXXX_Handler emitiré un evento (EventStore), para el cluster de
lectura indicando que la escritura es exitosa y tengo que actualizar el cluster de lectura y en consecuencia, actualizar
las proyecciones para hacer las lecturas. Es decir, usando un ReadGateway, haremos push mediante un ReadGWProducer
con el evento de la escritura, algo que indique “usuario userID ha sacado tanto dinero de la cuenta XXX con fecha YYY. Saldo actual: ZZZ”.

Actualizaremos el estado adecuadamente en el ServiceXXX_Handler, queremos que ServiceXXX_Handler
mantenga un estado mediante una máquina de estados finita. A continuación, en cuanto dicho mensaje sea introducido en
el topic, un consumidor ReadGWConsumer recibirá dicho mensaje. Este consumidor será parte de la lógica de una
proyeccion de lectura, podremos tener tantas proyeccciones de lectura como necesitemos.

Al recibir ese mensaje, un Servicio ReadHandler, invocará a algún servicio adecuado que trabajará con ese mensaje para
actualizar la base de datos de lecturas. Una vez que todo estos pasos están realizados, y solo en este punto, enviaremos
un mensaje al ServiceXXX_Handler para que actualice la maquina de estados e indique la operacion PUT creada inicialmente
que tiene actualizar el estado adecuadamente.

De esta manera, si llegara una nueva petición, exactamente igual que la anterior, del mismo usuario, con el mismo mensaje
a ejecutar, directamente el CommandHandler podría decidir no hacer nada o hacer algo en funcion de si el CommandGW
ha podido enviar el mensaje al topic mediante CommandGWProducer, si el CommandGWConsumer ha recibido el mensaje para
escribir en el cluster de escritura…

A lo mejor, no es necesario tanto envío de mensajes entre el manejador del servicio, el manejador de escrituras,
el almacen de eventos y el manejador de lecturas. Todo dependerá de las necesidades del proyecto.

donde están los puntos débiles, qué puede fallar?

La comunicación con los distintos subsistemas como las colas de mensajería, la interacción con la base de datos, 
los fallos de red que introduzcan pausas de tiempo indeterminado, la falta de espacio en disco, el fallo en 
los discos duros en los distintos subsistemas, puede conllevar a un montón de errores lógicos que habrá que 
tenerlos en cuenta creando tests unitarios duros de verdad. 

cómo evitar que falle?

tests, tests, codificación que tenga en cuenta todo tipo de excepciones, para tratar de reintentar cuando se pueda. 
Se podrá hacer reintento si por ejemplo hay problemas de estabilidad de red, por ejemplo, que esté muy saturada la red. 
Para enviar los mensajes asíncronos a través del topic, considerar deshabilitar el autocommit, capturar la excepción 
al pushear y enviar el mismo mensaje a una cola Dead Letter Queue (DLQ). 

No todos los sistemas de mensajería soportan este concepto, ZeroMQ puede que no lo soporte. Investigarlo. 
Esta misma técnica debería servir para los casos en que el hardware dedicado al intercambio de mensajes falle, 
pero hay que tener en cuenta que debería estar en un nodo distinto al de la comunicacion y debe estar bien provisionado. 

Al mismo tiempo, y por redundancia, escribir estos eventos de escritura fallidos en un log distribuido. 
Si el sistema DLQ basado en topics está indispuesto, uno alternativo basado en procesar ese fichero de log debería 
tomar el control. 

Estos dos sistemas DLQ deberían estar gestionados por algo basado en Apache Camel, un framework de integración de tareas,
algo que gestione el estado de salud del DLQ primario, por ejemplo preguntado al cluster si dicho topic está operativo 
sin problemas. 

Si encuentra problems, tratará de arreglarlo (por definir) mientras lanza el proceso secundario que gestione el 
procesamiento de log con los eventos fallidos. 
Robustez, hay que procurar que sea lo más robusto posible. A la vez que escalable y tenga siempre alta disponibilidad. 

Cuáles son los componentes?

De abajo a arriba, tenemos un Controller, ServiceXXXCommandHandler, ServiceXXXQueryHandler, EventStore, ServiceXXXHandler, 
ServiceXXX_GWHandler, ServiceXXXDeadLetterQueue.

ServiceXXXCommandHandler.

Es el encargado de gestionar la comunicacion asíncrona con ServiceXXXHandler y de gestionar el acceso a datos para las escrituras en 
un cluster de datos.

Para la comunicación asíncrona, quiero usar una cola de mensajes ultrarápida (zeromq?), tendré para ello un 
productor de mensajes para enviar mensajes/eventos con el resultado de la interaccion con la base de datos con ServiceXXXHandler. 
Tendré tambien un consumidor para recibir los mensajes de ServiceXXXHandler.

Para dicha comunicacion asíncrona orientada a eventos, tendré un ServiceXXX_GWCommandHandler que contendrá a un 
productor y a un consumidor de mensajes. 
El productor de mensajes estará conectado al consumidor del ServiceXXX_GWHandler, así como el consumidor de 
mensajes estará conectado al productor del ServiceXXX_GWHandler mediante sus respectivos topics.

Para gestionar la interaccion con la base de datos de escrituras, tendré un ServiceXXX_DataCommand, que hará 
las operaciones de inserción, borrado y actualizacion como métodos públicos. 
Necesitaré también métodos privados para buscar. No estarán expuestos. 

EventStore

Es el encargado de guardar los eventos agregados de solo lectura producidos mediante los comandos guardados 
anteriormente. 

ServiceXXXHandler podrá escribir en él una vez que haya recibido la confirmación por parte de ServiceXXXCommandHandler 
de que ha podido consolidar el dato.

Es de solo lectura para ServiceXXXQueryHandler, es su fuente de la verdad. 
Tiene que ser durable, una base de datos orientada a eventos. EventStore?, kafka? rabbitMQ? 

ServiceXXXQueryHandler

Es el encargado de crear las proyecciones o lecturas sobre los datos previamente guardados en el cluster de escritura 
y a su vez guardados como eventos en el EventStore.

ServiceXXXHandler

Es el encargado de gestionar la máquina de estados finita que gestionará el estado de la comunicación asíncrona con 
ServiceXXXCommandHandler, ServiceXXXQueryHandler y EventStore. 
Según vaya invocando a uno u otro, el estado evolucionará. 
Dicho estado debe guardar el identificador de cada operación, ya sea de escritura o de lectura. 
Dicho identificador debe ser suministrado por una capa superior a este Handler.

Como implemento esta máquina de estados finitos?

En principio, necesito que mantener la informacion dada por el identificador de la operacion idempotente sea la 
clave de un hashmap, donde el valor sea el estado actual de la máquina de estados. 
Tan simple como eso, aunque, si por ejemplo hubiera que guardar el estado de las peticiones long running en disco... 

Posibles implementaciones de la máquina de estado:

https://www.adictosaltrabajo.com/2011/08/12/state-machine/

https://projects.spring.io/spring-statemachine/

ServiceXXXDeadLetterQueue

Encargado de gestionar el concepto dead letter queue, tanto en ServiceXXX_GWCommandHandler como en ServiceXXX_GWHandler. 
Si no es capaz de escribir exitosamente en su topic respectivo, capturaremos su excepcion, escribiremos en un log y 
pushearemos el mensaje en otro topic para que un consumidor dlq vuelva a reintroducir el mensaje para reprocesamiento. 
Cuando se procesa exitosamente, se hace commit en el topic.

Este proyecto podría valer si fuera con kafka:

https://github.com/alonsoir/kafka-deadletter

O éste otro si fuera con Rabbit:

https://github.com/julian-eggers/spring-rabbitmq-dead-letter-queue-example

Aunque la comunicacion entre servicios sería ideal que fuese lo más rápida posible. Algo como 0mq.

https://zguide.zeromq.org

https://raw.githubusercontent.com/imatix/zguide/master/images/fig25.png

Hay una implementación oficial java, jeromq:

https://github.com/zeromq/jeromq 

Y otra basada en wrappers JNI, jzmq:

https://github.com/zeromq/jzmq

Y otra más, jczmq:

https://zeromq.org/languages/java/#jczmq

Por qué hay tres???

Es buena idea usar 0mq para intercomunicacion entre servicios?

0mq parece más una librería para construir un nuevo rabbitmq, pero el caso es que si yo quiero sólo tener pub/sub, podría valer. 
El caso es que no tiene concepto como tal sobre DLQ, es algo que tendrías que tratar de hacer por tu cuenta, aunque, si se gana 
tanto en velocidad y rendimiento, puede merecer la pena. La pregunta que habría que hacerse es, realmente necesitas que tus 
mensajes viajen super rápido? tienes que enviar millones de mensajes por segundo? realmente tienes que hacerlo?. 
Todo es cuestión de responder con sinceridad a estas preguntas.

Algo más???

Necesitaré también un ServiceXXX_GWHandler que contendrá un productor y un consumidor ultrarápidos para comunicarme con 
ServiceXXXCommandHandler. 

El productor mandará mensajes que escuchará el consumidor de ServiceXXX_GWHandler que pertenece a ServiceXXXCommandHandler.
El consumidor recibirá mensajes del productor de ServiceXXX_GWHandler que pertenece por igual a ServiceXXXCommandHandler.

Necesitaré también un ServiceXXXEventStore para escribir el agregado resultado de interactuar con el cluster de escrituras. 
Tiene que escribir en dicho EventStore, nada más.

Como voy a usar el identificador del recurso creado por el servidor web al hacer PUT, si ese identificador lo tendré de vuelta al ejecutar por primera vez? o sea, no lo tendré para cuando toque actualizar la maquina de estados!! 

Se me escapa algo?

Links

<pre class="wp-block-syntaxhighlighter-code">https://unpocodejava.com/2013/02/25/un-poco-de-zeromq/

https://stackoverflow.com/questions/33159703/single-or-multiple-topic-stream-per-aggregate-root-event-in-kafka?noredirect=1&lq=1

https://stackoverflow.com/questions/23453286/how-to-model-bank-transfer-in-cqrs

<a class="m-story" href="https://medium.com/memobank/choosing-an-architecture-85750e1e5a03" target="_blank" data-width="720" data-border="1" data-collapsed="">View at Medium.com</a>

<a class="m-story" href="https://medium.com/airbnb-engineering/avoiding-double-payments-in-a-distributed-payments-system-2981f6b070bb" target="_blank" data-width="720" data-border="1" data-collapsed="">View at Medium.com</a>

https://github.com/iluwatar/java-design-patterns

https://www.google.com/search?client=safari&rls=en&q=cqrs/es+banking&ie=UTF-8&oe=UTF-8

<blockquote class="wp-embedded-content" data-secret="AYRZDO3bPE"><a href="https://sylvainleroy.com/2020/09/24/answer-to-the-article-cqrs-is-an-anti-pattern-for-ddd/">Answer to the article CQRS Is an Anti-Pattern for DDD</a></blockquote><iframe class="wp-embedded-content" sandbox="allow-scripts" security="restricted" style="position: absolute; clip: rect(1px, 1px, 1px, 1px);" title="&#8220;Answer to the article CQRS Is an Anti-Pattern for DDD&#8221; &#8212; Cloud, Code Quality and Software Modernization Blog" src="https://sylvainleroy.com/2020/09/24/answer-to-the-article-cqrs-is-an-anti-pattern-for-ddd/embed/#?secret=AYRZDO3bPE" data-secret="AYRZDO3bPE" width="600" height="338" frameborder="0" marginwidth="0" marginheight="0" scrolling="no"></iframe>

http://awesome-scalability.com

https://en.wikipedia.org/wiki/Dead_letter_queue

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

https://hackernoon.com/idempotency-apis-and-retries-34b161f64cb4

https://stackoverflow.com/questions/4837794/iequalitycomparer-interface-in-java

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/</pre>

Trabajo en progreso.

2 thoughts on “Sobre componentes de una arquitectura de microservicios siguiendo el patrón CQRS/ES.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s