Filebeat 是 Elastic Stack 里那个轻量级的日志采集器。最近读它的源码,发现几处设计模式用得很顺手,记一下:Registry + LazyInit、Observer、Strategy 和 Object Pool 这四个,正好对应插件化、解耦、可配置和性能优化几个常见诉求。

Registry + LazyInit

Filebeat 支持 log、stdin、redis、kafka 等多种 input 类型,要做到加一种类型不动核心代码,靠的就是注册表:每个 input 类型把自己的工厂函数登记进去,运行时再按配置里的 type 查出来用。延迟初始化的部分在于——只有配置里真正用到的 input 才会被实例化。

filebeat/input.New 中,通过 Registry 获取对应类型的工厂函数:

f, err = GetFactory(input.config.Type)

Registry 查找逻辑

func GetFactory(name string) (Factory, error) {
    if _, exists := registry[name]; !exists {
        return nil, fmt.Errorf("Error creating input. No such input type exist: '%v'", name)
    }
    return registry[name], nil
}

组件注册

各个 input 类型在初始化时自动注册到 registry:

func init() {
    err := input.Register("log", NewInput)
    if err != nil {
        panic(err)
    }
}

这样配置和实现就彻底分开了——加一种新的 input 类型只要在 initRegister 一下,核心的 New 逻辑一行都不用改。

Observer:事件总线

Filebeat 内部组件之间不直接互相调用,而是走一条事件总线(Event Bus):谁关心什么事件就订阅,谁产生了事件就发布出去,中间用 channel 传递。典型的观察者模式。

发布事件时遍历所有订阅者,只把它感兴趣的事件投进它的 channel:

func (b *bus) Publish(e Event) {
    b.RLock()
    defer b.RUnlock()
    logp.Debug("bus", "%s: %+v", b.name, e)

    // 遍历所有订阅者,发送感兴趣的事件
    for _, listener := range b.listeners {
        if listener.interested(e) {
            listener.channel <- e
        }
    }
}

订阅时创建一个带缓冲的 channel,并把 filter 一起记下来,这样发布端就能做选择性投递:

func (b *bus) Subscribe(filter ...string) Listener {
    listener := &listener{
        filter:  filter,
        bus:     b,
        channel: make(chan Event, 100), // 带缓冲的 channel
    }

    b.Lock()
    defer b.Unlock()
    b.listeners = append(b.listeners, listener)
    return listener
}

发布者和订阅者互不知道对方存在,靠 channel 异步传递,再加上 filter 做选择性订阅——这套组合让模块之间的耦合降到很低。

Strategy:Kafka 分区策略

Filebeat 往 Kafka 写数据时支持 random、round_robin、hash 几种分区方式。与其在写入逻辑里塞一堆 if-else,它把每种策略做成一个独立的构造函数,用一张 map 注册起来,按配置取用:

// 定义策略映射表
var partitioners = map[string]partitionBuilder{
    "random":      cfgRandomPartitioner,
    "round_robin": cfgRoundRobinPartitioner,
    "hash":        cfgHashPartitioner,
}

func initPartitionStrategy(config *Config) (Partitioner, error) {
    name := config.Partition

    // 根据配置选择策略
    mk := partitioners[name]
    if mk == nil {
        return nil, fmt.Errorf("unknown kafka partition mode %v", name)
    }

    // 构造具体的分区器
    partitioner, err := mk(config)
    // ...
}

加一种新的分区策略,往这张 map 里添一行就行,写入主流程完全不用动。

Object Pool:复用 ackChan

Filebeat 在确认(ACK)链路上会高频创建、丢弃 ackChan 这种短命对象,频繁分配会给 GC 带来压力。它的做法是用 sync.Pool 把这些对象池化复用。

定义对象池,New 负责在池空时兜底造一个新的:

var ackChanPool = sync.Pool{
    New: func() interface{} {
        return &ackChan{
            ch: make(chan batchAckMsg, 1),
        }
    },
}

从池里取出来后,把每个字段重新赋值,相当于"重置"成一个干净对象再用:

func newACKChan(seq uint, start, count int, states []clientState) *ackChan {
    ch := ackChanPool.Get().(*ackChan)
    ch.next = nil
    ch.seq = seq
    ch.start = start
    ch.count = count
    ch.states = states
    return ch
}

归还时把 next 置空再 Put 回去——这一步很关键,不清掉引用的话被池子持有的对象会顺带把它指向的东西也留住,造成内存泄漏:

func releaseACKChan(c *ackChan) {
    c.next = nil  // 清理引用,防止内存泄漏
    ackChanPool.Put(c)
}

sync.Pool 在这种高频创建销毁的短命对象上收益最明显,省下的是反复分配内存和触发 GC 的开销。

参考资源