长耗时异步优化使用说明
长耗时异步优化使用说明
服务架构
1. 负载均衡的nginx上分别配置同步以及异步的下载地址
2. worker2 上用supervisor启动gm_task
3. worker4 上用nginx+uwsgi部署gm_async_task异步微服务
4. worker3 上部署具体业务工程来调用
调用过程
1. gm_service中
1. 创建任务, 并将任务存到数据库. async_task_srv.create(...)
2. 发送任务, 通过任务处理函数名路径来发送
```
task_queue = get_task_queue() # 得到队列, 本质是获取celery实例
task = async_task_srv.get(user_task_id=task_id) # 从数据库中那出预先创建的任务, 创建与发送分开时候会需要
task_queue.send_task(task['task_name'], args=[task['task_id']], queue=conf.gm_task_queue) # task['task_name']实际为处理的函数名(完整路径, 用.连接), args是任务处理函数函数参数, queue是指定的队列, 开发环境可以再local中自行配置自己的队列
```
2. gm_task 中
1. 导入task装饰器, from celery.task import task
2. 编写任务处理函数
```
@task(acks_late=True)
def batch_create_order(task_id):
task_data = async_task_srv.start(task_id) # 规范, 开始任务
pass
async_task_srv.finish(task_id) # 规范, 完成任务
```
3. gm_async_task 中 包含所有对任务的操作
1. task/add 添加任务 --> async_task_srv.create()
2. task/start 启动任务 --> async_task_srv.start()
3. task/finish 完成任务 --> async_task_srv.finish()
4. task/get 获取任务 --> async_task_srv.get()
5. task/update 更新任务结果, 状态等等 --> async_task_srv.update()
6. task/list 批量获取任务 --> async_task_srv.list()
7. task/hide 隐藏任务 -->async_task_srv.clear_show()
4. 导出以及其他操作区别点
1. 导入或者处理操作中, gm_task会通过访问一个无需登录的接口来处理, 自行设置auth_key校验权限
2. 导出操作中, gm_task会调用业务工程中具体的业务导出脚本来实现导出, subprocess.call('python3 tool/celery_tasks/xxxx.py'), 后期考虑如何解耦路径写死在代码中
灰度过程
1. worker3上灰度业务工程
2. worker2上灰度任务处理工程
1. pull代码
2. sudo supervisorctl reload 重启supervisord, 配置会重新读取, 所有工程也会重新启动; 配置未改变可跳过此步骤
3. sudo supervisorctl restart gm_task 重新加载gm_task工程, 配置不会重新读取
4. ps -ef | grep 'celery worker' 查看是否超过4个worker, 如果超过需要执行步骤4
5. ps aux | grep 'celery worker' | awk '{print $2}' | xargs kill 停止多余的worker
优化思路(仅供参考)
1. @do_cprofile('from/to/xxx.prof') 在接口入口函数处使用, 该工具在station工程中的common.util下
2. gprof2dot -f pstats xxx.prof | dot -Tpng -o xxx.png 生成耗时分析图, 需先pip install gprof2dot
3. +------------------------------+ 耗时分析图的每个节点含义
| function name |
| total time % ( self time % ) |
| total calls |
+------------------------------+
4. 定位到耗时最多的某个函数, 验证数据库是否有索引, 无索引则加索引后重复以上步骤. 如果sql语句过于复杂可以使用dev机器上的SQLadvisor来获取优化意见; 注意验证线上环境和开发环境索引要保持一致
5. 定位到调用次数最多的某个函数, 验证是否将操作数据库步骤放在循环中, 如果是考虑提出到循环之外做批量操作
6. 以上优化做完, 仍然无法解决问题, 则考虑将改接口做异步优化.
数据库
表名: tbl_async_task
id bigint not null 任务id
user_task_id varchar(64) 用户任务id
name varchar(64) not null 任务名称(任务处理函数名称)
station_id varchar(16) not null 站点id
input text not null 业务请求输入数据
result json 业务处理结果
status tinyint(4) not null 任务状态, 1-已经提交, 2-正在执行, 3-已经完成
repeat tinyint(1) 是否可重复
message_show tinyint(1) 是否展示
run_count int(11) not null 执行次数
creat_time datetime not null 创建时间
modify_time datetime not null 修改时间
start_time datetime 任务开始时间
end_time datetime 任务结束时间
business_id int not null 任务类型id
business_name str not null 业务名称(前端展示所用)
type int not null 分类(0-导入, 1-导出, 2-处理)
progress int 任务进度(0-100)
gm_service工程
/task/get 单个任务查看
接口描述: 单个任务查看
Method: Get
响应:
code M int 0为成功, 其它为失败
msg M str 错误提示信息
data M json 返回数据容器
{
task_id M int 任务id
user_task_id M str 用户任务id
task_name M str 任务名称
status M int 任务状态, 1-已经提交, 2-正在执行, 3-已经完成, 4-部分成功, 停止执行, 5-执行失败(全部失败), 停止执行
reason M str 失败原因
progress M int 任务进度(1-100之间)
type M int 任务类型(0-导入, 1-导出, 2-处理)
result M json 返回数据容器
}
/task/list 任务列表查询
接口描述: 批量任务查看
Method: Get
响应:
code M int 0为成功, 其它为失败
msg M str 错误提示信息
data M json 返回数据容器
{
finish M bool True表示全部完成, 反之未完成
tasks M list 任务列表
[{
task_id M int 任务id
user_task_id M str 用户任务id
task_name M str 任务名称
status M int 任务状态, 1-已经提交, 2-正在执行, 3-已经完成, 4-部分成功, 停止执行, 5-执行失败(全部失败), 停止执行
reason M str 失败原因
progress M int 任务进度(1-100之间)
type M int 任务类型(0-导入, 1-导出, 2-处理)
result M json 返回数据容器
}]
}
/task/clear_show 清空已经完成任务
接口: 不展示已经完成的任务
Method: Post
请求:
task_ids O list 任务id列表(task_ids与user_task_ids二者必有其一)
user_task_ids O list 用户自定义id列表
响应:
code M int 0为成功, 其它为失败
msg M str 错误提示信息
data M null 返回数据容器
以上三个接口, 均为对外接口(前端调用)
/station/sales_analysis/orderdetail 订单列表导出(智能判断接口, 默认是大于20单走异步)
Method: Get
请求:
---- params ----
query_type M int 查询类型(1-下单日期, 2-运营周期, 3-收货日期)
start_date C date 下单开始日期(查询类型为1时候必须存在)
end_date C date 下单结束日期(查询类型为1时候必须存在)
format O str 用友格式(yongyou)
time_config_id C str 运营周期id(查询类型为2时候必须存在)
cycle_start_time C datetime 运营开始时间(查询类型为2时候必须存在)
cycle_end_time C datetime 运营结束时间(查询类型为2时候必须存在)
status O int 订单状态
search_text O str 搜索字段
search_area O str 地理标签
响应:
code M int 0为成功, 其它为失败
msg M str 错误提示信息
data M json 返回数据容器
{
async M boolean 0表示同步, 1表示异步
task_url C str 任务结果返回地址(异步返回)
link C str 业务数据结果(同步返回)
}
/station/order/batch/submit 批量提交订单(异步接口)
接口描述: 异步批量提交订单
Method: POST
请求:
---- params ----
task_id M str 任务id
time_config_id M str 运营周期id
order_data M list 订单数据
file_name M str 上传文件名
响应:
code M int 0为成功, 其它为失败
msg M str 错误提示信息
data M json 返回数据容器
{
task_url M str 任务结果返回地址
}