同构图与异构图CYPHER-TASK设计与TASK锁机制

2024-02-25 06:32

本文主要是介绍同构图与异构图CYPHER-TASK设计与TASK锁机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

同构图与异构图CYPHER-TASK设计与TASK锁机制

  • 问题背景
    • CYPHER-TASK设计
      • 同构图
      • 异构图
      • check-point表结构设计
      • task-lock表结构设计
    • 任务模块解构
      • 数据分块
      • 任务状态回滚
      • 任务状态同步
      • 任务状态锁
    • 完整实现案例
      • 同构图
        • 节点TASK
        • 关系TASK
      • 异构图
        • 节点TASK
        • 关系TASK
    • 备注

问题背景

大规模重复并发执行写入操作会导致图数据库服务堆积大量的写入请求,导致服务性能下降甚至宕机。因此TASK锁机制设计非常重要,必须保证在同一时刻写入任务不可重复执行;检查点机制的设计保证了数据同步的一致性和完整性;TASK占用过多系统内存【尤其在处理大量数据时】图数据库服务会存在宕机风险,数据分块方案的设计很好的避免了这个问题。

CYPHER-TASK设计

同构图

  • 每个任务都需要获取锁然后执行数据构建逻辑,不管构建逻辑是否成功执行TASK结束时必须释放锁
  • [NODE-TASK]负责锁的node_check-point更新以及后续任务的rel_check_point同步
  • [REL-TASK]负责node_check-point的回滚和任务状态同步rel_check_point=node_check_point
# TASK执行流程
[NODE-TASK]->[REL-TASK]

异构图

  • 每个任务都需要获取锁然后执行数据构建逻辑,不管构建逻辑是否成功执行TASK结束时必须释放锁
  • [FROM-NODE-TASK]负责锁的node_check-point更新以及后续任务的rel_check_point同步
  • [TO-NODE-TASK]负责node_check-point的回滚
  • [REL-TASK]负责node_check-point的回滚和任务状态同步rel_check_point=node_check_point
# TASK执行流程
[FROM-NODE-TASK]->[TO-NODE-TASK]->[REL-TASK]

check-point表结构设计

任务状态表负责保存节点TASK和关系TASK的任务状态,在整个任务流上实现任务状态的传递,同时保证数据一致性和完整性。

CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`hcode` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '代码:HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '名称',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '关联类型',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG的hcode',
`node_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT '节点可以获取检查点时间可更改,关系TASK可以获取检查点时间【一个完整的图数据DAG-TASK必须包含节点和关系构建TASK】',
`rel_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT '保存更新前node_check_point的值',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对该检查点任务的具体描述',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '同步全量数据的CYPHER:数据分块方案脚本',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '同步全量数据的CYPHER:不设置时间范围的同步脚本',
`hcreatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`hupdatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '创建人',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '更新人',
`hisvalid` int(11) NOT NULL DEFAULT '1' COMMENT '逻辑删除标记:0-无效;1-有效',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT '数据来源标记',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE KEY `unique_key_02` (`hcode`) USING BTREE COMMENT '唯一索引',
UNIQUE KEY `unique_key_01` (`from`,`to`,`relationship`) USING BTREE COMMENT '唯一索引',
KEY `updateTime` (`hupdatetime`) USING BTREE,
KEY `name` (`from`) USING BTREE,
KEY `hisvalid` (`hisvalid`) USING BTREE,
KEY `type` (`relationship`) USING BTREE,
KEY `check_point` (`node_check_point`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=742715632 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='ONgDB DAG TASK检查点记录表';

task-lock表结构设计

任务状态锁表负责对任务状态进行加锁操作,保证TASK运行时的唯一性。

任务模块解构

数据分块

控制加载到内存的数据量,避免占用过多堆内存保证图数据库可靠运行。

// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value

任务状态回滚

回滚到构建节点的任务状态,下一次构建节点关系时从回滚点开始操作【任务运行都从节点TASK开始】。

// 当操作失败的数据包数量大于0时,回滚node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint

任务状态同步

关系TASK-CHECK-POINT和节点TASK-CHECK-POINT状态同步。

// batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint

任务状态锁

【图数据构建任务状态锁】【保证某一时刻关系的DAG中TASK运行的唯一性】。

  • 获取锁
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
  • 释放锁
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

完整实现案例

同构图

节点TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV001 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
关系TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV001 {hcode:row.from}),(to:HORGGuaranteeV001 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

异构图

异构图的TASK执行流程:(FROM-TASK)->(TO-TASK)->(REL-TASK),其中(TO-TASK)只负责node_check_point的回滚,不负责node_check_point的更新操作和任务状态同步操作。

节点TASK
  • FROM节点TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT org_hcode AS hcode,org_name AS name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HBondOrg {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
  • TO节点TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode,name,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HEventBond {hcode:row.hcode}) SET n+=row', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量大于0时,回滚node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
关系TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode AS `to`,org_hcode AS `from`,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HBondOrg {hcode:row.org_hcode}),(to:HEventBond {hcode:row.hcode}) MERGE (from)-[r:发行证券]->(to) SET r+=olab.reset.map(row,[\'from\',\'to\'])', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

备注

通过对图数据任务的解构分析,可以设想CYPHER脚本的自动化生成完全是可以实现的。进一步设计实现一个自动化抽取图数据的系统是非常有价值的。从图数据schema设计,到数据模型推荐和图数据自动抽取,可以大大解放数据工程师的精力。

# 图数据库相关插件包下载
https://github.com/neo4j-contrib/neo4j-apoc-procedures
https://github.com/ongdb-contrib/ongdb-lab-apoc
  • 相关文章
    基于check-point实现图数据构建任务
    基于check-point机制的任务状态回滚和数据分块任务

这篇关于同构图与异构图CYPHER-TASK设计与TASK锁机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/744652

相关文章

Java中的xxl-job调度器线程池工作机制

《Java中的xxl-job调度器线程池工作机制》xxl-job通过快慢线程池分离短时与长时任务,动态降级超时任务至慢池,结合异步触发和资源隔离机制,提升高频调度的性能与稳定性,支撑高并发场景下的可靠... 目录⚙️ 一、调度器线程池的核心设计 二、线程池的工作流程 三、线程池配置参数与优化 四、总结:线程

Android ClassLoader加载机制详解

《AndroidClassLoader加载机制详解》Android的ClassLoader负责加载.dex文件,基于双亲委派模型,支持热修复和插件化,需注意类冲突、内存泄漏和兼容性问题,本文给大家介... 目录一、ClassLoader概述1.1 类加载的基本概念1.2 android与Java Class

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

Redis的持久化之RDB和AOF机制详解

《Redis的持久化之RDB和AOF机制详解》:本文主要介绍Redis的持久化之RDB和AOF机制,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述RDB(Redis Database)核心原理触发方式手动触发自动触发AOF(Append-Only File)核

PostgreSQL中MVCC 机制的实现

《PostgreSQL中MVCC机制的实现》本文主要介绍了PostgreSQL中MVCC机制的实现,通过多版本数据存储、快照隔离和事务ID管理实现高并发读写,具有一定的参考价值,感兴趣的可以了解一下... 目录一 MVCC 基本原理python1.1 MVCC 核心概念1.2 与传统锁机制对比二 Postg

Maven 配置中的 <mirror>绕过 HTTP 阻断机制的方法

《Maven配置中的<mirror>绕过HTTP阻断机制的方法》:本文主要介绍Maven配置中的<mirror>绕过HTTP阻断机制的方法,本文给大家分享问题原因及解决方案,感兴趣的朋友一... 目录一、问题场景:升级 Maven 后构建失败二、解决方案:通过 <mirror> 配置覆盖默认行为1. 配置示

Redis过期删除机制与内存淘汰策略的解析指南

《Redis过期删除机制与内存淘汰策略的解析指南》在使用Redis构建缓存系统时,很多开发者只设置了EXPIRE但却忽略了背后Redis的过期删除机制与内存淘汰策略,下面小编就来和大家详细介绍一下... 目录1、简述2、Redis http://www.chinasem.cn的过期删除策略(Key Expir

Go语言中Recover机制的使用

《Go语言中Recover机制的使用》Go语言的recover机制通过defer函数捕获panic,实现异常恢复与程序稳定性,具有一定的参考价值,感兴趣的可以了解一下... 目录引言Recover 的基本概念基本代码示例简单的 Recover 示例嵌套函数中的 Recover项目场景中的应用Web 服务器中

MyBatis设计SQL返回布尔值(Boolean)的常见方法

《MyBatis设计SQL返回布尔值(Boolean)的常见方法》这篇文章主要为大家详细介绍了MyBatis设计SQL返回布尔值(Boolean)的几种常见方法,文中的示例代码讲解详细,感兴趣的小伙伴... 目录方案一:使用COUNT查询存在性(推荐)方案二:条件表达式直接返回布尔方案三:存在性检查(EXI