Skip to content

长耗时异步优化使用说明

长耗时异步优化使用说明

服务架构

1. 负载均衡的nginx上分别配置同步以及异步的下载地址
2. worker2 上用supervisor启动gm_task
3. worker4 上用nginx+uwsgi部署gm_async_task异步微服务
4. worker3 上部署具体业务工程来调用

调用过程

1. gm_service中
    1. 创建任务, 并将任务存到数据库. async_task_srv.create(...)
    2. 发送任务, 通过任务处理函数名路径来发送
        ```
            task_queue = get_task_queue() # 得到队列, 本质是获取celery实例
            task = async_task_srv.get(user_task_id=task_id) # 从数据库中那出预先创建的任务, 创建与发送分开时候会需要
            task_queue.send_task(task['task_name'], args=[task['task_id']], queue=conf.gm_task_queue) # task['task_name']实际为处理的函数名(完整路径, 用.连接), args是任务处理函数函数参数, queue是指定的队列, 开发环境可以再local中自行配置自己的队列
        ```
2. gm_task 中
    1. 导入task装饰器, from celery.task import task
    2. 编写任务处理函数
        ```
            @task(acks_late=True)
            def batch_create_order(task_id):
                task_data = async_task_srv.start(task_id) # 规范, 开始任务
                pass
                async_task_srv.finish(task_id) # 规范, 完成任务
        ```
3. gm_async_task 中 包含所有对任务的操作
    1. task/add 添加任务 --> async_task_srv.create()
    2. task/start 启动任务 --> async_task_srv.start()
    3. task/finish 完成任务  --> async_task_srv.finish()
    4. task/get 获取任务  --> async_task_srv.get()
    5. task/update 更新任务结果, 状态等等  --> async_task_srv.update()
    6. task/list 批量获取任务  --> async_task_srv.list()
    7. task/hide 隐藏任务  -->async_task_srv.clear_show()

4. 导出以及其他操作区别点
    1. 导入或者处理操作中, gm_task会通过访问一个无需登录的接口来处理, 自行设置auth_key校验权限
    2. 导出操作中, gm_task会调用业务工程中具体的业务导出脚本来实现导出, subprocess.call('python3 tool/celery_tasks/xxxx.py'), 后期考虑如何解耦路径写死在代码中

灰度过程

1. worker3上灰度业务工程
2. worker2上灰度任务处理工程
    1. pull代码
    2. sudo supervisorctl reload 重启supervisord, 配置会重新读取, 所有工程也会重新启动; 配置未改变可跳过此步骤
    3. sudo supervisorctl restart gm_task 重新加载gm_task工程, 配置不会重新读取
    4. ps -ef | grep 'celery worker' 查看是否超过4个worker, 如果超过需要执行步骤4
    5. ps aux | grep 'celery worker' | awk '{print $2}' | xargs kill 停止多余的worker

优化思路(仅供参考)

1. @do_cprofile('from/to/xxx.prof') 在接口入口函数处使用, 该工具在station工程中的common.util下
2. gprof2dot -f pstats xxx.prof | dot -Tpng -o xxx.png 生成耗时分析图, 需先pip install gprof2dot
3.  +------------------------------+  耗时分析图的每个节点含义
    |        function name         |
    | total time % ( self time % ) |
    |         total calls          |
    +------------------------------+
4. 定位到耗时最多的某个函数, 验证数据库是否有索引, 无索引则加索引后重复以上步骤. 如果sql语句过于复杂可以使用dev机器上的SQLadvisor来获取优化意见; 注意验证线上环境和开发环境索引要保持一致
5. 定位到调用次数最多的某个函数, 验证是否将操作数据库步骤放在循环中, 如果是考虑提出到循环之外做批量操作
6. 以上优化做完, 仍然无法解决问题, 则考虑将改接口做异步优化.

数据库

