快速掌握 GO 之 RabbitMQ

06-01 1011阅读

更多个人笔记见:

github个人笔记仓库

gitee 个人笔记仓库

个人学习,学习过程中还会不断补充~ (后续会更新在github和 gitee上)

文章目录

    • 作用
    • 经典例子
        • 生产者(发送端)
        • 消费者(接收端)

          作用

          类似一个“中间过渡器”,应对突发流量导致数据库连接池耗尽或者请求导致服务崩溃

          • 流量洪峰​​:促销活动时,前置 Nginx 将请求写入 RabbitMQ,后端服务按能力消费
          • 容灾恢复​​:数据库故障期间,消息持久化在队列;恢复后继续消费 (消费指的是 Mysql 取出数据然后存起来)
          • 将任务分发到多个消费者实例,确保高负载下任务均匀分配。这就可以实现负载均衡 (比如多个 worker 处理帖子审核)

            需要考虑如果用户的申请不是很多情况下,多引入一层 RabbitMQ 其实会导致实际的速度变慢(毕竟多加了一层)

            经典例子

            GO 语言相关库:go get -u github.com/streadway/amqp

            docker 快速部署 rabbitMQ:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

            • 5672:AMQP 端口
            • 15672:管理界面端口,访问 http://localhost:15672 ( 默认用户/密码:guest/guest)
              生产者(发送端)

              创建 producer文件夹下创建producer.go ,然后单独 go run(同时 go run 后面的消费者记得)

              package main
              import (
              	"fmt"
              	"log"
              	"time"
              	"github.com/streadway/amqp"
              )
              // 统一错误输出
              func failOnError(err error, msg string) {
              	if err != nil {
              		log.Fatalf("%s: %s", msg, err)
              	}
              }
              func main() {
              	// 连接 RabbitMQ
              	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
              	failOnError(err, "Failed to connect to RabbitMQ")
              	defer conn.Close() //关闭连接
              	ch, err := conn.Channel() //建立通道,通过 conn 建立的,可以调用 amqp 中的函数
              	failOnError(err, "Failed to open a channel")
              	defer ch.Close()
              	// 声明队列
              	q, err := ch.QueueDeclare(
              		"post_queue", // 指定创建或引用的队列名称
              		false,        // 持久化  false 表示队列不会持久化到磁盘,重启 RabbitMQ 后会丢失。true 的话重启后就还在
              		false,        // 自动删除   设置为 false 表示队列不会自动删除,如果 true,最后一个消费者断开后队列删除
              		false,        // 独占   设置为 true 表示该队列只供一个消费者使用,当连接关闭后,队列会自动删除。false表示队列可以被多个连接使用
              		false,        // 无等待  false 表示需要服务器确认队列创建,true表示客户端不会等待服务器的确认响应,如果操作失败也不会收到错误通知
              		nil,          // 额外参数  额外参数可以用来设置队列的特殊属性,如消息TTL、队列最大长度、死信队列等
              	)
              	failOnError(err, "Failed to declare a queue")
              	// 设置定时器,每5秒发送一次消息
              	ticker := time.NewTicker(1 * time.Second)
              	defer ticker.Stop()
              	// 创建一个函数用于发送消息,这样循环调用函数就是发送多次消息
              	sendMessage := func(msgContent string) {
              		err = ch.Publish(
              			"",     // 交换机名称   这里是默认交换机,能够将消息直接路由到与路由键同名的队列
              			q.Name, // 路由键   也就是队列名称,路由键应该与目标队列名称一致,消息才能被正确路由
              			false,  // mandatory标志  false 表示消息无法路由到队列,则消息会被丢弃  如果是 true 就是当消息不能路由到队列时,RabbitMQ会返回一个Basic.Return命令给生产者
              			false,  // immediate 标志   false 表示如果队列中没有消费者,消息会被存入队列等待消费, true表示当没有消费者能够立即消费该消息时,消息不会入队而是被丢弃
              			amqp.Publishing{ //消息内容和性质
              				ContentType: "text/plain",       //制定为 MIME 类型
              				Body:        []byte(msgContent), //转换为字节类型
              			})
              		if err != nil {
              			log.Printf("Failed to publish a message: %s", err)
              			return
              		}
              		log.Printf(" [x] Sent %s", msgContent)
              	}
              	count := 1
              	log.Println("Starting periodic message sending. Press Ctrl+C to exit.")
              	// 等待定时器触发,定期发送消息
              	for range ticker.C {
              		sendMessage(fmt.Sprintf("Hello, RabbitMQ! Message #%d", count))
              		count++
              	}
              }
              
              • 这里我将函数设置为每间隔 1s 就发送消息,同时记录数据
              • 如果运行后,隔一段时间再启动消费者,或者说运行中途关闭消费者,过一段时间再启动消费者,会发现中间发出的信号也会打印出来,这说明实际上是有存储在 RabbitMQ 中的(运行的时候,关闭后存储就需要看上面的设置了)
                消费者(接收端)

                consumer 文件夹下创建 consumer.go 然后单独一个终端 go run

                package main
                import (
                	"log"
                	"github.com/streadway/amqp"
                )
                func failOnError(err error, msg string) {
                	if err != nil {
                		log.Fatalf("%s: %s", msg, err)
                	}
                }
                func main() {
                	//建立连接
                	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
                	failOnError(err, "Failed to connect to RabbitMQ")
                	defer conn.Close()
                	//连接 channel
                	ch, err := conn.Channel()
                	failOnError(err, "Failed to open a channel")
                	defer ch.Close()
                	q, err := ch.QueueDeclare("post_queue", false, false, false, false, nil)
                	failOnError(err, "Failed to declare a queue")
                	msgs, err := ch.Consume(
                		q.Name, // 队列
                		"",     // 消费者标签
                		true,   // 自动确认
                		false,  // 独占
                		false,  // 无本地
                		false,  // 无等待
                		nil,    // 额外参数
                	)
                	failOnError(err, "Failed to register a consumer")
                	forever := make(chan bool)
                	go func() {
                		for d := range msgs {
                			log.Printf("Received: %s", d.Body)
                		}
                	}()
                	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
                	
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码