店长为了了解RabbitMQ,于是做了一个小demo,本文不对一些基础概念展开讲解(我也还没很熟==。)

RabbitMQ基于 AMQP(高级消息队列协议)标准,支持多种消息协议,如 STOMP、MQTT 等。RabbitMQ 广泛用于企业级应用中,提供可靠、灵活的消息传递服务。它的主要作用有:

  • 消息队列允许生产者和消费者独立运行,不需要直接交互。生产者将消息发送到队列,消费者从队列中读取消息,从而降低了系统间的依赖性。

  • 异步消息传递,生产者不需要等待消费者处理完消息即可继续发送新消息,提高了系统的吞吐量。

  • 通过控制消息的生产和消费速率,消息队列可以平衡系统的负载,防止某些部分因过载而崩溃。

  • 在某些场景下,消息队列可以保证消息的顺序性,确保消息按照发送的顺序被消费。


使用RabbitMQ对高并发请求削峰处理原理很简单,将高并发请求转发到MQ,服务端从MQ以限定的速率消费,同时可以限制队列的长度,超过则拒绝请求,然后将对接口的请求从消费端限流。

在生产者端模拟向MQ写入请求:


//发送请求
func sendRequest(ch *amqp.Channel, i int) {
  body, err := http.Get("http://localhost:8081/rabbit")
  if err != nil {
    return
  }
  respBody, err := io.ReadAll(body.Body)
  if err != nil {
    fmt.Println("Error reading response body:", err)
    return
  }
  err = ch.Publish(
    "",              // exchange
    "request_queue", // routing key
    false,           // mandatory
    false,           // immediate
    amqp.Publishing{
      DeliveryMode: amqp.Persistent,
      ContentType:  "text/plain",
      Body:         respBody,
    })
  if err != nil {
    fmt.Println("Failed to publish message:", err)
  } else {
    fmt.Printf("Sent request: %s\n", body)
  }
}
//将请求发送到MQ
func SendRequestsToQueue(requests int, wg *sync.WaitGroup) {
  defer wg.Done()

  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  if err != nil {
    panic(err)
  }
  defer conn.Close()

  ch, err := conn.Channel()
  if err != nil {
    panic(err)
  }
  defer ch.Close()

  _, err = ch.QueueDeclare(
    "request_queue", // name
    true,            // durable
    false,           // delete when unused
    false,           // exclusive
    false,           // no-wait
    nil,             // arguments
  )
  if err != nil {
    panic(err)
  }

  for i := 0; i < requests; i++ {
    sendRequest(ch, i)
  }
}

消费者从MQ取出请求并响应

func Consumer() {
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  if err != nil {
    panic(err)
  }
  defer conn.Close()

  ch, err := conn.Channel()
  if err != nil {
    panic(err)
  }
  defer ch.Close()

  q, err := ch.QueueDeclare(
    "request_queue", // name
    true,            // durable
    false,           // delete when unused
    false,           // exclusive
    false,           // no-wait
    nil,             // arguments
  )
  if err != nil {
    panic(err)
  }
  // 消费端限流
  err = ch.Qos(
    1,     // prefetchCount, 预取值,消费者一次从队列中获取的消息数量
    0,     // prefetchSize,预取的大小,这里设置为0,表示不限制
    false, // global,是否全局生效,设置为false,仅对当前频道生效
  )

  msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    false,  // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
  )
  if err != nil {
    panic(err)
  }

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      fmt.Println("Received a request: ", string(d.Body))
      d.Ack(false)
    }
  }()

  fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever
}

这里并发创建10000个请求,看一下效果


(全部写入完成后开始消费,实际生产中可以并发进行)