共计 4477 个字符,预计需要花费 12 分钟才能阅读完成。
导读 | prometheus 一般都是采用 pull 方式获取数据,但是有一些情况下,不方便配置 exporter,就希望能通过 push 的方式上传指标数据。 |
简介
prometheus 一般都是采用 pull 方式获取数据,但是有一些情况下,不方便配置 exporter,就希望能通过 push 的方式上传指标数据。
1、可以采用 pushgateway 的方式,推送到 pushgateway,然后 prometheus 通过 pushgateway 拉取数据。
2、在新版本中增加了一个参数:–enable-feature=remote-write-receiver,允许远程通过接口 /api/v1/write,直接写数据到 prometheus 里面。
pushgateway 在高并发的情况下还是比较消耗资源的,特别是开启一致性检查,高并发写入的时候特别慢。
第二种方式少了一层转发,速度应该比较快。
接口
可以通过 prometheus 的 http 接口 /api/v1/write 提交数据,这个接口的数据格式有有要求:
使用 POST 方式提交
需要经过 protobuf 编码,依赖 github.com/gogo/protobuf/proto
可以使用 snappy 进行压缩,依赖 github.com/golang/snappy
步骤:
收集指标名称,时间戳,值和标签
将数据转换成 prometheus 需要的数据格式
使用 proto 对数据进行编码,并用 snappy 进行压缩
通过 httpClient 提交数据
package prome | |
import ( | |
"bufio" | |
"bytes" | |
"context" | |
"io" | |
"io/ioutil" | |
"net/http" | |
"net/url" | |
"regexp" | |
"time" | |
"github.com/gogo/protobuf/proto" | |
"github.com/golang/snappy" | |
"github.com/opentracing-contrib/go-stdlib/nethttp" | |
opentracing "github.com/opentracing/opentracing-go" | |
"github.com/pkg/errors" | |
"github.com/prometheus/common/model" | |
"github.com/prometheus/prometheus/pkg/labels" | |
"github.com/prometheus/prometheus/prompb" | |
) | |
type RecoverableError struct {error} | |
type HttpClient struct { | |
url *url.URL | |
Client *http.Client | |
timeout time.Duration | |
} | |
var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`) | |
type MetricPoint struct { | |
Metric string `json:"metric"` // 指标名称 | |
TagsMap map[string]string `json:"tags"` // 数据标签 | |
Time int64 `json:"time"` // 时间戳,单位是秒 | |
Value float64 `json:"value"` // 内部字段,最终转换之后的 float64 数值 | |
} | |
func (c *HttpClient) remoteWritePost(req []byte) error {httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) | |
if err != nil {return err} | |
httpReq.Header.Add("Content-Encoding", "snappy") | |
httpReq.Header.Set("Content-Type", "application/x-protobuf") | |
httpReq.Header.Set("User-Agent", "opcai") | |
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") | |
ctx, cancel := context.WithTimeout(context.Background(), c.timeout) | |
defer cancel() | |
httpReq = httpReq.WithContext(ctx) | |
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { | |
var ht *nethttp.Tracer | |
httpReq, ht = nethttp.TraceRequest(parentSpan.Tracer(), | |
httpReq, | |
nethttp.OperationName("Remote Store"), | |
nethttp.ClientTrace(false), | |
) | |
defer ht.Finish()} | |
httpResp, err := c.Client.Do(httpReq) | |
if err != nil {// Errors from Client.Do are from (for example) network errors, so are | |
// recoverable. | |
return RecoverableError{err} | |
} | |
defer func() {io.Copy(ioutil.Discard, httpResp.Body) | |
httpResp.Body.Close()}() | |
if httpResp.StatusCode/100 != 2 {scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512)) | |
line := "" | |
if scanner.Scan() {line = scanner.Text() | |
} | |
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) | |
} | |
if httpResp.StatusCode/100 == 5 {return RecoverableError{err} | |
} | |
return err | |
} | |
func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) { | |
req := &prompb.WriteRequest{Timeseries: samples,} | |
data, err := proto.Marshal(req) | |
if err != nil {return nil, err} | |
compressed := snappy.Encode(nil, data) | |
return compressed, nil | |
} | |
type sample struct { | |
labels labels.Labels | |
t int64 | |
v float64 | |
} | |
const (LABEL_NAME = "__name__") | |
func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {pt := prompb.TimeSeries{} | |
pt.Samples = []prompb.Sample{{}} | |
s := sample{} | |
s.t = item.Time | |
s.v = item.Value | |
// name | |
if !MetricNameRE.MatchString(item.Metric) {return &pt, errors.New("invalid metrics name") | |
} | |
nameLs := labels.Label{ | |
Name: LABEL_NAME, | |
Value: item.Metric, | |
} | |
s.labels = append(s.labels, nameLs) | |
for k, v := range item.TagsMap {if model.LabelNameRE.MatchString(k) { | |
ls := labels.Label{ | |
Name: k, | |
Value: v, | |
} | |
s.labels = append(s.labels, ls) | |
} | |
} | |
pt.Labels = labelsToLabelsProto(s.labels, pt.Labels) | |
// 时间赋值问题, 使用毫秒时间戳 | |
tsMs := time.Unix(s.t, 0).UnixNano() / 1e6 | |
pt.Samples[0].Timestamp = tsMs | |
pt.Samples[0].Value = s.v | |
return &pt, nil | |
} | |
func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {result := buf[:0] | |
if cap(buf) | |
测试 | |
prometheus 启动的时候记得加参数 --enable-feature=remote-write-receiver | |
package prome | |
import ( | |
"testing" | |
"time" | |
) | |
func TestRemoteWrite(t *testing.T) {c, err := NewClient("http://localhost:9090/api/v1/write", 10*time.Second) | |
if err != nil {t.Fatal(err) | |
} | |
metrics := []MetricPoint{ | |
{Metric: "opcai1", | |
TagsMap: map[string]string{"env": "testing", "op": "opcai"}, | |
Time: time.Now().Add(-1 * time.Minute).Unix(), | |
Value: 1}, | |
{Metric: "opcai2", | |
TagsMap: map[string]string{"env": "testing", "op": "opcai"}, | |
Time: time.Now().Add(-2 * time.Minute).Unix(), | |
Value: 2}, | |
{Metric: "opcai3", | |
TagsMap: map[string]string{"env": "testing", "op": "opcai"}, | |
Time: time.Now().Unix(), | |
Value: 3}, | |
{Metric: "opcai4", | |
TagsMap: map[string]string{"env": "testing", "op": "opcai"}, | |
Time: time.Now().Unix(), | |
Value: 4}, | |
} | |
err = c.RemoteWrite(metrics) | |
if err != nil {t.Fatal(err) | |
} | |
t.Log("end...") | |
} | |
使用 go test 进行测试 | |
go test -v | |
总结 | |
这个方法也是在看夜莺 v5 的代码的时候发现的,刚好有需要统一收集 redis 的监控指标,刚好可以用上,之前用 pushgateway 写的实在是慢。 | |
阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配 | |
腾讯云新客低至 82 元 / 年,老客户 99 元 / 年 | |
代金券:在阿里云专用满减优惠券 | |
正文完
星哥玩云-微信公众号