表名: tbl_async_task
    id                      bigint           not null       任务id
    user_task_id            varchar(64)                     用户任务id
    name                    varchar(64)      not null       任务名称(任务处理函数名称)
    station_id              varchar(16)      not null       站点id
    input                   text             not null       业务请求输入数据
    result                  json                            业务处理结果
    status                  tinyint(4)       not null       任务状态, 1-已经提交, 2-正在执行, 3-已经完成
    repeat                  tinyint(1)                      是否可重复
    message_show            tinyint(1)                      是否展示
    run_count               int(11)          not null       执行次数
    creat_time              datetime         not null       创建时间
    modify_time             datetime         not null       修改时间
    start_time              datetime                        任务开始时间
    end_time                datetime                        任务结束时间
    business_id             int              not null       任务类型id
    business_name           str              not null       业务名称(前端展示所用)
    type                    int              not null       分类(0-导入, 1-导出, 2-处理)
    progress                int                             任务进度(0-100)

gm_service工程

/task/get 单个任务查看

接口描述: 单个任务查看
Method: Get
响应:
    code                    M       int             0为成功, 其它为失败
    msg                     M       str             错误提示信息
    data                    M       json            返回数据容器
    {
        task_id         M       int             任务id
        user_task_id    M       str             用户任务id
        task_name       M       str             任务名称
        status          M       int             任务状态, 1-已经提交, 2-正在执行, 3-已经完成, 4-部分成功, 停止执行, 5-执行失败(全部失败), 停止执行
        reason          M       str             失败原因
        progress        M       int             任务进度(1-100之间)
        type            M       int             任务类型(0-导入, 1-导出, 2-处理)
        result          M       json            返回数据容器
    }

/task/list 任务列表查询

接口描述: 批量任务查看
Method: Get
响应:
    code                    M       int             0为成功, 其它为失败
    msg                     M       str             错误提示信息
    data                    M       json            返回数据容器
    {
        finish              M       bool            True表示全部完成, 反之未完成
        tasks               M       list            任务列表
        [{
            task_id         M       int             任务id
            user_task_id    M       str             用户任务id
            task_name       M       str             任务名称
            status          M       int             任务状态, 1-已经提交, 2-正在执行, 3-已经完成, 4-部分成功, 停止执行, 5-执行失败(全部失败), 停止执行
            reason          M       str             失败原因
            progress        M       int             任务进度(1-100之间)
            type            M       int             任务类型(0-导入, 1-导出, 2-处理)
            result          M       json            返回数据容器
        }]
    }

/task/clear_show 清空已经完成任务

接口: 不展示已经完成的任务
Method: Post
请求:
    task_ids            O       list            任务id列表(task_ids与user_task_ids二者必有其一)
    user_task_ids       O       list            用户自定义id列表
响应:
    code                M       int             0为成功, 其它为失败
    msg                 M       str             错误提示信息
    data                M       null            返回数据容器

以上三个接口, 均为对外接口(前端调用)


/station/sales_analysis/orderdetail 订单列表导出(智能判断接口, 默认是大于20单走异步)

Method: Get
请求:
    ---- params ----
    query_type          M       int             查询类型(1-下单日期, 2-运营周期, 3-收货日期)
    start_date          C       date            下单开始日期(查询类型为1时候必须存在)
    end_date            C       date            下单结束日期(查询类型为1时候必须存在)
    format              O       str             用友格式(yongyou)
    time_config_id      C       str             运营周期id(查询类型为2时候必须存在)
    cycle_start_time    C       datetime        运营开始时间(查询类型为2时候必须存在)
    cycle_end_time      C       datetime        运营结束时间(查询类型为2时候必须存在)
    status              O       int             订单状态
    search_text         O       str             搜索字段
    search_area         O       str             地理标签
响应:
    code                M       int             0为成功, 其它为失败
    msg                 M       str             错误提示信息
    data                M       json            返回数据容器
    {
        async           M       boolean         0表示同步, 1表示异步
        task_url        C       str             任务结果返回地址(异步返回)
        link            C       str             业务数据结果(同步返回)
    }

/station/order/batch/submit 批量提交订单(异步接口)

接口描述:   异步批量提交订单
Method: POST
请求:
    ---- params ----
    task_id             M       str             任务id
    time_config_id      M       str             运营周期id
    order_data          M       list            订单数据
    file_name           M       str             上传文件名
响应:
    code                M       int             0为成功, 其它为失败
    msg                 M       str             错误提示信息
    data                M       json            返回数据容器
    {
        task_url        M       str             任务结果返回地址
    }