本文主要研究一下golang的zap的ZapKafkaWriter
package logger import ( "errors" "sync" "sync/atomic" "syscall" ) // ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafka type ZapKafkaWriter struct { kp *KafkaProducer ce *CloudEvents closed int32 // Nonzero if closing, must access atomically pendingWg sync.WaitGroup // WaitGroup for pending messages closeMut sync.Mutex } // newZapKafkaWriter returns a kafka io.writer instance func newZapKafkaWriter( kpCfg ProducerConfiguration, cloudEvents *CloudEvents, ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) { // create an async producer kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg) if err != nil { return nil, err } zw := &ZapKafkaWriter{ kp: kp, ce: cloudEvents, } return zw, nil }
ZapKafkaWriter定义了KafkaProducer、CloudEvents、closed、pendingWg、closeMut属性,其newZapKafkaWriter方法根据ProducerConfiguration、cloudEvents、CloudEventsConfiguration来创建KafkaProducer,然后根据KafkaProducer来创建ZapKafkaWriter
// Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as well func (zw *ZapKafkaWriter) Sync() error { return nil } // Write sends byte slices to Kafka ignoring error responses (Thread-safe) // Write might block if the Input() channel of the AsyncProducer is full func (zw *ZapKafkaWriter) Write(msg []byte) (int, error) { if zw.Closed() { return 0, syscall.EINVAL } if zw.kp.producer == nil { return 0, errors.New("No producer defined") } zw.pendingWg.Add(1) defer zw.pendingWg.Done() err := zw.kp.sendMessage(msg) return len(msg), err } // Closed returns true if the writer is closed, false otherwise (Thread-safe) func (zw *ZapKafkaWriter) Closed() bool { return atomic.LoadInt32(&zw.closed) != 0 } // Close must be called when the writer is no longer needed (Thread-safe) func (zw *ZapKafkaWriter) Close() (err error) { zw.closeMut.Lock() defer zw.closeMut.Unlock() if zw.Closed() { return syscall.EINVAL } atomic.StoreInt32(&zw.closed, 1) zw.pendingWg.Wait() return nil }
ZapKafkaWriter实现了zapcore.WriteSyncer接口,其Write方法使用KafkaProducer发送消息,其Sync方法目前不做任何操作,它还提供了Close方法,也就是也实现了Sink接口
WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。