1、创建连接对象
from elasticsearch import RequestsHttpConnection, Elasticsearch
class MyConnection(RequestsHttpConnection):
def __init__(self, *args, **kwargs):
proxies = kwargs.pop('proxies', {})
super(MyConnection, self).__init__(*args, **kwargs)
self.session.proxies = proxies
esclient_dns = Elasticsearch(["http://ip:host"],
connection_class=MyConnection,
proxies={'http': None},
timeout=600,
max_retries=10,
retry_on_timeout=True)
2、建立索引,并设置索引
说明:如果在没有索引的情况下往ES库中插入文档(document),系统会自动创建索引(文档中有索引信息)。但我们有时需要进行一些设置,比如索引包含几个碎片(shards),几份复制(replicas),还有文档中有的字段(field)的数据类型等,这些设置需要在文档插入之前完成;下面是创建并设置索引的代码
CREATE_BODY = {
"settings": {
"number_of_shards": 4, # 四个碎片
"number_of_replicas": 0 # 0份拷贝
},
"mappings": {
"dns_info": { # 文档类型为dns_info
"dynamic": "strict",
"properties": {
"first_seen": {
"format": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"type": "date"
},
"last_seen": {
"format": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"type": "date"
},
"id": {
"index": "not_analyzed", # 设置查询不需要使用的字段
"type": "string"
},
"source": {
"index": "not_analyzed",
"type": "string"
},
"ip_type": {
"type": "integer"
},
"created_by": {
"type": "integer"
},
"ips": {
"type": "ip"
},
"domain_names": {
"type": "string"
}
}
},
"_default_": {
"dynamic": "strict" # dynamic是处理遇到未知字段情况的,设置为“strict”是说当遇到未知字段时抛出异常
}
}
}
header = {'Content-Type': 'application/json'}
DNS_API = http://ip:host/dns_test_v2 # dns_test_v2为索引名
try:
resp = requests.put(DNS_API,headers=header, data=json.dumps(CREATE_BODY))
if resp.status_code == 200:
print(u"成功建立索引")
except Exception as e:
print(e, u"建立索引失败")
3、将文档插入索引
- 批量插入
这里使用ES的_bulk API做批量插入,效率较高,适用一次性录入多条数据的情况,ES写入一条数据相对mongoDB来说要慢许多,Python中bulk函数在helpers中;先导入helpers,bulk_gen()把所有的文档变为一个可迭代对象,bulk接受一个es对象和一个可迭代对象,还接受一个表示操作类型的参数_op_type(默认为index)。
from elasticsearch import helpers
# 例如单条action文档为如下形式
{
"_type": "dns_info",
"_id": "17.248.158.174p31-caldav.fe.apple-dns.net",
"_index": "dns_test_v2",
"_source":
{
"domain_name": "wdfmbank.com",
"first_seen": "2018-01-29T05:43:39.000Z",
"domain_names": [
"wlodekkam6.wdfmbank.com"
],
"last_seen": "2018-01-29T05:43:39.000Z"
}
}
# 从文件中读取多条action放入迭代器
def bulk_gen(file_path):
try:
with open(file_path, 'r') as f:
for line in f:
action = json.loads(line)
yield action
except IOError as e:
print(e)
# 这里只做说明,用时最好包装成函数并做异常判断
gen = bulk_gen(file_name)
helpers.bulk(esclient_dns, gen) # 批量录入数据
-
插入单个文档
适用插入数据量少的情况,每次插入一条数据
"""
说明:index方法的四个参数
index: 索引名,此例中为 subdomain_test
doc_type: 类型名,此例中为 domain_info
id: 插入文档的id
body: 文档正文,字典格式,此例中body为
"""
body = {
"domain_name": "wdfmbank.com",
"first_seen": "2018-01-29T05:43:39.000Z",
"domain_names": [
"wlodekkam6.wdfmbank.com"
],
"last_seen": "2018-01-29T05:43:39.000Z"
}
def indexing(file_path):
try:
with open(file_path, 'r') as f:
for line in f:
action = json.loads(line)
_type = action.pop('_type')
_id = action.pop('_id')
_index = action.pop('_index')
res = esclient_dns.index(index=_index, doc_type=_type, id=_id, body=action['_source'])
print(res)
except Exception as e:
print(e)