异步同步保证方案
同步脚本
支持过滤条件
参数 | 类型 | 是否可选 | 意义 |
---|---|---|---|
--stations | list[str] | 是 | 过滤站点 |
--groups | list[int] | 是 | 过滤group |
--begin | date | 是 | 过滤开始日期 |
--end | date | 是 | 过滤截止日期 |
--keys | list | 是 | 过滤单据 |
--batch | int | 是 | 如果有批量操作的话,使用这个参数控制单次批量操作数量 |
--sleep | int | 是 | 如果有批量操作的话,使用这个参数控制批量处理间隔 |
--runid | str | 否 | 因为有可能部署多个脚本并发,这个字段区别当前脚本是哪个,由运维部署时指定 |
-u | str | 否 | 操作人 |
过滤优先级
keys 优先级最高,传了就只过滤keys
stations 和 groups 过滤二选一,全过滤报错
begin 和 end 如果过滤必须同时传
脚本位置
各个业务工程的 /tools/data_sync/
目录
日志
位置 /data/logs/data_sync/
格式
- access 日志({script_name}_sync_access.log)
[time req_id runid] levelname time_cost succes_sync_keys | error_sync_keys
-
app 日志({script_name}_sync_app.log)
[time req_id runid] levelname
和接口的app日志一样, req_id 和对应的 access 日志一致
并且在 脚本开始的时候会打一条
[time req_id runid] levelname <START>
结束的时候打一条
[time req_id runid] levelname <END>
运行频率
1min 一次
逻辑
- 没有传参数的情况下,从上次校验写入mongo的表里获得上次同步失败的单据,同步
- 有传过滤参数的情况下,按过滤参数过滤出来的单据同步
- 将同步结果按格式写入mongo表
如果直接调用了业务接口,需要评审
执行脚本完需要把执行结果统计写入mongo数据库,具体格式见文档最下方
性能要求:
- 脚本必须能并行运行
- 并行至少要能基于不同客户并行
- 如果相同客户不同单可以并行最好
- 需要评估出当前业务数据量下,怎样的并行程度能支持分钟级同步完成。
校验脚本
支持过滤条件
参数 | 类型 | 是否可选 | 意义 |
---|---|---|---|
--batch | int | 是 | 大量的查询通过这个参数来控制单次查询的量 |
--sleep | int | 是 | 批量查询的间隔,配合batch使用 |
脚本位置
各个业务工程的 /tools/data_sync/ 目录
日志
位置 /data/logs/data_sync/
格式
-
access 日志({script_name}_monitor_access.log)
[time req_id ] levelname time_cost succes_sync_keys | error_sync_keys
-
app 日志({script_name}_monitor_app.log)
[time req_id ] levelname
和接口的app日志一样, req_id 和对应的 access 日志一致
并且在 脚本开始的时候会打一条
[time req_id ] levelname <START>
结束的时候打一条
[time req_id ] levelname <END>
运行频率
1min 一次
逻辑
- 从上次校验写入mongo的表里获得上次同步失败的单据,校验当前是否同步成功,
- 校验 next_monitor_from 后到当前时间 1min 前的数据同步是否一致
- 把以上两步校验的结果按以下格式写入mongo
表结构
- 采购写入(inventory-purchase-sync_monitor)
- 订单写入(base-xnn_core_product_2-sync_monitor)
- ES 写入(base-xnn_core_product_2-sync_monitor)
- 操作日志写入(base-other-sync_monitor)
- 分拣写入(inventory-sorting-sync_monitor)
- 进销存写入(inventory-inventory-sync_monitor)
mongo 数据分为两种: 同步脚本写入的数据格式:
{
"type": "sync" M str 表示此条记录是同步结果统计
"name": "purchase_task" M str 表示此脚本同步的是什么,例如同步的是采购任务
"runid": "blabla" M str id
"last_run_time": ISODate("...") M datetime 最后执行脚本时间
"success_sync_count": M int 最近一次执行同步成功数量
"error_sync_count": M int 最近一次执行同步失败数量
"last_time_cost": M float 上一次执行耗时(秒)
}
校验脚本写入的数据格式:
{
"type": "monitor" M str 表示此条记录是校验结果统计
"name": "purchase_task" M str 表示此条记录校验的是什么,例如校验的是采购任务
"unsync_keys": [ M list 未同步单据
{
"key": M 未同步单据号
"modify_time": M datetime 编辑时间
"reason": M str 未同步原因
... 还可以加别的业务认为需要的字段,例如,station_id,group_id等,不做硬性规定
}
]
"first_unsync_time": ISODate("...") M datetime 未正确同步单据里的最早的修改时间(取unsync_keys里的单据最早的修改时间),如果unsync_keys为空first_unsync_time=next_monitor_from
"next_monitor_from" ISODate("...") M datetime 下一次校验起始时间(记录的时间点为当前校验的所有单据的最晚修改时间,如果脚本本次执行没有校验任何单据,next_monitor_from=过滤单据截止时间(也就是脚本执行1min前))
"last_run_time": ISODate("...") M datetime 最后执行脚本时间
"last_time_cost": M float 上一次执行耗时(秒)
}
全量校验的规范(单独给出)
支持过滤条件
参数 | 类型 | 是否可选 | 意义 |
---|---|---|---|
--stations | list[str] | 是 | 过滤站点 |
--groups | list[int] | 是 | 过滤group |
--days | int | 是 | 过滤近多少天的数据 |
--begin | date | 是 | 过滤开始日期 |
--end | date | 是 | 过滤截止日期 |
--keys | list | 是 | 过滤单据 |
--batch | int | 是 | 如果有批量操作的话,使用这个参数控制单次批量操作数量 |
--sleep | int | 是 | 如果有批量操作的话,使用这个参数控制批量处理间隔 |
过滤优先级
keys 优先级最高,传了就只过滤keys
stations 和 groups 过滤二选一,全过滤报错
begin 和 end 如果过滤必须同时传
脚本位置
各个业务工程的 /tools/data_sync/
目录
日志
位置 /data/logs/data_sync/
格式
-
access 日志({script_name}_fullcheck_access.log)
[time req_id ] levelname time_cost succes_sync_keys | error_sync_keys
-
app 日志({script_name}_fullcheck_app.log)
[time req_id ] levelname
和接口的app日志一样, req_id 和对应的 access 日志一致
并且在 脚本开始的时候会打一条
[time req_id ] levelname <START>
结束的时候打一条
[time req_id ] levelname <END>
表结构
- 采购写入(inventory-purchase-sync_fullcheck)
- 订单写入(base-xnn_core_product_2-sync_fullcheck)
- ES 写入(base-xnn_core_product_2-sync_fullcheck)
- 操作日志写入(base-other-sync_fullcheck)
- 分拣写入(inventory-sorting-sync_fullcheck)
- 进销存写入(inventory-inventory-sync_fullcheck)
写入的数据格式:
{
"type": "fullcheck" M str 表示此条记录是校验结果统计
"name": "purchase_task" M str 表示此条记录校验的是什么,例如校验的是采购任务
"unsync_keys": [ M list 未同步单据
{
"key": M 未同步单据号
"modify_time": M datetime 编辑时间
"reason": M str 未同步原因
... 还可以加别的业务认为需要的字段,例如,station_id,group_id等,不做硬性规定
}
]
"last_run_time": ISODate("...") M datetime 最后执行脚本时间
"last_time_cost": M float 上一次执行耗时(秒)
}