Skip to content
/ lapin Public

AMQP client library in Rust, with a clean, futures based API

License

Notifications You must be signed in to change notification settings

amqp-rs/lapin

Folders and files

NameName
Last commit message
Last commit date

Latest commit

037924f · Apr 5, 2025
Mar 17, 2025
Feb 24, 2025
Apr 5, 2025
Feb 24, 2025
Aug 19, 2024
Aug 21, 2019
Mar 17, 2025
Apr 5, 2025
Apr 5, 2025
Apr 1, 2025
Mar 17, 2017
Mar 8, 2025
Jan 7, 2022
Nov 29, 2020
Mar 21, 2017
Jan 4, 2022

Repository files navigation

API Docs Build status Downloads Coverage Status Dependency Status LICENSE

A Rust AMQP client library.

This project follows the AMQP 0.9.1 specifications, targeting especially RabbitMQ.

Features

  • unstable: enable access to the experimental reconnection features
  • codegen: force code generation (default to pregenerated sources)
  • vendored-openssl: use a vendored openssl version instead of the system one (when using openssl backend)
  • verbose-errors: enable more verbose errors in the AMQP parser

TLS backends

  • native-tls
  • openssl
  • rustls (default)

Rustls certificates store

  • rustls-native-certs (default)
  • rustls-webpki-roots-certs

Warning about crypto backends for rustls

A crypto implementation must be enabled in rustls using feature flags. We mimic what rustls does, providing one feature flag per implementation and enabling the same as rustls by default. Available options are:

  • rustls--aws_lc_rs (default)
  • rustls--ring

Integration with third-party runtimes

Lapin can use any runtime of your choice by passing it to the ConnectionProperties.

You can configure the executor to use through executor-trait.

You can configure the reactor to use through reactor-trait.

There are implementations for tokio, async-std and others.

Example

use futures_lite::stream::StreamExt;
use lapin::{
    options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
    ConnectionProperties, Result,
};
use tracing::info;

fn main() -> Result<()> {
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "info");
    }

    tracing_subscriber::fmt::init();

    let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

    async_global_executor::block_on(async {
        let conn = Connection::connect(
            &addr,
            ConnectionProperties::default(),
        )
        .await?;

        info!("CONNECTED");

        let channel_a = conn.create_channel().await?;
        let channel_b = conn.create_channel().await?;

        let queue = channel_a
            .queue_declare(
                "hello",
                QueueDeclareOptions::default(),
                FieldTable::default(),
            )
            .await?;

        info!(?queue, "Declared queue");

        let mut consumer = channel_b
            .basic_consume(
                "hello",
                "my_consumer",
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;
        async_global_executor::spawn(async move {
            info!("will consume");
            while let Some(delivery) = consumer.next().await {
                let delivery = delivery.expect("error in consumer");
                delivery
                    .ack(BasicAckOptions::default())
                    .await
                    .expect("ack");
            }
        }).detach();

        let payload = b"Hello world!";

        loop {
            let confirm = channel_a
                .basic_publish(
                    "",
                    "hello",
                    BasicPublishOptions::default(),
                    payload,
                    BasicProperties::default(),
                )
                .await?
                .await?;
            assert_eq!(confirm, Confirmation::NotRequested);
        }
    })
}