El problema

Tanto en Azure Service Bus como en Azure Event Hub admiten metadatos en sus eventos o mensajes. En este caso vamos a centrarnos en Azure Service Bus y en las colas con sesión.

Solución

A continuación os pongo el código para enviar un mensaje a una cola así como la IaC.

Creamos el Sercice Bus:

az servicebus namespace create --resource-group myresourcegroup 
--name mynamespace --location westus --sku Standard

Creamos la cola con sesión:

az servicebus queue create --resource-group myresourcegroup 
--namespace-name mynamespace --name myqueue --enable-session true

Creamos nuestro docker-compose.yml:

az servicebus queue create --version: '3.4'

services:
 
  pub-microservice:
    image: ${DOCKER_REGISTRY-}publisher
    build:
      context: .
      dockerfile: PubSubDarpSample.Publisher/Dockerfile
    stdin_open: true
    tty: true

  pub-microservice-dapr:
    image: "daprio/daprd:1.7.0"
    network_mode: "service:pub-microservice"
    depends_on:
      - pub-microservice

Creamos nuestro docker-compose.overide.yml:

version: '3.4'

services:

  pub-microservice:
   environment:
      - ASPNETCORE_ENVIRONMENT=Development
   ports:
      - "60001:50001"

  pub-microservice-dapr:
    command: ["./daprd",
      "-app-id", "publisher",
      "-app-port", "80",
      "-components-path", "/components"      
      ]
    volumes:
      - "./dapr/components/:/components"

El componente de dapr:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: azurequeueoutputorder
spec:
  type: bindings.azure.servicebusqueues
  version: v1
  metadata:
  - name: connectionString # Required when not using Azure Authentication.
    value: "Endpoint=sb://[namespace].servicebus.windows.net/;SharedAccessKeyName=[policy];SharedAccessKey=[key]"
  - name: queueName
    value: myqueue

Program.cs:

using PubSubDarpSample.Publisher;

var pubService = new MessageService();

await pubService.SendEventMessages(2, "azurequeueoutputorder");

MessageService.cs:

using System.Text.Json;
using Dapr.Client;
using PubSubDaprSample.Contracts;

namespace PubSubDarpSample.Publisher;

public class MessageService
{
    private readonly DaprClient _client;

    public MessageService()
    {
        var options = new JsonSerializerOptions
        {
            PropertyNameCaseInsensitive = false
        };
        _client = new DaprClientBuilder().UseJsonSerializationOptions(options).Build();
    }

    public async Task SendEventMessages(int numberOfActions, string outputbinding)
    {
        var objects = new List<SomeObject>();

        for (var i = 0; i < numberOfActions; i++)
        {
            var obj = createSomeObject();
            objects.Add(obj);
        }

        await PublishMessages(objects, outputbinding);
    }

    public async Task SendEventMessagesOrder(int numberOfActions, string outputbinding)
    {
        var objects = new List<SomeObject>();

        for (var i = 0; i < numberOfActions; i++)
        {
            var obj = createSomeObject();
            objects.Add(obj);
        }

        await PublishMessages(objects, outputbinding);
    }

    private SomeObject createSomeObject()
    {
        var guid = Guid.NewGuid().ToString();
        return new SomeObject
        {
            PropA = "Test A: " + guid,
            PropB = "Test B: " + guid
        };
    }

    private async Task PublishMessages(List<SomeObject> menssages, string outputbinding)
    {
        var id = new Guid().ToString();
        var metadataPubSub = new Dictionary<string, string>();
        metadataPubSub.Add("PartitionKey", id);
        metadataPubSub.Add("SubcriptionId", id);
        foreach (var menssage in menssages)
            await _client.InvokeBindingAsync(outputbinding+ "2", "create", menssage, metadataPubSub);

    }
}

En este punto lo más importante es tener en cuenta como se ponen los metadatos con el objeto metadataPubsub, el de partición sobra debido a que nuestra cola no lo admite, pero no da error si lo ponemos, lo he dejado a modo de ejemplo.

SomeObject.cs:

namespace PubSubDaprSample.Contracts;

public class SomeObject
{
    public string PropA { get; set; }
    public string PropB { get; set; }
}
Un problema conocido con Input Bindig:

En este enlace podreis ver un Issue abierto en GitHub con respecto al consumo de colas con sesión. Espero que o bien tenga solución o sea una feature que se implemente en un futuro cercano… miestras tanto seguir este enlace para leer su evolución.

modifica el siguiente ejemplo:

Si modificas el siguiente ejemplo, podrás tener 100% funcional todo lo que he explicado con anterioridad: jmfloreszazo/AzStorageQueueBindingDapr