NodlAndHodl

Learning Rust - RabbitMq and Examples

07 Apr 2025

How to Connect to a Queue

Here is an example using lapin crate with tokio on how to create a connection to rabbitmq and establish a queue .

use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
use tokio::task;
use futures_util::stream::StreamExt;

const QUEUE_NAME: &str = "rust_queue";

#[tokio::main]
async fn main() {
    let amqp_addr = "amqp://localhost:5672/%2f";
    
    // Connect to RabbitMQ
    let connection = Connection::connect(amqp_addr, ConnectionProperties::default())
        .await
        .expect("Failed to connect to RabbitMQ");
    
    let channel = connection.create_channel().await.expect("Failed to create channel");

    // Declare queue
    channel.queue_declare(
        QUEUE_NAME,
        QueueDeclareOptions::default(),
        FieldTable::default(),
    ).await.expect("Queue declaration failed");
}

As you can see we’ve established the connection using default properties, and the connection. Then using our channel we create the queue. Now what about a consumer?

In this example we can clone our existing channel but normally you would have a separate process doing this.

    // Spawn a consumer in a separate task
    let consumer_channel = channel.clone();
    task::spawn(async move {
        let consumer = consumer_channel
            .basic_consume(
                QUEUE_NAME,
                "rust_consumer",
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await
            .expect("Failed to start consumer");

        println!("Waiting for messages...");
        consumer
            .for_each(|delivery| async {
                if let Ok(delivery) = delivery {
                    let message = String::from_utf8_lossy(&delivery.data);
                    println!("Received message: {}", message);
                    consumer_channel
                        .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
                        .await
                        .expect("Failed to ack message");
                }
            })
            .await;
    });

And then to publish to our queue using the lapin crate we can do something like this

    // Publish a message
    let payload = b"Hello from Rust!";
    channel
        .basic_publish(
            "",
            QUEUE_NAME,
            BasicPublishOptions::default(),
            payload.to_vec(),
            BasicProperties::default(),
        )
        .await
        .expect("Failed to publish message");

Handling Failures

This is a common situation where maybe something like network connection is inconsistent or the database connection is down, or a process restart. These need to be thought through and handled appropriately.

Here are creating a simulated failure using the same crates.

use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
use tokio::task;
use futures_util::stream::StreamExt;

const QUEUE_NAME: &str = "rust_queue";

#[tokio::main]
async fn main() {
    let amqp_addr = "amqp://localhost:5672/%2f";
    let connection = Connection::connect(amqp_addr, ConnectionProperties::default())
        .await
        .expect("Failed to connect to RabbitMQ");
    let channel = connection.create_channel().await.expect("Failed to create channel");

    // Declare queue
    channel
        .queue_declare(QUEUE_NAME, QueueDeclareOptions::default(), FieldTable::default())
        .await
        .expect("Queue declaration failed");

    // Spawn consumer task
    let consumer_channel = channel.clone();
    task::spawn(async move {
        let consumer = consumer_channel
            .basic_consume(QUEUE_NAME, "rust_consumer", BasicConsumeOptions::default(), FieldTable::default())
            .await
            .expect("Failed to start consumer");

        println!("Waiting for messages...");
        consumer
            .for_each(|delivery| async {
                if let Ok(delivery) = delivery {
                    let message = String::from_utf8_lossy(&delivery.data);
                    println!("Received message: {}", message);

                    let processing_result: Result<(), &'static str> = simulate_processing(&message);

                    match processing_result {
                        Ok(_) => {
                            println!("Processing succeeded. Acknowledging message.");
                            consumer_channel
                                .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
                                .await
                                .expect("Failed to ack message");
                        }
                        Err(err) => {
                            println!("Processing failed: {}. Rejecting message.", err);
                            consumer_channel
                                .basic_nack(delivery.delivery_tag, false, true) // Requeue the message
                                .await
                                .expect("Failed to nack message");
                        }
                    }
                }
            })
            .await;
    });

    // Publish a message
    let payload = b"Hello from Rust!";
    channel
        .basic_publish(
            "",
            QUEUE_NAME,
            BasicPublishOptions::default(),
            payload.to_vec(),
            BasicProperties::default(),
        )
        .await
        .expect("Failed to publish message");

    println!("Message published!");

    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}

// Simulate message processing with a failure scenario
fn simulate_processing(message: &str) -> Result<(), &'static str> {
    if message.contains("fail") {
        Err("Simulated processing failure")
    } else {
        Ok(())
    }
}

Pretty basic situation here and things can definitely evolve to suit your needs but you can see where we use the basic_ack and our message would be removed from the queue. Otherwise we send basic_nack(requeue=true) and our message will go back to be retried. At some point if the message persists to fail we need to using something like a DeadLetterQueue to handle multiple failures.

That’s a bit more complex and would look something like this

    const MAIN_QUEUE: &str = "rust_queue";
    const DLX_QUEUE: &str = "dead_letter_queue";
    const DLX_EXCHANGE: &str = "dlx_exchange";
    // Declare the Dead Letter Exchange (DLX)
    channel.exchange_declare(
        DLX_EXCHANGE,
        lapin::ExchangeKind::Direct,
        ExchangeDeclareOptions::default(),
        FieldTable::default(),
    ).await.expect("Failed to declare DLX exchange");

    // Declare the Dead Letter Queue
    channel.queue_declare(
        DLX_QUEUE,
        QueueDeclareOptions::default(),
        FieldTable::default(),
    ).await.expect("Failed to declare DLX queue");

    // Bind DLQ to DLX
    channel.queue_bind(
        DLX_QUEUE,
        DLX_EXCHANGE,
        DLX_QUEUE,
        QueueBindOptions::default(),
        FieldTable::default(),
    ).await.expect("Failed to bind DLQ");

    // Configure the main queue to send rejected messages to the DLX
    let mut args = FieldTable::default();
    args.insert("x-dead-letter-exchange".into(), DLX_EXCHANGE.into()); 

This way we can handle failures independently and are not stuck in a retry with prior use of basic_nack(requeue=true).

Next topic I will discuss is the use of the different types of exchanges. Here we are using an exchange with a single consumer. I will go over in more detail the different types we can set up and how they work and why next post. Thanks for reading!