RabbitMQ + Spring Boot + Python的使用过程

article/2024/5/23 2:30:22

需求:后端执行Pytorch框架下的模型,对输入图像的评估,得到一个分数。

首先,实现Java和Python的交互,参考以下资料:

spring boot 项目实现调用python工程的方法_springboot中可以用python吗-CSDN博客

有五种方法:

  1. 执行 Python 脚本:以终端cmd的方式运行.py文件。
  2. Jython:一个 Python 的 Java 实现。(只支持 Python 2.x)
  3. 使用 Web 服务:将 Python 脚本或应用封装为一个 Web 服务,然后通过 HTTP 请求进行交互。
  4. 使用消息队列:实现 Java 和 Python 之间进行异步通信。优点:支持高并发,解耦合。
  5. 使用 gRPC 或 Thrift :使用 gRPC 或 Apache Thrift 进行跨语言的 RPC(远程过程调用)。

尝试一:使用 Runtime 执行 Python 脚本

注:ProcessBuilder 不支持第三方库的 Python 脚本运行。

以下为简单实现的例子,没有考虑执行失败情况。

 public double getModel(String imageDir) throws Exception {Process process = Runtime.getRuntime().exec("绝对地址\\envs\\VEnet\\python.exe 绝对地址\\model.py 绝对地址\\example.jpg");BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream(), "GBK"));String line = null;Double score = 0.0; // 记录得分while ((line = in.readLine()) != null) {System.out.println(line);score = Double.parseDouble(line);}in.close();int re = process.waitFor(); // re表示Python执行的结果return score;

其中,re 用来让主线程等待子线程进行完毕。最终 re 为0或者1,表示子线程是否执行成功。我使用的是conda的虚拟环境,所以用了虚拟环境的python.exe所在的绝对路径。

此外,还可以执行 conda activate 虚拟环境名 && python ...\Model.py ...\example.jpg

此外,应该也可以把Python默认的系统路径,从base环境改成虚拟环境,从而直接执行 python ...\Model.py ...\example.jpg。(未尝试)

然而,这种方法不具备高并发的性能,每次请求都需要配置Python环境(Pytorch)、下载模型,耗费了很多没必要的资源。且耦合度高。所以考虑另外高并发的方法。

尝试二:使用消息队列

1.选择RabbitMQ

主要有四种实现方式,RabbitMQ、Apache Kafka和ActiveMQ,资料是:

一文讲清RabbitMQ、Apache Kafka、ActiveMQ_activemq kafaka-CSDN博客

消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ_mq对比-CSDN博客

  1. 信息传递模式
    1. RabbitMQ和ActiveMQ使用传统消息模型,非常适合需要严格排序和可靠交付消息的应用程序。
    2. Kafka使用发布/订阅消息模型,更适合流数据场景,需要实时处理数据。
  2. 性能
    1. RabbitMQ被设计为可靠的消息系统,这意味着它优先考虑消息传递而不是性能。RabbitMQ可以处理中等消息速率,适用于需要严格排序和可靠传递消息的应用程序。
    2. Kafka被设计为高性能系统,可以处理大量数据并具有低延迟。Kafka通过使用分布式架构和优化顺序I/O来实现这种性能。
    3. ActiveMQ也被设计为高性能系统,可以处理高消息速率。ActiveMQ通过使用异步架构和优化消息批处理来实现这种性能。

开始以为是流数据场景,尝试了Kafka。后来意识到,还需要把模型的输出值返回回来,所以应该是可靠通信。然后说RabbitMQ简单易用,适合初学者,所以果断采用RabbitMQ。

2. 学习

