蓝牙耳机怎么连接手机 蓝牙耳机怎
539 2023-04-03 04:34:19
RabbitMQ是采用Erlang编程语言实现的高级消息队列协议AMQP(Advanced Message Queuing Protocol)的开源消息队列中间件
消息队列中间件的作用:
总体上看RabbitMQ是一个生产者和消费者模型,用于实现消息的接收、存储、转发
Producer(生产者):消息的生产方,投递方。
Consumer(消费者):消息的消费者。
RabbitMQ Broker(RabbitMQ代理):RabbitMQ服务节点。在单机环境中,就是代表RabbitMQ服务器。
Queue(队列):在RabbitMQ中Queue是存储消息数据的唯一形式。
Binding(绑定):在RabbitMQ中的Binding是Exchange将message路由给Queue所需遵循的规则。如果要指定“交换机E将消息路由给队列Q”,那么Q就需要与E进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
RoutingKey(路由键):消息投递给交换机,通常会指定一个RoutingKey,通过这个路由键来明确消息的路由规则。RoutingKey 通常是生产者和消费者有协商一致的key策略,消费者就可以合法从生产者手中获取数据。这个RoutingKey主要当Exchange交换机模式为设定为direct和topic模式的时候使用,fanout模式不使用RoutingKey。
Exchange(交换机):生产者将消息发送给交换机,再由交换机将消息路由到对应的队列中。交换机有四种类型:fanout、direct、topic、headers
交换机有四种类型:fanout、direct、topic、headers
可以把fanout理解为扇形交换机
其将发送带该类型交换机的消息路由到所有与该交换机绑定的队列中,如同一个扇形一样扩散给各个队列
fanout类型的交换机会忽略RoutingKey的存在,将消息直接广播到绑定的所有队列中
可以把direct理解为直连交换机
其根据消息携带的RoutingKey将消息投递到相应的队列中
direct类型的交换机(exchange)是RabbitMQ Broker的默认类型,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名可以把topic理解为主题交换机
topic交换机在RoutingKey和BindKey匹配规则上更加灵活,同样是将消息路由到RoutingKey和BindKey相匹配的队列中,但是匹配规则有如下特点:
.
分割的字符串,例如:go.log.info
、java.log.error
.
分割的字符串,但是在BindKey中可以使用两种特殊字符*
和#
用于匹配一个单词,#
用于匹配多规格单词(零个或多个单词)RoutingKey和BindKey是一种“模糊匹配”,那么一个消息可能会被发送到一个或者多个队列中
无法匹配的消息将会被丢弃或者返回给生产者
可以把headers理解为头交换机
headers类型的交换机使用的不是很多
关于headers exchange比较容易理解的解释是:
有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。 头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。
直接安装management版本即可,搜索镜像
$ docker search rabbitmq:management
拉取镜像
$ docker pull rabbitmq:managementmanagement: Pulling from library/rabbitmq7b1a6ab2e44d: Pull complete 37f453d83d8f: Pull complete e64e769bc4fd: Pull complete c288a913222f: Pull complete 12addf9c8bf9: Pull complete eaeb088e057d: Pull complete b63d48599313: Pull complete 05c99d3d2a57: Pull complete 43665bfbc3f9: Pull complete f14c7d7911b1: Pull complete Digest: sha256:4c4b66ad5ec40b2c27943b9804d307bf31c17c8537cd0cd107236200a9cd2814Status: Downloaded newer image for rabbitmq:managementdocker.io/library/rabbitmq:management
启动RabbitMQ容器
$ docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq --hostname=rabbitmqhostone rabbitmq:management
参数含义如下:
访问http://ip:15672是RabbitMQ的webUI界面,默认用户名密码为guest/guest
推荐依赖包:github.com/streadway/amqp
demo的目录结构如下:
$ tree.├── consumer│ └── consumer.go├── go.mod├── go.sum├── lib│ ├── common-func.go│ └── error.go├── producer│ └── producer.go├── task│ └── task.go└── worker └── worker.go
// RabbitMQConn 获取rabbitMQ Broker连接func RabbitMQConn() (conn *amqp.Connection, err error) {var (user string = "admin"pwd string = "admin"host string = "xx.xx.xx.xx"port string = "5672")url := "amqp://" + user + ":" + pwd + "@" + host + ":" + port + "/"// 新建连接conn, err = amqp.Dial(url)return}
简单队列模式是RabbitMQ的常规用法,简单理解就是消息生产者发送消息给一个队列,然后消息的消费者从队列中读取消息
当多个消费者订阅同一个队列的时候,队列中的消息是平均分摊给多个消费者处理
首先定义一个消息的生产者producer:
type simpleDemo struct {Name string `json:"name"`Addr string `json:"addr"`}func main() {// 连接rabbitMQ服务器conn, err := lib.RabbitMQConn()lib.ErrorHandle(err, lib.ErrConnectRabbit)defer conn.Close()// 新建一个channelch, err := conn.Channel()lib.ErrorHandle(err, lib.ErrOpenChannel)defer ch.Close()// 声明或者创建一个队列来保存消息q, err := ch.QueueDeclare("simple:queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // argument)lib.ErrorHandle(err, lib.ErrDeclareQueue)data := simpleDemo{Name: "Tom",Addr: "Shanghai",}dataBytes, err := json.Marshal(data)lib.ErrorHandle(err, lib.ErrMarshalJSON)err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: dataBytes,},)lib.ErrorHandle(err, lib.ErrPublishMsg)log.Printf(" [x] Sent %s", dataBytes)}
定义消息消费者consumer:
func main() {conn, err := lib.RabbitMQConn()lib.ErrorHandle(err, lib.ErrConnectRabbit)defer conn.Close()ch, err := conn.Channel()lib.ErrorHandle(err, lib.ErrOpenChannel)defer ch.Close()q, err := ch.QueueDeclare("simple:queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // args)lib.ErrorHandle(err, lib.ErrDeclareQueue)// 定义一个消费者msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)lib.ErrorHandle(err, lib.ErrRegisterConsumer)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("[*] Waiting for messages. To exit press CTRL+C")select {}}
我们开启一个生产者和一个消费者,运行结果为:
$ go run producer.go2022/05/19 16:06:25 [x] Sent {"name":"Tom","addr":"Shanghai"}
$ go run consumer.go2022/05/19 16:06:33 [*] Waiting for messages. To exit press CTRL+C2022/05/19 16:06:33 Received a message: {"name":"Tom","addr":"Shanghai"}
工作队列也被称为任务队列
任务队列是为了避免等待执行一些耗时的任务,而是将需要执行的任务封装为消息发送给工作队列,后台运行的工作进程将任务消息取出来并执行相关任务
多个后台工作进程同时进行,他们之间共享任务(抢占)
定义一个任务生产者,用于生产任务消息
func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "no task"} else {s = strings.Join(args[1:], " ")}return s}func main() {conn, err := lib.RabbitMQConn()lib.ErrorHandle(err, lib.ErrConnectRabbit)defer conn.Close()ch, err := conn.Channel()lib.ErrorHandle(err, lib.ErrOpenChannel)defer ch.Close()q, err := ch.QueueDeclare("task:queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // args)lib.ErrorHandle(err, lib.ErrDeclareQueue)body := bodyFrom(os.Args)err = ch.Publish("",q.Name,false,false,amqp.Publishing{ContentType: "text/plain",DeliveryMode: amqp.Persistent,Body: []byte(body),},)lib.ErrorHandle(err, lib.ErrPublishMsg)log.Printf("sent %s", body)}
定义worker:
func main() {conn, err := lib.RabbitMQConn()lib.ErrorHandle(err, lib.ErrConnectRabbit)defer conn.Close()ch, err := conn.Channel()lib.ErrorHandle(err, lib.ErrOpenChannel)defer ch.Close()q, err := ch.QueueDeclare("task:queue",false,false,false,false,nil)lib.ErrorHandle(err, lib.ErrDeclareQueue)// 将预取计数器设置为1// 在并行处理中将消息分配给不同的工作进程err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global)lib.ErrorHandle(err, lib.ErrSetQoS)msgs, err := ch.Consume(q.Name,"",false,false,false,false,nil,)lib.ErrorHandle(err, lib.ErrRegisterConsumer)forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)log.Printf("Done")d.Ack(false)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever}
我们开启两个worker,运行结果为:
$ go run task.go hello world2022/05/19 16:03:31 sent hello world$ go run task.go hello golang2022/05/19 16:03:53 sent hello golang$ go run task.go hello rabbitmq2022/05/19 16:03:59 sent hello rabbitmq
$ go run worker.go2022/05/19 16:03:44 [*] Waiting for messages. To exit press CTRL+C2022/05/19 16:03:44 Received a message: hello world2022/05/19 16:03:44 Done2022/05/19 16:03:53 Received a message: hello golang2022/05/19 16:03:53 Done
$ go run worker.go2022/05/19 16:03:47 [*] Waiting for messages. To exit press CTRL+C2022/05/19 16:03:59 Received a message: hello rabbitmq2022/05/19 16:03:59 Done
同时可以查看一下RabbitMQ的webUI,看看我们的工作队列情况
参考:
https://developer.aliyun.com/article/769883
https://blog.csdn.net/qq_42402854/article/details/124820511
https://www.jianshu.com/p/179467f5cc85
https://www.rabbitmq.com