共计 6781 个字符,预计需要花费 17 分钟才能阅读完成。
这篇文章介绍了如何利用 Apache Flink 的内置指标系统以及如何使用 Prometheus 来高效地监控流式应用程序。

为什么选择 Prometheus?
随着深入地了解 Prometheus,你会发现一些非常好的功能:
- 服务发现使配置更加容易。Prometheus 支持 consul,etcd,kubernetes 以及各家公有云厂商自动发现。对于监控目标动态发现,这点特别契合 Cloud 时代,应用动态扩缩的特点。我们无法想象,在 Cloud 时代,需要运维不断更改配置。
- 开源社区建立了数百个 exporter。基本上涵盖了所有基础设施和主流中间件。
- 工具库可从您的应用程序获取自定义指标。基本上主流开发语言都有对应的工具库。
- 它是 CNCF 旗下的 OSS,是继 Kubernetes 之后的第二个毕业项目。Kubernetes 已经与 Promethues 深度结合,并在其所有服务中公开了 Prometheus 指标。
- Pushgateway,Alermanager 等组件,基本上涵盖了一个完整的监控生命周期。
- Flink 官方已经提供了对接 Prometheus 的 jar 包,很方便就可以集成。由于本系列文章重点在 Flink on Kubernetes,因此我们所有的操作都是基于这点展开。
部署 Prometheus
对 k8s 不熟悉的同学,可以查阅 k8s 相关文档。由于部署不是本博客的重点,所以我们直接贴出 yaml 文件:
apiVersion: v1 | |
kind: ServiceAccount | |
metadata: | |
name: monitor | |
namespace: kube-system | |
labels: | |
kubernetes.io/cluster-service: "true" | |
addonmanager.kubernetes.io/mode: Reconcile | |
apiVersion: rbac.authorization.k8s.io/v1 | |
kind: ClusterRole | |
metadata: | |
name: monitor | |
labels: | |
kubernetes.io/cluster-service: "true" | |
addonmanager.kubernetes.io/mode: Reconcile | |
rules: | |
- apiGroups: | |
- "" | |
resources: | |
- pods | |
verbs: | |
- get | |
- list | |
- watch | |
apiVersion: rbac.authorization.k8s.io/v1 | |
kind: ClusterRoleBinding | |
metadata: | |
name: monitor | |
labels: | |
kubernetes.io/cluster-service: "true" | |
addonmanager.kubernetes.io/mode: Reconcile | |
roleRef: | |
apiGroup: rbac.authorization.k8s.io | |
kind: ClusterRole | |
name: monitor | |
subjects: | |
- kind: ServiceAccount | |
name: monitor | |
namespace: kube-system | |
apiVersion: v1 | |
kind: ConfigMap | |
metadata: | |
labels: | |
app: monitor | |
name: monitor | |
namespace: kube-system | |
data: | |
prometheus.yml: |- | |
global: | |
scrape_interval: 10s | |
evaluation_interval: 10s | |
scrape_configs: | |
- job_name: kubernetes-pods | |
kubernetes_sd_configs: | |
- role: pod | |
relabel_configs: | |
- action: keep | |
regex: true | |
source_labels: | |
- __meta_kubernetes_pod_annotation_prometheus_io_scrape | |
- action: replace | |
regex: (.+) | |
source_labels: | |
- __meta_kubernetes_pod_annotation_prometheus_io_path | |
target_label: __metrics_path__ | |
- action: replace | |
regex: ([^:]+)(?::d+)?;(d+) | |
replacement: $1:$2 | |
source_labels: | |
- __address__ | |
- __meta_kubernetes_pod_annotation_prometheus_io_port | |
target_label: __address__ | |
- action: labelmap | |
regex: __meta_kubernetes_pod_label_(.+) | |
- action: replace | |
source_labels: | |
- __meta_kubernetes_namespace | |
target_label: kubernetes_namespace | |
- action: replace | |
source_labels: | |
- __meta_kubernetes_pod_name | |
target_label: kubernetes_pod_name | |
apiVersion: apps/v1 | |
kind: StatefulSet | |
metadata: | |
labels: | |
app: monitor | |
name: monitor | |
namespace: kube-system | |
spec: | |
serviceName: monitor | |
selector: | |
matchLabels: | |
app: monitor | |
replicas: 1 | |
template: | |
metadata: | |
labels: | |
app: monitor | |
spec: | |
containers: | |
- args: | |
- --config.file=/etc/prometheus/prometheus.yml | |
- --storage.tsdb.path=/data/prometheus | |
- --storage.tsdb.retention.time=10d | |
image: prom/prometheus:v2.19.0 | |
imagePullPolicy: IfNotPresent | |
name: prometheus | |
ports: | |
- containerPort: 9090 | |
protocol: TCP | |
readinessProbe: | |
httpGet: | |
path: /-/ready | |
port: 9090 | |
initialDelaySeconds: 30 | |
timeoutSeconds: 30 | |
livenessProbe: | |
httpGet: | |
path: /-/healthy | |
port: 9090 | |
initialDelaySeconds: 30 | |
timeoutSeconds: 30 | |
resources: | |
limits: | |
cpu: 1000m | |
memory: 2018Mi | |
requests: | |
cpu: 1000m | |
memory: 2018Mi | |
volumeMounts: | |
- mountPath: /etc/prometheus | |
name: config-volume | |
- mountPath: /data | |
name: monitor-persistent-storage | |
restartPolicy: Always | |
priorityClassName: system-cluster-critical | |
serviceAccountName: monitor | |
initContainers: | |
- name: "init-chown-data" | |
image: "busybox:latest" | |
imagePullPolicy: "IfNotPresent" | |
command: ["chown", "-R", "65534:65534", "/data"] | |
volumeMounts: | |
- name: monitor-persistent-storage | |
mountPath: /data | |
subPath: "" | |
volumes: | |
- configMap: | |
defaultMode: 420 | |
name: monitor | |
name: config-volume | |
volumeClaimTemplates: | |
- metadata: | |
name: monitor-persistent-storage | |
namespace: kube-system | |
spec: | |
accessModes: | |
- ReadWriteOnce | |
resources: | |
requests: | |
storage: 20Gi | |
storageClassName: gp2 | |
apiVersion: v1 | |
kind: Service | |
metadata: | |
annotations: | |
service.beta.kubernetes.io/aws-load-balancer-type: nlb | |
labels: | |
app: monitor | |
name: monitor | |
namespace: kube-system | |
spec: | |
ports: | |
- name: http | |
port: 9090 | |
protocol: TCP | |
targetPort: 9090 | |
selector: | |
app: monitor | |
type: LoadBalancer |
这里我们简单说下,由于我们想利用 Prometheus 的 Kubernetes 的服务发现的方式,所以需要 RBAC 授权,授权 prometheus 实例对集群中的 pod 有一些读取权限。
为什么我们要使用自动发现的方式那?
相比配置文件的方式,自动发现更加灵活。尤其是当你使用的是 flink on native kubernetes,整个 job manager 和 task manager 是根据作业的提交自动创建的,这种动态性,显然是配置文件无法满足的。
由于我们的集群在 eks 上,所以大家在使用其他云的时候,需要略做调整。
定制镜像
这里我们基本上使用上一篇文章介绍的 demo 上,增加监控相关,所以 Dockerfile 如下:
FROM flink | |
COPY /plugins/metrics-prometheus/flink-metrics-prometheus-1.11.0.jar /opt/flink/lib | |
RUN mkdir -p $FLINK_HOME/usrlib | |
COPY ./examples/streaming/WordCount.jar $FLINK_HOME/usrlib/my-flink-job.jar |
Flink 的 Classpath 位于 /opt/flink/lib,所以插件的 jar 包需要放到该目录下。
作业提交
由于我们的 Pod 必须增加一定的标识,从而让 Prometheus 实例可以发现。所以提交命令稍作更改,如下:
./bin/flink run-application -p 8 -t kubernetes-application | |
-Dkubernetes.cluster-id=my-first-cluster | |
-Dtaskmanager.memory.process.size=2048m | |
-Dkubernetes.taskmanager.cpu=2 | |
-Dtaskmanager.numberOfTaskSlots=4 | |
-Dkubernetes.container.image=iyacontrol/flink-world-count:v0.0.2 | |
-Dkubernetes.container.image.pull-policy=Always | |
-Dkubernetes.namespace=stream | |
-Dkubernetes.jobmanager.service-account=flink | |
-Dkubernetes.rest-service.exposed.type=LoadBalancer | |
-Dkubernetes.rest-service.annotations=service.beta.kubernetes.io/aws-load-balancer-type:nlb,service.beta.kubernetes.io/aws-load-balancer-internal:true | |
-Dkubernetes.jobmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 | |
-Dkubernetes.taskmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 | |
-Dmetrics.reporters=prom | |
-Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter | |
local:///opt/flink/usrlib/my-flink-job.jar |
- 给 jobmanager 和 taskmanager 增加了 annotations
- 增加了 metrcis 相关的配置,指定使用 prometheus reporter
关于 prometheus reporter:
参数:
- port – 可选, Prometheus 导出器监听的端口,默认为 9249。为了能够在一台主机上运行报告程序的多个实例(例如,当一个 TaskManager 与 JobManager 并置时),建议使用这样的端口范围 9250-9260。
- filterLabelValueCharacters – 可选, 指定是否过滤标签值字符。如果启用,则将删除所有不匹配 [a-zA-Z0-9:_] 的字符,否则将不删除任何字符。禁用此选项之前,请确保您的标签值符合 Prometheus 要求。
效果
提交任务后,我们看下实际效果。
首先查看 Prometheus 是否发现了我们的 Pod。

