目前COVID-19有很多的地方发表数据。在今天的文章中,我们使用Elastic Stack来分析相关的数据,并对数据进行分析。我们将使用Kibana来对所有的数据进行可视化分析。特别指出的是:我们将使用Elasticsearch的processors来对我们的数据进行解析。
数据来源
我们将使用在地址https://www.datawrapper.de/_/Gnfyw/下载我们想要的数据。我们点击链接Get the data来下载我们想要的数据。
我们下载后的数据就像是下面的:
上面的数据是一个csv格式的文件。我们把下载后的数据存入到一个我们喜欢的目录中,并命令为covid19.csv 。针对CSV格式的数据导入,我们可以采用Logstash来把它导入到Elasticsearch中。具体如何操作,我们可以参照我之前的文章:
当然我们也可以直接通过Kibana提供的便利,直接把数据导入到Elasticsearch中:
针对我们今天的这个COVID-19数据,我们将使用Filebeat来对它处理。
安装
Elasticsearch及Kibana
如果你还没有安装自己的Elasticsearch及Kibana,那么你可以参考我之前的文章“Elastic:菜鸟上手指南”安装自己的Elasticsearch及Kibana。安装好我们的Elasticsearch及Kibana后,启动它们。
Filebeat
我们打Kibana:
虽然csv部署log范畴,但是在上面的logs里含有针对各个操作系统的filebeat的安装指南。我们点击上面的Add log data按钮:
我们点击System logs:
在上面我们选择我们的操作系统,并按照上面的安装指令来完成和我们Elasticsearch版本相匹配的Filbeat的安装。我们可以不启动相应的模块,只做相应的安装即可。
配置Filebeat及导入数据到Elasticsearch
为了能够把我们的数据导入到Elasticsearch中,我们可以采用Filebeat。为此,我们必须来配置filebeat。我们首先来创建一个定制的filebeat_covid19.yml配置文件,并把这个文件存于和我们上面的covid19.csv同一个目录中:
filebeat_covid19.yml
filebeat.inputs:
- type: log
paths:
- /Users/liuxg/data/covid19/covid19.csv
exclude_lines: ['^Lat']
output.elasticsearch:
hosts: ["http://localhost:9200"]
index: covid19
setup.ilm.enabled: false
setup.template.name: covid19
setup.template.pattern: covid19
在上面我们定义了一个type为log的filebeat.inputs。我们定了我们的log的路径。你需要根据自己的实际路径来修改上面的路径。我们定义了数据的index为covid19的索引。值得注意的是,由于csv文件的第一行是数据的header,我们需要去掉这一行。为此,我们采用了exclude_lines: ['^Lat']来去掉第一行。
等我们定义好上面的配置后,我们运行如下的命令:
./filebeat -e -c ~/data/covid19/filebeat_covid19.yml
上面的命令将把我们的数据导入到Elasticsearch中。Filebeat的registry文件存储Filebeat用于跟踪上次读取位置的状态和位置信息。如果由于某种原因,我们想重复对这个csv文件的处理,我们可以删除如下的目录:
- data/registry 针对 .tar.gz and .tgz 归档文件安装
- /var/lib/filebeat/registry 针对 DEB 及 RPM 安装包
- c:\ProgramData\filebeat\registry 针对 Windows zip 文件
如果上面的命令成功后,我们可以在Kibana中查看新生产的covid19索引:
如果可以对数据进行查询:
GET covid19/_search
在上面,它显示了message字段。显然这个数据是最原始的数据,它并不能让我们很方便地对这个数据进行分析。那么我们该如何处理呢?我们可以通过Elasticsearch的ingest node提供的强大的processors来帮我们处理这个数据。
利用Processors来加工数据
去掉无用的字段
在我们的文档里,我们可以看到有很多我们并不想要的字段,比如ecs, host,log等等。我们想把这些字段去掉,那么我们该如何做呢?我们可以通过定义一个pipleline来帮我们处理。为此,我们定义一个如下的pipeline:
PUT _ingest/pipeline/covid19_parser
{
"processors": [
{
"remove": {
"field": ["log", "input", "ecs", "host", "agent"],
"if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
}
}
]
}
上面的pipeline定义了一个叫做remove的processor。它检查log,input, ecs, host及agent都不为空的情况下,删除字段log, input,ecs, host及agent。我们在Kibana中执行上面的命令。
为了能够使得我们的pipleline起作用,我们通过如下指令来执行:
POST covid19/_update_by_query?pipeline=covid19_parser
当我们执行完上面的指令后,我们重新查看我们的文档:
在上面我们可以看出来,所有的我们不想要的字段都已经被去掉了。
替换引号
我们可以看到导入的message数据为:
"""37.1232245,-78.4927721,"Virginia, US",Virginia,",",US,221,0,0"""
显然,这里的数据有很多的引号"字符,我们想把这些字符替换为符号'。为此,我们需要gsub processors来帮我们处理。我重新修改我们的pipeline:
PUT _ingest/pipeline/covid19_parser
{
"processors": [
{
"remove": {
"field": ["log", "input", "ecs", "host", "agent"],
"if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
}
},
{
"gsub": {
"field": "message",
"pattern": "\"",
"replacement": "'"
}
}
]
}
在Kibana中运行上面的指令,并同时执行:
POST covid19/_update_by_query?pipeline=covid19_parser
经过上面的pipeline的处理后,我们重新来查看我们的文档:
从上面的显示中,我们也看出来我们已经成功都去掉了引号。我们的message的信息如下:
"37.1232245,-78.4927721,'Virginia, US',Virginia,',',US,221,0,0"
解析信息
在上面我们已经很成功地把我们的信息转换为我们所希望的数据类型。接下来我们来使用grok来解析我们的数据。grok的数据解析,基本上是一种正则解析的方法。我们首先使用Kibana所提供的Grok Debugger来帮助我们分析数据。我们将使用如下的grok pattern来解析我们的message:
%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}
我们点击Grok Debugger,并把我们的相应的文档拷入到相应的输入框中,并用上面的grok pattern来解析数据。上面显示,它可以帮我们成功地解析我们想要的数据。显然这个被解析的信息更适合我们做数据的分析。为此,我们需要重新修改pipeline:
PUT _ingest/pipeline/covid19_parser
{
"processors": [
{
"remove": {
"field": ["log", "input", "ecs", "host", "agent"],
"if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
}
},
{
"gsub": {
"field": "message",
"pattern": "\"",
"replacement": "'"
}
},
{
"grok": {
"field": "message",
"patterns": [
"%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
]
}
}
]
}
我们运行上面的pipeline,并使用如下的命令来重新对数据进行分析:
POST covid19/_update_by_query?pipeline=covid19_parser
我们重新来查看文档:
在上面我们可以看到新增加的country,infected,address等等的字段。
添加location字段
在上面我们可以看到lon及lat字段。这些字段是文档的经纬度信息。这些信息并不能为我们所使用,因为首先他们是分散的,并不处于一个通过叫做location的字段中。为此,我们需要创建一个新的location字段。为此我们更新pipeline为:
PUT _ingest/pipeline/covid19_parser
{
"processors": [
{
"remove": {
"field": ["log", "input", "ecs", "host", "agent"],
"if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
}
},
{
"gsub": {
"field": "message",
"pattern": "\"",
"replacement": "'"
}
},
{
"grok": {
"field": "message",
"patterns": [
"%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
]
}
},
{
"set": {
"field": "location.lat",
"value": "{{lat}}"
}
},
{
"set": {
"field": "location.lon",
"value": "{{lon}}"
}
}
]
}
在上面我们设置了一个叫做location.lat及location.lon的两个字段。它们的值分别是{{lat}}及{{lon}}。我们执行上面的命令。
由于location是一个新增加的字段,在默认的情况下,它的两个字段都会被Elasticsearch设置为text的类型。为了能够让我们的数据在地图中进行显示,它必须是一个geo_point的数据类型。为此,我们必须通过如下命令来设置它的数据类型:
PUT covid19/_mapping
{
"properties": {
"location": {
"type": "geo_point"
}
}
}
执行上面的指令,我们再使用如下的命令来对我们的数据重新进行处理:
POST covid19/_update_by_query?pipeline=covid19_parser
等执行完上面的命令后,我们重新来查看我们的文档:
从上面我们可以看到一个叫做location的新字段。它含有lon及lat两个字段。我们同时也可以查看covid19的mapping。
GET covid19/_mapping
我们可以发现location的数据类型为:
"location" : {
"type" : "geo_point"
},
它显示location的数据类型是对的。
到目前为止,我们已经成功地把数据导入到Elasticsearch中。我们接下来针对covid19来进行数据分析。
展示并分析数据
创建index pattern
为了对covid19进行分析,我们必须创建一个index pattern:
点击上面的Create index pattern按钮:
点击Next step:
点击Create index pattern:
分析数据
创建Maps visualization
我们打开Visualization:
点击Create visualization:
选择Maps:
选择Documents:
点击Save & close:
我们把上面的Visualization保存为covid-1。
找出感染和死亡最多的国家
同样地,我们创建一个叫做Horizontal Bar的
点击Horizontal Bar:
选择covid-19:
我们保存该Visualization为covid-2。
同样地,我们得到死亡最多的前十个国家:
我们保存该Visualization为covid-3。
找出感染人数最多的地区
我们可以选择一个pie统计:
我们保存上面的Visualization为covid-3。
创建Dashboard
根据之前的Visualization,我们来创建一个Dashboard:
我们把之前创建的Visualization一个一个地添加进来,并形成我们最终的Dashboard:
参考: