Event Based Queue in C# with RabbitMQ

Today I thought I would create a post on something that I was unable to find much information on across the internet. Creating an event based queue with RabbitMQ using C#. Most solutions utilize the BasicConsumer class in the RabbitMQ client inside a while loop or some other looping solution to continually check for messages that are published to a queue. When a message comes into the queue the message is then pulled from the queue and processed. One thing to remember about going this route is that nothing else will be pulled from the queue until the current message being processed has completed being processed. The one way around this is to run the logic that consumes from the queue in a task so that it isn’t blocking and messages can still be pulled from the queue. In a different post I will cover how to accomplish pulling from a queue using tasks and a while loop.

Now on to today’s content about using the EventingBasicConsumer method of the RabbitMQ client. One word of caution is that the EventingBasicConsumer is still marked as experimental but I have been using it in production without any issues for a few months now. Alright, now for an example on the EventingBasicConsumer method. Please note that the code below is just a sample class. Feel free to use it and extend it for your own use and note that some methods might be missing or incomplete as this is just an example.

public class Consumer
{
    /// <summary>
    /// Gets or sets the model.
    /// </summary>
    /// <value>The model.</value>
    private IModel Model { get; set; }

    /// <summary>
    /// Gets or sets the connection to rabbit
    /// </summary>
    /// <value>The connection to rabbit</value>
    public IConnection Connection { get; set; }

    /// <summary>
    /// Gets or sets the name of the queue.
    /// </summary>
    /// <value>The name of the queue.</value>
    public string QueueName { get; set; }

    /// <summary>
    /// Read a message from the queue.
    /// </summary>
    /// <param name="onDequeue">The action to take when recieving a message</param>
    /// <param name="onError">If an error occurs, provide an action to take.</param>
    /// <param name="exchangeName">Name of the exchange.</param>
    /// <param name="queueName">Name of the queue.</param>
    /// <param name="routingKeyName">Name of the routing key.</param>
    public void ReadFromQueue(Action<string, Consumer, ulong> onDequeue, Action<Exception, Consumer, ulong> onError, string exchangeName, string queueName, string routingKeyName)
    {
        BindToQueue(exchangeName, queueName, routingKeyName);

        var consumer = new EventingBasicConsumer { Model = Model };
        // Receive the message from the queue and act on that message
        consumer.Received += (o, e) =>
            {
                var queuedMessage = Encoding.ASCII.GetString(e.Body);
                onDequeue.Invoke(queuedMessage, this, e.DeliveryTag);
            };

        // If the consumer shutdowns reconnect to rabbit and begin reading from the queue again.
        consumer.Shutdown += (o, e) =>
            {
                ConnectToRabbitMq();
                ReadFromQueue(onDequeue, onError, exchangeName, queueName, routingKeyName);
            };

        Model.BasicConsume(queueName, false, consumer);
    }

    /// <summary>
    /// Bind to a queue.
    /// </summary>
    /// <param name="exchangeName">Name of the exchange.</param>
    /// <param name="queueName">Name of the queue.</param>
    /// <param name="routingKeyName">Name of the routing key.</param>
    private void BindToQueue(string exchangeName, string queueName, string routingKeyName)
    {
        const bool durable = true, autoDelete = false, exclusive = false;

        Model.BasicQos(0, 1, false);

        // replicate the queue to all hosts. Queue arguments are optional
        IDictionary queueArgs = new Dictionary<string, object>
                {
                    {"x-ha-policy", "all"}
                };
        QueueName = Model.QueueDeclare(queueName, durable, exclusive, autoDelete, queueArgs);
        Model.QueueBind(queueName, exchangeName, routingKeyName, null);
    }

