一、什么是监视器?
watcher是一个用于elasticsearch的插件,它能够根据数据的变化提供警报和通知,通过使用 Watcher 监视数据中的更改或异常,并执行必要的响应操作。
从5.0版本以后,watcher就成为了x-pack的一部分,也就是说安装了x-pack,watcher就已经安装了。
为了使用ES中监视器Watcher的特性,必须获取一个包含该特性的证书。
ES各版本特性说明:https://www.elastic.co/subscriptions
典型使用场景:
监控社交媒体,作为检测面向用户的自动化系统(如 ATM 或票务系统)故障的另一种方式。 当某个区域的推文和帖子数量超过重要阈值时,通知服务技术人员。
监控您的基础设施,随着时间的推移跟踪磁盘使用情况。 当任何服务器在接下来的几天内可能用完可用空间时,请打开服务台票证。
跟踪网络活动以检测恶意活动,并主动更改防火墙配置以拒绝恶意用户。
监控 Elasticsearch,如果节点离开集群或查询吞吐量超出预期范围,立即通知系统管理员。
跟踪应用程序响应时间,如果页面加载时间超过 SLA 超过 5 分钟,请打开服务台票证。 如果超过 SLA 一个小时,请呼叫值班管理员。
采用监视器Watcher,你可以监控索引中的数据指标,集群服务器的磁盘使用情况,网络活动情况,ES集群的状态,程序的相应时间等属性,并根据监控到的情况做出相应的动作。
这些场景的特点是,相关数据或数据变化可以通过定期的 Elasticsearch 查询来识别,并可以根据条件检查查询结果。
如果条件为真 — 发送电子邮件、通知第三方系统或存储查询结果,则执行一个或多个操作。
二、监视器是如何工作的?
1、Schedule 调度计划
用于运行查询和检查条件的调度。
2、Query 查询
作为条件的输入运行的查询。监视器支持完整的 Elasticsearch 查询语言,包括聚合。
3、Condition 条件
决定是否执行动作的条件。 您可以使用简单的条件(始终为真),或者将脚本用于更复杂的场景。
4、Actions 行动
一项或多项操作,例如发送电子邮件、通过 Webhook 将数据推送到 3rd 方系统或索引查询结果。
Watcher执行过程说明:
在监视器Watcher执行上下文中将输入数据作为有效负载加载。这使得数据可用于执行过程中的所有后续步骤。这个步骤由Watcher的输入控制。
计算监视器Watcher条件Condition,以确定是否继续处理Watcher。如果满足条件(计算结果为 true) ,则处理进入下一步。如果不满足(计算结果为 false) ,则停止执行Watcher。
将转换应用于监视有效负载(如果需要)。
当添加满足且Watcher不被限制,则执行Watcher动作。
三、如何创建一个监视器Watcher
1、Trigger
确定何时检查Watcher。 Watcher中必须有触发器。
2、Input
将数据加载到Watcher负载中。 如果未指定输入,则加载空负载。
3、Condition
控制是否执行监视操作Actions。 如果未指定条件,则条件默认为始终。
4、Transform
处理监视负载数据使其为监视操作Actions做好准备。
可以在监视级别定义转换或定义特定于操作的转换。 数字转换是可选配置。
5、Actions
指定满足监视条件时发生的情况。
使用示例:
{ "trigger": { "schedule": { "interval": "60s" #一分钟执行一次 } }, "input": { "search": { "request": { "search_type": "query_then_fetch", "indices": [ "chegva.com_log*" #查询的索引名称 ], "rest_total_hits_as_int": true, "body": { "query": { "bool": { "minimum_should_match": 1, "must_not": { "term": { "agentinfo.agent_id": "agent001" #排除某项 } }, "should": [ { "match_phrase": { "error.code": "AGENT_NODE" } }, { "match_phrase": { "error.code": "AGENT_COLD_START_TIMEOUT" } }, ...... #error.code满足条件为or { "match_phrase": { "error.code": "AGENT_LOST_CONNECT" } } ], "filter": { "range": { "@timestamp": { "from": "{{ctx.trigger.scheduled_time}}||-1m", #监控一分钟内的数据 "to": "{{ctx.trigger.triggered_time}}" } } } } } } } } }, "condition": { "compare": { "ctx.payload.hits.total": { "gt": 0 #产生的数据量大于0 } } }, "actions": { "test_issue": { "webhook": { "scheme": "http", "host": "110.112.119.x", #报警配置的nginx服务器,将报警消息代理到报警群机器人 "port": 8888, "method": "post", "params": {}, "headers": {}, "body": """{"msgtype": "text", "text": { "content": "{{#ctx.payload.hits.hits}} agentnum:{{_source.agentinfo.agents}}\n agent_version:{{_source.agentinfo.agent_version}}\n ... {{/ctx.payload.hits.hits}}"}}""" } } } }
参考: