获取数据
可以通过爬虫或者导入数据到ES中来获取数据源
我这里使用从mysql中批量导入到es数据库中,
如果是大批量的数据,可以使用多线程、分页来进行导入ES集群,同时要注意一些脏数据的处理
# -*- coding: utf-8 -*-
import pymysql
from elasticsearch import Elasticsearch
def get_db_data():
# 打开数据库连接(ip/数据库用户名/登录密码/数据库名)
db = pymysql.connect("localhost", "root", "root123",
"echart", charset='utf8')
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
sql = "SELECT * FROM details"
# 使用 execute() 方法执行 SQL 查询
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
# 关闭数据库连接
db.close()
return results
def insert_data_to_es():
es = Elasticsearch()
try:
i = -1
for row in get_db_data():
print(row)
i += 1
es.index(index='echart', doc_type='test-type', body={
'id': i,
'updatetime': row[0],
'provience': row[1],
'city': row[2]
})
except:
print("Error: unable to fecth data")
if __name__ == "__main__":
insert_data_to_es()
启动ElasticSearch
通过ElasticSearch-head面板 ,查看有echart索引
Flask路由获取ES数据
from elasticsearch import Elasticsearch
class elasticSearch():
def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):
# self.es = Elasticsearch([ip], http_auth=('elastic', 'password'), port=9200)
self.es = Elasticsearch("localhost:9200")
self.index_type = index_type
self.index_name = index_name
def create_index(self):
if self.es.indices.exists(index=self.index_name) is True:
self.es.indices.delete(index=self.index_name)
self.es.indices.create(index=self.index_name, ignore=400)
def delete_index(self):
try:
self.es.indices.delete(index=self.index_name)
except:
pass
def get_doc(self, uid):
return self.es.get(index=self.index_name, id=uid)
def insert_one(self, doc: dict):
self.es.index(index=self.index_name,
doc_type=self.index_type, body=doc)
def insert_array(self, docs: list):
for doc in docs:
self.es.index(index=self.index_name,
doc_type=self.index_type, body=doc)
def search(self, query, count: int = 30):
dsl = {
"query": {
"bool": {
"must": [{
"match": {
"city.keyword": query
}
}],
"must_not": [],
"should": []
}
},
"from": 0,
"size": 10,
"sort": [],
"aggs": {}
}
match_data = self.es.search(
index=self.index_name, body=dsl, size=count)
return match_data
# 引入Flask库
from flask import Flask, request
from utils import elasticSearch
import jsonify
import json
# 实例化,创建对象
app = Flask(__name__)
# 设置路由
@app.route('/')
# 定义视图函数
def index():
return 'Hello World, this is my first flask web app!'
@app.route("/getEs/<query>")
def get_es(query):
es = elasticSearch(index_name='echart', index_type='test-type')
data = es.search(query)
address_data = data['hits']['hits']
address_list = []
for item in address_data:
address_list.append(item['_source'])
new_data = json.dumps(address_list)
return app.response_class(new_data, content_type='application/json')
if __name__ == '__main__':
# app.run()中host设置主机,port设置端口,开启调试模式debug,可随程序代码更新得到最新的页面显示,省去了重新启动服务器程序调试的麻烦
app.run(host='0.0.0.0', port=5000, debug=True)
# 当然也可以直接使用默认参数
# app.run()