How to Implement Exponential Backoff in Rabbitmq Using AMQP in Node.js

August 23, 2022

Exponential Backoff in Rabbitmq

Please make sure to read first, why we need the Exponential Backoff in event driven systems.

Nodejs Code using AMQP

Message Class

Lets first wrap our message,

'use strict';

class Message {
    constructor(channel, message) {
        this._channel = channel;
        this._message = message;
    }

    async ack() {
        await this._channel.ack(this._message);
    }

    async nack() {
        await this._channel.nack(this._message);
    }

    get properties() {
        return this._message.properties;
    }

    get content() {
        return this._message.content;
    }
}

module.exports = Message;

Rabbitmq Controller Class

Lets define our controller class, which will create connections and manage queues.

'use strict';

const amqp = require('amqp-connection-manager');
const Message = require('./message');

const QUEUE_DELAY = 'x-gyanblog-delay';
const QUEUE_TOTAL_DELAY = 'x-gyanblog-total-delay';
const QUEUE_TOTAL_RETRIES = 'x-gyanblog-total-retires';

function randomIntFromInterval(min, max) {
    return Math.floor(Math.random() * (max - min + 1) + min);
}

class Controller {
    init(config) {
        this.config = config;
        
        this._queueName = config.queueName;
        this._retrySettings = config.retry;
        this._prefetch = config.prefetch;
        this._retryOnException = config.retryOnException;

        this._connection_string = config.connection_string;

        return this.connect();
    }

