共计 13914 个字符,预计需要花费 35 分钟才能阅读完成。
公司的 Riak 版本是 2.0.4,目前已根据 CMDB 三级业务部署了十几套集群,大部分是跨机房部署。监控采集分为两个大的维度,第一个维度是单机,也就是「IP: 端口」;第二个维度是集群,也就是所有节点指标的统计结果。本文主要介绍采集的指标和采集程序。
一、采集的指标
1、吞吐量指标
1.1 单机
采集方法:
/usr/sbin/riak-admin status
指标 | 功能 |
---|---|
node_gets | 某节点前一分钟处理的 GET 请求数量,包括该节点上非本地虚拟节点处理的 GET 请求 |
node_puts | 某节点前一分钟处理的 PUT 请求数量,包括该节点上非本地虚拟节点处理的 PUT 请求 |
1.2 集群
指标 | 功能 | 统计方法 |
---|---|---|
node_gets_total | 集群前一分钟处理的 GET 请求数量 | SUM(node_gets) |
node_puts_total | 集群前一分钟处理的 PUT 请求数量 | SUM(node_puts) |
2、延迟指标
2.1 单机
采集方法:
/usr/sbin/riak-admin status
指标 | 功能 |
---|---|
node_get_fsm_time_mean | 客户端发起 GET 请求到收到响应时间间隔的均值 |
node_get_fsm_time_median | 客户端发起 GET 请求到收到响应时间间隔的中值 |
node_get_fsm_time_95 | 客户端发起 GET 请求到收到响应时间间隔的 95 百分位值 |
node_get_fsm_time_100 | 客户端发起 GET 请求到收到响应时间间隔的 100 百分位值 |
node_put_fsm_time_mean | 客户端发起 PUT 请求到收到响应时间间隔的均值 |
node_put_fsm_time_median | 客户端发起 PUT 请求到收到响应时间间隔的中值 |
node_put_fsm_time_95 | 客户端发起 PUT 请求到收到响应时间间隔的 95 百分位值 |
node_put_fsm_time_100 | 客户端发起 PUT 请求到收到响应时间间隔的 100 百分位值 |
2.2 集群
指标 | 功能 | 统计方法 |
---|---|---|
node_get_fsm_time_mean_avg | 客户端发起 GET 请求到收到响应时间间隔的均值 | AVG(node_get_fsm_time_mean) |
node_put_fsm_time_mean_avg | 客户端发起 PUT 请求到收到响应时间间隔的均值 | AVG(node_put_fsm_time_mean) |
3、Erlang 资源使用情况指标(单机)
采集方法:
/usr/sbin/riak-admin status
指标 | 功能 |
---|---|
sys_process_count | Erlang 进程的数量 |
memory_processes | 分配给 Erlang 进程的内存总量(单位 bytes) |
memory_processes_used | Erlang 进程使用的内存总量(单位 bytes) |
4、Riak 负荷 / 健康指标
4.1 单机
采集方法:
/usr/sbin/riak-admin status
指标 | 功能 |
---|---|
read_repairs | 某节点前一分钟处理的读取修复操作数量 |
node_get_fsm_siblings_mean | 某节点前一分钟所有 GET 操作处理的兄弟数据数量均值 |
node_get_fsm_siblings_median | 某节点前一分钟所有 GET 操作处理的兄弟数据数量中值 |
node_get_fsm_siblings_95 | 某节点前一分钟所有 GET 操作处理的兄弟数据数量 95 百分位值 |
node_get_fsm_siblings_100 | 某节点前一分钟所有 GET 操作处理的兄弟数据数量 100 百分位值 |
node_get_fsm_objsize_mean | 某节点前一分钟流经 GET_FSM 的对象大小均值 |
node_get_fsm_objsize_median | 某节点前一分钟流经 GET_FSM 的对象大小中值 |
node_get_fsm_objsize_95 | 某节点前一分钟流经 GET_FSM 的对象大小 95 百分位值 |
node_get_fsm_objsize_100 | 某节点前一分钟流经 GET_FSM 的对象大小 100 百分位值 |
4.2 集群
指标 | 功能 | 统计方法 |
---|---|---|
read_repairs_total | 集群前一分钟处理的读取修复操作数量 | SUM(read_repairs) |
node_get_fsm_siblings_mean_avg | 集群前一分钟所有 GET 操作处理的兄弟数据数量均值 | AVG(node_get_fsm_siblings_mean) |
node_get_fsm_objsize_mean_avg | 集群前一分钟流经 GET_FSM 的对象大小均值 | AVG(node_get_fsm_objsize_mean) |
5、其他
5.1 LevelDB 合并错误(单机)
采集方法:
find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} \; | wc -l
5.2 LevelDB 读取块操作错误(单机)
采集方法:
/usr/sbin/riak-admin status
指标 | 功能 |
---|---|
leveldb_read_block_error | LevelDB 读取块操作错误数量 |
5.3 节点存活状态(单机)
采集方法:
/usr/sbin/riak-admin member-status | grep `ifconfig | grep "inet addr:10" | awk -F':' '{print $2}' | awk '{print $1}'`
输出如下,valid 表示节点正常
valid 9.0% -- 'riak@10.1.80.114'
5.4 Riak Error Log(单机)
Riak 日志路径:/data1/riak/logs
采集文件:/data1/riak/logs/*
采集时间段:最近一分钟
采集内容:最近一分钟发生的错误数
采集示例:grep error -rn /data1/riak/logs | wc -l
说明:这个采集需要程序处理下逻辑,在此不给出完整的采集方法
二、采集程序
1、Riak 监控系统设计
DBA 通过前台页面根据 CMDB 三级业务添加 / 卸载 Riak 集群监控,根据 CMDB 的 ip 添加 Riak 单机监控(单机属于集群,不能单独存在,可增量添加单机监控),填写 ip 和端口,配置阈值、负责人等信息
1)数据库设计
mysql> use riakMonitor
show tabReading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+---------------------------+
| Tables_in_riakMonitor |
+---------------------------+
| riakClusterConf |
| riakClusterDisplay |
| riakClusterStatus |
| riakClusterStatusTemplate |
| riakSingleConf |
| riakSingleDisplay |
| riakSingleStatus |
| riakSingleStatusTemplate |
+---------------------------+
8 rows in set (0.00 sec)
Template 表作为历史库表模板,历史库按月分库,按 ip 分表
2) 单机 Agent 设计
- Agent 会通过自动调度平台下发到目标机器,Crond 周期是 1 分钟,直接上报到 mysql 数据库。运行时间超过 45s 会被调度平台 kill
- 如果检测不到 riak 或者命令出错则会发送 rtx 告警给 admins + dba, 系统错误会发送给 admins
3) 集群汇聚设计
- 集群数据根据节点 agent 上报数据在 50s 的时候 select 出当前一分钟的数据计算汇聚入库
- 程序每分钟都会清除 clusterStatus 的数据,如果 agent 在本分钟上报心跳异常或者上报时间不在集群程序运行前(50s),cluster 则不会统计该 ip 数据,但平均值计算时的除数会算上该 ip(+1)
- 集群计算同时会写进历史库,并创建历史表
4) CGI 接口设计(NodeJs)
- 异步接收 agent 上报的数据,根据 redis 的 ip 列表转换成 ip1
- 如果 redis 获取的 ip1 不存在 singleConf 表中则会拒绝上报,返回 3003 错误
- 上报成功会入 singleStatus 和历史库,并创建历史表
5) 代码列表
CGI : /data/riakMonitor # daemon
agent: /home/opd/script/riakMonitor # crond
analyzer: /opdData/opdOnline/script/kmc/riakMonitor/analyzer # crond
1、从 CMDB 更新 single/cluster conf 数据
2、同步 conf 和 display
3、解析 status 数据到 display
4、异常数据写入
5、告警
riakTool: /opdData/opdOnline/script/kmc/riakMonitor/riakTool # daemon
每分钟第 50s 运行一次
1、获取监控集群和集群的 ip,计算结果并汇聚
2、操作 redis,将集群数据入历史库
2、采集程序部分代码 (单机,python2.4)
1) 采集指标函数
def getRiakMeta():
thisFuncName = str(sys._getframe().f_code.co_name)
cmdStr = "/usr/sbin/riak-admin status
"
cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
if 0 != cmdCode:
msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
logger.error(msgTxt)
sendRtx(MYCONF.riakAdmins, thisFuncName+" %s Fail:" % cmdStr)
return 1
data["node_gets"] = data["node_puts"] = data["node_get_fsm_time_mean"] = data["node_get_fsm_time_median"] = 0
data["node_get_fsm_time_95"] = data["node_get_fsm_time_100"] = data["node_put_fsm_time_mean"] = 0
data["node_put_fsm_time_median"] = data["node_put_fsm_time_95"] = data["node_put_fsm_time_100"] = 0
data["sys_process_count"] = data["memory_processes"] = data["memory_processes_used"] = 0
data["read_repairs"] = data["node_get_fsm_siblings_mean"] = data["node_get_fsm_siblings_median"] = 0
data["node_get_fsm_siblings_95"] = data["node_get_fsm_siblings_100"] = data["node_get_fsm_objsize_mean"] = 0
data["node_get_fsm_objsize_median"] = data["node_get_fsm_objsize_95"] = data["node_get_fsm_objsize_100"] = 0
data["leveldb_read_block_error"] = 0
riakItemInfo = cmdStdout.split('\n')
for each in riakItemInfo:
eachInfo = each.split(" : ")
if 2 == len(eachInfo):
itemKey = eachInfo[0]
itemValue = eachInfo[1].replace('<<"', '').replace('">>', '')
if itemKey in data:
logger.debug("%s:%s" % (itemKey, itemValue))
try:
data[itemKey] = str(round(float(itemValue), 2))
except ValueError:
data[itemKey] = itemValue
except:
raise
cmdStr = """ find /data1/riak/data/leveldb -name"LOG"-exec grep -l'Compaction error'{} \; | wc -l """
cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
if 0 != cmdCode:
msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
logger.error(msgTxt)
sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
return 1
data["leveldb_compaction_error"] = cmdStdout #不用转 int
cmdStr = "/usr/sbin/riak-admin member-status | grep %s" % data["mainIp"]
cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
logger.debug(cmdStdout)
if 0 != cmdCode:
msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
logger.error(msgTxt)
sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
return 1
if cmdStdout.strip().startswith('valid'):
data["is_active"] = 1
else:
data["is_active"] = 0
data["riak_error_log"] = 0
riakLogPath = "/data1/riak/logs/"
if not os.path.isdir(riakLogPath):
msgTxt = "[%s] %s not exists" % (thisFuncName, riakLogPath)
logger.error(msgTxt)
sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
return 1
riakLogInfo = os.listdir(riakLogPath)
reportTimeSec = time.mktime(time.strptime(data["report_time"], "%Y-%m-%d %H:%M:%S"))
for each in riakLogInfo:
logger.debug("fileName: "+each)
eachFile = os.path.join(riakLogPath, each)
if os.path.isfile(eachFile):
try:
eachFd = open(eachFile, 'r')
except IOError, e:
msgTxt = "I/O error({}): {}".format(e.errno, e.strerror)
logger.error(msgTxt)
sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
return 1
else:
for eachLine in eachFd: #从头读,怕文件太大撑爆内存
if "error" in eachLine: #2016-03-20 04:57:09.704 [info] <0.19012.49>@riak_kv_index_h
eachInfo = eachLine.split(' ')
try:
eachTimeStr = "%s %s" % (eachInfo[0], eachInfo[1][:-4])
eachTimeSec = time.mktime(time.strptime(eachTimeStr, "%Y-%m-%d %H:%M:%S"))
if reportTimeSec - 60 <= eachTimeSec < reportTimeSec:
logger.debug(eachLine)
data["riak_error_log"] += 1
elif eachTimeSec >= reportTimeSec:
break
except:
msgTxt = "file(%s) format wrong " % eachFile
logger.error(msgTxt)
break
#sendRtx(MYCONF.riakAdmins, thisFuncName+"Fail:" + msgTxt)
#eachFile.close()
#return 1
eachFd.close()
return 0
2) 上报和失败重传函数
def report2server(content, retry):
'''上报到入库程序,根据 ip 求余获取优先的 server,如果上报失败会遍历 server 列表'''
thisFuncName = ""
try:
thisFuncName = str(sys._getframe().f_code.co_name)
pos = data["ip"] % len(MYCONF.reportServer)
serverKeys = MYCONF.reportServer.keys()
serverKeys.sort()
serverKeys = serverKeys[pos:] + serverKeys[:pos]
for serverId in serverKeys:
cmdStr = "/usr/bin/curl -s --connect-timeout %d -m %d -d'%s&reTry=%d'%s" %(MYCONF.curlConnectTimeout, MYCONF.curlMaxTimeout, content, retry, MYCONF.reportServer[serverId])
cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
logger.info(cmdStr + "\ncmdCode:" + str(cmdCode) + "\n" + cmdStdout + cmdStderr)
if 0 == cmdCode:
return 0
return 1
except:
exceptmsg = StringIO.StringIO()
traceback.print_exc(file=exceptmsg)
msgTxt = exceptmsg.getvalue()
sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
return 1
def reportScheduler(reportRecord=0):
'''reportRecord = 0 表示上报 data 中采集的新数据,reportRecord = 1 表示从 reportFailFile 里面获取最新的一条数据上报到 server,然后需要处理 reportFailFile'''
thisFuncName = ""
try:
thisFuncName = str(sys._getframe().f_code.co_name)
if 1 == reportRecord: # 从上报失败文件中获取最后一条数据,上报之
if not reportFail.has_section("index"): #这里不要去 add_section("index") 该谁 add 谁 add 去
return 0
if not reportFail.has_option("index", "index") or "" == reportFail.get("index", "index").strip():
return 0
indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
index = indexVec[-1]
if "" == index:
msgTxt = reportFail.get("index", "index").strip()
sendRtx(MYCONF.admins, thisFuncName + "[系统错误] index.index 末尾有多余的逗号 " + msgTxt)
return 1
if not reportFail.has_option("content", index + "_c") or not reportFail.has_option("content", index + "_t"):
# _c 是内容 _t 是重试次数
msgTxt = "content sector 缺少 %s_c 或 %s_t" %(index, index)
sendRtx(MYCONF.admins, thisFuncName + "[系统错误] " + msgTxt)
return 1
content = reportFail.get("content", index + "_c")
retry = reportFail.getint("content", index + "_t")
retry += 1
code = report2server(content, retry)
if 0 == code: # 发送成功
indexVec.remove(index)
if indexVec:
reportFail.set("index", "index", ",".join(indexVec))
else:
reportFail.set("index", "index", "")
reportFail.remove_option("content", index + "_c")
reportFail.remove_option("content", index + "_t")
elif retry > MYCONF.maxRetry: # 重发失败,且超过最大重试次数
indexVec.remove(index)
if indexVec:
reportFail.set("index", "index", ",".join(indexVec))
else:
reportFail.set("index", "index", "")
reportFail.remove_option("content", index + "_c")
reportFail.remove_option("content", index + "_t")
else: # 重发失败,更新 _t (retry) 字段
reportFail.set("content", index + "_t", retry)
else: # 发送新数据
index = data["report_time"].replace(" ", "").replace("-", "").replace(":", "")
content = urllib.urlencode(data)
retry = 0
code = report2server(content, retry)
if 0 == code:
return 0
if not reportFail.has_section("index"):
reportFail.add_section("index")
reportFail.set("index", "index", index)
reportFail.add_section("content")
reportFail.set("content", index + "_c", content)
reportFail.set("content", index + "_t", retry)
else:
indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
indexVec.append(index)
if len(indexVec) > MYCONF.maxFailRecord: # 超过最大 fail record 数
reportFail.set("index", "index", ",".join(indexVec[len(indexVec) - MYCONF.maxFailRecord:]))
reportFail.set("content", index + "_c", content)
reportFail.set("content", index + "_t", retry)
for i in range(0, len(indexVec) - MYCONF.maxFailRecord):
delIndex = indexVec[i]
reportFail.remove_option("content", delIndex + "_c")
reportFail.remove_option("content", delIndex + "_t")
else:
reportFail.set("index", "index", ",".join(indexVec))
reportFail.set("content", index + "_c", content)
reportFail.set("content", index + "_t", retry)
return 0
except:
exceptmsg = StringIO.StringIO()
traceback.print_exc(file=exceptmsg)
msgTxt = exceptmsg.getvalue()
sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
return 1
3) 获取 shell 命令输出函数
def getCmdResult(cmdStr):
'''获取 shell 命令的返回码,标准输出,标准错误'''
#child = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
#cmdStdout, cmdStderr = child.communicate()
#cmdCode = child.wait()
#return (cmdCode, cmdStdout, cmdStderr)
thisFuncName = str(sys._getframe().f_code.co_name)
nowTime = int(time.time())
tmpstdout = os.path.join(MYCONF.basePath, "cmd.stdout.%d.tmp" % nowTime)
tmpstderr = os.path.join(MYCONF.basePath, "cmd.stderr.%d.tmp" % nowTime)
if "debug" == MYCONF.role:
msgTxt = "[%d]Run Cmd: %s" % (nowTime, cmdStr)
logger.debug(msgTxt)
cmdStr = "(%s) 1>%s 2>%s" %(cmdStr, tmpstdout, tmpstderr)
cmdCode = os.system(cmdStr) >> 8
cdmStdout = cmdStderr = ""
try:
outfd = open(tmpstdout)
cmdStdout = outfd.read()
errfd = open(tmpstderr)
cmdStderr = errfd.read()
except:
exceptmsg = StringIO.StringIO()
traceback.print_exc(file=exceptmsg)
msgTxt = exceptmsg.getvalue()
sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
cmdCode = 110
else:
outfd.close()
errfd.close()
os.remove(tmpstderr)
os.remove(tmpstdout)
return (cmdCode, cmdStdout, cmdStderr)
4) 读 / 写 Cache 函数
def readLastCache():
global lastCache
lastCache = ConfigParser.ConfigParser()
if not os.path.isfile(MYCONF.lastCacheFile):
try:
fd = open(MYCONF.lastCacheFile, "w")
except IOError, e:
logger.error("I/O error({}): {}".format(e.errno, e.strerror))
return 1
else:
fd.close()
lastCache.readfp(open(MYCONF.lastCacheFile), "rb")
return 0
def writeCache():
thisFuncName = ""
try:
thisFuncName = str(sys._getframe().f_code.co_name)
lastCache.write(open(MYCONF.lastCacheFile, 'w'))
return 0
except:
exceptmsg = StringIO.StringIO()
traceback.print_exc(file=exceptmsg)
msgTxt = exceptmsg.getvalue()
logger.error(msgTxt)
return 1
5) 读 / 写失败记录
def readFailRecord():
global reportFail
reportFail = ConfigParser.ConfigParser()
if not os.path.isfile(MYCONF.lastReportFailFile):
try:
fd = open(MYCONF.lastReportFailFile, "w")
except IOError, e:
logger.error("I/O error({}): {}".format(e.errno, e.strerror))
return 1
else:
fd.close()
reportFail.readfp(open(MYCONF.lastReportFailFile), "rb")
return 0
def writeFailRecord():
thisFuncName = ""
try:
thisFuncName = str(sys._getframe().f_code.co_name)
reportFail.write(open(MYCONF.lastReportFailFile, 'w'))
return 0
except:
exceptmsg = StringIO.StringIO()
traceback.print_exc(file=exceptmsg)
msgTxt = exceptmsg.getvalue()
logger.error(msgTxt)
return 1
6)main 函数
def main():
data["osType"] = 0 # 0 表示 linux
data["version"] = MYCONF.version # 当前程序的自定义版本号
data["report_time"] = time.strftime("%Y-%m-%d %H:%M:00") #上报时间,由于目前基础监控是分钟级监控粒度,因此秒取 00
initLog()
logger.info('='*80)
if 0 == checkLastPid() and 0 == readLastCache() and 0 == getLoginIp():
readFailRecord() # 读取早迁采集周期上报失败,需要重传的数据
reportScheduler(reportRecord=1) #从 fail record 中选取最近的一条信息上报给服务器
if 0 == getRiakMeta():
reportScheduler(reportRecord=0)
writeFailRecord()
writeCache()
logger.info('='*80)
logging.shutdown()
return
3、添加 / 卸载监控
1) 添加监控
添加监控需要先添加集群(不支持先添加 IP),添加集群会默认把所有 IP 都添加监控(前台将在 clusterConf 新增记录, 并在 singleConf 增加对应的 ip 记录,然后调用调度平台,检测 ip 是否已经安装)如果该集群在 CMDB 里面新增 Ip,则需要手动添加监控(前台提供新增监控节点,插入 singleConf)
2) 卸载监控
(1) 卸载监控可以卸载整个集群的监控(将 clusterConf needMonitor 置 0,同步将 singleConf 的 needMonitor 都置 0,然后调用
调度平台 卸载集群下的所有机器,如果该 ip 存在其他集群并且需要监控,则不用调用 调度平台 卸载)也可以卸载单个节点的监控 (前台将 singleConf 的 needMonitor 置 0,调用 调度平台,同样判断 ip 是否存在其他集群)(2) 添加卸载监控部由前台调用
调度平台 接口,并修改数据库(插入数据或者更新 need_monitor) (3) Single/cluster dislplay 表会同步 conf 表的数据,只保留 need_monitor= 1 的数据
4、CMDB 数据同步
后台一直同步 CMDB 的数据和 conf 表的数据,如果不在 CMDB 的则需要删掉 conf 里面的数据,不管 needMonitor 的值为多少。删除三级业务的话只需要删除 clusterConf 表对应的记录,single 会自动同步外键(尝试调用
调度平台卸载接口,卸载掉被删除的三级业务 ID 下面的所有已安装监控的 IP)
5、前台展示
1)集群状态展示
2) 单机节点状态展示
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-11/136829.htm