En arquitecturas distribuidas con Service Bus (como Azure Service Bus, RabbitMQ o NServiceBus), es común recibir mensajes que provocan cambios en nuestro modelo de dominio. Pero… ¿qué ocurre cuando ese modelo ya fue modificado por otro proceso?

Este artículo te guía sobre cómo actuar ante errores de concurrencia al reprocesar mensajes, especialmente cuando sigues prácticas de Domain-Driven Design (DDD) y usas concurrencia optimista con una base de datos relacional.

El problema

Supón que llega un mensaje PedidoConfirmado desde tu bus de mensajes, y quieres cambiar el estado de ese Pedido. Pero al guardar, obtienes un error de concurrencia porque otro proceso ya lo modificó.

Este error no es de lógica, ni de negocio, es de concurrencia.

Diagrama del problema

¿Qué puedes hacer?

Hay 3 estrategias habituales:

1. Reprocesar el mensaje automáticamente

Si falla por concurrencia, lo vuelves a intentar desde cero hasta que pase.

public async Task ProcesarMensajeAsync(Guid pedidoId)
{
    int reintentos = 0;
    const int maxReintentos = 5;

    while (reintentos < maxReintentos)
    {
        try
        {
            using var db = new AppDbContext();
            var pedido = await db.Pedidos.FindAsync(pedidoId);

            if (pedido == null) return;

            pedido.Confirmar(); // lógica del dominio
            await db.SaveChangesAsync();

            Console.WriteLine("Procesado correctamente.");
            return;
        }
        catch (DbUpdateConcurrencyException)
        {
            reintentos++;
            Console.WriteLine($"Conflicto de concurrencia. Reintento #{reintentos}...");
            await Task.Delay(200);
        }
    }

    Console.WriteLine("No se pudo procesar tras varios intentos.");
    // Aquí podrías usar un fallback: enviar a dead-letter queue, notificar, etc.
}

2. Fallback completo

Si tras varios intentos no se puede procesar, se aplica una lógica alternativa (fallback), como mover el mensaje a una dead-letter queue.

public async Task ProcesarMensajeAsync(Guid pedidoId)
{
    int reintentos = 0;
    const int maxReintentos = 5;

    while (reintentos < maxReintentos)
    {
        try
        {
            using var db = new AppDbContext();

            // Cargar entidad
            var pedido = await db.Pedidos.FindAsync(pedidoId);
            if (pedido == null)
            {
                Console.WriteLine("Pedido no encontrado.");
                return;
            }

            // Aplicar lógica de dominio
            pedido.Confirmar();

            // Guardar cambios
            await db.SaveChangesAsync();

            Console.WriteLine("Procesado correctamente.");
            return; // éxito → salir del bucle
        }
        catch (DbUpdateConcurrencyException)
        {
            reintentos++;
            Console.WriteLine($"Conflicto de concurrencia. Reintento #{reintentos}...");
            await Task.Delay(200); // backoff simple
        }
        catch (Exception ex)
        {
            // Otros errores: negocio, conexión, etc.
            Console.WriteLine($"Error inesperado: {ex.Message}");
            return;
        }
    }

    // Fallback tras agotar los reintentos
    Console.WriteLine("No se pudo procesar tras varios intentos. Ejecutando fallback...");

    await EjecutarFallbackAsync(pedidoId);
}

private async Task EjecutarFallbackAsync(Guid pedidoId)
{
    // Ejemplo: mover a dead-letter, loguear o notificar
    Console.WriteLine($"Pedido {pedidoId} movido a Dead Letter Queue o notificación de error enviada.");
    // Aquí puedes publicar un evento, enviar a una cola, registrar, etc.
}

3. Fallback parcial

En vez de reprocesar todo el mensaje, solo reintentas la operación crítica que falló (por ejemplo, el guardado). Esta suele ser más eficiente.

public async Task ProcesarMensajeAsync(Guid pedidoId)
{
    // Cargar entidad y aplicar lógica de dominio una vez
    using var db = new AppDbContext();
    var pedido = await db.Pedidos.FindAsync(pedidoId);

    if (pedido == null)
    {
        Console.WriteLine("Pedido no encontrado.");
        return;
    }

    pedido.Confirmar(); // ✅ Se asume que Confirmar() es idempotente o que sólo se ejecuta una vez

    // Reintentar sólo el SaveChanges (operación crítica)
    int intentos = 0;
    const int maxIntentos = 3;
    bool guardado = false;

    while (intentos < maxIntentos && !guardado)
    {
        try
        {
            await db.SaveChangesAsync();
            Console.WriteLine("Cambios guardados correctamente.");
            guardado = true;
        }
        catch (DbUpdateConcurrencyException)
        {
            intentos++;
            Console.WriteLine($"Conflicto de concurrencia al guardar. Reintento #{intentos}...");
            await Task.Delay(200);

            // Recargar estado actual y reintentar aplicar la parte crítica
            db.Entry(pedido).Reload(); // recarga entidad desde base de datos
            pedido.Confirmar(); // vuelve a aplicar lógica si es necesario
        }
    }

    if (!guardado)
    {
        Console.WriteLine("No se pudo guardar tras varios intentos. Ejecutando fallback parcial...");
        await EjecutarFallbackParcialAsync(pedidoId);
    }
}

private async Task EjecutarFallbackParcialAsync(Guid pedidoId)
{
    // Aquí puedes registrar el fallo, enviar a DLQ, emitir una alerta o evento compensatorio
    Console.WriteLine($"Fallback parcial activado para Pedido {pedidoId}.");
}

Comparativa de Estrategias de Reprocesamiento

Estrategia Nombre Qué se reintenta Cuándo se aplica Código asociado
E1 Reintento completo Todo: leer, aplicar lógica, guardar Cuando el mensaje es idempotente o se puede ejecutar varias veces sin efectos no deseados ProcesarMensajeAsync (reintento completo)
E2 Fallback completo Todo (como la 1), pero si falla tras X intentos → fallback Cuando no quieres perder el mensaje si el reintento no basta ProcesarMensajeAsync + EjecutarFallbackAsync
E3 Fallback parcial Solo la operación crítica (SaveChangesAsync) Cuando ya aplicaste lógica del dominio y solo necesitas persistirla ProcesarMensajeAsync + EjecutarFallbackParcialAsync

¿Qué estrategia aplicar según el contexto?

Escenario Estrategía recomendada Justificación
El mensaje es idempotente (puede reprocesarse sin efectos no deseados) Reintento completo (E1) Puedes repetir toda la lógica sin riesgo, incluso múltiples veces. Ideal para handlers simples.
La lógica de dominio es compleja o tiene efectos secundarios (envíos, llamadas externas, acumulaciones) Fallback parcial (E3) Mejor no repetir toda la lógica. Reintentas sólo el guardado para minimizar efectos colaterales.
Reintentos agotados tras varios intentos fallidos de concurrencia Fallback completo (E2) + DLQ / Log Es preferible fallar de forma controlada antes que atascar el sistema. Mueve el mensaje para revisión manual o alerta.
El mensaje depende de un estado anterior que ya cambió No reprocesar / Idempotencia + registro A veces es mejor ignorar el mensaje si ya se cumplió su objetivo por otro flujo. Registra el intento y no reproceses.
Existen dependencias externas que no deben duplicarse (pagos, integraciones, emails) Fallback parcial o lógica compensatoria Protege los efectos externos; evita reprocesar handlers enteros y enfócate en guardar el resultado local.

Alternativas adicionales que puede ofrecer Service Bus (o similares)

Es un problema tan conocido que muchas veces podrás optar por soluciones que nos proporciona el fabricante.

1. Message De-Duplication / Idempotency Tokens (Service Bus feature o diseño)

