rabbitMq------客户端模块

news/2024/10/3 14:16:24 标签: rabbitmq, ruby, 分布式

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 消费者模块
  • 信道管理模块
    • 管理的字段
    • 提供的接口
  • 信道内存管理
  • 连接管理类


前言

在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中,弱化了Client客⼾端的概念。在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来,⽽是以⼀种提供服务的形式来体现。实现思想类似于普通的功能接⼝封装,⼀个接⼝实现⼀个功能,接⼝内部完成向客⼾端请求的过程,但是对外并不需要体现出客⼾端与服务端通信的概念,⽤⼾需要什么服务就调⽤什么接⼝就⾏。


消费者模块

客户端这块对于消费者模块是不需要管理的,当进行消息订阅的时候,就会创建出一个消费者,而这消费者的作用就是:
• 描述当前信道订阅了哪个队列的消息。
• 描述了收到消息后该如何对这条消息进⾏处理。
• 描述收到消息后是否需要进⾏确认回复。

using ConsumerCallBack = std::function<void(const std::string&,BasicProperties *,const std::string &)>;
    struct Consumer
    {
        using ptr = std::shared_ptr<Consumer>;
        std::string _qname; //消费者订阅的队列名称
        std::string _ctag;  //消费者标识
        bool _auto_ack;  //自动应答标志
        ConsumerCallBack _cb;   
    }

信道管理模块

同样的客户端也有信道,其功能与服务端几乎一致,只不过客户端的信道是为用户提供服务的,而服务器的信道是为客户端的请求提供服务的。也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求,获取请求的服务。

管理的字段

客户端的信道需要管理的字段中有一个哈希表,是请求id和通用响应结构的映射。
因为在muduo库中发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步。

class Channel
    {
    private:
        std::string _cid;
        muduo::net::TcpConnectionPtr _conn;
        ProtobufCodecPtr _codec; 
        Consumer::ptr _consumer;

        //由于muduo库的发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步
        std::mutex _mutex;
        std::condition_variable _cv;
        std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;
    }

提供的接口

客户端的channel和服务端的接口都是几乎一致的,客户端的接口中是组织请求,向服务端发起请求,服务端的接口是接收请求进行业务处理。

但客户端中的channel提供了打开信道和关闭信道这俩个接口,他们是向服务端发起请求,在服务器上创建信道。

//这两个接口是向服务端发送请求,在服务端创建对应信道
 bool openChannel(){
     //组织请求
     openChannelRequest req;
     std::string rid = UUIDHelper::uuid();
     req.set_rid(rid);
     req.set_cid(_cid);

     //发送
     _codec->send(_conn,req);
     basicCommonResponsePtr resp = waitResponse(rid);
     return resp->ok();

 }
 void closeChannel(){
     //组织请求
     closeChannelRequest req;
     std::string rid = UUIDHelper::uuid();
     req.set_rid(rid);
     req.set_cid(_cid);

     //发送
     _codec->send(_conn,req);
     waitResponse(rid);
     return;
 }

其中每个接口中都需要调用waitResponse,用来同步等待响应。

basicCommonResponsePtr waitResponse(std::string &rid)
 {
     std::unique_lock<std::mutex> lock(_mutex);
     _cv.wait(lock,[&rid,this](){
         return _basic_resp.find(rid) != _basic_resp.end();
     });

     basicCommonResponsePtr basic_resp = _basic_resp[rid];
     _basic_resp.erase(rid);
     return basic_resp;
 }

信道内存管理

对信道的一个总的管理类。
有一个哈希表,是信道ID和信道对象的映射。
他提供了三个接口,创建信道/删除信道/获取指定信道。

class ChannelManager
    {
    private:
        std::mutex _mutex;
        std::unordered_map<std::string,Channel::ptr> _channels;
    }

连接管理类

在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。
这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。

class Connection {
    public:
        using ptr = std::shared_ptr<Connection>;
private:
        muduo::CountDownLatch _latch;//实现同步的
        muduo::net::TcpConnectionPtr _conn;//客户端对应的连接
        muduo::net::TcpClient _client;//客户端
        ProtobufDispatcher _dispatcher;//请求分发器
        ProtobufCodecPtr _codec;//协议处理器

        AsyncWorker::ptr _worker;
        ChannelManager::ptr _channel_manager;
  }

在客户端这边,会收到两种响应,一种是基础响应,一种是消息推送响应。
我们需要注册对于这两种类型响应的回调函数。

基础响应,在基础响应中有一个cid字段,根据cid获取指定的信道对象,
调用信道对象中的putBasicResponse接口。
也就是往信道的hashMap中添加响应值。

