使用 Elasticsearch 查看和分析数据时,通常会看到可视化效果以及监视和警报解决方案,这些解决方案利用了在远程/受监视系统上生成的时间戳。但是,使用远程生成的时间戳可能会有风险。
如果远程事件的发生与到达 Elasticsearch 的事件之间存在延迟,或者如果远程系统上的时间设置不正确,则重要事件可能会在雷达的扫描之外而不被发现。因此,在将文档摄取到 Elasticsearch 中时,存储每个文档的摄取时间以及监视每个事件到达 Elasticsearch 集群需要多长时间通常会很有帮助。大于正常的摄取滞后时间可能表明摄取过程存在问题或远程系统上的时间设置存在问题。
在此博客中,我们将展示如何在集合处理器到达文档时将接收节点(ingest node)与 set processor 一起使用,以将接收时间戳添加到文档中。此时间戳可用于可视化,监视和警报。
此外,我们将展示如何使用脚本处理器(script processor)来计算摄取延迟。此滞后时间是远程/受监视系统上发生事件的时间戳与相应文档到达Elasticsearch 集群的时间之间的差。这可用于确保摄取过程不会花费太长时间,并且可用于检测远程时间戳记是否设置不正确。
添加摄取时间戳并计算摄取滞后
下面我们给出一个摄取管道的示例,该管道添加了一个称为 “ingest_time” 的摄取时间戳。 它还计算远程事件时间戳和事件到达 Elasticsearch 的时间之间的延迟,并将其存储在名为 “lag_in_seconds” 的字段中。
“ingest_time”字段用于两个目的:
- 可用作Kibana可视化中的时间字段以及用于监视和警报
- 用于滞后计算。
请注意,我们假设每个文档都有一个名为 “event_timestamp” 的字段,该字段与远程/受监视系统上每个事件的发生时间相对应。 事件时间戳记字段的名称可能与你的数据不同,应该相应地进行修改。
我们向 Elasticsearch 编写管道如下:
PUT _ingest/pipeline/calculate_lag
{
"description": "Add an ingest timestamp and calculate ingest lag",
"processors": [
{
"set": {
"field": "_source.ingest_time",
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"lang": "painless",
"source": """
if(ctx.containsKey("ingest_time") && ctx.containsKey("event_timestamp")) {
ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['event_timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
}
"""
}
}
]
}
然后,我们可以使用模拟 API 测试管道:
POST _ingest/pipeline/calculate_lag/_simulate
{
"docs": [
{
"_source": {
"event_timestamp": "2019-11-07T20:39:00.000Z"
}
}
]
}
响应应该类似于以下内容,其中包括 “lag_in_seconds” 和 “ingest_time” 字段:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"lag_in_seconds" : 19395388,
"ingest_time" : "2020-06-19T08:15:28.551500Z",
"event_timestamp" : "2019-11-07T20:39:00.000Z"
},
"_ingest" : {
"timestamp" : "2020-06-19T08:15:28.5515Z"
}
}
}
]
}
最后,我们可以使用管道将实际文档写入 Elasticsearch:
PUT test_index/_doc/1?pipeline=calculate_lag
{
"event_timestamp": "2019-11-07T20:39:00.000Z",
"other_field": "whatever"
}
我们可以检索文档:
GET test_index/_doc/1
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"lag_in_seconds" : 19395747,
"ingest_time" : "2020-06-19T08:21:27.383917Z",
"event_timestamp" : "2019-11-07T20:39:00.000Z",
"other_field" : "whatever"
}
}
在索引设置中指定管道
在生产部署中使用摄取管道时,最好将管道应用于索引设置,而不是在PUT URL中指定管道。 这可以通过将index.default_pipeline添加到索引设置中来完成,如下所示:
PUT test_index/_settings
{
"index.default_pipeline": "calculate_lag"
}
现在,发送到test_index的所有文档都将通过 calculate_lag 管道,而无需在URL中添加 ?pipeline = calculate_lag。 我们可以使用下面的 PUT 命令来验证它是否正常工作。
PUT test_index/_doc/2
{
"event_timestamp": "2019-11-07T20:39:00.000Z",
"other_field": "This is a new doc"
}
执行以下命令以查看我们刚刚摄取的文档。
GET test_index/_doc/2
应该返回如下所示的文档:
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true,
"_source" : {
"lag_in_seconds" : 19396161,
"ingest_time" : "2020-06-19T08:28:21.767628Z",
"event_timestamp" : "2019-11-07T20:39:00.000Z",
"other_field" : "This is a new doc"
}
}
如何使用 ingest lag
大于预期的摄取延迟可能表示摄取过程存在问题。因此,如果滞后超过某个阈值,则可以将其用于触发警报。或者,可以将延迟输入机器学习作业中,以检测摄取处理时间中的意外偏差。
或者,可以执行分析以检测是否有任何主机比其他主机具有更大的延迟,这可能表明它的时钟设置可能有问题。此外,可以创建 Kibana 仪表板以显示历史和当前滞后值的图形表示。
如果延迟大于预期,则应调查延迟的原因。如果是由于远程系统上的时钟设置不正确引起的,则应正确设置远程时钟。如果延迟是由于缓慢的摄取过程引起的,则应调查并调整摄取过程以使其达到预期效果。如何使用滞后的可能性仅受您的需求限制。
使用摄取时间戳
在查看 Kibana 中的可视化效果或观察异常时,我们经常考虑发生在最后一天或最后一周的事件。但是,如果我们依赖于远程生成的事件时间戳而不是摄取时间戳,那么摄取过程的任何延迟都可能导致某些文档永远不会被查看或监视。例如,如果某个事件是昨天发生的,但今天才刚到达集群,则该事件不会显示在今天的事件的仪表板中,因为它的远程生成的时间戳记是从昨天开始的。此外,当我们昨天查看仪表板时,它将无法使用,因为它尚未存储在 Elasticsearch 中(因为它仅在今天才到达)。
另一方面,如果我们使用摄取时间戳来可视化数据并设置警报,则可以确保无论事件何时发生,我们都在考虑将最新事件到达 Elasticsearch。这将确保即使临时备份了摄取过程,也不会错过事件。
使用摄取时间戳的另一个优势与以下事实有关:事件时间戳可能被恶意或不正确地设置为错误。例如,假设我们正在监视一个远程系统,黑客将其时钟设置为1980年代的某个日期。如果我们依赖于远程生成的事件时间戳,则可能会错过恶意用户在该系统上执行的所有活动,除非我们专门查找存储于1980年代的事件。另一方面,如果我们依赖于接收时间戳,则可以保证无论远程系统为每个事件提供的时间戳如何,我们都将考虑最近到达 Elasticsearch 集群的所有事件。
结论
在本博客中,我们介绍了如何使用摄取处理器来存储摄取时间并计算摄取过程的滞后,以及为什么要这样做。 此外,我们概述了使用摄取时间进行监视和可视化的优势,以及使用远程事件时间戳的风险。