Go on.......
接下来我们来实现ElasticGraphObj类,上面已经得到包含有相关需求的素材list,那么该类的目的是根据得到的material list可以找到相应的设计graph
3. ElasticGraphObj类
- 3.1 __init__方法
def __init__(self,index_name,index_type,ip="127.0.0.1"):
'''
:param index_name: 索引名
:param index_type: 索引类型
:param ip: ip
'''
self.index_name = index_name
self.index_type = index_type
# 无用户名密码状态
self.esObj = Elasticsearch([ip])
# 用户名密码状态
# self.esObj = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
- 3.2 create_index方法
def create_index(self,index_name='stylepatternshow',index_type='stylepatternshow_type'):
'''
创建索引,创建索引名称为stylepatternshow,类型为stylepatternshow_type的索引
:param index_name:
:param index_type:
:return:
'''
# 创建索引
_index_mappings = {
"mappings": {
self.index_type: {
"properties": {
"id": {
"type": "long",
"index": "false"
},
"serial": {
"type": "text", # keyword不会进行分词,text会分词,默认按空格切分
"index": "false" # 不建索引
},
"taskType": {
"type": "long",
"index": "false"
},
"taskParam": {
"type": "text",
"index": True
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
}
if self.esObj.indices.exists(index = self.index_name) is not True:
res = self.esObj.indices.create(index = self.index_name,body = _index_mappings)
print('index is create->',res)
else:
print('index is already exist')
- 3.3 bulk_Index_Data方法
def bulk_Index_Data(self, jsonFile):
'''
用bulk将批量数据存储到es
:return:
'''
with open(jsonFile, 'r') as load_f:
load_json = json.load(load_f)
ACTIONS = []
i = 1
for line in load_json:
if line['status'] == 1 and (line['taskType'] == 500 or line['taskType'] == 501):
taskParam_dict = json.loads(line['taskParam'])
if 'main_groups' in taskParam_dict.keys():
layers_list = taskParam_dict['main_groups'][0]['layers']
serial_set = self.get_singel_serial_set(layers_list)
serial_str = ""
'''为了利用es本身的standard分析器,能够把空格切割,故意构造带空格material serial的字符串'''
for col in serial_set:
serial_str += col+' '
if serial_str == "":
continue
action = {
"_index": self.index_name,
"_type": self.index_type,
"_id": i, # _id 也可以默认生成,不赋值
"_source": {
"id": line['id'],
"serial": line['serial'],
"taskType":line['taskType'],
"taskParam":serial_str,
"createTime": line['createTime'],
}
}
i += 1
ACTIONS.append(action)
success, _ = bulk(self.esObj, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
# 得到单个serial列表
def get_singel_serial_set(self,layers_list):
serial_set = set()
for row_dict in layers_list:
if type(row_dict).__name__ == 'dict':
serial_str = row_dict['serial']
serial_set.add(serial_str)
return serial_set
- 3.4 get_Data_By_Body方法
def get_Data_By_Body(self,material_serial_list):
graph_id_set=set()
for row in material_serial_list:
doc_graph = {"query": {"match": {"taskParam": row}}}
graph_id_list = self.get_graph_id_list(doc_graph)
if graph_id_list == []:
continue
for col in graph_id_list:
graph_id_set.add(col)
return graph_id_set
def get_graph_id_list(self, doc):
scanResp = scan(client=self.esObj, query=doc, scroll="10m", preserve_order=False)
i = 0
graph_id_list = []
for resp in scanResp:
graph_id_list.append(resp['_source']['id'])
i += 1
# print('match total:%d' % i)
return graph_id_list
=========================================================================================
上面已经实现了设计图类的编写,能够根据素材list 得到包含这些的material的每张设计图id,调用如下:
esGraph = ElasticGraphObj('stylepatternshow', 'stylepatternshow_type', ip='127.0.0.1')
if esGraph.esObj.indices.exists(index=esGraph.index_name) is not True:
esGraph.create_index()
esGraph.bulk_Index_Data('./design_stylepatternshow.json')
graph_id_set = esGraph.get_Data_By_Body(material_serial_list)
任务告一段落了.......
=========================================================================================
下面我贴出所有源代码:
#coding=utf-8
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan
import json
class ElasticMaterialObj:
def __init__(self,index_name,index_type,ip="127.0.0.1"):
'''
:param index_name: 索引名
:param index_type: 索引类型
:param ip: ip
'''
self.index_name = index_name
self.index_type = index_type
# 无用户名密码状态
self.esObj = Elasticsearch([ip])
# 用户名密码状态
# self.esObj = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
def create_index(self,index_name='resource',index_type='resource_type'):
'''
创建索引,创建索引名称为resource,类型为resource_type的索引
:param index_name:
:param index_type:
:return:
'''
# 创建索引
_index_mappings = {
"mappings": {
self.index_type: {
"properties": {
"id": {
"type": "long",
"index": "false"
},
"serial": {
"type": "keyword", # keyword不会进行分词,text会分词
"index": "false" # 不建索引
},
"tags": {
"type": "object",
"properties": {
"content": {"type": "keyword", "index": True},
"dominant_color_name": {"type": "keyword", "index": True},
"skill": {"type": "keyword", "index": True},
}
},
"hasTag": {
"type": "long",
"index": True
},
"status": {
"type": "long",
"index": True
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"updateTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
}
if self.esObj.indices.exists(index = self.index_name) is not True:
res = self.esObj.indices.create(index = self.index_name,body = _index_mappings)
print(res)
else:
print('index is already exist')
def bulk_Index_Data(self,jsonFile):
'''
用bulk将批量数据存储到es
:return:
'''
with open(jsonFile, 'r') as load_f:
load_json = json.load(load_f)
ACTIONS =[]
i = 1
for line in load_json:
if line['type'] == 'MATERIAL' and line['hasTag'] == 1 and line['status'] == 1:
tags_dict = eval(line['tags'])
content,dominant_color_name,skill = self.get_data_from_dict(tags_dict)
if content==[]:
continue
action ={
"_index": self.index_name,
"_type": self.index_type,
"_id": i, #_id 也可以默认生成,不赋值
"_source":{
"id": line['id'],
"serial":line['serial'],
"tags.content" :content[0],
"tags.dominant_color_name": dominant_color_name,
"tags.skill": skill,
"hasTag":line['hasTag'],
"status":line['status'],
"createTime" :line['createTime'],
"updateTime":line['updateTime'],
}
}
i += 1
ACTIONS.append(action)
success, _ = bulk(self.esObj, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
def get_Data_By_Body(self,content,color,skill,match_num):
# doc_content = {"query": {"match": {"tags.content": content}}}
# doc_color = {"query": {"match": {"tags.dominant_color_name": color}}}
# doc_skill = {"query": {"match": {"tags.skill": skill}}}
doc_combine = {
"query":{
"bool":{
"should":[
{"match":{"tags.content": content}},
{"match": {"tags.dominant_color_name": color}},
{"match": {"tags.skill": skill}}
],
"minimum_should_match":match_num
}
}
}
material_serial_list = self.get_single_serial(doc_combine)
# print(material_serial_list)
return material_serial_list
def get_single_serial(self,doc):
scanResp = scan(client=self.esObj,query=doc,scroll="10m",preserve_order=False)
i=0
serial_list = []
for resp in scanResp:
serial_list.append(resp['_source']['serial'])
i += 1
# print('match total:%d'%i)
return serial_list
def delete_Index_Data(self):
self.esObj.delete_by_query(index=self.index_name,body={"query": {"match_all": {}}})
# res = self.esObj.delete(index=self.index_name, doc_type=self.index_type)
print('---delete index OK----')
def get_data_from_dict(self,tags_dict):
content=[]
dominant_color_name=""
skill=""
if 'content' in tags_dict.keys():
content = self.get_content_list(tags_dict)
if 'dominant_color_name' in tags_dict.keys():
dominant_color_name = tags_dict['dominant_color_name']
if 'skill' in tags_dict.keys():
skill = tags_dict['skill']
return content,dominant_color_name,skill
def get_content_list(tags_dict):
content_list = []
content = tags_dict['content']
content_split_list = content.split(',')
for split_row in content_split_list:
#['花', '百合'] 取百合
type_str = split_row.split('-')[1]
if type_str == '其它':
if 'other' in tags_dict.keys():
content_list.append(tags_dict['other'])
else:
content_list.append(type_str)
return content_list
class ElasticGraphObj:
def __init__(self,index_name,index_type,ip="127.0.0.1"):
'''
:param index_name: 索引名
:param index_type: 索引类型
:param ip: ip
'''
self.index_name = index_name
self.index_type = index_type
# 无用户名密码状态
self.esObj = Elasticsearch([ip])
# 用户名密码状态
# self.esObj = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
def create_index(self,index_name='stylepatternshow',index_type='stylepatternshow_type'):
'''
创建索引,创建索引名称为stylepatternshow,类型为stylepatternshow_type的索引
:param index_name:
:param index_type:
:return:
'''
# 创建索引
_index_mappings = {
"mappings": {
self.index_type: {
"properties": {
"id": {
"type": "long",
"index": "false"
},
"serial": {
"type": "text", # keyword不会进行分词,text会分词,默认按空格切分
"index": "false" # 不建索引
},
"taskType": {
"type": "long",
"index": "false"
},
"taskParam": {
"type": "text",
"index": True
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
}
if self.esObj.indices.exists(index = self.index_name) is not True:
res = self.esObj.indices.create(index = self.index_name,body = _index_mappings)
print('index is create->',res)
else:
print('index is already exist')
# 得到单个serial列表
def get_singel_serial_set(self,layers_list):
serial_set = set()
for row_dict in layers_list:
if type(row_dict).__name__ == 'dict':
serial_str = row_dict['serial']
serial_set.add(serial_str)
return serial_set
def bulk_Index_Data(self, jsonFile):
'''
用bulk将批量数据存储到es
:return:
'''
with open(jsonFile, 'r') as load_f:
load_json = json.load(load_f)
ACTIONS = []
i = 1
for line in load_json:
if line['status'] == 1 and (line['taskType'] == 500 or line['taskType'] == 501):
taskParam_dict = json.loads(line['taskParam'])
if 'main_groups' in taskParam_dict.keys():
layers_list = taskParam_dict['main_groups'][0]['layers']
serial_set = self.get_singel_serial_set(layers_list)
serial_str = ""
for col in serial_set:
serial_str += col+' '
if serial_str == "":
continue
action = {
"_index": self.index_name,
"_type": self.index_type,
"_id": i, # _id 也可以默认生成,不赋值
"_source": {
"id": line['id'],
"serial": line['serial'],
"taskType":line['taskType'],
"taskParam":serial_str,
"createTime": line['createTime'],
}
}
i += 1
ACTIONS.append(action)
success, _ = bulk(self.esObj, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
def get_Data_By_Body(self,material_serial_list):
graph_id_set=set()
for row in material_serial_list:
doc_graph = {"query": {"match": {"taskParam": row}}}
graph_id_list = self.get_graph_id_list(doc_graph)
if graph_id_list == []:
continue
for col in graph_id_list:
graph_id_set.add(col)
return graph_id_set
def get_graph_id_list(self, doc):
scanResp = scan(client=self.esObj, query=doc, scroll="10m", preserve_order=False)
i = 0
graph_id_list = []
for resp in scanResp:
graph_id_list.append(resp['_source']['id'])
i += 1
# print('match total:%d' % i)
return graph_id_list
def main():
es = ElasticMaterialObj('resource','resource_type',ip='127.0.0.1')
if es.esObj.indices.exists(index=es.index_name) is not True:
es.create_index()
es.bulk_Index_Data('./design_resource.json')
material_serial_list = es.get_Data_By_Body('月季','黄色','水墨画',3)
esGraph = ElasticGraphObj('stylepatternshow', 'stylepatternshow_type', ip='127.0.0.1')
if esGraph.esObj.indices.exists(index=esGraph.index_name) is not True:
esGraph.create_index()
esGraph.bulk_Index_Data('./design_stylepatternshow.json')
graph_id_set = esGraph.get_Data_By_Body(material_serial_list)
print(graph_id_set)
print(len(graph_id_set))
if __name__ == '__main__':
main()
接下来,有个问题,我这里的数据是提前从ssh穿透的阿里云服务器download下来的,要是数据库数据有变,我只能再一次去download下。因此要连接上ssh穿透的阿里云服务器,进行实时下载数据。这个操作我找了好多资料,国内都没有,我早google上找到了解决方法。详见python下建立elasticsearch索引实现大数据搜索——之连接ssh穿透的阿里云rdb服务器(四)