适用场景
业务需要将操作ES的多条命令先后执行,如先插入再更新、先删除再插入等,且多条命令执行间隔很短。由于Logstash为批量提交事件,Elasticsearch为异步线程池处理,普通使用方式无法保证数据的事务性。
Elasticsearch的乐观锁机制
Elasticsearch对每条存储的数据都有版本控制,每一次对一条记录的增删改都会使该条记录的版本号增加1。如果在进行某一次操作时,先进行版本号检查,若当前版本号不是预期的版本号,则本次操作将被取消。
删除一条记录后,该记录的版本信息默认将保存1分钟。
Logstash配置示例
2条时间间隔很近的操作请求增加dataVersion字段,该字段可以为时间戳,2条操作中的字段值需相同
input {
kafka {
bootstrap_servers => "192.168.x.x:9092,192.168.x.x:9092,192.168.x.x:9092"
topics => "synclogs"
group_id => "logstash-sync"
consumer_threads => "1"
max_partition_fetch_bytes=> "5242880"
codec => "json"
}
}
output {
if [dataVersion] {
if [action] and [action] == "delete" {
elasticsearch {
hosts => ["192.168.x.x:9200","192.168.x.x:9200"]
index => "sync_test"
document_type => "test"
action => "delete"
codec => "json"
document_id => "%{id}"
version => "%{dataVersion}"
version_type => "external"
retry_on_conflict => 4
}
} else {
elasticsearch {
hosts => ["192.168.x.x:9200","192.168.x.x:9200"]
index => "sync_test"
document_type => "test"
action => "index"
codec => "json"
document_id => "%{id}"
version => "%{dataVersion}"
retry_on_conflict => 4
}
}
}else {
elasticsearch {
hosts => ["192.168.x.x:9200","192.168.x.x:9200"]
index => "sync_test"
document_type => "test"
action => "index"
codec => "json"
document_id => "%{id}"
}
}
}