go语言之路

消息队列之rabbit mq

消息队列之rabbit mq

本文来自我的中间件教程

https://github.com/gocloudcoder/gopher-road/tree/main/middlewares

参考链接:

http://rabbitmq.mr-ping.com/

在此之前我们必须理解几个概念。

  • 什么是中间件?
  • 什么是单体架构以及什么是分布式架构?
  • 什么是同步调用?什么是异步调用?
  • 什么是消息队列?

中间件

中间件是介于应用系统和系统软件之间的一类软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享、功能共享的目的。

典型如mysql,redis就是中间件。通俗来讲,中间件是一个非业务类型的技术类组件。

再举个生动形象的例子,比如我开了一个炸鸡店,向用户售卖炸鸡(业务层),我难道要和养鸡场的老板去商讨鸡的价格么(底层)。

显然为了更有效的进货,我会去优选选择一个专门整合鸡场资源的第三方代理商(中间件)去批量购买鸡。因为我并不需要从养鸡场买鸡,然后杀鸡。为了更快更有效的赚钱(提高开发效率),我只需要购买已经处理好的鸡,直接炸好给用户就行。

那我做的事情就很简单,选择一个优质鸡肉的鸡场代理商,而不用考虑到底是公鸡还是母鸡,不用考虑鸡是吃什么长大的。

鸡场代理商做的事情也很简单,与养鸡场老板协商,购买鸡,然后统一处理好鸡,并向商家拟定一个价格(接口)。

整个过程都井然有序,各有各的职责,分工明确,效率更高。

这就是中间件的作用。

单体架构和分布式架构

单体架构

单体架构将所有的服务混合运行在同一个服务端口中。

当某个模块需要修改功能时,需要将所有的模块组件重新编译,打包,上线。

图片来自csdn

链接:https://blog.csdn.net/supingemail/article/details/80076009

img

当服务越来越大,模块间的依赖将会越来越强,耦合性将会越来越大。逐渐的随着业务增长,将会成为一座巨大的“shit”。

这时候,微服务就起到重要的作用了。

分布式架构

微服务架构是目前广为关注的一种分布式架构。

图片来自csdn

链接:https://blog.csdn.net/supingemail/article/details/80076009

img

将不同的业务拆分成不同的服务,使用不同的数据表,各个服务间不相互依赖。并独立运行在不同的服务端口。

好处就是低耦合,功能改变时只需要将相应的服务进行升级即可。并且项目团队更利于管理,出现问题更容易定位。

同步调用和异步调用

同步调用

同步调用是一种阻塞式调用,一段代码调用另一端代码时,必须等待这段代码执行结束并返回结果后,代码才能继续执行下去。

同步调用可以认为是打电话。比如我要和鸡场第三方代理商买鸡,我必须要先打通电话才能进一步商讨购买鸡的数量。如果没有打通,我就一直等待电话打通(进程阻塞)。

异步调用

异步调用是一种非阻塞式调用,一段异步代码还未执行完,可以继续执行下一段代码逻辑,当代码执行完以后,通过回调函数返回继续执行相应的逻辑,而不耽误其他代码的执行。

异步调用可以认为是发短信。我不用管代理商手机是否是开机装态还是关机状态,我直接发短信告诉代理商我要购买鸡的数量。

消息队列

前面介绍了这么多基础,终于可以说说消息队列了。

队列

当然首先是介绍队列的概念了。

它是先进先出的一种数据结构。

可以理解为有不同的商家向代理商购买鸡,当然是谁先买鸡就先把鸡给谁了。

消息队列

img

我们把它称为MQ(Message Queue)。

消息队列在上述的比喻中,可以认为是一个电信服务商。首先需要明白的一点是我们发信息不是直接把信息发送给另一个人,而需要经过电信服务商(移动,电信,联通)的转送。

有不同的卖鸡的商家向鸡场代理商发短信商讨买鸡的事情,发完短信之后,信息发送到了电信服务商那里,电信服务商把信息存在一个消息队列中,然后再把信息转送给鸡场代理商。这当然遵循的原则是先进先出了。所以鸡场代理商接收到信息的时间也不一样,先收到谁的消息就先把鸡卖给谁!

消息队列的特性

解耦,在不同业务间必须要通过协议去进行沟通(如需要同步数据库等),我们通过一个消息队列进行沟通,低耦合。

削峰,在高并发的时候,消息队列可以起到减压的作用。因为它是异步调用,不会阻塞。

提高系统的拓展性,消息队列可以搭建集群。

rabbit mq

介绍了这么多,我们终于大概清楚了消息队列。

常用的消息队列有

  • rabbit mq
  • active mq
  • rocker mq
  • kafka

