Skip to content

数据一致性2 规范

实时业务系统

实时业务系统在开发过程中,需要满足的条件。

  1. 用户输入数据(a)持久化
  2. 目标数据(b)的可重入性

细节参看前面一致性方案 -> 基本方案

消息订阅接口

业务上必须要考虑的点

  1. 消息乱序到达
  2. 消息重入
  3. 接口性能

其他点交给补偿脚本处理

校对与补偿脚本

脚本这里列明了具体脚本满足的规范

一致性校对脚本

  1. 参数

    参数 类型 是否可选 意义
    --batch int 大量的查询通过这个参数来控制单次查询的量
    --sleep int 批量查询的间隔,配合batch使用
  2. 逻辑

    校验 1min 前的数据是否一致,将校验结果写入监控表sync_monitor中(实例部署见“脚本运行监控”),监控记录格式:

    ```
    {
        "type": "monitor" M str 表示此条记录是校验结果统计
        "name": "purchase_task" M str 表示此条记录校验的是什么,例如校验的是采购任务
        "unsync_count": 1 M int 未同步计数
        "last_success_time": ISODate("...") M datetime 最后已同步的时间(连续)比如五个订单同步成功了1245,订单3失败了,这边记录的就是订单2的编辑时间(注意,不是校验脚本跑的时间!)
        "last_check_time": ISODate("...") M datetime 最后检查时间
        "last_time_cost": M float 上一次执行耗时(秒)
    }
    ```
    
  3. 日志

    • access 日志({script_name}_monitor_access.log)

      [time req_id ] levelname time_cost succes_sync_keys | error_sync_keys

    • app 日志({script_name}_monitor_app.log)

      [time req_id ] levelname

      和接口的app日志一样, req_id 和对应的 access 日志一致

      并且在 脚本开始的时候会打一条

      [time req_id ] levelname <START>

      结束的时候打一条

      [time req_id ] levelname <END>

错误修复脚本

  1. 参数

    参数 类型 是否可选 意义
    --stations list[str] 过滤站点
    --groups list[int] 过滤group
    --begin date 过滤开始日期
    --end date 过滤截止日期
    --keys list 过滤单据
    --batch int 如果有批量操作的话,使用这个参数控制单次批量操作数量
    --sleep int 如果有批量操作的话,使用这个参数控制批量处理间隔
    --runid str 因为有可能部署多个脚本并发,这个字段区别当前脚本是哪个,由运维部署时指定
    -u str 操作人

    说明: 1. keys 优先级最高,传了就只过滤keys 1. stations 和 groups 过滤二选一,全过滤报错 1. begin 和 end 如果过滤必须同时传

  2. 逻辑

    同步 last_success_time 到 1min 前没有同步的数据,last_success_time 从校验监控表中获取。

    默认写入目标数据时直接连接DB写入,如果通过调用业务接口的方式写入,考虑到在紧急状态下的性能压力,需要专门评审。

    执行脚本完需要把执行结果统计写入监控表sync_monitor中(实例部署见“脚本运行监控”),监控记录表格式:

    {
        "type": "sync" M str 表示此条记录是同步结果统计
        "name": "purchase_task" M str 表示此脚本同步的是什么,例如同步的是采购任务
        "runid": "blabla" M str id
        "last_run_time": ISODate("...") M datetime 最后执行脚本时间
        "success_sync_count": M int 最近一次执行同步成功数量
        "error_sync_count": M int 最近一次执行同步失败数量
        "last_time_cost": M float 最近一次执行耗时(秒)
    }
    
  3. 日志

    • access 日志({script_name}_sync_access.log)

      [time req_id runid] levelname time_cost succes_sync_keys | error_sync_keys

    • app 日志({script_name}_sync_app.log)

      [time req_id runid] levelname

      和接口的app日志一样, req_id 和对应的 access 日志一致

      并且在 脚本开始的时候会打一条

      [time req_id runid] levelname <START>

      结束的时候打一条

      [time req_id runid] levelname <END>

  4. 性能要求

    1. 脚本必须能并行运行
    2. 并行至少要能基于不同客户并行
    3. 如果相同客户不同单可以并行最好
    4. 需要评估出当前业务数据量下,怎样的并行程度能支持分钟级同步完成。

脚本部署

  1. 脚本在各自工程中的位置

    1. /tools/data_sync/xxx_monitor.py
    2. /tools/data_sync/xxx_sync.py
  2. 日志位置

    1. /data/logs/data_sync/{script_name}_monitor_access.log
    2. /data/logs/data_sync/{script_name}_monitor_app.log
    3. /data/logs/data_sync/{script_name}_sync_access.log
    4. /data/logs/data_sync/{script_name}_sync_app.log
  3. 运行频率

    正常为 1min 一次,可调整。

  4. 校验数据库

    校验表部署位置:<实例>.<库名>.<表名>

    • 采购库:inventory.purchase.sync_monitor
    • 订单库:base.xnn_core_product_2.sync_monitor
    • 操作日志:base.other.sync_monitor
    • 分拣库:inventory.sorting.sync_monitor
    • 进销存:inventory.inventory.sync_monitor

    monitor和sync的监控数据都写入同一个sync_monitor中,分别用不同的记录类型。

脚本监控

针对sync_monitor表中的两种记录类型做监控。

违规等级

严重