店长为了了解RabbitMQ,于是做了一个小demo,本文不对一些基础概念展开讲解(我也还没很熟==。)
RabbitMQ基于 AMQP(高级消息队列协议)标准,支持多种消息协议,如 STOMP、MQTT 等。RabbitMQ 广泛用于企业级应用中,提供可靠、灵活的消息传递服务。它的主要作用有:
消息队列允许生产者和消费者独立运行,不需要直接交互。生产者将消息发送到队列,消费者从队列中读取消息,从而降低了系统间的依赖性。
异步消息传递,生产者不需要等待消费者处理完消息即可继续发送新消息,提高了系统的吞吐量。
通过控制消息的生产和消费速率,消息队列可以平衡系统的负载,防止某些部分因过载而崩溃。
在某些场景下,消息队列可以保证消息的顺序性,确保消息按照发送的顺序被消费。
使用RabbitMQ对高并发请求削峰处理原理很简单,将高并发请求转发到MQ,服务端从MQ以限定的速率消费,同时可以限制队列的长度,超过则拒绝请求,然后将对接口的请求从消费端限流。
在生产者端模拟向MQ写入请求:
//发送请求
func sendRequest(ch *amqp.Channel, i int) {
body, err := http.Get("http://localhost:8081/rabbit")
if err != nil {
return
}
respBody, err := io.ReadAll(body.Body)
if err != nil {
fmt.Println("Error reading response body:", err)
return
}
err = ch.Publish(
"", // exchange
"request_queue", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: respBody,
})
if err != nil {
fmt.Println("Failed to publish message:", err)
} else {
fmt.Printf("Sent request: %s\n", body)
}
}
//将请求发送到MQ
func SendRequestsToQueue(requests int, wg *sync.WaitGroup) {
defer wg.Done()
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
_, err = ch.QueueDeclare(
"request_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
for i := 0; i < requests; i++ {
sendRequest(ch, i)
}
}
消费者从MQ取出请求并响应
func Consumer() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"request_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// 消费端限流
err = ch.Qos(
1, // prefetchCount, 预取值,消费者一次从队列中获取的消息数量
0, // prefetchSize,预取的大小,这里设置为0,表示不限制
false, // global,是否全局生效,设置为false,仅对当前频道生效
)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
fmt.Println("Received a request: ", string(d.Body))
d.Ack(false)
}
}()
fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
这里并发创建10000个请求,看一下效果
(全部写入完成后开始消费,实际生产中可以并发进行)
评论