然后查看具体的 metrics,是否被准确抓取。

指标已经收集,后续大家就可以选择 grafana 绘图了。或是增加相应的报警规则。例如:

总结
当然除了 Prometheus 主动发现 Pod,然后定期抓取 metrcis 的方式,flink 也支持向 PushGateway 主动 push metrcis。
Flink 通过 Reporter 来向外部系统提供 metrcis。通过在 conf/flink-conf.yaml 中配置一个或多个 Reporter,可以将 metrcis 公开给外部系统。这些 Reporter 在启动时将在每个作业和任务管理器上实例化。
所有 Reporter 都必须至少具有 class 或 factory.class 属性。可以 / 应该使用哪个属性取决于 Reporter 的实现。有关更多信息,请参见各个 Reporter 配置部分。一些 Reporter 允许指定报告间隔。
指定多个 Reporter 的示例配置:
metrics.reporters: my_jmx_reporter,my_other_reporter | |
metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory | |
metrics.reporter.my_jmx_reporter.port: 9020-9040 | |
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num | |
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter | |
metrics.reporter.my_other_reporter.host: 192.168.1.1 | |
metrics.reporter.my_other_reporter.port: 10000 |
启动 Flink 时,必须可以访问包含 reporter 的 jar。支持 factory.class 属性的 reporter 可以作为插件加载。否则,必须将 jar 放在 /lib 文件夹中。
你可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来编写自己的 Reporter。
如果 reporter 定期发送报告,则还必须实现 Scheduled 接口。通过额外实现 MetricReporterFactory,你的 reporter 也可以作为插件加载。
好啦!今天的分享到这里就结束了,希望大家持续关注马哥教育官网,每天都会有大量 优质内容与大家分享!声明:文章转载于网络,版权归原作者所有,如有侵权请及时联系删除!
