RabbitMQ : messages prioritaires

2020-10-07 | Benoît Petitcollot

image article

RabbitMQ : messages prioritaires

Dans cet article, je vous présente un cas d'utilisation de messages rabbitMQ avec différents niveaux de priorité.

Nous verrons à travers un exemple de consumer en javascript qu'il ne suffit pas d'ajouter un attribut priority sur un message pour qu'il soit effectivement traité prioritairement.

Cas d'utilisation

On a une application web qui permet à un utilisateur d'envoyer une newsletter à un nombre assez important d'utilisateurs, disons par exemple 10.000. Chaque email est personnalisé ("Bonjour, <Nom> <Prénom>") donc on doit envoyer ces email un par un. On utilise un service SMTP limité à 200 envois par minute. L'envoi à nos 10.000 contacts va donc prendre 50 minutes. Pour ne pas bloquer l'envoyeur sur le formulaire d'envoi, on va publier un message rabbit pour chaque email et écrire un consumer chargé d'envoyer les emails. Comme on veut réutiliser des éléments du design du site dans les emails, on écrit le consumer en javascript.

Maintenant, un utilisateur demande à réinitialiser son mot de passe pendant que la newsletter est en cours d'envoi. Si on publie simplement un message rabbit dans la queue, celui-ci ne sera consommé qu'après les messages déjà présents et l'utilisateur va attendre de longues minutes avec de recevoir son email. C'est là qu'entre en jeu le paramètre priority.

Priorité des messages dans une queue

Pour commencer, il faut savoir qu'un message ne pourra être délivré prioritairement que par une queue paramétrée pour gérer les messages prioritaires. La queue doit être créée avec un argument x-max-priority qui définit le niveau maximal de priorité des messages dans la queue. C'est un entier entre 0 et 255. Attention, ce paramètre ne peut pas être modifié donc si votre queue existe déjà, il vous faudra la supprimer et la recréer avec le bon paramétrage.

Par exemple en mettant x-max-priority = 3, on aura quatre niveaux de priorité : 0 = pas urgent du tout, 1 = pas urgent, 2 = urgent, 3 = très urgent.

création d'une queue avec priorité

queue avec priorité

On peut maintenant publier un message prioritaire dans cette queue, comme on le fait d'habitude, simplement en ajoutant la propriété priority

  • si on ne met pas de priorité, la priorité sera de 0 (la plus basse)
  • si on dépasse la priorité maximale de la queue, la queue considère que le message a la priorité maximale (la valeur de x-max-priority)

Voici ce que ça donne avec l'interface rabbitMQ manager.

Publication des messages :

publication d'un message avec priorité 0

publication d'un message sans priorité

publication d'un message avec priorité 10

publication d'un message avec priorité 1

publication d'un message avec priorité 3

Récupération des messages :

récupération des messages dans l'ordre de priorité

On constate que les messages sortent bien dans l'ordre attendu : les plus prioritaires en premiers puis, pour une même priorité, par ordre d'entrée dans la queue.

Consommer les messages en tenant compte de leur priorité

Mais ce n'est pas tout ! Si on se contente de ce paramétrage (qui est correct) et qu'on consomme les messages, par exemple comme ceci...

async function sendMail(rabbitMessage){
  // envoi de l'email via le service SMTP...
}

await channel.consume('mailer', async mailMessage => {
  await sendMail(mailMessage);
});

on va attendre l'email de réinitialisation de mot de passe un certain temps...

La doc de rabbitMQ prévient : "It's important to understand how consumers work when working with priority queues."

En effet, un consumer consomme d'un seul coup autant de messages que le lui permettent les limites de capacité du réseau ! Autant dire que nos 10.000 messages de newsletter sont consommés en une seule fois. Le message prioritaire qui arrive ensuite devra attendre que le consumer ait fini de traiter les 10.000 messages pour être consommé à son tour. En fait, on n'a pas laissé à notre queue l'occasion de gérer la priorité des messages.

Pour résoudre ce problème, on va indiquer le nombre de messages qu'un consumer sera autorisé à consommer en une seule fois. C'est l'option prefetch.

channel.prefetch(3, false);
await channel.consume('mailer', async mailMessage => {
  await sendMail(mailMessage);
});

Ainsi notre consumer consommera les messages trois par trois, ce qui correspond à un délai de traitement d'environ une seconde pour notre serveur SMTP. Après ce delai, le consumer retourne vers la queue et si des messages prioritaires ont été ajoutés, ils seront bien traités en premiers.

Pour finir

Je n'ai pas abordé la gestion des erreurs. Si vous utilisez des queues de retry, n'oubliez pas de leur mettre aussi une propriété x-max-priority. En pratique, si le taux d'erreur dans le traitement des messages est assez faible, ce paramètre n'est pas indispensable. Vous pouvez publier un message avec priorité dans une queue non prioritaire. La priorité du message sera bien transmise quand il reviendra dans la queue principale.

Documentation

RabbitMQ : Priority Queue Support

RabbitMQ : Consumer Prefetch

RabbitMQ