本文主要研究一下promtail的Client
loki/pkg/promtail/client/client.go
// Client pushes entries to Loki and can be stopped type Client interface { api.EntryHandler // Stop goroutine sending batch of entries. Stop() }
Client接口内嵌了api.EntryHandler接口,定义了Stop方法
loki/pkg/promtail/api/types.go
// EntryHandler is something that can "handle" entries. type EntryHandler interface { Handle(labels model.LabelSet, time time.Time, entry string) error }
EntryHandler接口定义了Handle方法
loki/pkg/promtail/client/client.go
// Client for pushing logs in snappy-compressed protos over HTTP. type client struct { logger log.Logger cfg Config client *http.Client quit chan struct{} once sync.Once entries chan entry wg sync.WaitGroup externalLabels model.LabelSet } // Handle implement EntryHandler; adds a new line to the next batch; send is async. func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { if len(c.externalLabels) > 0 { ls = c.externalLabels.Merge(ls) } // Get the tenant ID in case it has been overridden while processing // the pipeline stages, then remove the special label tenantID := c.getTenantID(ls) if _, ok := ls[ReservedLabelTenantID]; ok { // Clone the label set to not manipulate the input one ls = ls.Clone() delete(ls, ReservedLabelTenantID) } c.entries <- entry{tenantID, ls, logproto.Entry{ Timestamp: t, Line: s, }} return nil } // Stop the client. func (c *client) Stop() { c.once.Do(func() { close(c.quit) }) c.wg.Wait() }
client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法判断LabelSet是否包含ReservedLabelTenantID,如果包含则会执行ls.Clone()及然后移除,之后构造entry发送到c.entries这个channel;Stop方法执行close(c.quit)
loki/pkg/promtail/client/client.go
func (c *client) run() { batches := map[string]*batch{} // Given the client handles multiple batches (1 per tenant) and each batch // can be created at a different point in time, we look for batches whose // max wait time has been reached every 10 times per BatchWait, so that the // maximum delay we have sending batches is 10% of the max waiting time. // We apply a cap of 10ms to the ticker, to avoid too frequent checks in // case the BatchWait is very low. minWaitCheckFrequency := 10 * time.Millisecond maxWaitCheckFrequency := c.cfg.BatchWait / 10 if maxWaitCheckFrequency < minWaitCheckFrequency { maxWaitCheckFrequency = minWaitCheckFrequency } maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) defer func() { // Send all pending batches for tenantID, batch := range batches { c.sendBatch(tenantID, batch) } c.wg.Done() }() for { select { case <-c.quit: return case e := <-c.entries: batch, ok := batches[e.tenantID] // If the batch doesn't exist yet, we create a new one with the entry if !ok { batches[e.tenantID] = newBatch(e) break } // If adding the entry to the batch will increase the size over the max // size allowed, we do send the current batch and then create a new one if batch.sizeBytesAfter(e) > c.cfg.BatchSize { c.sendBatch(e.tenantID, batch) batches[e.tenantID] = newBatch(e) break } // The max size of the batch isn't reached, so we can add the entry batch.add(e) case <-maxWaitCheck.C: // Send all batches whose max wait time has been reached for tenantID, batch := range batches { if batch.age() < c.cfg.BatchWait { continue } c.sendBatch(tenantID, batch) delete(batches, tenantID) } } } }
client的run方法创建time.NewTicker(maxWaitCheckFrequency),然后for循环,如果是c.entries读取到了数据就执行batch.add(e),如果是maxWaitCheck触发了则遍历batches,执行c.sendBatch(tenantID, batch)及delete;最后quit的时候,还有defer方法遍历batches执行c.sendBatch(tenantID, batch)
loki/pkg/promtail/client/client.go
func (c *client) sendBatch(tenantID string, batch *batch) { buf, entriesCount, err := batch.encode() if err != nil { level.Error(c.logger).Log("msg", "error encoding batch", "error", err) return } bufBytes := float64(len(buf)) encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) ctx := context.Background() backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig) var status int for backoff.Ongoing() { start := time.Now() status, err = c.send(ctx, tenantID, buf) requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) if err == nil { sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) for _, s := range batch.streams { lbls, err := parser.ParseMetric(s.Labels) if err != nil { // is this possible? level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err) return } var lblSet model.LabelSet for i := range lbls { if lbls[i].Name == LatencyLabel { lblSet = model.LabelSet{ model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host), model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value), } } } if lblSet != nil { streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) } } return } // Only retry 429s, 500s and connection-level errors. if status > 0 && status != 429 && status/100 != 5 { break } level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() backoff.Wait() } if err != nil { level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err) droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) } }
sendBatch方法先通过batch.encode()编码为buf,然后通过c.send(ctx, tenantID, buf)进行发送
loki/pkg/promtail/client/client.go
func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) { ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) defer cancel() req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf)) if err != nil { return -1, err } req = req.WithContext(ctx) req.Header.Set("Content-Type", contentType) req.Header.Set("User-Agent", UserAgent) // If the tenant ID is not empty promtail is running in multi-tenant mode, so // we should send it to Loki if tenantID != "" { req.Header.Set("X-Scope-OrgID", tenantID) } resp, err := c.client.Do(req) if err != nil { return -1, err } defer helpers.LogError("closing response body", resp.Body.Close) if resp.StatusCode/100 != 2 { scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) line := "" if scanner.Scan() { line = scanner.Text() } err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) } return resp.StatusCode, err }
send方法执行一个POST的http请求发送到c.cfg.URL.String()
promtail的client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法构造entry发送到c.entries这个channel;Stop方法执行close(c.quit);然后它还有一个run方法将entry添加到batch,然后将batch通过http的POST请求发送到指定的地址。