利用SchedulerX提升LangChain脚本调度与效率

通过阿里云SchedulerX调度LangChain脚本,实现Prompt管理、资源优化、限流控制、失败重试及可视化监控,提升AI任务效率与稳定性。

原文标题:LangChain脚本如何调度及提效?

原文作者:阿里云开发者

冷月清谈:

本文介绍了如何通过阿里云的任务调度服务SchedulerX来管理和调度LangChain脚本,以提升AI任务的效率和可维护性。主要包括以下几个方面:

1. **脚本管理与调度:** 使用SchedulerX的脚本任务功能托管LangChain脚本,实现定时调度,并支持历史版本管理和回滚。
2. **Prompt管理:** 通过SchedulerX的任务参数动态管理Prompt,避免直接修改脚本,方便灵活调整。
3. **资源利用率:** 结合SchedulerX的ECS脚本任务和K8s任务两种模式,根据任务特性选择合适的运行时,提升资源利用率。
4. **限流控制:** 利用SchedulerX的应用级别限流功能,解决突发流量问题,保障核心任务的稳定运行。
5. **失败自动重试:** 开启SchedulerX的失败自动重试功能,降低因大模型限流或服务不可用导致的失败率。
6. **依赖编排:** 通过SchedulerX的可视化任务编排能力,处理LangChain脚本之间的依赖关系,甚至与其他类型任务进行编排。
7. **企业级可观测:** SchedulerX集成云产品,提供企业级可观测能力,包括调度大盘、监控报警和日志服务,方便监控任务状态和排查问题。

此外,文章还展望了AI任务调度未来的发展方向,包括AI任务管理、模型Failover、Tokens限流、AI任务批处理和AI可观测等方面。

怜星夜思:

1、SchedulerX的任务编排功能,除了文中的例子,你还能想到哪些LangChain应用场景可以通过编排实现更复杂的AI工作流?
2、文章提到了SchedulerX的限流控制可以避免突发流量打爆下游服务。除了文中的例子,你觉得在哪些LangChain应用中限流特别重要?为什么?
3、文章提到了Prompt管理,你有什么好的Prompt管理实践经验可以分享吗?

原文内容


概述

LangChain[1]是开源领域最流行的大模型编程开发框架,支持通过python/js语言快速构建AI应用。Dify[2]是开源的图形化大模型应用开发平台,可以通过可视化的画布拖拖拽拽快速构建AI agent/工作流。

通过任务调度系统托管AI任务,可以进行脚本版本管理、定时调度、提升资源利用率、限流控制、可运维、可观测。

由于篇幅有限,本文章主要介绍通过任务调度SchedulerX[3]进行LangChain脚本的管理和调度,Dify工作流调度将在下一篇介绍。

脚本管理及调度

AI任务有许多业务场景,需要定时调度,比如:

  • 风险监控:每分钟扫描风险数据,通过大模型分析是否有风险事件,并发出报警。
  • 数据分析:每天拉取金融数据,通过大模型进行数据分析,给出投资者建议。
  • 内容生成:每天帮我做工作总结,写日报。

LangChain任务基本上都是python脚本,可以使用SchedulerX的脚本任务[4]托管脚本,并进行定时配置

任务调度SchedulerX还支持脚本的历史版本,方便进行历史版本的对比和回滚:

Prompt管理