    /// <summary>
    /// Connect to rabbit mq.
    /// </summary>
    /// <returns><c>true</c> if a connection to RabbitMQ has been made, <c>false</c> otherwise</returns>
    public bool ConnectToRabbitMq()
    {
        int attempts = 0;
        // make 3 attempts to connect to RabbitMQ just in case an interruption occurs during the connection
        while (attempts < 3)
        {
            attempts++;

            try
            {
                var connectionFactory = new ConnectionFactory
                {
                    HostName = Hostname,
                    UserName = Username,
                    Password = Password,
                    RequestedHeartbeat = 60
                };
                Connection = connectionFactory.CreateConnection();

                // Create the model 
                CreateModel();

                return true;
            }
            catch (System.IO.EndOfStreamException ex)
            {
                // Handle Connection Exception Here
                return false;
            }
            catch (BrokerUnreachableException ex)
            {
                // Handle Connection Exception Here
                return false;
            }

            // wait before trying again
            Thread.Sleep(1000);
        }

        if (Connection != null)
            Connection.Dispose();

        return false;
    }

    /// <summary>
    /// Create a model.
    /// </summary>
    private void CreateModel()
    {
        Model = Connection.CreateModel();

        // When AutoClose is true, the last channel to close will also cause the connection to close. 
        // If it is set to true before any channel is created, the connection will close then and there.
        Connection.AutoClose = true;

        // Configure the Quality of service for the model. Below is how what each setting means.
        // BasicQos(0="Dont send me a new message untill I’ve finshed",  1= "Send me one message at a time", false ="Apply to this Model only")
        Model.BasicQos(0, 1, false);

        const bool durable = true, exchangeAutoDelete = true, queueAutoDelete = false, exclusive = false;

        // Create a new, durable exchange, and have it auto delete itself as long as an exchange name has been provided.
        if (!string.IsNullOrWhiteSpace(ExchangeName))
            Model.ExchangeDeclare(ExchangeName, ExchangeType, durable, exchangeAutoDelete, null);

        // replicate the queue to all hosts. Queue arguments are optional
        IDictionary queueArgs = new Dictionary<string, object>
                {
                    {"x-ha-policy", "all"}
                };
        Model.QueueDeclare(QueueName, durable, exclusive, queueAutoDelete, queueArgs);
        Model.QueueBind(QueueName, ExchangeName, RoutingKeyName, null);
    }
}

The key method in the sample class above is the ReadFromQueue method. The EventingBasicConsumer method allows us to skip using while loops and we can call the ReadFromQueue method directly anytime we want to consume messages from a queue and our other logic will not be blocked. All you have to do is setup a method to be called when a message is pulled from the queue, see the ProcessMessage method in the example below, and a method to be called if an error occurs, see the RaiseException method in the example below. These 2 method names can be passed into the ReadFromQueue method as shown below.

private void ProcessQueue()
{
     var consumer = new Consumer();
     consumer.ReadFromQueue(ProcessMessage, RaiseException, "ExchangeName", "QueueName", string.Empty);
}

private void ProcessMessage(string message, Consumer consumer, ulong deliveryTag)
{
     // logic to process your message goes here
}

private void RaiseException(Exception ex, Consumer consumer, ulong deliveryTag)
{
    // Handle any exceptions raised while processing your message
}

Now I must warn you that connections to rabbit can be reused and should be reused if you are going to publish the message to another queue after you are done processing it. If you do not reuse the same connection as the one you used to consume messages you will cause a connection reset issue and the consumer.Shutdown event will be triggered within the ReadFromQueue method example above. If you have not acknowledged your message already then that same message will be reprocessed by your code again. This can cause issues if you are not careful, not to mention that you will be processing a message twice. The other thing to remember is that you cannot reuse a Model so you need to always make sure you re-create the model when publishing to a different queue.

Hope this helps someone use the EventingBasicConsumer method of the C# RabbitMQ client. It took me a little while to find information on Google about this method so I’m hoping this will make it easier for someone else in the future or even future me.

Posted in C#, RabbitMQ Tagged with: , , ,

Leave a Reply