本篇主要介绍rabbit mq的使用。

介绍

rabbit mq是一个消息代理:它接受并转发消息。你可以把它想象成一个邮局:当你把你想要邮寄的邮件放进一个邮箱时,你可以确定邮差先生或女士最终会把邮件送到你的收件人那里。在这个比喻中,rabbit mq是一个邮箱、一个邮局和一个邮递员。

rabbit mq和邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息。

它是一个生产消费模型。生产即是发送消息,消费即接受消息。

支持多个协议,常用的协议是AMQP。

AMQP 0-9-1的工作过程如下图:消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

enter image description here

rabbit mq入门之hello world

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列——rabbit mq代表消费者保存的消息缓冲区。

(P) -> [|||] -> (C)

我们做的Demo就是通过P将hello world发送到消息队列中,再从消息队列中把消息发送给C。

P可以认为是一个服务端,C可以认为是一个客户端。

首先安装rabbit mq软件到linux操作系统上。

上篇我们介绍了Docker,所以我们直接使用Docker来运行rabbit mq容器。

docker run -d --restart=always --name myrabbitmq -p 5672:5672  rabbitmq
# -d 表示后台运行
# --restart=always 表示自动重启
# --name myrabbitmq 指定容器名字
# -p 5672:5672 暴露容器端口号5672,并映射到宿主机端口5672
# rabbitmq 使用官方镜像rabbitmq

image-20210310193558444

接下来我们安装golang rabbitmq客户端 amqp

go get github.com/streadway/amqp

发送消息

  • 连接到rabbitmq服务器
  • 创建一个通道
  • 声明队列
  • 将消息发送到队列中
package main
import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)
func main() {
    // 连接rabbitmq服务器
    conn, err := amqp.Dial("amqp://guest:guest@nj-jay.com:5672")
    failOnError(err, "连接rabbitmq失败")
    defer conn.Close()
    //创建一个通道
    ch, err := conn.Channel()
    defer ch.Close()
    failOnError(err, "创建ch通道失败")
    //声明要发送到的队列 "hello"
    q, err := ch.QueueDeclare(
        "hello",
        false,
        false,
        false,
        false,
        nil,
    )
    //消息
    message := "hello world"
    //将消息发送到声明的队列
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body: []byte(message),
        },
    )
    failOnError(err, "发送失败")
    fmt.Println("发送成功")
}
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

每当我们运行该代码,运行rabbitmq的容器将会接收到相应。

使用docker logs -f myrabbitmq就可以查看

image-20210310195251559

每运行一次,将建立连接,然后运行完毕连接断开。

接下来,我们应该要从rabbitmq中间件中获取消息。

接收消息

  • 连接rabbitmq服务器
  • 创建通道
  • 声明队列
  • 从队列中拿到消息
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)
func main() {
    // 连接到rabbitmq中
    conn, err := amqp.Dial("amqp://guest:guest@nj-jay.com:5672")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
    //创建一个通道用于与发送消息的通道连接
    ch, err := conn.Channel()
    failOnError(err, "创建通道失败")
    //声明队列
    q, err := ch.QueueDeclare(
        "hello",
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")
    //创建获取消息的通道
    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "注册获取消息的通道失败")
    //使用goroutine获取消息,并一直阻塞在这里,不让通道退出
    exit := make(chan bool, 1)
    go func() {
        for m := range msgs {
            fmt.Println(string(m.Body))
        }
    }()
    <- exit
}
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

我们运行接收消息的程序,再运行发送消息的命令。可以看到正常接收到消息。

或者我们先运行发送消息,再运行接收消息,依然可以正常接收到消息。邮箱的机制保证了消息传递的可靠性。

image-20210310202348798

rabbit mq重大意义

在一个项目中,通过这个中间件,我们可以用多种语言同时开发。

如支付服务采用java,查询业务采用Go,修改业务采用python。

并能保证消息消息传递和接收的可靠性。

并且是异步调用的,不仅效率高,而且高并发性能好,在微服务架构中非常的适合。

因此在开发大型项目中,rabbit mq经常使用。服务可以跨机器,可以使用集群。

后续rabbit mq的教程

正在陆续更新,敬请期待

链接:https://github.com/gocloudcoder/gopher-road/tree/main/middlewares

学会用编程思维解决工作中的问题--Go操作Excel篇

上一篇

存钱会上瘾的--让存钱成为一种习惯

下一篇

你也可能喜欢

发表评论

您的电子邮件地址不会被公开。 必填项已用 * 标注

提示:点击验证后方可评论!

插入图片

个人微信公众号

we-tuiguang

qq交流群

群号:1046260719

微信扫一扫

微信扫一扫