微服务:注册中心设计与 golang 实现
1. 为什么引入服务注册发现
在微服务架构中,调用一个微服务时,如何找到正确的服务地址是最基础问题。
在由单体架构转向微服务架构,进行服务拆分的早期,我们通常将服务调用的域名写死到代码或配置文件中,然后通过 Host 配置或 DNS 域名解析进行路由寻址。如果某个服务有多个实例,还会加入负载均衡 (Nginx、F5)。
但人工维护慢慢会出现瓶颈和问题:新增服务或服务扩容,所有依赖需要新增修改配置;某台服务器挂了还要手动摘流量;服务上下线变更时效慢;人工配置错误或漏配;RPC 类型服务不能满足等等。
这时你会想如果能让服务自动化完成配置(注册)和查找(发现)就好了,于是乎服务注册发现就应运而生。
可以看出,所有服务提供者在上下线时都会告知服务注册中心,服务消费者要查找服务直接从注册中心拉取。一切都变得更加美好,那么服务注册中心该如何实现呢?简单!优秀的开源项目已有一大把,大名鼎鼎的 Zookeeper、Eureka,还有后期之秀 Consul、Nacos、Etcd,当然有些算是分布式 KV 存储,要实现服务注册发现仍需些额外工作。如何技术选型,是 AP 模式更好还是 CP 模式更好?今天先抛开这些开源项目,我们亲自动手来实现一个服务注册中心,深入理解其设计原理,逐行代码分析与实践。PS:本文项目代码参考 bilibili discover 开源项目进行改造。
2. 注册中心实现原理
1)设计思想
首先进行功能需求分析,作为服务注册中心,要实现如下基本功能:
- 服务注册:接受来自服务提交的注册信息,并保存起来
- 服务下线:接受服务的主动下线请求,并将服务从注册信息表中删除
- 服务获取:调用方从注册中心拉取服务信息
- 服务续约:服务健康检查,服务通过心跳保持(主动续约)告知注册中心服务可用
- 服务剔除:注册中心将长时间不续约的服务实例从注册信息表中删除
2)构造注册表
服务中心首先要维护一个服务地址注册信息列表(简称注册表)。通俗理解注册表就像手机通讯录,记录了所有联系人(服务)的电话(服务地址),通过联系人姓名(服务名称)即可找到。
那么如何存储注册表呢?最普遍认知想到存数据库(Redis 这种内存数据库),Zookeeper、Etcd 本身作为分布式 KV 存储天然具有成为注册中心的优势,但这些都会引入新组件,要考虑其稳定性及性能。那么我们可以直接将注册信息存到内存中,这时候你会想如果服务挂了内存数据丢了怎么办?这个问题后面我们会想办法解决。
首先构建一个注册表 Registry 数据结构,定义如下:
type Registry struct { apps map[string]*Application lock sync.RWMutex }
- apps 记录应用服务 Application 的信息,使用 map 结构,key 为应用服务的唯一标识,值为应用服务结构类型
- lock 读写锁,保障并发读写安全
应用服务 Application结构如下:
type Application struct { appid string instances map[string]*Instance latestTimestamp int64 lock sync.RWMutex }
- appid 记录应用服务唯一标识
- lock 读写锁,保障并发读写安全
- latestTimestamp 记录更新时间
- instances 记录服务实例 Instance 的信息,使用 map 结构,key 为实例的 hostname (唯一标识),值为实例结构类型
服务实例 Instance 的结构如下:
type Instance struct { Env string `json:"env"` AppId string `json:"appid"` Hostname string `json:"hostname"` Addrs []string `json:"addrs"` Version string `json:"version"` Status uint32 `json:"status"` RegTimestamp int64 `json:"reg_timestamp"` UpTimestamp int64 `json:"up_timestamp"` RenewTimestamp int64 `json:"renew_timestamp"` DirtyTimestamp int64 `json:"dirty_timestamp"` LatestTimestamp int64 `json:"latest_timestamp"` }
- Env 服务环境标识,如 online、dev、test
- AppId 应用服务的唯一标识
- Hostname 服务实例的唯一标识
- Addrs 服务实例的地址,可以是 http 或 rpc 地址,多个地址可以维护数组
- Version 服务实例版本
- Status 服务实例状态,用于控制上下线
- xxTimestamp 依次记录服务实例注册时间戳,上线时间戳,最近续约时间戳,脏时间戳(后面解释),最后更新时间戳
注册表及相关依赖的结构体构建完成了,梳理一下所有概念和关系。注册表 Registry 中存放多个应用服务 Application,每个应用服务又会有多个服务实例 Instance,服务实例中存储服务的具体地址和其他信息。
3)服务注册
功能目标:接受来自服务提交的注册信息,并保存到注册表中。先初始化注册表 NewRegistry() ,根据提交信息构建实例 NewInstance(),然后进行注册写入。
func NewRegistry() *Registry { registry := &Registry{ apps: make(map[string]*Application), } return registry } func NewInstance(req *RequestRegister) *Instance { now := time.Now().UnixNano() instance := &Instance{ Env: req.Env, AppId: req.AppId, Hostname: req.Hostname, Addrs: req.Addrs, Version: req.Version, Status: req.Status, RegTimestamp: now, UpTimestamp: now, RenewTimestamp: now, DirtyTimestamp: now, LatestTimestamp: now, } return instance } r := NewRegistry() instance := NewInstance(&req) r.Register(instance, req.LatestTimestamp)
注册时,先从 apps 中查找是否已注册过,根据唯一标识 key = appid + env 确定。如果没有注册过,先新建应用 app,然后将 instance 加入到 app 中,最后 app 放入注册表中。这里分别使用了读锁和写锁,保障数据安全同时,尽量减少锁时间和锁抢占影响。
func (r *Registry) Register(instance *Instance, latestTimestamp int64) (*Application, *errcode.Error) { key := getKey(instance.AppId, instance.Env) r.lock.RLock() app, ok := r.apps[key] r.lock.RUnlock() if !ok { //new app app = NewApplication(instance.AppId) } //add instance _, isNew := app.AddInstance(instance, latestTimestamp) if isNew { //todo } //add into registry apps r.lock.Lock() r.apps[key] = app r.lock.Unlock() return app, nil }
新建应用服务 app,初始化 instances
func NewApplication(appid string) *Application { return &Application{ appid: appid, instances: make(map[string]*Instance), } }
将服务主机实例 instance 加入应用 app 中,注意判断是否已存在,存在根据脏时间戳 DirtyTimestamp 比对,是否进行替换,添加实例信息,更新最新时间 latestTimestamp ,并返回实例。
func (app *Application) AddInstance(in *Instance, latestTimestamp int64) (*Instance, bool) { app.lock.Lock() defer app.lock.Unlock() appIns, ok := app.instances[in.Hostname] if ok { //exist in.UpTimestamp = appIns.UpTimestamp //dirtytimestamp if in.DirtyTimestamp < appIns.DirtyTimestamp { log.Println("register exist dirty timestamp") in = appIns } } //add or update instances app.instances[in.Hostname] = in app.upLatestTimestamp(latestTimestamp) returnIns := new(Instance) *returnIns = *in return returnIns, !ok }
返回 !ok (isNew)表明,本次服务注册时,实例为新增还是替换,用来维护服务健康信息(后面会再次提到)。
服务注册完成了,编写测试用例看下效果。
var req = &model.RequestRegister{AppId: "com.xx.testapp", Hostname: "myhost", Addrs: []string{"http://testapp.xx.com/myhost"}, Status: 1} func TestRegister(t *testing.T) { r := model.NewRegistry() instance := model.NewInstance(req) app, _ := r.Register(instance, req.LatestTimestamp) t.Log(app) }
4. 服务发现
功能目标:查找已注册的服务获取信息,可以指定条件查找,也可以全量查找。这里以指定过滤条件 appid 、env 和 status 为例。
r := model.NewRegistry() fetchData, err := r.Fetch(req.Env, req.AppId, req.Status, 0) 根据 appid 和 env 组合成 key,然后从注册表的 apps 中获取应用 app,然后通过 app 获取服务实例 GetInstance() func (r *Registry) Fetch(env, appid string, status uint32, latestTime int64) (*FetchData, *errcode.Error) { app, ok := r.getApplication(appid, env) if !ok { return nil, errcode.NotFound } return app.GetInstance(status, latestTime) } func (r *Registry) getApplication(appid, env string) (*Application, bool) { key := getKey(appid, env) r.lock.RLock() app, ok := r.apps[key] r.lock.RUnlock() return app, ok }
根据 app 获取所有应用实例,并用 status 过滤,这里对返回结果 instances 中的 Addr 进行了拷贝返回一个新的切片。
func (app *Application) GetInstance(status uint32, latestTime int64) (*FetchData, *errcode.Error) { app.lock.RLock() defer app.lock.RUnlock() if latestTime >= app.latestTimestamp { return nil, errcode.NotModified } fetchData := FetchData{ Instances: make([]*Instance, 0), LatestTimestamp: app.latestTimestamp, } var exists bool for _, instance := range app.instances { if status&instance.Status > 0 { exists = true newInstance := copyInstance(instance) fetchData.Instances = append(fetchData.Instances, newInstance) } } if !exists { return nil, errcode.NotFound } return &fetchData, nil } //deep copy func copyInstance(src *Instance) *Instance { dst := new(Instance) *dst = *src //copy addrs dst.Addrs = make([]string, len(src.Addrs)) for i, addr := range src.Addrs { dst.Addrs[i] = addr } return dst }
编写测试用例,先注册再获取,看到可以正常获取到信息。
5)服务下线
功能目标:接受服务的下线请求,并将服务从注册信息列表中删除。通过传入 env, appid, hostname 三要素信息进行对应服务实例的取消。
r := model.NewRegistry() r.Cancel(req.Env, req.AppId, req.Hostname, 0)
根据 appid 和 env 找到对象的 app,然后删除 app 中对应的 hostname。如果 hostname 后 app.instances 为空,那么将 app 从注册表中清除。
func (r *Registry) Cancel(env, appid, hostname string, latestTimestamp int64) (*Instance, *errcode.Error) { log.Println("action cancel...") //find app app, ok := r.getApplication(appid, env) if !ok { return nil, errcode.NotFound } instance, ok, insLen := app.Cancel(hostname, latestTimestamp) if !ok { return nil, errcode.NotFound } //if instances is empty, delete app from apps if insLen == 0 { r.lock.Lock() delete(r.apps, getKey(appid, env)) r.lock.Unlock() } return instance, nil } func (app *Application) Cancel(hostname string, latestTimestamp int64) (*Instance, bool, int) { newInstance := new(Instance) app.lock.Lock() defer app.lock.Unlock() appIn, ok := app.instances[hostname] if !ok { return nil, ok, 0 } //delete hostname delete(app.instances, hostname) appIn.LatestTimestamp = latestTimestamp app.upLatestTimestamp(latestTimestamp) *newInstance = *appIn return newInstance, true, len(app.instances) }
编写测试用例先注册,再取消,然后获取信息,发现 404 not found。
6) 服务续约
功能目标:实现服务的健康检查机制,服务注册后,如果没有取消,那么就应该在注册表中,可以随时查到,如果某个服务实例挂了,能否自动的从注册表中删除,保障注册表中的服务实例都是正常的。
通常有两种方式做法:注册中心(服务端)主动探活,通过请求指定接口得到正常响应来确认;服务实例(客户端)主动上报,调用续约接口进行续约,续约设有时效 TTL (time to live)。两种方式各有优缺点,大家可以思考一下,不同的注册中心也采用了不同的方式,这里选型第二种方案。
r := model.NewRegistry() r.Renew(req.Env, req.AppId, req.Hostname) 根据 appid 和 env 找到对象的 app,再根据 hostname 找到对应主机实例,更新其 RenewTimestamp 为当前时间。 func (r *Registry) Renew(env, appid, hostname string) (*Instance, *errcode.Error) { app, ok := r.getApplication(appid, env) if !ok { return nil, errcode.NotFound } in, ok := app.Renew(hostname) if !ok { return nil, errcode.NotFound } return in, nil } func (app *Application) Renew(hostname string) (*Instance, bool) { app.lock.Lock() defer app.lock.Unlock() appIn, ok := app.instances[hostname] if !ok { return nil, ok } appIn.RenewTimestamp = time.Now().UnixNano() return copyInstance(appIn), true }
7)服务剔除
功能目标:既然有服务定期续约,那么对应的如果服务没有续约呢?服务如果下线可以使用 Cancel 进行取消,但如果服务因为网络故障或挂了导致不能提供服务,那么可以通过检查它是否按时续约来判断,把 TTL 达到阈值的服务实例剔除(Cancel),实现服务的被动下线。
首先在新建注册表时开启一个定时任务,新启一个 goroutine 来实现。
func NewRegistry() *Registry { go r.evictTask() }
配置定时检查的时间间隔,默认 60 秒,通过 Tick 定时器开启 evict。
func (r *Registry) evictTask() { ticker := time.Tick(configs.CheckEvictInterval) for { select { case <-ticker: r.evict() } } }
遍历注册表的所有 apps,然后再遍历其中的 instances,如果当前时间减去实例上一次续约时间 instance.RenewTimestamp 达到阈值(默认 90 秒),那么将其加入过期队列中。这里并没有直接将过期队列所有实例都取消,考虑 GC 以及 本地时间漂移的因素,设定了一个剔除的上限 evictionLimit,随机剔除一些过期实例。
func (r *Registry) evict() { now := time.Now().UnixNano() var expiredInstances []*Instance apps := r.getAllApplications() var registryLen int for _, app := range apps { registryLen += app.GetInstanceLen() allInstances := app.GetAllInstances() for _, instance := range allInstances { if now-instance.RenewTimestamp > int64(configs.InstanceExpireDuration) { expiredInstances = append(expiredInstances, instance) } } } evictionLimit := registryLen - int(float64(registryLen)*configs.SelfProtectThreshold) expiredLen := len(expiredInstances) if expiredLen > evictionLimit { expiredLen = evictionLimit } if expiredLen == 0 { return } for i := 0; i < expiredLen; i++ { j := i + rand.Intn(len(expiredInstances)-i) expiredInstances[i], expiredInstances[j] = expiredInstances[j], expiredInstances[i] expiredInstance := expiredInstances[i] r.Cancel(expiredInstance.Env, expiredInstance.AppId, expiredInstance.Hostname, now) } }
剔除上限数量,是通过当前注册表大小(注册表所有 instances 实例数)减去 触发自我保护机制的阈值(当前注册表大小 * 保护自我机制比例值),保护机制稍后会具体解释。
剔除过期时,采用了 Knuth-Shuffle 算法,也叫公平洗牌算法来实现随机剔除。当然如果 expiredLen <= evictionLimit,随机剔除的意义不大,如果前者大于后者,随机剔除能最大程度保障,剔除的实例均匀分散到所有应用实例中,降低某服务被全部清空的风险。公平洗牌算法实现也比较简单,循环遍历过期列表,将当前数与特定随机数交换,和我们打牌时两两交换洗牌过程类似,它实现了 O(n) 的时间复杂度,由 Knuth 发明。
8)自我保护
功能目标:既然服务会定期剔除超时未续约的服务,那么假设一种情况,网络一段时间发生了异常,所有服务都没成功续约,这时注册中心是否将所有服务全部剔除?当然不行!所以,我们需要一个自我保护的机制防止此类事情的发生。
怎么设计自我保护机制呢?按短时间内失败的比例达到某特定阈值就开启保护,保护模式下不进行服务剔除。所以我们需要一个统计模块,续约成功 +1。默认情况下,服务剔除每 60 秒执行一次,服务续约每 30 秒执行一次,那么一个服务实例在检查时应该有 2 次续约。
type Guard struct { renewCount int64 lastRenewCount int64 needRenewCount int64 threshold int64 lock sync.RWMutex }
- renewCount 记录所有服务续约次数,每执行一次 renew 加 1
- lastRenewCount 记录上一次检查周期(默认 60 秒)服务续约统计次数
- needRenewCount 记录一个周期总计需要的续约数,按一次续约 30 秒,一周期 60 秒,一个实例就需要 2 次,所以服务注册时 + 2,服务取消时 - 2
- threshold 通过 needRenewCount 和阈值比例 (0.85)确定触发自我保护的值
func (gd *Guard) incrNeed() { gd.lock.Lock() defer gd.lock.Unlock() gd.needRenewCount += int64(configs.CheckEvictInterval / configs.RenewInterval) gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold) } func (gd *Guard) decrNeed() { gd.lock.Lock() defer gd.lock.Unlock() gd.needRenewCount -= int64(configs.CheckEvictInterval / configs.RenewInterval) gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold) } func (gd *Guard) setNeed(count int64) { gd.lock.Lock() defer gd.lock.Unlock() gd.needRenewCount = count * int64(configs.CheckEvictInterval/configs.RenewInterval) gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold) } func (gd *Guard) incrCount() { atomic.AddInt64(&gd.renewCount, 1) }
在注册表中增加 Guard 模块并初始化,在服务注册成功,服务取消,服务续约时操作统计。
type Registry struct { gd *Guard } func NewRegistry() *Registry { r := &Registry{ gd: new(Guard), } } func (r *Registry) Register(...) { if isNew { r.gd.incrNeed() } } func (r *Registry) Cancel(...) { r.gd.decrNeed() } func (r *Registry) Renew(...) { r.gd.incrCount() }
在服务剔除前进行上一周期计数统计,并判断是否达到自我保护开启状态。
func (gd *Guard) storeLastCount() { atomic.StoreInt64(&gd.lastRenewCount, atomic.SwapInt64(&gd.needRenewCount, 0)) } func (gd *Guard) selfProtectStatus() bool { return atomic.LoadInt64(&gd.lastRenewCount) < atomic.LoadInt64(&gd.threshold) }
如果开启自我保护,那么续约时间超过阈值(默认90 秒)忽略不会剔除。但如果续约时间超过最大阈值(默认3600 秒),那么不管是否开启保护都要剔除。因为自我保护只是保护短时间由于网络原因未续约的服务,长时间未续约大概率已经有问题了。
func (r *Registry) evictTask() { case <-ticker: r.gd.storeLastCount() r.evict() } } func (r *Registry) evict() { delta := now - instance.RenewTimestamp if !protectStatus && delta > int64(configs.InstanceExpireDuration) || delta > int64(configs.InstanceMaxExpireDuration) { expiredInstances = append(expiredInstances, instance) } }
思考下,服务续约比例未达到 85% 就会触发自我保护,还记不记得在服务剔除那块有一个剔除数量上限不能超过 15%,这里就 match 了,否则还没来得及进入自我保护程序就把服务都剔除了。
最后增加一个定时器,如果超过一定时间(15 分钟),重新计算下当前实例数,重置保护阈值,降低脏数据风险。
func (r *Registry) evictTask() { resetTicker := time.Tick(configs.ResetGuardNeedCountInterval) for { select { case <-resetTicker: var count int64 for _, app := range r.getAllApplications() { count += int64(app.GetInstanceLen()) } r.gd.setNeed(count) } } }
9)注册中心对外提供服务
目前注册中心基本功能已实现,需要对外提供服务了,我们采用 gin 来实现一个 web 服务,接受 http 请求进行服务的注册、查找、续约、下线操作,这样保障注册中心可以方便的接受来自任何语言客户端请求。
func main() { //init config c := flag.String("c", "", "config file path") flag.Parse() config, err := configs.LoadConfig(*c) if err != nil { log.Println("load config error:", err) return } //global discovery global.Discovery = model.NewDiscovery(config) //init router and start server router := api.InitRouter() srv := &http.Server{ Addr: config.HttpServer, Handler: router, } go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("listen:%s\n", err) } }() }
增加一个 discovery 结构,并开启一个全局变量 global.Discovery ,该结构中维护注册表 Registry,然后就可以根据注册表实现各种操作了。
type Discovery struct { config *configs.GlobalConfig protected bool Registry *Registry } func NewDiscovery(config *configs.GlobalConfig) *Discovery { dis := &Discovery{ protected: false, config: config, Registry: NewRegistry(), //init registry } return dis } //init discovery var Discovery *model.Discovery
api.InitRouter() 绑定 url 路由和 Handler,以注册为例,接受请求入参,调用 global.Discovery.Registry.Register() 进行注册,成功返回。
router.POST("api/register", handler.RegisterHandler) func RegisterHandler(c *gin.Context) { var req model.RequestRegister if e := c.ShouldBindJSON(&req); e != nil { err := errcode.ParamError c.JSON(http.StatusOK, gin.H{ "code": err.Code(), "message": err.Error(), }) return } //bind instance instance := model.NewInstance(&req) if instance.Status == 0 || instance.Status > 2 { err := errcode.ParamError c.JSON(http.StatusOK, gin.H{ "code": err.Code(), "message": err.Error(), }) return } //dirtytime if req.DirtyTimestamp > 0 { instance.DirtyTimestamp = req.DirtyTimestamp } global.Discovery.Registry.Register(instance, req.LatestTimestamp) c.JSON(http.StatusOK, gin.H{ "code": 200, "message": "", "data": "", }) }
接着要实现平滑重启,在 main 启动时增加接收信号后关闭服务。
func main() { //... //graceful restart quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT) <-quit log.Println("shutdown discovery server...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { log.Fatal("server shutdown error:", err) } select { case <-ctx.Done(): log.Println("timeout of 5 seconds") } log.Println("server exiting") }
3. 总结与问题
至此,一个单机版的注册中心就可以工作了,但生产环境单点肯定是不能容忍的,因此有必要实现一个注册中心集群。那么是否部署多个注册中心实例就可以了,当然 .... 不行!这只能保障有多个注册中心节点,而每个节点中维护自己的注册表,那么就需要进行注册表数据同步。多节点数据同步又会涉及著名的一致性问题,这时 Paxos、Raft、ZAB、Gossip 等算法名词涌现,而我们将使用 P2P(Peer to Peer)对等网络协议来实现。关于集群设计与实现我们将在后续文章中展开。
相关代码:https://github.com/wangshizebin/service_discovery
下一章:微服务:链路追踪
微服务:链路追踪设计与 golang 实现:一、链路追踪:微服务架构是将单个应用程序被划分成各种小而连接的服务,每一个服务完成一个单一的业务功能,相互之间保持独立和解耦,每个服务都可以独立演进。相对于传统的单体服务,微服务具有隔离性、技术异构性、可扩展性以及简化部署等优点。链路追踪系统用来追踪每一个请求的完整调用链路,帮助技术人员准确地定位异常服务、发现性能瓶颈、梳理调用链路以及预估系统容量。