透过Kratos源码,强化Go语言学习(三)_并发启动 & 优雅关闭


Kratos 一套轻量级 Go 微服务框架,包含大量微服务相关框架及工具。致力于提供完整的微服务研发体验,整合相关框架及工具后,微服务治理相关部分可对整体业务开发周期无感,从而更加聚焦于业务交付

🏷️ 一、源码概览app.go

1、AppInfo 接口

// AppInfo is application context value.
type AppInfo interface {
	ID() string
	Name() string
	Version() string
	Metadata() map[string]string
	Endpoint() []string
}
  • AppInfo 接口定义了应用程序的核心元信息规范,它抽象了一个微服务应用必须提供的基本信息
  • ID(): 返回应用实例的唯一标识符,用于在服务注册中心中区分同一服务的不同实例
  • Name(): 返回服务名称,用于服务发现和路由
  • Version(): 返回应用版本,支持版本管理和灰度发布
  • Metadata(): 返回服务的元数据键值对,可包含环境、区域、权重等信息
  • Endpoint(): 返回服务的所有端点地址,包括 HTTP 和 gRPC 服务地址

AppInfo 接口在框架中连接应用层和基础设施层,也体现了 “接口优先、抽象清晰"的设计理念

2、App 结构体

App 负责应用程序组件的生命周期管理;实现了AppInfo接口;

type App struct {
	opts     options
	ctx      context.Context
	cancel   context.CancelFunc
	mu       sync.Mutex
	instance *registry.ServiceInstance
}

核心方法有:

1、创建*App ;前面文章提到的Option 配置在此处应用;

func New(opts ...Option) *App

2、Run() :负责执行 OnStart,启动所有服务 和服务器注册

func (a *App) Run() error

3、Stop() :负责执行 OnStop,服务注销 和取消应用上下文

func (a *App) Stop() (err error) {
	sctx := NewContext(a.ctx, a)
	for _, fn := range a.opts.beforeStop {
		err = fn(sctx)
	}

	a.mu.Lock()
	instance := a.instance
	a.mu.Unlock()
	if a.opts.registrar != nil && instance != nil {
		ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.registrarTimeout)
		defer cancel()
		if err = a.opts.registrar.Deregister(ctx, instance); err != nil {
			return err
		}
	}
	if a.cancel != nil {
		a.cancel()
	}
	return err
}

🚀 二、并发启动机制

1. 核心工具:errgroup + WaitGroup

Run 方法中使用了两个关键工具:

eg, ctx := errgroup.WithContext(sctx)  // 统一错误管理和取消
wg := sync.WaitGroup{}                 // 确保启动顺序

errgroup 的作用

  • 管理多个 goroutine 的生命周期
  • 任何一个 goroutine 出错,自动取消其他 goroutine
  • 统一收集和返回错误

2. 服务器并发启动策略

关键代码在 app.go#L95-L108

for _, srv := range a.opts.servers {
    server := srv
    // 为每个服务器创建停止监听器
    eg.Go(func() error {
        <-ctx.Done() // 等待停止信号
        stopCtx := context.WithoutCancel(octx)
        if a.opts.stopTimeout > 0 {
            var cancel context.CancelFunc
            stopCtx, cancel = context.WithTimeout(stopCtx, a.opts.stopTimeout)
            defer cancel()
        }
        return server.Stop(stopCtx)
    })
    
    // 启动服务器
    wg.Add(1)
    eg.Go(func() error {
        wg.Done() // 标记该服务器已开始启动
        return server.Start(octx)
    })
}

启动顺序控制

wg.Wait() // 等待所有服务器开始启动

这确保了:

  • 所有服务器同时启动,提高效率
  • 服务注册在服务器启动后进行
  • 每个服务器都有对应的停止监听器

3. 服务注册时机控制

wg.Wait() // 等待所有服务器启动
if a.opts.registrar != nil {
    rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
    defer rcancel()
    if err = a.opts.registrar.Register(rctx, instance); err != nil {
        return err
    }
}

设计巧思:先让服务器监听端口,再注册到注册中心,避免"服务已注册但尚未就绪"的问题。

🛑 二、优雅关闭机制

1. 信号监听与传播

c := make(chan os.Signal, 1)
signal.Notify(c, a.opts.sigs...) // 默认监听 SIGTERM, SIGQUIT, SIGINT
eg.Go(func() error {
    select {
    case <-ctx.Done():
        return nil
    case <-c:           // 收到退出信号
        return a.Stop() // 触发优雅关闭
    }
})

信号处理流程

  1. 监听操作系统信号(如 kill 命令、Ctrl+C)
  2. 收到信号后调用 a.Stop()
  3. 通过 errgroup 的 context 取消机制通知所有 goroutine

2. 多阶段关闭流程

Stop 方法实现了分阶段优雅关闭

func (a *App) Stop() (err error) {
    sctx := NewContext(a.ctx, a)
    
    // 阶段1:执行 beforeStop 钩子(如停止接收新请求)
    for _, fn := range a.opts.beforeStop {
        err = fn(sctx)
    }

    // 阶段2:从注册中心注销服务
    a.mu.Lock()
    instance := a.instance
    a.mu.Unlock()
    if a.opts.registrar != nil && instance != nil {
        ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.registrarTimeout)
        defer cancel()
        if err = a.opts.registrar.Deregister(ctx, instance); err != nil {
            return err
        }
    }
    
    // 阶段3:取消上下文,触发所有服务器停止
    if a.cancel != nil {
        a.cancel()
    }
    return err
}

3. 超时控制机制

每个服务器的停止都有超时保护:

stopCtx := context.WithoutCancel(octx)
if a.opts.stopTimeout > 0 {
    var cancel context.CancelFunc
    stopCtx, cancel = context.WithTimeout(stopCtx, a.opts.stopTimeout)
    defer cancel()
}
return server.Stop(stopCtx)

超时策略

  • 如果设置了 stopTimeout,强制在超时后停止
  • 防止某个服务器卡死导致整个应用无法关闭

🔄 三、完整的并发控制流程图

启动阶段:
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  beforeStart    │ -> │ 所有服务器并发启动  │ -> │   服务注册       │
│  钩子执行       │    │ (errgroup管理)    │    │                │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                       ┌──────────────────┐
                       │  afterStart 钩子  │
                       └──────────────────┘
                       ┌──────────────────┐
                       │   信号监听        │
                       └──────────────────┘

关闭阶段:
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  收到退出信号    │ -> │  beforeStop钩子   │ -> │  服务注销       │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                                              ┌─────────────────┐
                                              │ context取消      │
                                              │ 触发所有服务器停止 │
                                              └─────────────────┘
                                              ┌─────────────────┐
                                              │ afterStop钩子    │
                                              └─────────────────┘

总结

Kratos 的并发启动和优雅关闭实现体现了生产级微服务框架的精良设计

🔸 启动快速且安全:并发启动 + 顺序控制
🔸 关闭优雅且可靠:分阶段关闭 + 超时保护
🔸 错误处理完善:统一管理 + 快速失败
🔸 并发控制精准:errgroup + WaitGroup + Context

这种设计确保了在高并发、分布式环境下的稳定性和可靠性

wx

关注公众号

©2017-2023 鲁ICP备17023316号-1 Powered by Hugo