6种消息模型

  1. 基本消息模型:
    1. 1个生产者,1个消费者。
    2. 有消息确认机制(ACK) 
  2. work消息模型:
    1. 和简单队列模式基本一样,不过有一点不同,该模式有多个消费者在监听队列。
    2. 以轮询的方式将消息发给多个消费者确保一条消息只会被一个消费者消费。
    3. 任务分发默认使用的是公平队列调度的原则。
    4. 不需要设置队列和交换机的绑定,因为这个模式会将队列绑定到默认的交换机 。
  3. Publish/subscribe(发布订阅模式):交换器类型是 Fanout
    1. 和上面2种模式默认提供交换机不同的是,该模式需要显示声明交换机
    2. 生产者:声明Exchange,不再声明Queue。 发送消息到Exchange,不再发送到Queue。即,生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
    3. 交换机:将消息转发给与自己绑定的所有队列,实现一个消息被多个消费者消费。
    4. 消费者监听指定的队列获得消息。每个队列可以有多个消费者监听,同样也是以轮询的机制发给消费者。所以,多个消费端监听同一个队列不会重复消费消息。
    5. 注:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
    6. 这个模式需要设置队列和交换机的绑定。
  4. Routing 路由模型:交换机类型是 Direct
    1. 交换机:接收生产者的消息,然后把消息递交给与路由键(routing key)完全匹配的队列
    2. 消费者所在队列:可以指定多个路由键(routing key)
    3. 生产者:发送消息时需要声明路由键(routing key)
  5. Topics 通配符模式:交换机类型是 Topics
    1. 每个消费者监听自己的队列,并且设置带统配符的routingkey,
    2. 生产者将消息发给broker
    3. 交换机根据routingkey来转发消息到指定的队列。
    4. Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
  6. RPC 模型:
    1. 基本概念:Callback queue 回调队列、Correlation id 关联标识

4种交换机

  1. Direct Exchange:定向(路由)。交换机通过路由键(routing key)把消息路由到指定的队列。相当于路由键是队列的名字。发送消息时,需要设置路由键。
  2. Topic Exchange:交换机通过通配符匹配路由键和绑定键的关系。把消息交给符合路由模式(routing pattern) 的队列
  3. Fanout Exchange:交换机把消息广播给路由机绑定的所有队列
  4. Headers Exchange:交换机通过消息的headers属性的键值对(key/value)来确定消息的路由。每个队列有一组键值对。当发送消息时,需要在headers属性中设置一组键值对。如果消息的headers中包含了指定的键值对,则该消息将被路由到该队列中。
    1. x-match= all:当消息的所有键值对与绑定的键值对匹配时,才会将消息路由到绑定的队列。这相当于“与”逻辑。如果绑定中没有任何键值对,则所有消息都会被路由到与该绑定相关联的队列。
    2. x-match= any:当消息中的至少一个键值对与绑定的键值对匹配时,就会将消息路由到绑定的队列。这相当于“或”逻辑。如果绑定中没有任何键值对,则没有消息会被路由到与该绑定相关联的队列。

rabbitmq RPC 交换机 rabbitmq几种交换机_mob64ca14122c74的技术博客_51CTO博客 详细解释了交换机。

消息队列可以解决什么问题呢?

  • 业务解耦:A系统需要耦合B、C、D系统,在消息队列之前可以通过共享数据、接口调用等方式来实现业务,现在可以通过消息中间件进行解耦。

  • 削峰填谷:在互联网经常会出现流量突然飙升的情况,以前很多时候就是通过性能优化、加服务器等方式,可以通过消息中间件缓存相关任务,然后按计划的进行处理。

  • 异步:可以通过消息推送及短信发送进行说明,业务平台并不关注具体消息的发送细则,完全可以通过消息队列的方式,直接下发任务,由任务消费者进行处理。

以上都来这两篇文章: 

RabbitMQ介绍 + python操作 - dongye95 - 博客园 (cnblogs.com) 写的很好,面试前看这个。

Python角度介绍RabbitMQ。

还介绍了高级特性:过期时间、消息确认、持久化、死信队列、延迟队列。

rabbitmq RPC 交换机 rabbitmq几种交换机_mob64ca14122c74的技术博客_51CTO博客 

Spring Boot角度介绍RabbitMQ。

还介绍了消息持久化、延迟发送(TTL机制和rabbitmq插件两种方式)、可靠性发送与接收。

RabbitTemplate在Spring中的所有方法:

Rabbittemplate所有方法.简介.-CSDN博客

RabbitMQ在Python中的常见错误:

RabbitMq使用中常见错误小结_pika.exceptions.probableauthenticationerror: conne-CSDN博客

pika库的错误我基本都遇到了,可以加深对代码的理解。

3. 准备工作

