版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/weixin_39198406/article/details/84790635
说到neo4j的批量导入数据,我想你一定会想到如下几点:
- import tools
- load file
- neo4j driver for python/java…
前两种必须要数据文件存在文件系统才可以执行。
但是如果你的数据是以流数据的形式持续获取的呢?
这时候会选择python或者java来进行实时的数据节点的导入。
也许你使用过python的py2neo
,然后发现导入节点的速度特别慢。
这里就告诉你解决方案:批量导入 + 不重复Merge
def add_names(items, tx):
for data in items:
tx.append(statement_c, data)
tx.process()
def main():
with open("./raw.csv", "r") as f:
content = f.readlines()
items = []
for index, c in enumerate(content):
print(">>> {}".format(index))
c = c.strip()
person_name, company_name, visit_time = c.split(",")
data = {
"person_name": person_name,
"company_name": company_name,
"visit_time": visit_time,
}
items.append(data)
if index % 1000 == 0:
tx = graph.begin()
add_names(items, tx)
items = []
tx.commit()
if __name__ == '__main__':
s = time.time()
statement_c = """MERGE (node1:Person {person_name:{person_name}})
MERGE (node2:Company {company_name:{company_name}})
MERGE (node1)<-[:Query {visit_time: {visit_time}}]-(node2)"""
main()
e = time.time()
print("耗时:{}s".format(e-s))
具体代码就不解读了,有问题可以在下面留言。
这里的merge语句是参考:https://stackoverflow.com/questions/35381968/cypher-node-already-exists-issue-with-merge
python代码参考:https://py2neo.org/2.0/cypher.html#py2neo.cypher.CypherTransaction.process
Neo4j python driver 1.6:https://neo4j.com/docs/api/python-driver/1.7-preview/index.html?highlight=import
py2neo 2.0:https://py2neo.org/2.0/cypher.html#py2neo.cypher.CypherTransaction.process
py2neo v4:https://py2neo.org/v4/