阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

如何远程写入prometheus存储

33次阅读
没有评论

共计 4477 个字符,预计需要花费 12 分钟才能阅读完成。

导读 prometheus 一般都是采用 pull 方式获取数据,但是有一些情况下,不方便配置 exporter,就希望能通过 push 的方式上传指标数据。
简介

prometheus 一般都是采用 pull 方式获取数据,但是有一些情况下,不方便配置 exporter,就希望能通过 push 的方式上传指标数据。

1、可以采用 pushgateway 的方式,推送到 pushgateway,然后 prometheus 通过 pushgateway 拉取数据。

2、在新版本中增加了一个参数:–enable-feature=remote-write-receiver,允许远程通过接口 /api/v1/write,直接写数据到 prometheus 里面。

pushgateway 在高并发的情况下还是比较消耗资源的,特别是开启一致性检查,高并发写入的时候特别慢。

第二种方式少了一层转发,速度应该比较快。
如何远程写入 prometheus 存储

接口

可以通过 prometheus 的 http 接口 /api/v1/write 提交数据,这个接口的数据格式有有要求:
使用 POST 方式提交
需要经过 protobuf 编码,依赖 github.com/gogo/protobuf/proto
可以使用 snappy 进行压缩,依赖 github.com/golang/snappy

步骤:

收集指标名称,时间戳,值和标签
将数据转换成 prometheus 需要的数据格式
使用 proto 对数据进行编码,并用 snappy 进行压缩
通过 httpClient 提交数据

package prome 
 
import ( 
    "bufio" 
    "bytes" 
    "context" 
    "io" 
    "io/ioutil" 
    "net/http" 
    "net/url" 
    "regexp" 
    "time" 
 
    "github.com/gogo/protobuf/proto" 
    "github.com/golang/snappy" 
    "github.com/opentracing-contrib/go-stdlib/nethttp" 
    opentracing "github.com/opentracing/opentracing-go" 
    "github.com/pkg/errors" 
    "github.com/prometheus/common/model" 
    "github.com/prometheus/prometheus/pkg/labels" 
    "github.com/prometheus/prometheus/prompb" 
) 
 
type RecoverableError struct {error} 
 
type HttpClient struct { 
    url     *url.URL 
    Client  *http.Client 
    timeout time.Duration 
} 
 
var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`) 
 
type MetricPoint struct { 
    Metric  string            `json:"metric"` // 指标名称 
    TagsMap map[string]string `json:"tags"`   // 数据标签 
    Time    int64             `json:"time"`   // 时间戳,单位是秒 
    Value   float64           `json:"value"`  // 内部字段,最终转换之后的 float64 数值 
} 
 
func (c *HttpClient) remoteWritePost(req []byte) error {httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) 
    if err != nil {return err} 
    httpReq.Header.Add("Content-Encoding", "snappy") 
    httpReq.Header.Set("Content-Type", "application/x-protobuf") 
    httpReq.Header.Set("User-Agent", "opcai") 
    httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") 
    ctx, cancel := context.WithTimeout(context.Background(), c.timeout) 
    defer cancel() 
 
    httpReq = httpReq.WithContext(ctx) 
 
    if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { 
        var ht *nethttp.Tracer 
        httpReq, ht = nethttp.TraceRequest(parentSpan.Tracer(), 
            httpReq, 
            nethttp.OperationName("Remote Store"), 
            nethttp.ClientTrace(false), 
        ) 
        defer ht.Finish()} 
 
    httpResp, err := c.Client.Do(httpReq) 
    if err != nil {// Errors from Client.Do are from (for example) network errors, so are 
        // recoverable. 
        return RecoverableError{err} 
    } 
    defer func() {io.Copy(ioutil.Discard, httpResp.Body) 
        httpResp.Body.Close()}() 
 
    if httpResp.StatusCode/100 != 2 {scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512)) 
        line := "" 
        if scanner.Scan() {line = scanner.Text() 
        } 
        err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) 
    } 
    if httpResp.StatusCode/100 == 5 {return RecoverableError{err} 
    } 
    return err 
} 
 
func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) { 
 
    req := &prompb.WriteRequest{Timeseries: samples,} 
    data, err := proto.Marshal(req) 
    if err != nil {return nil, err} 
    compressed := snappy.Encode(nil, data) 
    return compressed, nil 
} 
 
type sample struct { 
    labels labels.Labels 
    t      int64 
    v      float64 
} 
 
const (LABEL_NAME = "__name__") 
 
func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {pt := prompb.TimeSeries{} 
    pt.Samples = []prompb.Sample{{}} 
    s := sample{} 
    s.t = item.Time 
    s.v = item.Value 
    // name 
    if !MetricNameRE.MatchString(item.Metric) {return &pt, errors.New("invalid metrics name") 
    } 
    nameLs := labels.Label{ 
        Name:  LABEL_NAME, 
        Value: item.Metric, 
    } 
    s.labels = append(s.labels, nameLs) 
    for k, v := range item.TagsMap {if model.LabelNameRE.MatchString(k) { 
            ls := labels.Label{ 
                Name:  k, 
                Value: v, 
            } 
            s.labels = append(s.labels, ls) 
        } 
    } 
 
    pt.Labels = labelsToLabelsProto(s.labels, pt.Labels) 
    // 时间赋值问题, 使用毫秒时间戳 
    tsMs := time.Unix(s.t, 0).UnixNano() / 1e6 
    pt.Samples[0].Timestamp = tsMs 
    pt.Samples[0].Value = s.v 
    return &pt, nil 
} 
 
func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {result := buf[:0] 
    if cap(buf) 
测试

prometheus 启动的时候记得加参数 --enable-feature=remote-write-receiver

package prome 
 
import ( 
    "testing" 
    "time" 
) 
 
func TestRemoteWrite(t *testing.T) {c, err := NewClient("http://localhost:9090/api/v1/write", 10*time.Second) 
    if err != nil {t.Fatal(err) 
    } 
    metrics := []MetricPoint{ 
        {Metric: "opcai1", 
            TagsMap: map[string]string{"env": "testing", "op": "opcai"}, 
            Time:    time.Now().Add(-1 * time.Minute).Unix(), 
            Value:   1}, 
        {Metric: "opcai2", 
            TagsMap: map[string]string{"env": "testing", "op": "opcai"}, 
            Time:    time.Now().Add(-2 * time.Minute).Unix(), 
            Value:   2}, 
        {Metric: "opcai3", 
            TagsMap: map[string]string{"env": "testing", "op": "opcai"}, 
            Time:    time.Now().Unix(), 
            Value:   3}, 
        {Metric: "opcai4", 
            TagsMap: map[string]string{"env": "testing", "op": "opcai"}, 
            Time:    time.Now().Unix(), 
            Value:   4}, 
    } 
    err = c.RemoteWrite(metrics) 
    if err != nil {t.Fatal(err) 
    } 
    t.Log("end...") 
}

使用 go test 进行测试

go test -v
总结

这个方法也是在看夜莺 v5 的代码的时候发现的,刚好有需要统一收集 redis 的监控指标,刚好可以用上,之前用 pushgateway 写的实在是慢。

阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

代金券:在阿里云专用满减优惠券

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-07-25发表,共计4477字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中