本文主要研究一下klog的Flush
k8s.io/klog/v2@v2.4.0/klog.go
// Flush flushes all pending log I/O. func Flush() { logging.lockAndFlushAll() }
Flush方法执行的是logging.lockAndFlushAll()
k8s.io/klog/v2@v2.4.0/klog.go
// init sets up the defaults and runs flushDaemon. func init() { logging.stderrThreshold = errorLog // Default stderrThreshold is ERROR. logging.setVState(0, nil, false) logging.logDir = "" logging.logFile = "" logging.logFileMaxSizeMB = 1800 logging.toStderr = true logging.alsoToStderr = false logging.skipHeaders = false logging.addDirHeader = false logging.skipLogHeaders = false logging.oneOutput = false go logging.flushDaemon() }
klog的init方法异步协程执行logging.flushDaemon()
k8s.io/klog/v2@v2.4.0/klog.go
// flushDaemon periodically flushes the log file buffers. func (l *loggingT) flushDaemon() { for range time.NewTicker(flushInterval).C { l.lockAndFlushAll() } }
flushDaemon方法range新建ticker的channel,然后执行l.lockAndFlushAll()
k8s.io/klog/v2@v2.4.0/klog.go
// lockAndFlushAll is like flushAll but locks l.mu first. func (l *loggingT) lockAndFlushAll() { l.mu.Lock() l.flushAll() l.mu.Unlock() }
lockAndFlushAll使用lock执行flushAll
k8s.io/klog/v2@v2.4.0/klog.go
const ( infoLog severity = iota warningLog errorLog fatalLog numSeverity = 4 ) // flushAll flushes all the logs and attempts to "sync" their data to disk. // l.mu is held. func (l *loggingT) flushAll() { // Flush from fatal down, in case there's trouble flushing. for s := fatalLog; s >= infoLog; s-- { file := l.file[s] if file != nil { file.Flush() // ignore error file.Sync() // ignore error } } }
flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法
k8s.io/klog/v2@v2.4.0/klog.go
// flushSyncWriter is the interface satisfied by logging destinations. type flushSyncWriter interface { Flush() error Sync() error io.Writer } type Writer interface { Write(p []byte) (n int, err error) }
flushSyncWriter接口定义了Flush、Sync方法,内嵌了io.Writer接口
k8s.io/klog/v2@v2.4.0/klog.go
// redirectBuffer is used to set an alternate destination for the logs type redirectBuffer struct { w io.Writer } func (rb *redirectBuffer) Sync() error { return nil } func (rb *redirectBuffer) Flush() error { return nil } func (rb *redirectBuffer) Write(bytes []byte) (n int, err error) { return rb.w.Write(bytes) }
redirectBuffer内嵌了io.Writer,其Write方法通过io.Writer来写;其Sync及Flush方法都为空操作
k8s.io/klog/v2@v2.4.0/klog.go
// syncBuffer joins a bufio.Writer to its underlying file, providing access to the // file's Sync method and providing a wrapper for the Write method that provides log // file rotation. There are conflicting methods, so the file cannot be embedded. // l.mu is held for all its methods. type syncBuffer struct { logger *loggingT *bufio.Writer file *os.File sev severity nbytes uint64 // The number of bytes written to this file maxbytes uint64 // The max number of bytes this syncBuffer.file can hold before cleaning up. } func (sb *syncBuffer) Sync() error { return sb.file.Sync() } func (sb *syncBuffer) Write(p []byte) (n int, err error) { if sb.nbytes+uint64(len(p)) >= sb.maxbytes { if err := sb.rotateFile(time.Now(), false); err != nil { sb.logger.exit(err) } } n, err = sb.Writer.Write(p) sb.nbytes += uint64(n) if err != nil { sb.logger.exit(err) } return }
syncBuffer定义了logger、file、sev、nbytes、maxbytes属性,内嵌了*bufio.Writer
;其Sync方法执行的是*os.File
.Sync;其Flush方法执行的是*bufio.Writer
.Flush
/usr/local/go/src/bufio/bufio.go
type Writer struct { err error buf []byte n int wr io.Writer } // Flush writes any buffered data to the underlying io.Writer. func (b *Writer) Flush() error { if b.err != nil { return b.err } if b.n == 0 { return nil } n, err := b.wr.Write(b.buf[0:b.n]) if n < b.n && err == nil { err = io.ErrShortWrite } if err != nil { if n > 0 && n < b.n { copy(b.buf[0:b.n-n], b.buf[n:b.n]) } b.n -= n b.err = err return err } b.n = 0 return nil }
*bufio.Writer
.Flush方法执行的是底层io.Writer的Write方法
// rotateFile closes the syncBuffer's file and starts a new one. // The startup argument indicates whether this is the initial startup of klog. // If startup is true, existing files are opened for appending instead of truncated. func (sb *syncBuffer) rotateFile(now time.Time, startup bool) error { if sb.file != nil { sb.Flush() sb.file.Close() } var err error sb.file, _, err = create(severityName[sb.sev], now, startup) if err != nil { return err } if startup { fileInfo, err := sb.file.Stat() if err != nil { return fmt.Errorf("file stat could not get fileinfo: %v", err) } // init file size sb.nbytes = uint64(fileInfo.Size()) } else { sb.nbytes = 0 } sb.Writer = bufio.NewWriterSize(sb.file, bufferSize) if sb.logger.skipLogHeaders { return nil } // Write header. var buf bytes.Buffer fmt.Fprintf(&buf, "Log file created at: %s\n", now.Format("2006/01/02 15:04:05")) fmt.Fprintf(&buf, "Running on machine: %s\n", host) fmt.Fprintf(&buf, "Binary: Built with %s %s for %s/%s\n", runtime.Compiler, runtime.Version(), runtime.GOOS, runtime.GOARCH) fmt.Fprintf(&buf, "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg\n") n, err := sb.file.Write(buf.Bytes()) sb.nbytes += uint64(n) return err }
syncBuffer.rotateFile方法会设置其Writer为bufio.NewWriterSize(sb.file, bufferSize),底层writer为syncBuffer的file
klog的init方法异步协程执行logging.flushDaemon(),它内部执行的是l.lockAndFlushAll();Flush方法是执行l.lockAndFlushAll();l.lockAndFlushAll()方法使用lock执行flushAll;flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法;对于redirectBuffer,其Flush及Sync方法为空操作;对于syncBuffer,其Sync方法执行的是*os.File
.Sync;其Flush方法执行的是*bufio.Writer
.Flush,*bufio.Writer
.Flush方法执行的是底层io.Writer的Write方法,即syncBuffer的file的Write方法。