Evitar procesamiento duplicado en lugar de reintentarlo bien

Cómo ayuda:
Azure Service Bus permite configurar una window de duplicación (via MessageId) para evitar que se entreguen dos veces los mismos mensajes en un intervalo dado.

Ventajas:

  • Reduce reintentos innecesarios si el mensaje ya fue procesado.

  • Útil junto con una tabla de ProcessedMessages o IdempotencyKeys

if (await db.ProcessedMessages.AnyAsync(x => x.Id == message.Id)) return; // ya procesado

2. Session-based Message Handling (Azure Service Bus feature)

Serializar el procesamiento de mensajes relacionados entre sí

Cómo ayuda:

Azure Service Bus permite usar sessions, donde todos los mensajes con la misma SessionId se procesan en orden y uno por uno (lock exclusivo), evitando conflictos de concurrencia por diseño.

Útil para:

  • Mensajes de una misma entidad de agregado (ej. PedidoId).

  • Garantizar orden y exclusividad sin lock a nivel de base de datos.

3. Deferring messages (Service Bus feature)

Posponer el mensaje hasta que sea más seguro procesarlo

Cómo ayuda:

Puedes usar DeferMessage() para posponer la entrega del mensaje si detectas que el estado actual del sistema no es favorable (por ejemplo, está siendo modificado por otro proceso).

Esto requiere:

  • Mantener el SequenceNumber del mensaje para poder recuperarlo.

  • Polling o trigger posterior para volver a procesarlo.

4. Lock Renewal / Peek-Lock personalizado

Tener más tiempo para reintentar antes de liberar el mensaje al Bus

Cómo ayuda:

Si necesitas más tiempo para manejar conflictos de concurrencia, puedes renovar el peek-lock de un mensaje manualmente, evitando que se reentregue prematuramente.

Esto es avanzado pero útil en procesos largos.

5. Redirección a colas específicas por tipo de error

Clasificar errores y redirigir el mensaje según la causa

En lugar de una única Dead Letter Queue, puedes crear colas de:

  • concurrency-errors

  • validation-errors

  • business-conflicts

Y reenviar ahí los mensajes para su posterior análisis y reprocesamiento selectivo.

Nivel aplicación (más allá del bus)

6. Command Scheduling

Reprogramar el mensaje para ejecutarse más adelante si hay conflicto

En lugar de reprocesarlo en caliente, se puede re-agendar el comando o evento para dentro de 1-5 minutos. Es útil si el conflicto se debe a operaciones simultáneas temporales.

7. Event Sourcing con versionado fuerte

Evitar guardar snapshots intermedios y simplemente apilar eventos

Si estás usando Event Sourcing puro, puedes evitar muchos conflictos al guardar solo eventos inmutables, y manejar versiones a nivel de evento sin sobrescribir estado.

Conclusión

​Los errores por concurrencia no son excepciones de negocio ni fallos lógicos: son una manifestación natural de trabajar en sistemas distribuidos modernos. Intentar tratarlos como bugs o ignorarlos puede llevar a inconsistencias silenciosas o reprocesamientos innecesarios.

Por eso es clave entender que:

  • No todos los mensajes deben ser reprocesados automáticamente.

  • No toda lógica de dominio puede ejecutarse varias veces sin consecuencias.

  • Y no todas las estrategias valen por igual en todos los casos.

Una buena arquitectura no elimina los problemas de concurrencia, pero los vuelve previsibles y manejables.

Ya sea que elijas reintentar, aplicar un fallback, usar sesiones de Azure Service Bus, programar el reintento más adelante o registrar la idempotencia, lo esencial es que tu sistema:

  • Sea resiliente a carreras y modificaciones concurrentes.

  • Evite efectos colaterales no deseados.

  • Y tenga trazabilidad clara de lo que ocurrió y por qué.

La verdadera robustez no está en evitar fallos, sino en saber responder con elegancia cuando ocurren.