void basicResponse(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponsePtr& message, muduo::Timestamp) {
 //1. 找到信道
 Channel::ptr channel = _channel_manager->get(message->cid());
 if (channel.get() == nullptr) {
     DLOG("未找到信道信息!");
     return;
 }
 //2. 将得到的响应对象,添加到信道的基础响应hash_map中
 channel->putBasicResponse(message);
}

消息推送,在收到消息推送的响应后,需要更具响应中的rid,获取指定的信道对象,然后封装一个任务,这个任务就是调用信道中的consume接口,这个接口就是执行消费者中设置的回调函数。我们把这个任务放入到线程池中。

void consumeResponse(const muduo::net::TcpConnectionPtr& conn, const basicConsumerResponsePtr& message, muduo::Timestamp){
//1. 找到信道
  Channel::ptr channel = _channel_manager->get(message->cid());
  if (channel.get() == nullptr) {
      DLOG("未找到信道信息!");
      return;
  }
  //2. 封装异步任务(消息处理任务),抛入线程池
  _worker->pool.push([channel, message](){
      channel->consume(message);
  });
}

连接管理提供了两个接口,打开信道和关闭信道。这是给用户提供的,用来创建一个信道,通过信道进行服务的调用。

 Channel::ptr openChannel() {
    Channel::ptr channel = _channel_manager->create(_conn, _codec);
    bool ret = channel->openChannel();
    if (ret == false) {
        DLOG("打开信道失败!");
        return Channel::ptr();
    }
    return channel;
}

void closeChannel(const Channel::ptr &channel) {
    channel->closeChannel();
    _channel_manager->remove(channel->cid());
}

http://www.niftyadmin.cn/n/5688506.html

相关文章

数据结构--包装类简单认识泛型

目录 1 包装类 1.1 基本数据类型和对应的包装类 1.2 装箱和拆箱&#xff0c;自动装箱和自动拆箱 2 什么是泛型 3 引出泛型 3.1 语法 4 泛型类的使用 4.1 语法 4.2 示例 5 泛型的上界 5.1 语法 5.2 示例 5.3 复杂示例 8 泛型方法 8.1 定义语法 8.2 示例 总结 1 …

C语言_内存函数

内存函数是 C 标准库中的一组函数&#xff0c;用于管理和操作内存。使用时需要包含头文件<string.h>。 1. memcpy的使用和模拟实现 函数形式如下&#xff1a; void* memcpy(void* destination, const void* source, size_tnum);函数解析和注意事项&#xff1a; memcp…

二、创建drf纯净项目

1)创建项目 django-admin startproject api2&#xff09;创建app django-admin startproject api_app3)修改settings.py注释掉一些没用的配置 INSTALLED_APPS [# django.contrib.admin,# django.contrib.auth,# django.contrib.contenttypes,# django.contrib.sessions,# d…

VScode 自定义代码配色方案

vscode是一款高度自定义配置的编辑器, 我们来看看如何使用它自定义配色吧 首先自定义代码配色是什么呢? 看看我的代码界面 简而言之, 就是给你的代码的不同语义(类名, 函数名, 关键字, 变量)等设置不同的颜色, 使得代码的可读性变强. 其实很多主题已经给出了定制好的配色方案…

FastGPT的使用

fastGPT的介绍&#xff1a; fastGPT其实和chatGPT差不多 但是好处是可以自行搭建&#xff0c;而且很方便 链接&#xff1a;https://cloud.fastgpt.cn/app/list 首先我们可以根据红框点击&#xff0c;创建一个简易的对话引导 这个机器人就非常的简易&#xff0c;只能完成一些翻…

学生Schutte情绪智力测评

情绪智力(Emotional Intelligence,EI)作为衡量个体在情绪处理、理解和运用方面的能力,近年来在心理学和教育领域得到了广泛关注。Schutte情绪智力量表(Schutte Self-report Emotional Intelligence Scale,SSEIS)作为其中一种常用的测评工具,为高中学生提供了全面评估自…

CMake所学

向大佬lyf学习&#xff0c;先把其8服务器中所授fine 文章目录 前言一、CMakeList.txt 命令1.1 最外层CMakeLists1.1.1 cmake_minimum_required&#xff08;&#xff09;1.1.2 project&#xff08;&#xff09;1.1.3 set&#xff08;&#xff09;1.1.4 add_subdirectory&#xf…

51单片机LED点阵屏

目录 一、点阵屏介绍 ​编辑1、显示原理 2、开发板引脚对应关系 3、点阵屏的内部结构图 4、74HC595介绍 二、LED点阵屏显示图形 1、绘制笑脸 2、编写程序 三、LED点阵屏显示动画 1、利用字模提取软件&#xff0c;获取动画像素数组 2、编写程序 一、点阵屏介绍 LED点阵…