El problema
Tanto en Azure Service Bus como en Azure Event Hub admiten metadatos en sus eventos o mensajes. En este caso vamos a centrarnos en Azure Service Bus y en las colas con sesión.
Solución
A continuación os pongo el código para enviar un mensaje a una cola así como la IaC.
Creamos el Sercice Bus:
az servicebus namespace create --resource-group myresourcegroup
--name mynamespace --location westus --sku Standard
Creamos la cola con sesión:
az servicebus queue create --resource-group myresourcegroup
--namespace-name mynamespace --name myqueue --enable-session true
Creamos nuestro docker-compose.yml:
az servicebus queue create --version: '3.4'
services:
pub-microservice:
image: ${DOCKER_REGISTRY-}publisher
build:
context: .
dockerfile: PubSubDarpSample.Publisher/Dockerfile
stdin_open: true
tty: true
pub-microservice-dapr:
image: "daprio/daprd:1.7.0"
network_mode: "service:pub-microservice"
depends_on:
- pub-microservice
Creamos nuestro docker-compose.overide.yml:
version: '3.4'
services:
pub-microservice:
environment:
- ASPNETCORE_ENVIRONMENT=Development
ports:
- "60001:50001"
pub-microservice-dapr:
command: ["./daprd",
"-app-id", "publisher",
"-app-port", "80",
"-components-path", "/components"
]
volumes:
- "./dapr/components/:/components"
El componente de dapr:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azurequeueoutputorder
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString # Required when not using Azure Authentication.
value: "Endpoint=sb://[namespace].servicebus.windows.net/;SharedAccessKeyName=[policy];SharedAccessKey=[key]"
- name: queueName
value: myqueue
Program.cs:
using PubSubDarpSample.Publisher;
var pubService = new MessageService();
await pubService.SendEventMessages(2, "azurequeueoutputorder");
MessageService.cs:
using System.Text.Json;
using Dapr.Client;
using PubSubDaprSample.Contracts;
namespace PubSubDarpSample.Publisher;
public class MessageService
{
private readonly DaprClient _client;
public MessageService()
{
var options = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = false
};
_client = new DaprClientBuilder().UseJsonSerializationOptions(options).Build();
}
public async Task SendEventMessages(int numberOfActions, string outputbinding)
{
var objects = new List<SomeObject>();
for (var i = 0; i < numberOfActions; i++)
{
var obj = createSomeObject();
objects.Add(obj);
}
await PublishMessages(objects, outputbinding);
}
public async Task SendEventMessagesOrder(int numberOfActions, string outputbinding)
{
var objects = new List<SomeObject>();
for (var i = 0; i < numberOfActions; i++)
{
var obj = createSomeObject();
objects.Add(obj);
}
await PublishMessages(objects, outputbinding);
}
private SomeObject createSomeObject()
{
var guid = Guid.NewGuid().ToString();
return new SomeObject
{
PropA = "Test A: " + guid,
PropB = "Test B: " + guid
};
}
private async Task PublishMessages(List<SomeObject> menssages, string outputbinding)
{
var id = new Guid().ToString();
var metadataPubSub = new Dictionary<string, string>();
metadataPubSub.Add("PartitionKey", id);
metadataPubSub.Add("SubcriptionId", id);
foreach (var menssage in menssages)
await _client.InvokeBindingAsync(outputbinding+ "2", "create", menssage, metadataPubSub);
}
}
En este punto lo más importante es tener en cuenta como se ponen los metadatos con el objeto metadataPubsub, el de partición sobra debido a que nuestra cola no lo admite, pero no da error si lo ponemos, lo he dejado a modo de ejemplo.
SomeObject.cs:
namespace PubSubDaprSample.Contracts;
public class SomeObject
{
public string PropA { get; set; }
public string PropB { get; set; }
}
Un problema conocido con Input Bindig:
En este enlace podreis ver un Issue abierto en GitHub con respecto al consumo de colas con sesión. Espero que o bien tenga solución o sea una feature que se implemente en un futuro cercano… miestras tanto seguir este enlace para leer su evolución.
modifica el siguiente ejemplo:
Si modificas el siguiente ejemplo, podrás tener 100% funcional todo lo que he explicado con anterioridad: jmfloreszazo/AzStorageQueueBindingDapr