引言

在分布式环境中,许多服务依赖项中的一些必然会失败。Hystrix是一个库,通过添加延迟容忍和容错逻辑,帮助你控制这些分布式服务之间的交互。Hystrix通过隔离服务之间的访问点、停止级联失败和提供回退选项来实现这一点,所有这些都可以提高系统的整体弹性。

复杂分布式体系结构中的应用程序有许多依赖项,每个依赖项在某些时候都不可避免地会失败。如果主机应用程序没有与这些外部故障隔离,那么它有可能被他们拖垮。例如,对于一个依赖于30个服务的应用程序,每个服务都有99.99%的正常运行时间,你可以期望如下:99.9930  =  99.7% 可用。也就是说一亿个请求的0.03% = 3000000 会失败。如果一切正常,那么每个月有2个小时服务是不可用的。

当服务一切正常时,看起来是这样的:

当其中有一个节点宕机,会影响用户的请求:

在高流量的情况下,一个后端依赖项的延迟可能导致所有服务器上的所有资源在数秒内饱和(PS:意味着后续再有请求将无法立即提供服务):


Hystrix如何实现它的目标

  1. 使用HystrixCommand或者HystrixObservableCommand包装所有对外部系统的调用。

  2. 为每一个依赖项维护一个小的线程池,如果线程池满了会立刻拒绝请求而不是排队。

  3. 调用的结果有:成功、失败、超时、拒绝。

  4. 在一段时间内,如果服务的错误百分比超过了一个阈值,就会触发断路器来停止对特定服务的所有请求。

  5. 当请求失败、被拒绝、超时或者短路,执行回退逻辑。

  6. 近实时监控指标和配置变化。

以上内容参考:废物大师兄https://www.cnblogs.com/cjsblog/p/9391819.html

go实现Hystrix断路器

案例介绍

在这个实践案例中,主要实现通过Hystrix为API反向代理的微服务调用添加熔断保护和资源隔离以及负载均衡能力,以保护API网关的稳定运行。

实现

新建hystrix.go文件,首先定义HystrixHandler的struct,表明它可以处理HTTP请求。

import (
  "errors"
  "fmt"
  "github.com/afex/hystrix-go/hystrix"
  "github.com/hashicorp/consul/api"
  "github.com/longjoy/micro-go-book/common/discover"
  "github.com/longjoy/micro-go-book/common/loadbalance"
  "log"
  "net/http"
  "net/http/httputil"
  "strings"
  "sync"
)

type HystrixHandler struct {
    hystrixs         map[string]bool           //记录Hystrix是否已经配置
    hystrixMutex     *sync.Mutex               //互斥锁
    discoveryClient  discover.DiscoveryClient  //服务发现客户端
    loadbalance      loadbalance.LoadBalance   //负载均衡
    logger           *log.Logger               //日志
}

//init HystrixHandler

func NewHystrixHandler(discoveryClient discover.DiscoveryClient,
    loadbalance loadbalance.LoadBalance, logger *log.Logger) *HystrixHandler {
    
    return &HystrixHandler{
        discoveryClient:  discoveryClient,
        logger:           logger,
        hystrixs:         make(map[string]bool),
        loadbalance:      loadbalance,
        hystrixMutex:     *sync.Mutex{},
    }
}

接着实现ServerHTTP接口,将反向代理的逻辑使用hystrix.Do包裹起来。Do方法是同步方式、Go方法是异步方式。本例使用同步方式,代码如下:

func (hH *HystrixHandler) ServerHTTP(rw http.ResponseWriter,req *http.Request) {
  
    reqPath := req.URL.Path    //获取请求路径
    if reqPath == "" {
        return
    }
    
    pathArray := strings.Split(reqPath, "/")
    serviceName := pathArray[1]    //获取服务名称
    
    if serviceName == "" {
        rw.WriteHeader(404)
        return
    }
    
    if _, ok := hH.hystrixs[serviceName]; !ok {
        hH.hystrixMutex.Lock()    //如果服务未配置Hystrix,则上锁准备进行配置。因为这里使用的是普通的map而不是原子安全的map,不加锁可能会有并发写冲突问题
        //再判断一次,防止在上锁瞬间修改掉map,由于高并发服务访问,map读写速度是很快的,可能存在判断逻辑通过后,上锁之前又被修改的情况,所以先加锁再判断一次(个人理解)
        if _, ok := hH.hystrixs[serviceName]; !ok{
            hystrix.ConfigureCommand(serviceName, hystrix.CommandConfig{
              //进行hystrix命令自定义
            })
            hystrixHandler.hystrixs[serviceName] = true
        }
        hystrixHandler.hystrixMutex.Unlock()
     }
     //将反向代理的逻辑使用Do封装(有三个参数,后面两个均为函数,第一个为逻辑处理函数,第二个为保底处理函数)
     err := hystrix.Do(serviceName, func() err {
     //调用DiscoveryClient 查询serviceName的服务实例列表(服务发现)
     instances := hystrixHandler.discoveryClient.DiscoverServices(serviceName, hystrixHandler.logger)
     instanceList := make([]*api.AgentService, len(instances))
     //将instances中的每个元素转换为*api.AgentService类型,并存储在instanceList的相应位置
     for i := 0; i < len(instances); i++ {
        instanceList[i] = instances[i].(*api.AgentService)
    }
    //通过负载均衡选择一个服务实例(随机法)
        selectInstance, er := hH.loadbalance.SelectService(instanceList)        
        if err != nil {
          return loadbalance.ErrNoInstances
        }
        
        //创建director
        director := func(req  *http.Request){
            //重新组织请求路径
            desPath := strings.Join(pathArray[2:], "/")
            hH.logger.Println("service id", selectInstance.ID)
            
            //设置代理服务地址信息,将原本请求代理到serviceInstance
            req.URL.Scheme = "http"
            req.URL.Host = fmt.Sprintf("%s:%d", selectInstance.Address, selectInstance.Port)
            req.URL.Path = "/" + desPath
        }
        var proxyError error
        //返回代理异常,用于记录hystrix.Do执行失败
        errHandler := func(ew http.ResponseWriter, er *http.Request, err error) {
        proxyError = err
        }
        
        //进行代理转发,使用httputil中的反向代理方法
        proxy := &httputil.ReverseProxy{Director: director, ErrorHandler: errHandler}
        proxy.ServeHTTP(rw, req)
        //将执行异常返回hystrix
        return proxyError
     }, func(e error) error {
         //hystrix.Do执行失败
        hH.logger.Println("hystrix error: ", e.Error())
        return errors.New("fallback excute")
     })
     if err != nil {
        rw.WriteHeader(500)
        rw.Write([]byte(err.Error()))
  }
}

其中,随机法负载均衡SelectService实现方法代码如下:

// 随机负载均衡
func (loadBalance *RandomLoadBalance) SelectService(services []*api.AgentService) (*api.AgentService, error) {

  if services == nil || len(services) == 0 {
    return nil, ErrNoInstances
  }

  return services[rand.Intn(len(services))], nil
}