compose-chain_parallel 模块深度解析
1. 引言
欢迎来到 compose-chain_parallel 模块的技术深度解析文档。本模块是 CloudWeGo Eino 框架中负责构建并行处理组件的关键部分,它允许开发者以简洁的方式在链式流程中嵌入并行执行的节点组,从而提升系统的处理效率和并发能力。
在本文档中,我们将深入探讨这个模块的设计理念、实现原理、核心组件、依赖关系以及使用场景,帮助你全面理解并有效地应用这一模块。
2. 问题空间与模块定位
2.1 问题背景
在构建复杂的 AI 应用和工作流时,我们经常会遇到需要同时执行多个相互独立操作的场景。例如:
- 多模型并发调用:同时向多个不同的大语言模型发送相同的查询,以便比较结果或进行投票
- 并行数据处理:同时对同一输入进行多种不同的转换或处理
- 多源信息检索:同时从多个不同的数据源或检索器中获取相关信息
在传统的串行处理方式中,这些操作需要依次执行,导致总处理时间等于所有操作时间之和。而通过并行处理,我们可以将总时间降低到最慢的那个操作的时间,显著提升系统的响应速度和吞吐量。
2.2 模块定位
compose-chain_parallel 模块正是为了解决上述问题而设计的。它是 Eino 框架中 compose-graph_engine 模块下的一个子模块,专门用于在链式(Chain)工作流中创建并行执行的节点组。
该模块的核心价值在于:
- 简化并行逻辑的构建:提供直观的 API,让开发者无需关心底层并发控制细节
- 与链式工作流无缝集成:作为 Chain 的一个组成部分,保持了整体工作流的连贯性
- 灵活的节点类型支持:支持各种类型的节点,包括模型、模板、工具、Lambda 函数等
3. 核心概念与心智模型
3.1 核心抽象
在理解 compose-chain_parallel 模块时,有几个核心概念需要掌握:
- Parallel:这是模块的核心结构体,代表一个并行执行的节点组容器。
- Output Key:每个添加到 Parallel 中的节点都需要一个唯一的输出键,用于在最终结果中标识该节点的输出。
- 节点对(nodeOptionsPair):包含实际的图节点和其配置选项的内部结构。
3.2 心智模型
理解 compose-chain_parallel 的一个有效心智模型是将其想象为**"分流器-收集器"**系统:
- 分流器:当工作流执行到 Parallel 节点时,输入数据会被复制并分发给 Parallel 中的所有子节点,就像水流被分配到多个平行的管道中一样。
- 并行处理:每个子节点在各自的"管道"中独立处理输入数据,互不干扰。
- 收集器:所有子节点执行完成后,它们的输出会被收集起来,组装成一个以 output key 为键的 map,然后传递给下一个节点。
这种设计使得并行逻辑的构建变得非常直观,开发者只需要关注每个子节点的功能,而无需担心并发控制、同步等底层细节。
4. 架构与数据流
4.1 模块架构
compose-chain_parallel 模块的架构相对简洁,主要由以下几个部分组成:
┌─────────────────────────────────────────────────────────────┐
│ Parallel │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ nodes: []nodeOptionsPair │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ │ │
│ │ │ Node 1 │ │ Node 2 │ │ Node N │ │ │
│ │ │ (outputKey1) │ │ (outputKey2) │ │ (outputKeyN)│ │ │
│ │ └──────────────┘ └──────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ outputKeys: map[string]bool │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
4.2 构建时数据流
当构建包含 Parallel 的 Chain 时,数据流如下:
- 开发者创建一个 Parallel 实例
- 通过各种 Add* 方法向 Parallel 中添加节点,每个节点都指定一个唯一的 output key
- 将 Parallel 实例通过 Chain.AppendParallel() 方法添加到 Chain 中
- 在 AppendParallel 内部,系统会:
- 验证 Parallel 的有效性(非空、有足够的节点等)
- 为每个节点生成唯一的节点键
- 将所有节点添加到底层的 Graph 中
- 添加从起始节点到所有并行节点的边
- 更新 Chain 的 preNodeKeys 为所有并行节点的键
4.3 运行时数据流
在运行时,当执行到 Parallel 部分时:
- 前一个节点的输出作为输入传递给所有并行节点
- 所有并行节点同时开始执行,处理相同的输入
- 每个节点执行完毕后,将结果存储在自己的 output key 下
- 等待所有节点执行完毕
- 将所有节点的结果组装成一个 map[string]any,其中键是节点的 output key
- 将这个 map 作为输入传递给 Chain 中的下一个节点
5. 核心组件深度解析
5.1 Parallel 结构体
Parallel 是模块的核心结构体,定义如下:
type Parallel struct {
nodes []nodeOptionsPair
outputKeys map[string]bool
err error
}
组件职责:
nodes:存储所有添加到 Parallel 中的节点及其配置选项outputKeys:跟踪已使用的输出键,确保唯一性err:存储构建过程中可能发生的错误,实现错误传播
设计亮点:
- 使用构建器模式,所有 Add* 方法都返回 *Parallel,支持链式调用
- 延迟错误处理,错误会在添加到 Chain 时才被检查
- 内部使用 map 来确保输出键的唯一性
5.2 NewParallel 函数
func NewParallel() *Parallel {
return &Parallel{
outputKeys: make(map[string]bool),
}
}
这个函数简单但重要,它创建并初始化一个新的 Parallel 实例。注意它只初始化了 outputKeys,而 nodes 和 err 则保持它们的零值(nil),这是符合 Go 语言习惯的做法。
5.3 Add* 方法族
Parallel 提供了一系列 Add* 方法,用于添加不同类型的节点:
AddChatModel:添加聊天模型节点AddChatTemplate:添加聊天模板节点AddToolsNode:添加工具节点AddLambda:添加 Lambda 函数节点AddEmbedding:添加嵌入模型节点AddRetriever:添加检索器节点AddLoader:添加文档加载器节点AddIndexer:添加索引器节点AddDocumentTransformer:添加文档转换器节点AddGraph:添加子图/子链节点AddPassthrough:添加直通节点
所有这些方法都遵循相似的模式:
- 接收输出键、节点实例和可选配置
- 将节点转换为内部的 graphNode 格式
- 调用内部的 addNode 方法进行实际添加
- 返回 Parallel 实例以支持链式调用
这种设计提供了一致的 API 体验,同时确保了各种类型节点的正确处理。
5.4 addNode 内部方法
addNode 是 Parallel 的内部方法,实现了节点添加的核心逻辑:
func (p *Parallel) addNode(outputKey string, node *graphNode, options *graphAddNodeOpts) *Parallel {
// 检查是否已有错误
if p.err != nil {
return p
}
// 验证节点不为空
if node == nil {
p.err = fmt.Errorf("chain parallel add node invalid, node is nil")
return p
}
// 确保 outputKeys 已初始化
if p.outputKeys == nil {
p.outputKeys = make(map[string]bool)
}
// 检查输出键是否重复
if _, ok := p.outputKeys[outputKey]; ok {
p.err = fmt.Errorf("parallel add node err, duplicate output key= %s", outputKey)
return p
}
// 验证节点信息不为空
if node.nodeInfo == nil {
p.err = fmt.Errorf("chain parallel add node invalid, nodeInfo is nil")
return p
}
// 设置节点的输出键
node.nodeInfo.outputKey = outputKey
// 添加节点到列表
p.nodes = append(p.nodes, nodeOptionsPair{node, options})
p.outputKeys[outputKey] = true
return p
}
这个方法展示了几个重要的设计决策:
- 错误快速失败但延迟报告:一旦出现错误,后续操作都会被跳过,但错误会保存直到 Parallel 被添加到 Chain 时才报告
- 防御性编程:多处验证确保状态一致性,即使是在内部方法中
- 输出键唯一性保证:通过 map 来确保每个输出键只被使用一次
- 节点信息完整性:确保节点的 nodeInfo 存在并正确设置 outputKey
6. 与其他模块的关系
6.1 依赖关系
compose-chain_parallel 模块与以下模块有紧密的依赖关系:
- compose-graph_engine:Parallel 最终会被转换为 Graph 中的节点和边
- 各种组件模块:如 model、prompt、retriever 等,这些是可以添加到 Parallel 中的节点类型
- compose-chain:Parallel 主要是作为 Chain 的一部分使用的
6.2 被调用关系
主要被以下模块调用:
- compose-chain:通过 AppendParallel 方法将 Parallel 集成到 Chain 中
- 用户代码:开发者直接创建和配置 Parallel 实例
7. 设计决策与权衡
7.1 构建器模式 vs 函数选项模式
Parallel 采用了构建器模式(Builder Pattern)而不是函数选项模式(Functional Options Pattern)。
选择原因:
- 节点添加是一个累积过程,构建器模式更自然
- 支持流畅的链式调用,提高代码可读性
- 每个 Add* 方法都有明确的语义,比通用的选项函数更直观
权衡:
- 相比函数选项模式,构建器模式通常会产生更多的代码
- 但在这个场景下,不同类型节点的添加逻辑确实不同,构建器模式是更合适的选择
7.2 延迟错误处理
Parallel 采用了延迟错误处理策略:错误在发生时被保存,但不会立即返回,而是等到 Parallel 被添加到 Chain 时才被检查。
选择原因:
- 支持流畅的链式调用,不需要在每一步都检查错误
- 简化了用户代码,错误处理可以集中在一个地方
权衡:
- 错误发生的位置和报告的位置可能有距离,调试时可能稍微困难
- 一旦发生错误,后续的操作都会被静默跳过,直到错误被报告
7.3 输出键唯一性验证
Parallel 强制要求每个节点的输出键必须唯一。
选择原因:
- 避免结果覆盖,确保每个节点的输出都能被正确获取
- 简化下游节点的逻辑,它们可以确定每个键对应唯一的值
权衡:
- 增加了一定的约束,用户需要确保键的唯一性
- 但这是一个合理的约束,因为重复的键在并行执行场景下几乎总是一个错误
7.4 输入复用 vs 输入分区
Parallel 设计为将相同的输入传递给所有节点,而不是将输入分区给不同节点。
选择原因:
- 更符合常见的 AI 应用场景,如多模型比较、多源信息收集等
- 简化了使用模型,用户不需要考虑如何分区输入
权衡:
- 不适合需要将大数据集分区并行处理的场景
- 但这可以通过其他模式(如先分区再并行)来实现,不属于本模块的核心职责
8. 使用指南与示例
8.1 基本使用
以下是使用 compose-chain_parallel 的基本步骤:
// 1. 创建 Parallel 实例
parallel := compose.NewParallel()
// 2. 添加节点,每个节点指定一个唯一的输出键
parallel.
AddChatModel("model_a", modelA).
AddChatModel("model_b", modelB).
AddLambda("custom_process", myLambdaFunction)
// 3. 创建 Chain 并添加 Parallel
chain := compose.NewChain[InputType, OutputType]()
chain.
AppendSomeNode(...).
AppendParallel(parallel).
AppendAnotherNode(...)
// 4. 编译并使用 Chain
runnable, err := chain.Compile(ctx)
if err != nil {
// 处理错误
}
result, err := runnable.Invoke(ctx, input)
8.2 完整示例
让我们看一个更完整的示例,展示如何在实际场景中使用 Parallel:
func TestParallelExample(t *testing.T) {
// 创建两个不同的聊天模型
modelGpt := createChatModel("gpt-4")
modelClaude := createChatModel("claude-3")
// 创建一个并行组件,同时调用两个模型
parallel := compose.NewParallel()
parallel.
AddChatModel("gpt_result", modelGpt).
AddChatModel("claude_result", modelClaude).
AddLambda("timestamp", compose.InvokableLambda(
func(ctx context.Context, input any) (string, error) {
return time.Now().Format(time.RFC3339), nil
}))
// 创建一个处理并行结果的 Lambda
resultProcessor := compose.InvokableLambda(
func(ctx context.Context, results map[string]any) (string, error) {
gptResult := results["gpt_result"].(*schema.Message)
claudeResult := results["claude_result"].(*schema.Message)
timestamp := results["timestamp"].(string)
// 这里可以比较、聚合或选择结果
return fmt.Sprintf("[%s]\nGPT: %s\nClaude: %s",
timestamp, gptResult.Content, claudeResult.Content), nil
})
// 构建完整的链
chain := compose.NewChain[*schema.Message, string]()
chain.
AppendParallel(parallel).
AppendLambda(resultProcessor)
// 编译和运行
runnable, err := chain.Compile(context.Background())
if err != nil {
t.Fatalf("Failed to compile chain: %v", err)
}
input := schema.UserMessage("What is AI?")
result, err := runnable.Invoke(context.Background(), input)
if err != nil {
t.Fatalf("Failed to invoke chain: %v", err)
}
t.Logf("Result: %s", result)
}
这个示例展示了如何:
- 同时调用两个不同的模型
- 添加一个自定义的 Lambda 来提供额外信息(时间戳)
- 处理并行执行的结果,将它们组合成一个单一的输出
8.3 嵌套使用
Parallel 也可以与其他结构嵌套使用,例如在 Branch 中使用 Parallel,或者在 Parallel 中使用 Graph/Chain:
// 创建一个内部链
innerChain := compose.NewChain[map[string]any, string]()
innerChain.AppendLambda(someProcessingLambda)
// 创建并行组件,其中包含一个链
parallel := compose.NewParallel()
parallel.
AddChatModel("direct_model", model).
AddGraph("chain_result", innerChain)
// 在分支中使用并行
branch := compose.NewChainBranch(conditionFunc)
branch.
AddParallel("branch_a", parallelA).
AddParallel("branch_b", parallelB)
// 主链
mainChain := compose.NewChain[InputType, OutputType]()
mainChain.
AppendBranch(branch).
AppendSomeMoreNodes(...)
9. 注意事项与常见陷阱
9.1 输出键唯一性
最常见的错误是忘记确保输出键的唯一性。如果尝试添加具有相同输出键的两个节点,Parallel 会记录一个错误,该错误会在添加到 Chain 时暴露出来。
解决方法:
- 为每个节点使用清晰、描述性的键名
- 考虑在键名中包含节点类型,如 "gpt_response"、"retrieval_results" 等
9.2 节点数量限制
Parallel 至少需要两个节点才能正常工作。如果你尝试添加只有一个节点的 Parallel,AppendParallel 会返回错误。
解决方法:
- 确保在添加到 Chain 之前,Parallel 中至少有两个节点
- 如果只需要一个节点,直接添加到 Chain 中即可,不需要使用 Parallel
9.3 错误处理
由于 Parallel 采用延迟错误处理,有时很难确定错误发生的确切位置。
解决方法:
- 在构建 Parallel 时逐步测试,每添加几个节点就尝试将其添加到 Chain 中编译,看是否有错误
- 如果遇到错误,可以通过检查 Parallel.err 字段来获取更早的错误信息(虽然这是内部字段,但在调试时可以临时访问)
9.4 下游节点输入类型
使用 Parallel 后,下一个节点会收到一个 map[string]any 类型的输入,其中键是你在 Parallel 中指定的输出键。
常见陷阱:
- 忘记下游节点需要处理 map 类型的输入
- 对 map 中的值类型进行不正确的类型断言
解决方法:
- 确保下游节点能够处理 map[string]any 类型的输入
- 在类型断言时使用逗号-ok 语法来避免 panic
- 考虑使用 Lambda 节点来进行类型安全的转换
9.5 并行执行的独立性
Parallel 中的所有节点都是独立执行的,它们之间不能直接通信或共享状态。
解决方法:
- 如果节点之间需要协调,考虑使用不同的模式,如顺序执行或使用 Graph 进行更复杂的编排
- 所有需要共享的信息都应该通过输入传递,或者在后续的聚合节点中处理
9.6 性能考虑
虽然 Parallel 可以并行执行多个操作,但这并不总是意味着性能会线性提升。
注意事项:
- 如果节点受限于相同的资源(如 API 速率限制、数据库连接池),并行执行可能不会带来预期的性能提升
- 并行执行会增加内存使用,因为需要同时保存所有节点的输入和中间状态
- 如果节点执行时间差异很大,总执行时间将由最慢的节点决定
优化建议:
- 考虑为不同的节点设置适当的超时
- 如果某些节点明显比其他节点慢,可能需要重新考虑是否应该将它们放在同一个 Parallel 中
- 对于非常大量的并行节点,考虑分批处理或使用其他模式
10. 总结
compose-chain_parallel 模块是 Eino 框架中一个强大而灵活的组件,它简化了在链式工作流中添加并行执行逻辑的过程。通过提供直观的 API 和处理底层并发细节,它使开发者能够轻松构建更高效、更强大的 AI 应用。
在本模块的设计中,我们看到了几个关键原则:
- 简洁性:提供简单直观的 API,隐藏复杂的并发细节
- 灵活性:支持多种类型的节点,并与其他结构良好组合
- 安全性:进行适当的验证,防止常见错误
- 一致性:与框架的其他部分保持一致的设计风格和使用模式
当你需要在工作流中同时执行多个独立操作时,compose-chain_parallel 是一个值得考虑的强大工具。通过合理使用它,你可以显著提升应用的性能和响应速度,同时保持代码的清晰和可维护性。