3.1 安装和配置RabbitMQ

RabbitMQ安装教程(非常详细)从零基础入门到精通,看完这一篇就够了_rabbitmq安装详细教程-CSDN博客

注:15672端口是图形化界面的,而RabbitMQ服务仍然是在默认端口5672上。如果想用新增用户登入图形化界面,需要给新增的用户添加管理员权限。

此外,这篇文章提到:web管理界面把消息内容序列化了(因为它默认使用的还是jdk的序列化的默认序列化器),所以他介绍了如何把web管理界面的默认序列化器更改为json类型的序列化器。这样,我们在web管理界面看消息会更直观。

3.2 Spring Boot端的准备

在Spring Boot引入maven相关依赖:

SpringBoot学习之路---使用RabbitTemplate操作RabbitMq_rabbittemplate用法-CSDN博客

然后,SpringBoot会自动帮我们注入RabbitTemplate。

在yml文件配置 RabbitMQ 的信息:

rabbitmq:username: xxxpassword: xxxaddresses: 127.0.0.1:5672

3.3 Python端的准备

在Python端直接 conda install pika

3.4 模式的思考

尝试简单模式:Spring发送消息。Python接收消息,处理消息,再发送消息。二者都有监听器。这种做法只适用于单线程。在多线程中,如果只使用一个队列,那么不能保证多线程的数据的准确传输;如果每个线程都创建一个队列,那么会造成资源浪费。

后来查阅资料,发现RPC模式非常合适。

2.RPC模式

Spring端

参考了:

RabbitMQ学习整理————基于RabbitMQ实现RPC_spring mvc rabbit mq 如何事项rpc-CSDN博客

他采用了rabbitTemplate.sendAndReceive方法,该方法有三个参数:第一个是交换机(exchange)的名字,第二个是路由键(我感觉就是队列的意思)的名字,第三个则为消息的内容。(注:RabbitMQ中所有的消息都要先通过交换机,空字符串表示使用默认的交换机)

以下我的Spring Boot端的代码:

