Python 中的 Kombu 类库

news/2024/9/21 22:06:33 标签: python, Celery, Kombu

Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis

一.主要功能

1.消息队列

提供可靠的消息传递和队列机制,允许将消息从生产者发送到消费者。

2.消息代理支持

支持多种消息代理,如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。

3.异步任务

可以用来实现异步任务处理,配合 Celery 使用时,可以构建分布式任务队列。

4.消息格式

支持多种消息格式,包括 JSON、YAML、pickle 等。

5.路由和交换

提供了高级的消息路由和交换功能,可以实现复杂的消息分发逻辑。

二.基本使用

1. 创建消息生产者

生产者负责向消息队列发送消息。

(1)Redis 消息代理

python">from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建生产者
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'key': 'value'},
            exchange=exchange,
            routing_key='my_key',
            serializer='json'
        )
        print("Message sent.")

(2)RabbitMQ 消息代理

python">from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建生产者
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'key': 'value'},
            exchange=exchange,
            routing_key='my_key',
            serializer='json'
        )
        print("Message sent.")

2. 创建消息消费者

消费者从消息队列中接收和处理消息。

(1)Redis 消息代理

python">from kombu import Connection, Exchange, Queue, Consumer

# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'

def callback(body, message):
    print(f"Received message: {body}")
    message.ack()  # 确认消息已处理

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建消费者
    with Consumer(conn, [queue], callback=callback) as consumer:
        print("Waiting for messages...")
        # 运行消费者,等待消息
        while True:
            conn.drain_events()

(2)RabbitMQ 消息代理

python">from kombu import Connection, Exchange, Queue, Consumer

# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'

def callback(body, message):
    print(f"Received message: {body}")
    message.ack()  # 确认消息已处理

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建消费者
    with Consumer(conn, [queue], callback=callback) as consumer:
        print("Waiting for messages...")
        # 运行消费者,等待消息
        while True:
            conn.drain_events()

3. 高级用法:消息路由

Kombu 支持复杂的消息路由配置,以下示例展示了如何使用路由功能将消息发送到不同的队列。

(1)Redis 消息代理

python">from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'

# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')

def route_message(message):
    if message['type'] == 'type1':
        return 'key1'
    return 'key2'

# 创建连接
with Connection(broker_url) as conn:
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'type': 'type1', 'data': 'value1'},
            exchange=exchange,
            routing_key=route_message({'type': 'type1'}),
            serializer='json'
        )
        print("Message routed and sent.")

(2)RabbitMQ 消息代理

python">from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'

# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')

def route_message(message):
    if message['type'] == 'type1':
        return 'key1'
    return 'key2'

# 创建连接
with Connection(broker_url) as conn:
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'type': 'type1', 'data': 'value1'},
            exchange=exchange,
            routing_key=route_message({'type': 'type1'}),
            serializer='json'
        )
        print("Message routed and sent.")

Celery__204">4. 结合 Celery 使用

Kombu 通常与 Celery 一起使用来处理异步任务。简单理解,KombuCelery 的依赖库,Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能,提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。

(1)Redis 消息代理

python">from celery import Celery

# 配置 Celery 使用 Redis 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

在 Dify 中默认消息代理使用 Redis,如下所示:

(2)RabbitMQ 消息代理

python">from celery import Celery

# 配置 Celery 使用 RabbitMQ 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='amqp://guest:guest@localhost//')

@app.task
def add(x, y):
    return x + y

Kombu 是一个强大的消息传递库,提供了多种消息代理的支持,并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能,如交换机、队列、路由等。基础用法 包括创建生产者和消费者,通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等,用于构建分布式系统和异步任务处理。

参考文献

[1] https://github.com/celery/kombu

[2] https://docs.celeryq.dev/projects/kombu/en/stable/

[3] 消息队列 Kombu 之 基本架构:https://www.cnblogs.com/rossiXYZ/p/14454761.html

[4] Kombu 库用法详解(连接、连接池、生产者、消费者):https://blog.csdn.net/weixin_44799217/article/details/128490325

NLP工程化(星球号)


http://www.niftyadmin.cn/n/5669532.html

相关文章

C++中数组可以开多大

C中数组可以开多大 - 糖豆爸爸 - 博客园 (cnblogs.com) 一般OI题的时空限制 时/空限制:1s/64MB或 时/空限制:2s/128MB 以128MB为例,128M131072KB134217728字节 下面才是正确的大小: int4字节 char1字节 long long8字节 开intin…

光伏电站的方案报告包含哪些内容

光伏电站的方案报告是一个综合性的文档,它详细描述了光伏电站项目的规划、设计、施工、运营及维护等各个环节。一个完整的光伏电站方案报告通常包含以下内容: 一、项目概述 项目背景:介绍光伏电站建设的背景、目的和意义,包括能…

Springboot常见问题(bean找不到)

如图错误显示userMapper bean没有找到。 解决方案: mapper包位置有问题:因为SpringBoot默认的包扫描机制会扫描启动类所在的包同级文件和子包下的文件。注解问题: 比如没有加mapper注解 然而无论是UserMapper所在的包位置还是Mapper注解都是…

初步认识C++模版

前言 在C语言中,我们知道函数的形参需要指定类型,但是在C中,我们可以模版实现各种类型参数的通用函数。 1. 泛型编程 我们通过函数重载实现多种类型的同一作用的函数。如交换函数: void Swap(int& left, int& right) …

ChatGPT搭上langchain的知识库RAG应用,效果超预期

最近利用LangchainChatGPT实现了上传文档实现个人知识库应用的能力,效果比想象得要好。文末大家可以体验一下效果~~ 给大家大致介绍下实现方式,参考了Langchain chatchat。 一、LangchainChatGPT 1、概述 LangChain 是一个强大的框架,可以…

为人机交互保持预见性丨基于G32A1445的T-BOX应用方案

T-BOX是一种集成了通信、计算和控制功能的车载信息处理终端,通过车辆与云端、移动网络等进行数据交互,用于车、人、外部环境的互联互通,支持车辆定位、车载通信、远程控制、故障诊断、数据传输、紧急呼叫等功能,帮助车辆实现更加智…

基于神经网络的光线追踪

基于神经网络的光线追踪(Neural Network-based Ray Tracing)结合了光线追踪算法与神经网络的强大能力,用于加速光线追踪渲染过程,提升图像质量,并降低计算资源消耗。这种方法主要用于计算机图形学和渲染领域&#xff0…

网页聊天——测试报告——Selenium自动化测试

一,项目概括 1.1 项目名称 网页聊天 1.2 测试时间 2024.9 1.3 编写目的 对编写的网页聊天项目进行软件测试活动,揭示潜在问题,总结测试经验 二,测试计划 2.1 测试环境与配置 服务器:云服务器 ubuntu_22 PC机&am…