Skip to content

异步消息方案

涉及接口

thrift

  1. order_update_details 更新商品
  2. create_order 新建订单
  3. order_rm_details 删除商品

station 接口

  1. /station/order/update/deliver_time 修改订单配送时间

  2. /station/task/purchase/create_from_order 新建采购任务

    Method
        POST
    请求
        order_id    O string    订单号
        sku_id    O   string    SKUID
        plan_purchase_amount   M   float    计划采购数
    响应
        code    M   int     返回码,0表示成功,其他表示错误
        msg     M   string  错误信息
    逻辑
        1. 新加order_id参数,逻辑上兼容订单来源的请求
  1. /station/task/purchase/edit 编辑采购任务
    Method
        POST
    请求
        order_id    O string    订单号
        sku_id    O   string    SKUID
        plan_amount_delta   M   float    计划采购差值
    响应
        code    M   int     返回码,0表示成功,其他表示错误
        msg     M   string  错误信息
    逻辑
        1. 新接口,处理编辑订单内商品的消息
  1. /station/task/purchase/delete_order
    Method
        POST
    请求
        order_id    O string    订单号
    响应
        code    M   int     返回码,0表示成功,其他表示错误
        msg     M   string  错误信息
    逻辑
        1. 新接口,处理编辑订单内删除商品的消息

pypi库mq_lib

mq_lib.make_channel(db_host, db_port, db_user, db_password, rabbitmq_url)

配置mq_lib连接,
成功返回一个mq_lib.Channel类,失败抛出异常
这个函数应该全局运行一次,而后使用mq_lib.Channel的实例来操作消息
db_host 数据库url
db_port 数据库端口
db_user 数据库账户
db_password 数据库密码
rabbitmq_url rabbitmq的url

class Channel(object):

"""
mq连接类
使用示例:
channel = Channel('localhost', 3306, 'user', 'password', 'amqp://localhost')
msg = channel.publish('order', 'hello world', 'order_id')
channel
"""

def __init__(self):
    """
    初始化Channel类
    """

def publish(topic, msg_body, key):
    """
    发布消息,必须在主流程前运行,否则可能会导致主流程和队列不一致

    topic 主题名
    msg_body 消息内容
    key 唯一标识

    正确返回mq_lib.Msg消息对象,错误抛出异常
    """

def send(msg):
    """
    发送消息

    msg 为publish返回的Msg对象

    正确返回成功发送条数,错误抛出异常
    """

def send_batch(msgs):
    """
    批量发送消息

    msgs 为Msg对象列表

    正确返回成功发送条数,错误抛出异常
    """

class Msg(object): """ 消息类 """ def init(self, topic, body, key): """ topic 主题名 msg_body 消息内容 key 唯一标识 """

tbl_msg表

CREATE TABLE `tbl_msg` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `topic` varchar(16) NOT NULL DEFAULT '' COMMENT '主题',
  `body` text NOT NULL COMMENT '正文',
  `key` varchar(64) NOT NULL DEFAULT '' COMMENT '唯一标识',
  `seq` int(11) NOT NULL COMMENT '序列号',
  `create_time` datetime NOT NULL,
  `modify_time` datetime NOT NULL,
  `status` int(11) NOT NULL COMMENT '状态(1操作前,2操作成功,3发送成功)',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

tbl_push表

CREATE TABLE `tbl_push` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `subscription_id` int(11) NOT NULL COMMENT '订阅',
  `msg_id` int(11) NOT NULL COMMENT '消息ID',
  `key` varchar(64) NOT NULL DEFAULT '' COMMENT '唯一标识',
  `seq` int(11) NOT NULL COMMENT '序列号',
  `push_count` int(11) NOT NULL COMMENT '推送次数',
  `create_time` datetime NOT NULL,
  `modify_time` datetime NOT NULL,
  `status` int(11) NOT NULL COMMENT '状态(1等待处理,2已处理,3处理失败)',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT ChannelRSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

tbl_subscription表

CREATE TABLE `tbl_subscription` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `topic` varchar(16) NOT NULL DEFAULT '' COMMENT '主题',
  `endpoint` varchar(1024) NOT NULL DEFAULT '' COMMENT '处理接口',
  `name` varchar(16) NOT NULL DEFAULT '' COMMENT '名字',
  `status` int(11) NOT NULL COMMENT '状态(1删除,2激活)',
  `create_time` datetime NOT NULL,
  `modify_time` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

工作流程

CMQ工作流