Prompt(提示词)对于AI任务来说非常重要,为了得到好的效果,可能需要经常修改Prompt,将Prompt写在脚本中会非常麻烦。我们可以通过SchedulerX的任务参数来管理Prompt,在LangChain脚本中通过SchedulerX提供的系统参数(#{schedulerx.jobParameters})动态获取任务参数,来代替Prompt或者PromptTemplate参数。

定时调度获取Prompt

Prompt写法
1. 通过SchedulerX控制台编写脚本
from langchain_community.llms import Tongyi
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

llm = Tongyi(model=“qwen-plus”)
question = “#{schedulerx.jobParameters}”
print(“question:” + question)

results = llm.invoke(question)
print(results)

2. 配置任务参数
PromptTemplate写法
1. 通过SchedulerX控制台编写脚本
from langchain_community.llms import Tongyi
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

llm = Tongyi(model=“qwen-plus”)
prompt = PromptTemplate(template=“请帮我解答这个问题:{question}”)

chain = LLMChain(llm=llm, prompt=prompt)
question = “#{schedulerx.jobParameters}”
print(“question:” + question)

results = chain.invoke(question)
print(results)

2. 配置任务参数

API调度动态传递Prompt

SchedulerX也支持通过控制台手动运行或者API调度,动态设置新的Prompt,以上面PromptTemplate写法为例,通过控制台手动运行任务,动态传递任务参数,该任务参数会覆盖任务配置中的静态任务参数。

提升资源利用率

SchedulerX执行脚本,当前支持两种模式(未来会支持更多的运行时):

  • 脚本任务:在ECS上部署schedulerx-agent,每次执行fork一个子进程执行脚本,适合任务数比较多、调度频繁、资源消耗少的场景。
  • K8s任务[5]:在K8s上部署schedulerx-agent,每次执行弹一个Pod执行脚本,适合任务数不多、调度不频繁、资源消耗大的场景。

两种运行时适合不同的场景,结合起来使用,可以提升资源利用率。

如上图所示,通过ECS执行脚本以及通过K8s执行脚本,主要区别总结下表:

限流控制

业务场景:比如有一堆离线任务,每天0点之后执行,处理上一天的数据,核心任务必须在早上9点上班前全部跑完。业务同学可能会把任务的调度时间都设置成同一时刻,比如每天00:30执行。

当大量任务同时调度的时候,会把ECS资源打满。虽然用K8s跑脚本可以解决一部分问题,但是突增的流量一样会把下游(比如数据库)打满。所以针对这种突增流量的场景,最佳解决方案是使用限流。通过限流控制解决定时调度不均特别是突发流量的场景,其实也是一种提升资源利用率的解决方案。

如上图所示,任务调度SchedulerX支持应用级别的限流控制:

1. 每个应用会有2个队列,一个是优先级排队队列,可以把任务按照优先级在队列中排队,保证核心任务优先跑完。任务的优先级仅在自己的应用下生效,不会和其他应用产生冲突。
2. 另一个是并发数队列,控制这个应用的并发数,不同应用的并发数彼此不受干扰。
3. 当并发队列中某个任务运行完成,有空闲槽位后,会从排队队列头部取出任务,放到并发队列中,开始执行任务。

失败自动重试

当前大模型调用不是很稳定,大家平时和大模型聊天,可能会经常遇到token限流了,或者是后端服务异常了。这个时候我们只要过一会重新尝试下就好了。

任务调度SchedulerX自带任务失败自动重试功能,可以通过控制台动态配置,经过我们验证,使用失败重试功能,LangChain脚本因为后端大模型限流或者服务不可用导致的失败率大大降低,成功率可以提升至少一个9。

依赖编排

SchedulerX提供可视化任务编排能力,如果你的LangChain脚本有依赖关系,可以进行任务编排。甚至是不同任务类型的任务,都可以进行编排。

如上图所示:

1. 先通过Shell脚本,去大数据平台拉取数据。
2. 通过Java代码实现,做商家数据和用户数据清洗。
3. 通过LangChain实现,把清洗好的数据用大模型做数据分析。
4. 最后,再通过Python脚本生成报表。

企业级可观测

任务调度SchedulerX默认集成了各种云产品,提供企业级可观测能力,包括但不限于如下功能。

调度大盘

调度大盘可以看到任务执行的总体情况,支持按照命名空间和应用过滤筛选。

监控报警

任务如果执行失败了,需要快速响应处理,否则容易产生故障。SchedulerX支持应用级别报警,也能精细到每个任务级别,如下图所示是任务级别报警配置。

  • 联系人管理:支持联系人和联系人组管理,支持同步云监控联系人。
  • 报警方式:失败报警、超时报警、成功通知。
  • 报警渠道:邮件、webhook、短信、电话。

日志服务

当任务执行失败了,需要查看任务运行的日志分析问题。只要接入schedulerx-agent运行脚本,默认就集成了日志服务,可以看到脚本运行的所有标准输出和异常。

1. 配置如下脚本
2. 任务参数配置如下
3. 运行一次,查看日志

未来展望

在AI时代,AI任务调度面临着新的机遇和挑战,我们总结了一些常见的需求如下:

  • AI任务管理:可以通过任务调度配置prompt模版、模型类型、输出格式等参数,通过控制台可以动态调整。
  • 模型Failover:通过任务调度系统托管各种模型,如果某个模型调用失败,可以自动重试其他的模型,进一步提升任务执行的成功率。
  • Tokens限流:每个任务返回消耗的tokens,任务调度系统能做到token级别的限流,防止触发下游大模型的API限流。
  • AI任务批处理:AI任务执行时间比较长,特别是推理型模型时间更加长,通过任务调度系统进行任务拆分及分布式处理,加快任务执行速度。
  • AI可观测:可以看到每个任务的执行耗时、消耗的tokens、输入和输出。如果是工作流,可以看到每个node级别的耗时、tokens消耗、输入和输出。

如果您有AI任务调度方面的其他诉求,欢迎联系我们,钉钉群号:23103656

参考链接:
[1]https://python.langchain.com/docs/introduction/
[2]https://dify.ai/zh
[3]https://www.aliyun.com/ntms/middleware/schedulerx
[4]https://help.aliyun.com/zh/mse/user-guide/script-task
[5]https://help.aliyun.com/zh/mse/user-guide/k8s-task

其实可以更进一步,把模型也放进去。现在不是流行A/B测试吗?可以这么玩:

1. 准备多个LangChain模型: 多个模型,每个模型都针对特定领域或任务进行了优化。
2. 流量分配: 根据预设的策略(比如按比例随机分配),将用户请求分发到不同的模型。
3. 动态路由: SchedulerX可以根据模型的响应时间、成功率等指标,动态调整流量分配策略,将更多流量导向表现更好的模型。
4. 效果评估: 定期分析各个模型的表现数据,用于模型优化和选择。

这样,我们就可以在一个可控的环境下,持续优化我们的AI模型,提升AI应用的效果。

分享一个我常用的Prompt模板管理方法:

1. 版本控制: 将Prompt模板存储在Git等版本控制系统中,方便追踪修改历史和回滚。
2. 参数化: 将Prompt中的可变部分参数化,方便根据不同场景动态调整。
3. 分类管理: 按照应用场景对Prompt进行分类管理,方便查找和复用。
4. 注释说明: 为每个Prompt添加详细的注释说明,包括适用场景、输入参数、输出格式等。

通过这些方法,可以有效地管理Prompt,提高开发效率和模型效果。

我的经验是,Prompt的版本控制非常重要。因为Prompt的细微改动,都可能对结果产生很大的影响。如果Prompt管理不善,很容易出现“改坏了,又不知道怎么改回去”的情况。所以,一定要养成良好的版本控制习惯,每次修改Prompt都提交一个commit,方便回溯。

其实有些看似不重要的应用,也需要限流。比如,一个简单的LangChain聊天机器人,如果被恶意攻击,短时间内收到大量垃圾消息,可能会导致机器人反应迟钝,甚至崩溃。虽然这看起来不是什么大问题,但也会影响用户体验,甚至损害品牌形象。所以,即使是小型应用,也应该考虑限流。

我一般会维护一个Prompt的“黑名单”。就是那些经过测试,效果很差,或者容易引起模型出错的Prompt。把这些Prompt记录下来,避免以后重复踩坑。这其实也是一种Prompt管理,可以有效提高模型的稳定性和安全性。

我考虑到一个在电商领域的应用:

1. 用户行为分析: 首先,用一个脚本任务分析用户的浏览、购买历史等行为数据。
2. 产品推荐: 然后,利用LangChain,基于用户行为分析结果,生成个性化的产品推荐文案。
3. 定向推送: 接着,通过API接口,将这些推荐文案推送给对应的用户。
4. 效果评估: 最后,再用一个脚本任务,收集用户对推荐的反馈数据,用于优化推荐模型。

这样,就把用户行为分析、内容生成和营销推广串联起来了,形成一个闭环的AI工作流。

我认为在API接口集成的LangChain应用中,限流特别重要。例如,一个基于LangChain的智能客服系统,如果同时有大量用户发起咨询,瞬间涌入的请求可能会压垮下游的大模型服务或者知识库系统,导致客服系统瘫痪。通过限流,可以平滑请求流量,保证系统的稳定运行。

SchedulerX的任务编排功能确实强大,除了文章中提到的数据处理和分析的例子,我想到一个场景:

可以编排一个自动化内容创作流程。比如:
1. 抓取热门话题: 第一个任务使用爬虫脚本从社交媒体或新闻网站抓取当前的热门话题。
2. 生成文章大纲: 第二个任务使用LangChain,基于热门话题生成文章大纲。
3. 撰写文章内容: 第三个任务再次使用LangChain,根据生成的大纲,撰写文章内容。
4. 发布到多个平台: 第四个任务使用脚本将文章发布到不同的自媒体平台。

这个流程可以完全自动化,并且可以根据话题热度动态调整内容,非常适合内容营销的场景。

我想到的是内容审核类应用。很多平台需要用AI来审核用户上传的内容,比如图片、视频、文本等等。如果恶意用户大量上传违规内容,触发AI审核,可能会导致系统资源耗尽,影响正常用户的体验。所以,对这类应用来说,限流是必不可少的安全措施。