原文链接:https://streamsets.com/blog/visualizing-analyzing-salesforce-data-neo4j/
图形数据库通过节点、边、属性来表示和存储数据,允许快速、轻松地检索可能难以在传统关系数据库中建模的复杂层次结构。Neo4j是一个广泛部署在社区中的开源图形数据库; 在本博客文章中,我将向您展示如何使用StreamSets Data Collector(SDC)从Salesforce读取案例数据,并使用Neo4j的JDBC驱动将数据加载到图形数据库中。
Salesforce案例
Salesforce Service Cloud提供客户支持即服务。核心概念就是这种情况 - 类似于其他系统中的票证。案例可以与帐户,该帐户的联系人以及其他几个标准和自定义Salesforce对象相关联。持续分析服务云数据是提供卓越客户服务的关键,通过将服务云数据与其他数据(例如Sales Cloud中的机会)相结合,我们可以通过识别客户提交的服务票据来帮助支持销售团队重复业务。让我们看看我们如何从Salesforce中提取所需的所有信息,并使用SDC将其写入Neo4j。
从Salesforce阅读案例
自版本2.2.0.0以来随SDC一起提供的Salesforce Origin允许您通过SOQL查询和Salesforce SOAP或Bulk API 读取数据。我们可以使用关系查询在单个SOQL查询中检索案例及其关联的帐户和联系人:
SELECT Id, Account.Id, Account.Name, CaseNumber,
Contact.Id, Contact.Name, Contact.Account.Id,
Contact.Account.Name, Status, Product__c,
Subject, Priority, Description, IsClosed,
OwnerId
FROM Case
WHERE Id > '${OFFSET}'
ORDER BY Id
在单个查询中,我们检索ID大于给定偏移量的所有案例,其关联帐户的ID和名称,其联系人的ID和名称以及其联系人的帐户详细信息(以防它们与案例的关联帐户不同) )。我们还检索关联的产品代码,这是Salesforce Developer Edition中的自定义字段。
Salesforce源执行关系查询并生成具有字段层次结构的记录:
将数据写入Neo4j
由于我们不需要将Salesforce中的数据转换为Neo4j,因此我们可以构建一个非常简单的管道:
由于Neo4j与关系数据库非常不同,因此它不使用SQL。相反,我们通过Cypher与图形数据库进行交互。特别是,我们可以使用Cypher MERGE子句“插入”数据。
目前,JDBC Producer Destination 不允许我们编写任意语句 - 它使用SQL的INSERT进行硬编码。该JDBC查询执行更灵活,所以我们在这里使用它。但请注意,执行程序并非真正设计为在数据路径中工作 - 它们未针对大量数据进行优化,因此性能可能不是您期望从SDC获得的。
我们可以将执行程序配置 jdbc:neo4j:bolt://localhost
为JDBC URL以访问localhost上的Neo4j,并使用SDC的表达式语言 将查询直接放入JDBC Executor的配置中以填充该语句。这是一个非常简单的示例,如果数据库中还没有匹配的节点,则只创建一个案例节点:
MERGE (case:Case {
CaseId: "${record:value('/Id')}",
CaseNumber: "${record:value('/CaseNumber')}",
Status: "${record:value('/Status')}",
Subject: "${record:value('/Subject')}",
Priority: "${record:value('/Priority')}",
Description: "${record:value('/Description')}",
IsClosed: ${record:value('/IsClosed')}
})
虽然使用相同数据多次运行此语句只会导致数据库中的单个节点,但如果MERGE调用之间的任何属性发生更改,它仍会创建重复节点,因此最好合并唯一记录ID和SET其余属性:
MERGE (case:Case { CaseId: "${record:value('/Id')}" })
ON CREATE SET case.CaseNumber = "${record:value('/CaseNumber')}",
case.Status = "${record:value('/Status')}",
case.Subject = "${record:value('/Subject')}",
case.Priority = "${record:value('/Priority')}",
case.Description = "${record:value('/Description')}",
case.IsClosed = ${record:value('/IsClosed')}
但是,我们可以做更多的事情!由于我们有相关的帐户和联系人数据,我们可以在一个语句中使用多个MERGE子句来处理案例,帐户,联系人及其之间的关系:
// 'Create' should be read as "Create, if it does not already exist"
// in each case!
MERGE (case:Case { CaseId: "${record:value('/Id')}" })
ON CREATE SET case.CaseNumber = "${record:value('/CaseNumber')}",
case.Status = "${record:value('/Status')}",
case.Subject = "${record:value('/Subject')}",
case.Priority = "${record:value('/Priority')}",
case.Description = "${record:value('/Description')}",
case.IsClosed = ${record:value('/IsClosed')}
// Create the corresponding account
MERGE (account1:Account { AccountId:"${record:value('/Account/Id')}" })
ON CREATE SET account1.Name = "${record:value('/Account/Name')}"
// Create the corresponding contact
MERGE (contact:Contact { ContactId:"${record:value('/Contact/Id')}" })
ON CREATE SET contact.Name = "${record:value('/Contact/Name')}"
// Create the contact's account
MERGE (account2:Account { AccountId:"${record:value('/Contact/Account/Id')}" })
ON CREATE SET account2.Name = "${record:value('/Contact/Account/Name')}"
// Create the corresponding product
MERGE (product:Product { Code:"${record:value('/Product__c')}" })
// Create a relationship between the case and its account
MERGE (case)-[:associatedWith]->(account1)
// Create a relationship between the case and its contact
MERGE (case)-[:associatedWith]->(contact)
// Create a relationship between the contact and its account
MERGE (account2)-[:isParent]->(contact)
// Create a relationship between the case and its product
MERGE (case)-[:concerns]->(product)
现在应该明白为什么我们使用MERGE而不是CREATE,即使是在单次运行数据时:尽管我们在处理查询结果时只遇到一次每个案例,但我们可能会遇到任意次数的关联对象。我们不想创建一大堆重复节点!
在运行管道之前,我们可以做出的一个可选优化是告诉Neo4j Id
Salesforce对象上的字段(以及产品的代码属性)是唯一键。这告诉Neo4j创建相关索引并提高性能:
CREATE CONSTRAINT ON (c:Case) ASSERT c.CaseId IS UNIQUE;
CREATE CONSTRAINT ON (a:Account) ASSERT a.AccountId IS UNIQUE;
CREATE CONSTRAINT ON (p:Product) ASSERT p.Code IS UNIQUE;
CREATE CONSTRAINT ON (c:Contact) ASSERT c.ContactId IS UNIQUE;
使用标准样本数据针对Salesforce Developer Edition运行管道只需几秒钟,并在Neo4j的浏览器界面中实现以下可视化:
我们不仅可以与可视化,检查节点和以下关系进行交互,还可以使用Cypher查询来分析数据。例如,让我们看看所有具有“新”案例的帐户:
MATCH (c:Case)-[r:associatedWith]-(a:Account)
WHERE c.Status = "New"
RETURN c, r, a
我们还可以使用表格输出构建Cypher查询。例如,帐户名称和每个帐户的已关闭案例数量:
MATCH (c:Case)-[r:associatedWith]-(a:Account)
WHERE c.IsClosed
RETURN a.Name as accountName, COUNT(a) as numberOfCases
ORDER BY numberOfCases DESC
我们可以继续以不同的方式探索数据,例如,“谁是与特定产品打开的案例的联系人?”。
从Salesforce读Opportunity数据
SELECT Id, Account.Id, Account.Name, Amount, ExpectedRevenue,
IsClosed, Name, StageName
FROM Opportunity
WHERE Id > '${OFFSET}'
ORDER BY Id
注意 - 我们检索帐户ID和名称,以便我们可以将机会链接到图表中的现有帐户节点。
Cypher声明遵循与案例相同的模式:
MERGE (oppty:Opportunity { OpportunityId: "${record:value('/Id')}" })
ON CREATE SET oppty.Amount = ${record:value('/Amount')},
oppty.ExpectedRevenue = ${record:value('/ExpectedRevenue')},
oppty.IsClosed = ${record:value('/IsClosed')},
oppty.Name = "${record:value('/Name')}",
oppty.StageName = "${record:value('/StageName')}"
MERGE (account:Account { AccountId:"${record:value('/Account/Id')}" })
ON CREATE SET account.Name = "${record:value('/Account/Name')}"
MERGE (oppty)-[:associatedWith]->(account)
运行管道会增加图表的机会,以及一些新帐户 - 这些是没有相关案例的潜在客户或客户:
显示开放案例的账户,按预期机会收入排序:
MATCH (c:Case)-[r1:associatedWith]->(a:Account)<-[r2:associatedWith]-(o:Opportunity)
WHERE c.Status <> "Closed"
WITH a, SUM(o.ExpectedRevenue) AS totalExpectedRevenue
ORDER BY totalExpectedRevenue DESC
RETURN a.Name, totalExpectedRevenue
来自联合石油天然气公司的550万美元。
结论
Salesforce Origin允许您轻松地从Salesforce检索数据,将相关对象建模为字段层次结构。JDBC Executor支持针对任何支持JDBC的数据存储的任意查询,即使那些不会使用SQL的数据存储,但请记住,它没有针对大型数据集进行优化。如果您是Neo4j用户,请立即下载StreamSets Data Collector - 它是开源的,可以从各种数据源中读取。当您需要将数据加载到图形数据库中时,请尝试SDC,并告诉我们您在评论中的表现!