阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

InfluxDB meta文件解析

212次阅读
没有评论

共计 6902 个字符,预计需要花费 18 分钟才能阅读完成。

操作系统:CentOS7.3.1611_x64

Go 语言版本:1.8.3 linux/amd64

InfluxDB 版本:1.1.0

influxdb 默认配置:

/etc/influxdb/influxdb.confmeta

默认配置:

[meta]
  dir = “/var/lib/influxdb/meta”
  retention-autocreate = true
  logging-enabled = true

dir

meta 数据存放目录,默认值:/var/lib/influxdb/meta

meta 数据文件默认路径:/var/lib/influxdb/meta/meta.db

retention-autocreate

用于控制默认存储策略,数据库创建时,会自动生成 autogen 的存储策略,默认值:true

logging-enabled

是否开启 meta 日志,默认值:true

meta 文件的 dump 和 load

源码路径:github.com/influxdata/influxdb/services/meta/client.go

meta 文件 dump

// snapshot will save the current meta data to disk
func snapshot(path string, data *Data) error {
    file := filepath.Join(path, metaFile)
    tmpFile := file + “tmp”

    f, err := os.Create(tmpFile)
    if err != nil {
        return err
    }
    defer f.Close()

    var d []byte
    if b, err := data.MarshalBinary(); err != nil {
        return err
    } else {
        d = b
    }

    if _, err := f.Write(d); err != nil {
        return err
    }

    if err = f.Sync(); err != nil {
        return err
    }

    //close file handle before renaming to support Windows
    if err = f.Close(); err != nil {
        return err
    }

    return renameFile(tmpFile, file)
}

snapshot 可以通过以下两种方式触发:

1、当执行 Client.Open 函数时会进行 snapshot 操作;

2、执行 meta 文件更新时通过 commit 函数进行 snapshot 操作;

在 InfluxDB 中程序中,通过 NewServer 函数创建 MetaClient 变量(meta.NewClient),然后执行 MetaClient.Open()进行初始化;

后续会通过 Server.Open 函数(run/server.go)启动各项服务,如果有 meta 文件的更新操作,通过 commit 函数进行 snapshot 操作;

meta 文件 load

// Load will save the current meta data from disk
func (c *Client) Load() error {
    file := filepath.Join(c.path, metaFile)

    f, err := os.Open(file)
    if err != nil {
        if os.IsNotExist(err) {
            return nil
        }
        return err
    }
    defer f.Close()

    data, err := ioutil.ReadAll(f)
    if err != nil {
        return err
    }

    if err := c.cacheData.UnmarshalBinary(data); err != nil {
        return err
    }
    return nil
}

Client.Open()中会执行 Load 操作,NewServer 时会自动加载。

meta 文件内容编解码
源码路径:github.com/influxdata/influxdb/services/meta/data.go

meta 数据 encode:

// MarshalBinary encodes the metadata to a binary format.
func (data *Data) MarshalBinary() ([]byte, error) {
    return proto.Marshal(data.marshal())
}

meta 数据 decode:

// UnmarshalBinary decodes the object from a binary format.
func (data *Data) UnmarshalBinary(buf []byte) error {
    var pb internal.Data
    if err := proto.Unmarshal(buf, &pb); err != nil {
        return err
    }
    data.unmarshal(&pb)
    return nil
}

proto 路径:github.com/gogo/protobuf/proto

meta 文件结构定义
源码路径:github.com/influxdata/influxdb/services/meta/data.go

meta 文件存储的就是 meta.Data 的数据,结构定义如下:

// Data represents the top level collection of all metadata.
type Data struct {
    Term      uint64 // associated raft term
    Index    uint64 // associated raft index
    ClusterID uint64
    Databases []DatabaseInfo
    Users    []UserInfo

    MaxShardGroupID uint64
    MaxShardID      uint64
}

Term:暂时不知道干什么用的。

Index:从源码看这个应该是类似版本号的东西,初始化为 1,执行 commit 操作是会增加。如果为 1,会立即执行持久化操作(在 Open 函数中操作)。

ClusterID:是 InfluxDB 集群相关内容;

Databases:用于存储数据库信息;

Users:用于存储数据库用户信息;

DatabaseInfo 定义:

// DatabaseInfo represents information about a database in the system.
type DatabaseInfo struct {
    Name                  string
    DefaultRetentionPolicy string
    RetentionPolicies      []RetentionPolicyInfo
    ContinuousQueries      []ContinuousQueryInfo
}

RetentionPolicyInfo 定义:

// RetentionPolicyInfo represents metadata about a retention policy.
type RetentionPolicyInfo struct {
    Name              string
    ReplicaN          int
    Duration          time.Duration
    ShardGroupDuration time.Duration
    ShardGroups        []ShardGroupInfo
    Subscriptions      []SubscriptionInfo
}

ShardGroupInfo 定义:

// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important
// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system
// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can
// safely delete any associated shards.
type ShardGroupInfo struct {
    ID          uint64
    StartTime  time.Time
    EndTime    time.Time
    DeletedAt  time.Time
    Shards      []ShardInfo
    TruncatedAt time.Time
}

ShardInfo 定义:

// ShardInfo represents metadata about a shard.
type ShardInfo struct {
    ID    uint64
    Owners []ShardOwner
}

ShardOwner 定义:

// ShardOwner represents a node that owns a shard.
type ShardOwner struct {
    NodeID uint64
}

ShardOwner 主要用于集群,其中 NodeId 用于标识集群的节点 ID,在 InfluxDB 1.1 社区版本中集群已经不支持了,该字段无效。

SubscriptionInfo 定义:

// SubscriptionInfo hold the subscription information
type SubscriptionInfo struct {
    Name        string
    Mode        string
    Destinations []string
}

ContinuousQueryInfo 定义:

// ContinuousQueryInfo represents metadata about a continuous query.
type ContinuousQueryInfo struct {
    Name  string
    Query string
}

UserInfo 定义:

// UserInfo represents metadata about a user in the system.
type UserInfo struct {
    Name      string
    Hash      string
    Admin      bool
    Privileges map[string]influxql.Privilege
}

其它
meta 文件解析示例代码:

package main

import (
    “os”
    “fmt”
    “io/ioutil”
    “github.com/influxdata/influxdb/services/meta”
)

func Load(metaFile string) error {
    cacheData:= &meta.Data{
            Index: 1,
        }
    //file := filepath.Join(c.path, metaFile)

    f, err := os.Open(metaFile)
    if err != nil {
        if os.IsNotExist(err) {
            return nil
        }
        return err
    }
    defer f.Close()

    data, err := ioutil.ReadAll(f)
    if err != nil {
        return err
    }

    if err := cacheData.UnmarshalBinary(data); err != nil {
        return err
    }
    //fmt.Println(data)
    //fmt.Println(“=======================”)

    fmt.Println(“Term      :”,cacheData.Term)
    fmt.Println(“Index      :”,cacheData.Index)
    fmt.Println(“Databases :”)
    //fmt.Println(cacheData.Databases)

    for k,dbInfo := range cacheData.Databases {
        //fmt.Println(k,dbInfo)
        fmt.Println(“k =”,k)
        fmt.Println(dbInfo.Name,dbInfo.DefaultRetentionPolicy)
        for _,rPolicy := range dbInfo.RetentionPolicies {
            //fmt.Println(rPolicy)
            fmt.Println(rPolicy.Name,rPolicy.ReplicaN,rPolicy.Duration,rPolicy.ShardGroupDuration)
            fmt.Println(“————-ShardGroups—————“)
            //fmt.Println(rPolicy.ShardGroups)
            for shardIdx,shardGroup := range rPolicy.ShardGroups {
                //fmt.Println(shardGroup)
                fmt.Println(“shardIdx =”,shardIdx)
                fmt.Println(“ID          :”,shardGroup.ID)
                fmt.Println(“StartTime  :”,shardGroup.StartTime)
                fmt.Println(“EndTime    :”,shardGroup.EndTime)
                fmt.Println(“DeletedAt  :”,shardGroup.DeletedAt)
                //fmt.Println(“Shards      :”,shardGroup.Shards)
                fmt.Printf(“Shards      :”)
                for _,shard := range shardGroup.Shards {
                    fmt.Println(shard.ID,shard.Owners)
                }

                fmt.Println(“TruncatedAt :”,shardGroup.TruncatedAt)
                //fmt.Println(shardGroup.ID,shardGroup.StartTime,shardGroup.EndTime)
                // DeletedAt,Shards  ,      TruncatedAt
            }
            //fmt.Println(rPolicy.Subscriptions)
            fmt.Println(“————–Subscriptions—————-“)
            for subsIdx,subInfo := range rPolicy.Subscriptions {
                //fmt.Println(subInfo)
                fmt.Println(“subsIdx =”,subsIdx)
                fmt.Println(“Name :”,subInfo.Name)
                fmt.Println(“Mode :”,subInfo.Mode)
                fmt.Println(“Destinations :”,subInfo.Destinations)
            }

        }
        fmt.Println(“=======================”)
    }

    fmt.Println(“Users :”)
    fmt.Println(cacheData.Users)
    fmt.Println(cacheData.MaxShardGroupID)
    fmt.Println(cacheData.MaxShardID)
    return nil
}

func main() {
    argsWithProg := os.Args
    if(len(argsWithProg) < 2) {
        fmt.Println(“usage : “,argsWithProg[0],” configFile”)
        return
    }
    metaFile := os.Args[1]

    fmt.Println(argsWithProg)
    fmt.Println(metaFile)

    Load(metaFile)
}

好,就这些了,希望对你有帮助。

本文永久更新链接地址:http://www.linuxidc.com/Linux/2018-01/150314.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计6902字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中