一、什么是监视器?
watcher是一个用于elasticsearch的插件,它能够根据数据的变化提供警报和通知,通过使用 Watcher 监视数据中的更改或异常,并执行必要的响应操作。
从5.0版本以后,watcher就成为了x-pack的一部分,也就是说安装了x-pack,watcher就已经安装了。
为了使用ES中监视器Watcher的特性,必须获取一个包含该特性的证书。
ES各版本特性说明:https://www.elastic.co/subscriptions
典型使用场景:
监控社交媒体,作为检测面向用户的自动化系统(如 ATM 或票务系统)故障的另一种方式。 当某个区域的推文和帖子数量超过重要阈值时,通知服务技术人员。
监控您的基础设施,随着时间的推移跟踪磁盘使用情况。 当任何服务器在接下来的几天内可能用完可用空间时,请打开服务台票证。
跟踪网络活动以检测恶意活动,并主动更改防火墙配置以拒绝恶意用户。
监控 Elasticsearch,如果节点离开集群或查询吞吐量超出预期范围,立即通知系统管理员。
跟踪应用程序响应时间,如果页面加载时间超过 SLA 超过 5 分钟,请打开服务台票证。 如果超过 SLA 一个小时,请呼叫值班管理员。
采用监视器Watcher,你可以监控索引中的数据指标,集群服务器的磁盘使用情况,网络活动情况,ES集群的状态,程序的相应时间等属性,并根据监控到的情况做出相应的动作。
这些场景的特点是,相关数据或数据变化可以通过定期的 Elasticsearch 查询来识别,并可以根据条件检查查询结果。
如果条件为真 — 发送电子邮件、通知第三方系统或存储查询结果,则执行一个或多个操作。
二、监视器是如何工作的?
1、Schedule 调度计划
用于运行查询和检查条件的调度。
2、Query 查询
作为条件的输入运行的查询。监视器支持完整的 Elasticsearch 查询语言,包括聚合。
3、Condition 条件
决定是否执行动作的条件。 您可以使用简单的条件(始终为真),或者将脚本用于更复杂的场景。
4、Actions 行动
一项或多项操作,例如发送电子邮件、通过 Webhook 将数据推送到 3rd 方系统或索引查询结果。
Watcher执行过程说明:
在监视器Watcher执行上下文中将输入数据作为有效负载加载。这使得数据可用于执行过程中的所有后续步骤。这个步骤由Watcher的输入控制。
计算监视器Watcher条件Condition,以确定是否继续处理Watcher。如果满足条件(计算结果为 true) ,则处理进入下一步。如果不满足(计算结果为 false) ,则停止执行Watcher。
将转换应用于监视有效负载(如果需要)。
当添加满足且Watcher不被限制,则执行Watcher动作。
三、如何创建一个监视器Watcher
1、Trigger
确定何时检查Watcher。 Watcher中必须有触发器。
2、Input
将数据加载到Watcher负载中。 如果未指定输入,则加载空负载。
3、Condition
控制是否执行监视操作Actions。 如果未指定条件,则条件默认为始终。
4、Transform
处理监视负载数据使其为监视操作Actions做好准备。
可以在监视级别定义转换或定义特定于操作的转换。 数字转换是可选配置。
5、Actions
指定满足监视条件时发生的情况。
使用示例:
{
"trigger": {
"schedule": {
"interval": "60s" #一分钟执行一次
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"chegva.com_log*" #查询的索引名称
],
"rest_total_hits_as_int": true,
"body": {
"query": {
"bool": {
"minimum_should_match": 1,
"must_not": {
"term": {
"agentinfo.agent_id": "agent001" #排除某项
}
},
"should": [
{
"match_phrase": {
"error.code": "AGENT_NODE"
}
},
{
"match_phrase": {
"error.code": "AGENT_COLD_START_TIMEOUT"
}
},
...... #error.code满足条件为or
{
"match_phrase": {
"error.code": "AGENT_LOST_CONNECT"
}
}
],
"filter": {
"range": {
"@timestamp": {
"from": "{{ctx.trigger.scheduled_time}}||-1m", #监控一分钟内的数据
"to": "{{ctx.trigger.triggered_time}}"
}
}
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0 #产生的数据量大于0
}
}
},
"actions": {
"test_issue": {
"webhook": {
"scheme": "http",
"host": "110.112.119.x", #报警配置的nginx服务器,将报警消息代理到报警群机器人
"port": 8888,
"method": "post",
"params": {},
"headers": {},
"body": """{"msgtype": "text", "text": { "content": "{{#ctx.payload.hits.hits}} agentnum:{{_source.agentinfo.agents}}\n agent_version:{{_source.agentinfo.agent_version}}\n ... {{/ctx.payload.hits.hits}}"}}"""
}
}
}
}参考:
