Skip to content

异步同步保证方案

同步脚本

支持过滤条件

参数 类型 是否可选 意义
--stations list[str] 过滤站点
--groups list[int] 过滤group
--begin date 过滤开始日期
--end date 过滤截止日期
--keys list 过滤单据
--batch int 如果有批量操作的话,使用这个参数控制单次批量操作数量
--sleep int 如果有批量操作的话,使用这个参数控制批量处理间隔
--runid str 因为有可能部署多个脚本并发,这个字段区别当前脚本是哪个,由运维部署时指定
-u str 操作人

过滤优先级

keys 优先级最高,传了就只过滤keys
stations 和 groups 过滤二选一,全过滤报错
begin 和 end 如果过滤必须同时传

脚本位置

各个业务工程的 /tools/data_sync/ 目录

日志

位置 /data/logs/data_sync/

格式

  • access 日志({script_name}_sync_access.log)

[time req_id runid] levelname time_cost succes_sync_keys | error_sync_keys

  • app 日志({script_name}_sync_app.log)

    [time req_id runid] levelname

    和接口的app日志一样, req_id 和对应的 access 日志一致

    并且在 脚本开始的时候会打一条

    [time req_id runid] levelname <START>

    结束的时候打一条

    [time req_id runid] levelname <END>

运行频率

1min 一次

逻辑

  1. 没有传参数的情况下,从上次校验写入mongo的表里获得上次同步失败的单据,同步
  2. 有传过滤参数的情况下,按过滤参数过滤出来的单据同步
  3. 将同步结果按格式写入mongo表

如果直接调用了业务接口,需要评审

执行脚本完需要把执行结果统计写入mongo数据库,具体格式见文档最下方

性能要求:

  1. 脚本必须能并行运行
  2. 并行至少要能基于不同客户并行
  3. 如果相同客户不同单可以并行最好
  4. 需要评估出当前业务数据量下,怎样的并行程度能支持分钟级同步完成。

校验脚本

支持过滤条件

参数 类型 是否可选 意义
--batch int 大量的查询通过这个参数来控制单次查询的量
--sleep int 批量查询的间隔,配合batch使用

脚本位置

各个业务工程的 /tools/data_sync/ 目录

日志

位置 /data/logs/data_sync/

格式

  • access 日志({script_name}_monitor_access.log)

    [time req_id ] levelname time_cost succes_sync_keys | error_sync_keys

  • app 日志({script_name}_monitor_app.log)

    [time req_id ] levelname

    和接口的app日志一样, req_id 和对应的 access 日志一致

    并且在 脚本开始的时候会打一条

    [time req_id ] levelname <START>

    结束的时候打一条

    [time req_id ] levelname <END>

运行频率

1min 一次

逻辑

  1. 从上次校验写入mongo的表里获得上次同步失败的单据,校验当前是否同步成功,
  2. 校验 next_monitor_from 后到当前时间 1min 前的数据同步是否一致
  3. 把以上两步校验的结果按以下格式写入mongo

表结构

  • 采购写入(inventory-purchase-sync_monitor)
  • 订单写入(base-xnn_core_product_2-sync_monitor)
  • ES 写入(base-xnn_core_product_2-sync_monitor)
  • 操作日志写入(base-other-sync_monitor)
  • 分拣写入(inventory-sorting-sync_monitor)
  • 进销存写入(inventory-inventory-sync_monitor)

mongo 数据分为两种: 同步脚本写入的数据格式:

{
    "type": "sync" M str 表示此条记录是同步结果统计
    "name": "purchase_task" M str 表示此脚本同步的是什么,例如同步的是采购任务
    "runid": "blabla" M str id
    "last_run_time": ISODate("...") M datetime 最后执行脚本时间
    "success_sync_count": M int 最近一次执行同步成功数量
    "error_sync_count": M int 最近一次执行同步失败数量
    "last_time_cost": M float 上一次执行耗时(秒)
}

校验脚本写入的数据格式:

{
    "type": "monitor" M str 表示此条记录是校验结果统计
    "name": "purchase_task" M str 表示此条记录校验的是什么,例如校验的是采购任务
    "unsync_keys": [ M list 未同步单据
        {
            "key": M 未同步单据号
            "modify_time": M datetime 编辑时间
            "reason": M str 未同步原因
            ... 还可以加别的业务认为需要的字段,例如,station_id,group_id等,不做硬性规定
        }
    ]
    "first_unsync_time": ISODate("...") M datetime 未正确同步单据里的最早的修改时间(取unsync_keys里的单据最早的修改时间),如果unsync_keys为空first_unsync_time=next_monitor_from
    "next_monitor_from" ISODate("...") M datetime 下一次校验起始时间(记录的时间点为当前校验的所有单据的最晚修改时间,如果脚本本次执行没有校验任何单据,next_monitor_from=过滤单据截止时间(也就是脚本执行1min前))
    "last_run_time": ISODate("...") M datetime 最后执行脚本时间
    "last_time_cost": M float 上一次执行耗时(秒)
}

全量校验的规范(单独给出)

支持过滤条件

参数 类型 是否可选 意义
--stations list[str] 过滤站点
--groups list[int] 过滤group
--days int 过滤近多少天的数据
--begin date 过滤开始日期
--end date 过滤截止日期
--keys list 过滤单据
--batch int 如果有批量操作的话,使用这个参数控制单次批量操作数量
--sleep int 如果有批量操作的话,使用这个参数控制批量处理间隔

过滤优先级

keys 优先级最高,传了就只过滤keys
stations 和 groups 过滤二选一,全过滤报错
begin 和 end 如果过滤必须同时传

脚本位置

各个业务工程的 /tools/data_sync/ 目录

日志

位置 /data/logs/data_sync/

格式

  • access 日志({script_name}_fullcheck_access.log)

    [time req_id ] levelname time_cost succes_sync_keys | error_sync_keys

  • app 日志({script_name}_fullcheck_app.log)

    [time req_id ] levelname

    和接口的app日志一样, req_id 和对应的 access 日志一致

    并且在 脚本开始的时候会打一条

    [time req_id ] levelname <START>

    结束的时候打一条

    [time req_id ] levelname <END>

表结构

  • 采购写入(inventory-purchase-sync_fullcheck)
  • 订单写入(base-xnn_core_product_2-sync_fullcheck)
  • ES 写入(base-xnn_core_product_2-sync_fullcheck)
  • 操作日志写入(base-other-sync_fullcheck)
  • 分拣写入(inventory-sorting-sync_fullcheck)
  • 进销存写入(inventory-inventory-sync_fullcheck)

写入的数据格式:

{
    "type": "fullcheck" M str 表示此条记录是校验结果统计
    "name": "purchase_task" M str 表示此条记录校验的是什么,例如校验的是采购任务
    "unsync_keys": [ M list 未同步单据
        {
            "key": M 未同步单据号
            "modify_time": M datetime 编辑时间
            "reason": M str 未同步原因
            ... 还可以加别的业务认为需要的字段,例如,station_id,group_id等,不做硬性规定
        }
    ]
    "last_run_time": ISODate("...") M datetime 最后执行脚本时间
    "last_time_cost": M float 上一次执行耗时(秒)
}