@Service
public class GetModelRabbitMQService {@Autowiredprivate AmqpTemplate rabbitTemplate;private static final String s = "image2Model";public String sendMessage(String imageDir) {// 设置correlationIdString corrId = UUID.randomUUID().toString();MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();Message message = new Message(imageDir.getBytes(StandardCharsets.UTF_8), messageProperties);System.out.println("Spring要发出咯~~"+message);Message response = rabbitTemplate.sendAndReceive("", s, message);if(response == null){System.out.println("没有收到哟~~~");return null;}else{String res = new String(response.getBody(), StandardCharsets.UTF_8);System.out.println("Spring收到咯~~"+res);return res;}}
}

其中,客户端在等待回调队列里的数据时,如果有消息出现,它会检查 correlation_id 属性。如果此属性的值与请求匹配,就返回给应用。所以,能从回调队列中得到数据,就说明id一致。

Python端

参考了:

python对RabbitMQ的简单使用_python rabbitmq-CSDN博客

他实现了简单模式、发布订阅模式和RPC模式,我参考了RPC模式,以下是我的代码

if __name__ == '__main__':# 连接到RabbitMQ服务器user_info = pika.PlainCredentials('xxx', 'xxx')  # 用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))channel = connection.channel()# 声明持久化队列queue_name = 'image2Model'channel.queue_declare(queue=queue_name ,durable=True) # durable=True,声明队列是持久化。# 清空队列,简单模式的持久化需要,RPC模式不需要# channel.queue_purge(queue=queue_name)def on_request(ch, method, props, body):body = body.decode()print('body decode后:', body)response = str(Train(body))ch.basic_publish(exchange='',routing_key=props.reply_to,  # props.reply_to 把消息发送到用来返回消息的queueproperties=pika.BasicProperties(correlation_id=props.correlation_id),body=response,)ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)  # 一次处理一个队列# auto_ack指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息channel.basic_consume(queue=queue_name, on_message_callback=on_request)print(' 开始监听. To exit press CTRL+C')channel.start_consuming()

交换机默认是持久的,队列中的消息属性默认是持久的,即 properties=pika.BasicProperties(delivery_mode=2),其中,1表示非持久。所以,我采用了都设置为持久的方法,即在声明queue的时候,设置queue为持久的,不然无法运行。以下讲到了具体的持久化操作(需要重启服务):RabbitMQ基础学习_rabbitmq channel.basic_qos-CSDN博客

如果auto_ack设置为True,需要手动给消息发送方回复确认。这样,如果程序没有成功运行,可以返回一个错误信息给到客户端。默认是False,即自动回复,然后从内存或者硬盘中删除。 在简单模式中,如果设置了持久化和手动回复,而没有手动回复(只进行了重新发送其他数据,没有再进行手动回复)。每次Python服务启动,都会把之前队列的消息再重新读取和处理。


http://www.ngui.cc/article/show-2038901.html

相关文章

我与C++的爱恋:隐式类型转换

​ ​ 🔥个人主页:guoguoqiang. 🔥专栏:我与C的爱恋 朋友们大家好,本篇内容我们来介绍初始化列表,隐式类型转换以及explicit的内容 一、初始化列表 1.1 构造函数体赋值 在创建对象时,编译器…

etcd campaign

1. 引言 本文主要讲解使用etcd进行选举的流程,以及对应的缺陷和使用场景 2. etcd选举流程 流程如以代码所示,流程为: clientv3.New 创建client与etcd server建立连接 concurrency.NewSession 创建选举的session,一般会配置ses…

月球地形数据介绍(LOLA)

月球地形数据介绍 LOLA介绍LOLA数据的处理与发布数据类型和格式投影坐标系SIMPLE CYLINDRICALPOLAR STEREOGRAPHIC 数据下载与浏览 LOLA介绍 目前最新的月球地形高程数据来源于美国2009年发射的LRO探测器。 “月球勘测轨道器”(Lunar Reconnaissance Orbiter,LRO)…

git基础教程(52).git/config的’remote’和’branch’部分的理解

文章目录 .git/config文件简介‘remote’部分‘branch’部分.git/config文件简介 在使用Git进行版本控制时,每个仓库都会有一个名为.git的隐藏文件夹,其中包含了Git的配置信息和版本记录等重要数据。其中,.git/config文件是每个仓库的配置文件,记录了与远程仓库的连接和本…

深入理解高并发超卖一系列问题与解决方案(近7万字详解,跳槽涨薪必备宝藏珍藏级分享)

破除困境带你飞 能遇上高并发的,基本都是有点规模的公司,小公司基本都是CRUD。 想去一线城市跳槽,想去有高并发的公司,但是没有高并发经验,没有高并发的经验,就去不了高并发的公司,去不了这样的…

app创建

项目 app 用户管理 app 订单管理 app 后台管理 app 网站管理 开发简洁,一个项目下创建一个app manage.py创建 python3.9 manage.py startapp app01 python3.9 manage.py startapp app02 python3.9 manage.py startapp app03 python3.11 manage.py …

SQL load direct path load index 无效的原因

Index (unique) state changing to unusable status after data loading using SQL*Loader. The steps below are executed to load the data: 1/ disable constraint--如果不是单独建的index, 对应index会drop掉的 2/ load data using SQL*Loader 3/ remove duplic…

使用eNSP配置OSPF多区域实验

一、实验拓扑 二、实验要求 1、R4为ISP,其上只配置IP地址;R4与其他所直连设备间均使用公有IP; 2、R3-R5、R6、R7为MGRE环境,R3为中心站点; 3、整个OSPF环境IP基于172.16.0.0/16划分;除了R12有两个环回&…

K8s 部署 Redis 6.2.3 集群

一、安装规划 组件replicas类型Redis6StatefulSetredis-trib1Deployment 使用 k8s 版本为:v1.18.0 。 本次使用 OpenEBS 来作为存储引擎,OpenEBS 是一个开源的、可扩展的存储平台,它提供了一种简单的方式来创建和管理持久化存储卷。它支持…

oracle 清空回收站

参考官方文档 select * from user_recyclebin; select * from dba_recyclebin; ---清除回收站中当前用户下的对象 purge recyclebin; ---清除回收站中所有的对象 purge dba_recyclebin; ---清除回收站中指定用户的表 PURGE TABLE owner.table_name; ---清除回收站中指…