    connect() {
        if (!this._queueName) {
            return Promise.reject(new Error('No queue name was specified'));
        }
        this._queueName = `gyanblog_${this._queueName}`;
        this.retryQueueName = `gyanblog_${this._queueName}_retry`;
        this.inputExchangeName = `snitch_exchange_${this._queueName}`;
        this.retryExchangeName = `snitch_exchange_${this.queueName}_retry`;
        return Promise.resolve()
            .then(() => {
                return amqp.connect(this._connection_string, {
                    connectionOptions: {frameMax: 0x10000}, heartbeatIntervalInSeconds: 60
                });
            })
            .then((conn) => {
                console.log('info', 'Connected to rabbitmq');
                //Try and open a channel
                return conn.createChannel({json: true,
                    setup: ((channel) => {
                        console.log('info', 'input channel created, creating exchange', this.inputExchangeName);
                        return channel.assertExchange(this.inputExchangeName, 'direct', {durable: true})
                            .then(async () => {
                                console.log('info', 'Lets create the input queue', this._queueName);
                                await channel.assertQueue(this._queueName, {
                                    durable: true,
                                    persistent: true,
                                    maxLength: 1000000
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Bind queue and exchange', this._queueName, this.inputExchangeName);
                                //Bind our queue to the earlier created exchange, the exchange will receive message and put them in the queue for us
                                await channel.bindQueue(this._queueName, this.inputExchangeName, '');
                            })
                            .then(async () => {
                                console.log('info', 'Create retry exchange', this.retryExchangeName);
                                //Lets create a dead letter and exponential backoff exchange
                                await channel.assertExchange(this.retryExchangeName, 'direct', {
                                    durable: true
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Lets create the retry queue with dead letter delivery to', this.retryQueueName, this.inputExchangeName);
                                //Create a another queue with an dead-letter delivery to our main exchange. We will expect messages to be put here with an expiry
                                // and will expire and then retired later from main queue
                                await channel.assertQueue(this.retryQueueName, {
                                    durable: true,
                                    persistent: true,
                                    deadLetterExchange: this.inputExchangeName,
                                    maxLength: 1000000
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Bind', this.retryQueueName, this.retryExchangeName, '');
                                await channel.bindQueue(this.retryQueueName, this.retryExchangeName, '');
                            })
                            .then(async () => {
                                console.log('info', `Starting queue consumer for queue: ${this._queueName}`);
                                channel.prefetch(this._prefetch);
                                
                                console.log('info', 'wait for the input to arrive on', this._queueName);
                                await channel.consume(this._queueName, this._consumeMessage.bind(this), {noAck: false});
                            });
                    })
                });
            })
            .then((channel) => {
                this._channel = channel;
                channel.on('disconnect', (err) => {
                    console.log('warn', err);
                });
            });
    }

    async _publishToExchange(message) {
        await this._channel.publish(this.inputExchangeName, '', message, {persistent: true})
            .then((response) => {
                if (!response) {
                    console.log('warn', 'cannot publish message; channel\'s buffer is full');
                    //Lets wait for drain event and send this message back into the queue
                    return new Promise((resolve) => {
                        this._channel._channel.once('drain', () => {
                            console.log('debug', 'Channel was successfully drained, so we can accept more data.');
                            resolve();
                        });
                    });
                }
                return response;
            });
    }

    async publishMessage(message) {
        await this._publishToExchange(message);
    }

    _consumeMessage(message) {
        let _message = new Message(this._channel, message);
        
        console.log('Message received');
        return new Promise((resolve, reject) => {
            try {
                const messageContent = JSON.parse(message.content);
                if (messageContent.op == 'error') {
                    return reject(new Error('My error'));
                }
                console.log(messageContent);
            } catch(error) {
                reject(error);
            }
            resolve();
        })
        .then(() => {
            _message.ack()
        })
        .catch((err) => {
            console.log('error', err);
            if (this._retryOnException) {
                return this._pushToDelayedQueue(message);
            }
            return _message.nack();
        });
    }

    _pushToDelayedQueue(message) {
        if (this._channel === null) {
            return;
        }
        //We will not nack this but instead it ack and put it in the retry later queue
        let expiration = 0;
        let totalDelay = 0;
        let totalRetries = 0;

        let config = this._retrySettings;

        //Sometimes these headers are missing and needs to be checked if they exists or not.
        if (message.properties.headers !== undefined) {
            expiration = message.properties.headers[QUEUE_DELAY] || expiration;
            totalDelay = message.properties.headers[QUEUE_TOTAL_DELAY] || totalDelay;
            totalRetries = message.properties.headers[QUEUE_TOTAL_RETRIES] || totalRetries;
        }
        if (expiration === 0) {
            expiration = config.initialWait;
        } else {
            if ((config.maximumWait !== -1 && totalDelay > config.maximumWait) ||
                (config.maxRetries !== -1 && totalRetries >= config.maxRetries)) {
                let object = JSON.parse(message.content.toString());
                console.log('warn', 'Rejecting task as it has between retried too beyond maximum attempts', object.id);
                return this._channel.ack(message);
            }
            expiration *= config.factor;
            expiration = Math.floor(expiration);
        }

        let randomizedValue = 0;
        if (config.randomizeBy > 0) {
            randomizedValue = randomIntFromInterval(0, config.randomizeBy);
        }

        let nextTotalDelay = totalDelay + expiration;
        if (config.maximumWaitCeil !== -1 && expiration > config.maximumWaitCeil) {
            expiration = config.maximumWaitCeil;
            nextTotalDelay = config.maximumWaitCeil;
        }

        console.log('expiration', expiration);
        //Lets send it to the retry exchange
        return this._channel.publish(this.retryExchangeName, '', JSON.parse(message.content), {
            expiration: expiration + randomizedValue,
            headers: {
                QUEUE_DELAY: expiration,
                QUEUE_TOTAL_DELAY: nextTotalDelay,
                QUEUE_TOTAL_RETRIES: totalRetries + 1
            }
        })
            .then(() => this._channel.ack(message));
    }
}

module.exports = Controller;

Main Runner Test Code

const RabbiqmqController = require('./rabbitmq_controller');

const config = {
    connection_string: 'amqp://guest:guest@localhost:5672',
    queueName: "test",
    retry: {
        factor: 1.2,
        initialWait: 5000,
        maximumWait: -1,
        randomizeBy: 2000,
        maxRetries: -1,
        maximumWaitCeil: -1
    },
    prefetch: 1,
    retryOnException: true
};

rabbitmqController = new RabbiqmqController();
rabbitmqController.init(config)
    .then(() => {

    })
    .catch((error) => {
        console.error(error);
    });

The code is self explanatory. We are just creating exchange/queues and retry queues. And, starting the consumer by default in this code. You might want to do something else.

Bonus

Docker-compose file for starting rabbitmq container:

version: '3.3'
services:
  rabbitmq:
    image: rabbitmq:3-management
    hostname: "mynode"
    ports:
       - "15672:15672"
       - "5672:5672"
    environment:
       RABBITMQ_DEFAULT_USER: "guest"
       RABBITMQ_DEFAULT_PASS: "guest"
       RABBITMQ_NODENAME: "mynode"
    volumes:
      - ./data:/var/lib/rabbitmq

Thanks for reading.


Similar Posts

Latest Posts