Skip to content

LangChain.js 模块分析 #1: Runnable 接口与实现(含流程图)

模块: langchain-core/runnables 文件: libs/langchain-core/src/runnables/分析日期: 2026-02-13 核心文件: base.ts (约 2800 行)


目录

  1. 核心概念
  2. RunnableInterface 接口
  3. Runnable 抽象类
  4. 核心实现类
  5. 完整执行流程图
  6. 配置系统
  7. 流式处理
  8. 追踪与调试
  9. 设计模式
  10. 使用示例
  11. 最佳实践

1. 核心概念

1.1 Runnable 是什么?

Runnable 是 LangChain.js 中最核心的抽象,代表一个"可执行单元"。

所有可执行组件(LLM、工具、链、Agent、检索器等)都实现了 Runnable 接口,这意味着:

  • 统一的执行模型
  • 统一的错误处理
  • 统一的配置方式
  • 统一的追踪和监控
  • 可以自由组合和嵌套

1.2 核心特性

  1. 可组合性: Runnable 可以嵌套组合,形成复杂的处理链
  2. 类型安全: 完整的 TypeScript 泛型支持
  3. 可流式: 支持异步迭代器输出
  4. 可批量: 支持批量执行
  5. 可配置: 运行时配置注入
  6. 可追踪: 内置 LangSmith 追踪支持
  7. 可序列化: 所有 Runnable 可序列化和反序列化

1.3 设计哲学

一切皆 Runnable,一切皆可组合

这种设计让 LangChain.js 具有极强的灵活性:

  • 任何两个 Runnable 可以用 pipe 连接
  • 任何 Runnable 可以包装、增强、重试
  • 任何 Runnable 可以批量执行
  • 任何 Runnable 可以流式输出

2. RunnableInterface 接口

2.1 接口定义

typescript
export interface RunnableInterface<
  RunInput,
  RunOutput,
  CallOptions extends RunnableConfig = RunnableConfig
> extends SerializableInterface {
  lc_serializable: boolean;

  // 核心方法
  invoke(input: RunInput, options?: Partial<CallOptions>): Promise<RunOutput>;

  batch(inputs: RunInput[], options?, batchOptions?): Promise<RunOutput[]>;

  stream(input: RunInput, options?): Promise<IterableReadableStreamInterface<RunOutput>>;

  transform(generator: AsyncGenerator<RunInput>, options?): AsyncGenerator<RunOutput>;

  getName(suffix?: string): string;
}

2.2 接口流程图

mermaid
graph TD
    A[用户调用] --> B[invoke]
    B --> C[执行逻辑]
    C --> D[返回结果]
    D --> E[用户接收]

    A --> F[batch调用]
    F --> G[并行执行]
    G --> H[批量返回结果]
    H --> E

    A --> I[stream调用]
    I --> J[创建迭代器]
    J --> K[流式输出]
    K --> L[实时返回]
    L --> E

    A --> M[transform调用]
    M --> N[转换生成器]
    N --> O[逐个处理]
    O --> P[异步生成输出]
    P --> E

    style A fill:#e1f5ff
    style B fill:#fff4e6
    style C fill:#d1fae5
    style D fill:#82c91a
    style E fill:#f59e0b
    style F fill:#8ec5fc
    style G fill:#8ec5fc
    style H fill:#f59e0b
    style I fill:#e1f5ff
    style J fill:#fff4e6
    style K fill:#fff4e6
    style L fill:#fff4e6
    style M fill:#e1f5ff
    style N fill:#fff4e6
    style O fill:#fff4e6
    style P fill:#fff4e6

3. Runnable 抽象类

3.1 类结构

typescript
export abstract class Runnable<
  RunInput,
  RunOutput,
  CallOptions extends RunnableConfig = RunnableConfig
> extends Serializable {
  protected lc_runnable = true;
  name?: string;

  // 抽象方法,必须由子类实现
  abstract invoke(input: RunInput, options?: Partial<CallOptions>): Promise<RunOutput>;

  // 提供默认实现的方法
  async batch(inputs, options?, batchOptions?): Promise<RunOutput[]>;
  async stream(input, options?): Promise<IterableReadableStreamInterface<RunOutput>>;
  async transform(generator: AsyncGenerator<RunInput>, options?): AsyncGenerator<RunOutput>;

  // 配置方法
  withConfig(config: Partial<CallOptions>): Runnable<RunInput, RunOutput>;
  withRetry(fields?): RunnableRetry<RunInput, RunOutput>;
  withFallbacks(fields): RunnableWithFallbacks<RunInput, RunOutput>;
  bind(kwargs): Runnable<RunInput, RunOutput>;

  // 流式方法
  async *streamLog(input, options?): AsyncGenerator<RunLog>;
  async *streamEvents(input, options?): AsyncGenerator<StreamEvent>;
}

3.2 Runnable 类流程图

mermaid
graph TB
    Start([创建Runnable实例]) --> Initialize[初始化]

    Initialize --> |A[基础设置]|
    |A --> A1[设置name]
    |A --> A2[设置lc_runnable]

    Initialize --> Configure[配置管理]

    Configure --> |B[应用配置]|
    |B --> B1[默认配置]
    |B --> B2[组件级配置]
    |B --> B3[运行时配置]

    Initialize --> Methods[核心方法]

    Methods --> |C[invoke - 抽象方法]|
    Methods --> |C1[检查配置]
    Methods --> |C2[准备回调管理]
    Methods --> |C3[准备运行环境]
    Methods --> |C4[调用具体实现]
    Methods --> |C5[返回结果]

    Methods --> |D[batch - 默认实现]|
    Methods --> |D1[获取选项列表]
    Methods --> |D2[应用并发限制]
    Methods --> |D3[并行调用invoke N次]
    Methods --> |D4[收集所有结果]
    Methods --> |D5[处理异常]

    Methods --> |E[stream - 默认实现]|
    Methods --> |E1[创建异步迭代器]
    Methods --> |E2[在迭代器中调用invoke]
    Methods --> |E3[逐个yield结果]
    Methods --> |E4[清理迭代器]

    Methods --> |F[transform - 默认实现]|
    Methods --> |F1[迭代输入生成器]
    Methods --> |F2[对每个输入调用invoke]
    Methods --> |F3[yield输出]

    Methods --> |G[增强方法]|
    Methods --> |G1[withConfig - 绑定配置]
    Methods --> |G2[withRetry - 添加重试]
    Methods --> |G3[withFallbacks - 添加降级]
    Methods --> |G4[bind - 绑定参数]

    Methods --> |H[流式增强]|
    Methods --> |H1[streamLog - 日志流]
    Methods --> |H2[streamEvents - 事件流]

    Start --> End([Runnable实例就绪])

    style A fill:#e1f5ff
    style B fill:#fff4e6
    style C fill:#d1fae5
    style D fill:#fff4e6
    style E fill:#f59e0b
    style F fill:#8ec5fc
    style G fill:#8ec5fc
    style H fill:#8ec5fc

4. 核心实现类

4.1 RunnableSequence - 顺序执行流程图

mermaid
graph TD
    Input[输入数据] --> Node1[Runnable1]
    Input --> Node2[Runnable2]
    Input --> Node3[Runnable3]

    Node1 --> |处理1|
    Node2 --> |处理2|
    Node3 --> |处理3|

    Node1 --> Output1[输出1]
    Node2 --> Output2[输出2]
    Node3 --> Output3[输出3]

    Output1 --> Merge[合并输出]
    Output2 --> Merge
    Output3 --> Merge
    Merge --> Final[最终输出]

    style Input fill:#e1f5ff
    style Node1 fill:#fff4e6
    style Node2 fill:#fff4e6
    style Node3 fill:#fff4e6
    style Output1 fill:#d1fae5
    style Output2 fill:#d1fae5
    style Output3 fill:#d1fae5
    style Merge fill:#82c91a
    style Final fill:#f59e0b

实现:

typescript
export class RunnableSequence<
  RunInput,
  RunOutput
> extends Runnable<RunInput, RunOutput> {
  steps: RunnableLike<RunInput, RunOutput>[];

  async invoke(input, options?) {
    let result = input;
    for (const step of this.steps) {
      result = await step.invoke(result, options);
    }
    return result;
  }
}

4.2 RunnableParallel - 并行执行流程图

mermaid
graph TD
    Input[输入数据] --> Split{分发输入}

    Split --> |Node1[Runnable1]|
    Split --> |Node2[Runnable2]|
    Split --> |Node3[Runnable3]|

    Node1 --> Exec1[并行执行]
    Node2 --> Exec2[并行执行]
    Node3 --> Exec3[并行执行]

    Exec1 --> Output1[输出1]
    Exec2 --> Output2[输出2]
    Exec3 --> Output3[输出3]

    Exec1 --> Wait1[等待]
    Exec2 --> Wait2[等待]
    Exec3 --> Wait3[等待]

    Wait1 --> Collect[收集结果]
    Wait2 --> Collect
    Wait3 --> Collect

    Collect --> Merge[合并结果]
    Merge --> Final[最终输出]

    style Input fill:#e1f5ff
    style Split fill:#82c91a
    style Node1 fill:#fff4e6
    style Node2 fill:#fff4e6
    style Node3 fill:#fff4e6
    style Exec1 fill:#d1fae5
    style Exec2 fill:#d1fae5
    style Exec3 fill:#d1fae5
    style Output1 fill:#d1fae5
    style Output2 fill:#d1fae5
    style Output3 fill:#d1fae5
    style Wait1 fill:#82c91a
    style Wait2 fill:#82c91a
    style Wait3 fill:#82c91a
    style Collect fill:#f59e0b
    style Merge fill:#8ec5fc
    style Final fill:#f59e0b

实现:

typescript
export class RunnableParallel<RunInput> extends RunnableMap<RunInput> {
  async invoke(input, options?) {
    const entries = await Promise.all(
      Object.entries(this.steps).map(async ([key, runnable]) => {
        const output = await runnable.invoke(input, options);
        return [key, output];
      })
    );
    return Object.fromEntries(entries);
  }
}

4.3 RunnableBranch - 条件分支流程图

mermaid
graph TD
    Input[输入数据] --> Condition[条件判断]

    Condition --> |Case1[条件1]|
    Condition --> |Case2[条件2]|
    Condition --> |Case3[条件3]|
    Condition --> |Default[默认]

    Case1 --> Node1[分支1]
    Case2 --> Node2[分支2]
    Case3 --> Node3[分支3]
    Default --> NodeDefault[默认分支]

    Node1 --> Output1[输出1]
    Node2 --> Output2[输出2]
    Node3 --> Output3[输出3]
    NodeDefault --> OutputDefault[默认输出]

    Output1 --> Merge[合并输出]
    Output2 --> Merge
    Output3 --> Merge
    OutputDefault --> Merge

    Merge --> Final[最终输出]

    style Input fill:#e1f5ff
    style Condition fill:#82c91a
    style Case1 fill:#fff4e6
    style Case2 fill:#fff4e6
    style Case3 fill:#fff4e6
    style NodeDefault fill:#fff4e6
    style Node1 fill:#d1fae5
    style Node2 fill:#d1fae5
    style Node3 fill:#d1fae5
    style OutputDefault fill:#d1fae5
    style Merge fill:#8ec5fc
    style Final fill:#f59e0b

实现:

typescript
export class RunnableBranch<RunInput, RunOutput> extends Runnable<RunInput, RunOutput> {
  constructor(
    condition: (input: RunInput) => string,
    branches: Record<string, Runnable<RunInput, RunOutput>>
  );

  async invoke(input, options?) {
    const branchName = this.condition(input);
    const runnable = this.branches[branchName];
    return runnable.invoke(input, options);
  }
}

4.4 RunnableRetry - 重试机制流程图

mermaid
graph TD
    Input[输入数据] --> Exec[尝试执行]

    Exec --> Check{检查结果}
    Check --> |Success[成功]|
    Check --> |Failure[失败]

    Success --> Output[返回结果]

    Failure --> Count{计数尝试}
    Count --> Compare{是否达到上限}

    Compare --> |Continue[继续重试]|
    Compare --> |Stop[停止重试]

    Continue --> Wait[等待延迟]
    Wait --> Retry{重新执行}
    Retry --> Exec

    Stop --> Final{返回错误]

    Exec --> Callback{调用失败回调}
    Callback --> Wait2[等待]

    Wait2 --> Continue2{继续重试}
    Continue2 --> Exec

    Output --> FinalOutput[最终输出或错误]

    style Input fill:#e1f5ff
    style Exec fill:#fff4e6
    style Check fill:#d1fae5
    style Success fill:#82c91a
    style Failure fill:#f59e0b
    style Count fill:#fff4e6
    style Compare fill:#82c91a
    style Continue fill:#8ec5fc
    style Stop fill:#f59e0b
    style Wait fill:#d1fae5
    style Wait2 fill:#d1fae5
    style Retry fill:#fff4e6
    style Callback fill:#8ec5fc
    style Continue2 fill:#8ec5fc
    style FinalOutput fill:#f59e0b
    style Final fill:#82c91a

实现:

typescript
export class RunnableRetry<RunInput, RunOutput> extends Runnable<RunInput, RunOutput> {
  bound: Runnable<RunInput, RunOutput>;
  maxAttemptNumber: number;
  onFailedAttempt?: (error, input, attempt) => void;

  async invoke(input, options?) {
    let lastError: Error | undefined;
    for (let attempt = 1; attempt <= this.maxAttemptNumber; attempt++) {
      try {
        return await this.bound.invoke(input, options);
      } catch (error) {
        lastError = error;
        if (this.onFailedAttempt) {
          await this.onFailedAttempt(error, input, attempt);
        }
        // 延迟后重试
        await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
      }
    }
    throw lastError;
  }
}

4.5 RunnableEach - 批量处理流程图

mermaid
graph TD
    Input[输入数组] --> Loop[循环处理]

    Loop --> |Item1[项目1]|
    Loop --> |Item2[项目2]|
    Loop --> |Item3[项目3]|
    Loop --> |...

    Item1 --> Exec[执行Runnable]
    Item2 --> Exec
    Item3 --> Exec

    Exec --> Collect1[收集结果1]
    Exec --> Collect2[收集结果2]
    Exec --> Collect3[收集结果3]

    Collect1 --> Array[结果数组]
    Collect2 --> Array
    Collect3 --> Array

    Array --> Output[返回数组]

    style Input fill:#e1f5ff
    style Loop fill:#82c91a
    style Item1 fill:#fff4e6
    style Item2 fill:#fff4e6
    style Item3 fill:#fff4e6
    style Exec fill:#d1fae5
    style Collect1 fill:#d1fae5
    style Collect2 fill:#d1fae5
    style Collect3 fill:#d1fae5
    style Array fill:#8ec5fc
    style Output fill:#f59e0b

实现:

typescript
export class RunnableEach<RunInput extends Iterable<any>, RunOutput> extends Runnable<RunInput, RunOutput[]> {
  bound: Runnable<any, RunOutput>;

  async invoke(input, options?) {
    const results: RunOutput[] = [];
    for (const item of input) {
      const result = await this.bound.invoke(item, options);
      results.push(result);
    }
    return results;
  }
}

5. 完整执行流程图

5.1 Runnable 完整执行流程

mermaid
graph TD
    Start[开始执行] --> Config[配置解析]

    Config --> Validate[验证配置]
    Validate --> |Valid[有效]|
    Validate --> |Invalid[无效]

    Invalid --> Error[返回错误]
    Valid --> Prepare[准备执行]

    Prepare --> |A[检查配置]|
    Prepare --> |B[准备环境]|
    Prepare --> |C[准备回调]|

    A --> Merge[合并所有配置]
    B --> Merge
    C --> Merge

    Merge --> Select{选择执行模式}

    Select --> |Mode1[invoke]|
    Select --> |Mode2[batch]|
    Select --> |Mode3[stream]|
    Select --> |Mode4[transform]|

    Mode1 --> Exec1[执行invoke]
    Mode2 --> Exec2[执行batch]
    Mode3 --> Exec3[执行stream]
    Mode4 --> Exec4[执行transform]

    Exec1 --> RunCore[核心运行]
    Exec2 --> RunCore
    Exec3 --> RunCore
    Exec4 --> RunCore

    RunCore --> |A[调用具体实现]|
    RunCore --> |B[错误处理]|
    RunCore --> |C[记录日志]|
    RunCore --> |D[追踪事件]|

    A --> Invoke{调用invoke方法}
    B --> TryCatch{try-catch错误}
    C --> Logging[日志系统]
    D --> Tracing[追踪系统]

    Invoke --> Result[获取结果]
    TryCatch --> |Success[成功]|
    TryCatch --> |Retry[重试]|

    Success --> Result
    Retry --> |Retry[重试]|
    Retry --> |Fallback[降级]|

    Result --> Output{返回输出}
    Retry --> Error{返回错误}
    Fallback --> Output

    Logging --> RunCore
    Tracing --> RunCore

    Output --> Final[最终返回]

    style Start fill:#e1f5ff
    style Config fill:#fff4e6
    style Validate fill:#d1fae5
    style Prepare fill:#d1fae5
    style Merge fill:#82c91a
    style Select fill:#8ec5fc
    style Exec1 fill:#fff4e6
    style Exec2 fill:#fff4e6
    style Exec3 fill:#fff4e6
    style Exec4 fill:#fff4e6
    style RunCore fill:#d1fae5
    style Mode1 fill:#fff4e6
    style Mode2 fill:#fff4e6
    style Mode3 fill:#fff4e6
    style Mode4 fill:#fff4e6
    style Invoke fill:#e1f5ff
    style TryCatch fill:#f59e0b
    style Result fill:#82c91a
    style Retry fill:#8ec5fc
    style Fallback fill:#8ec5fc
    style Error fill:#f59e0b
    style Logging fill:#82c91a
    style Tracing fill:#8ec5fc
    style Output fill:#f59e0b
    style Final fill:#f59e0b

6. 配置系统

6.1 配置合并流程图

mermaid
graph TD
    Input[用户调用] --> Configs{多层配置}

    Configs --> |Default[默认配置]|
    Configs --> |Component[组件配置]|
    Configs --> |Runtime[运行时配置]|

    Default --> Merge1[第一层合并]
    Component --> Merge2[第二层合并]
    Runtime --> Merge3[第三层合并]

    Merge1 --> FinalConfig[最终配置]
    Merge2 --> Merge3
    Merge3 --> FinalConfig

    FinalConfig --> Execute[使用配置执行]

    Execute --> Output[返回结果]

    style Input fill:#e1f5ff
    style Configs fill:#82c91a
    style Default fill:#d1fae5
    style Component fill:#fff4e6
    style Runtime fill:#fff4e6
    style Merge1 fill:#fff4e6
    style Merge2 fill:#fff4e6
    style Merge3 fill:#fff4e6
    style FinalConfig fill:#8ec5fc
    style Execute fill:#d1fae5
    style Output fill:#f59e0b

配置优先级:

运行时配置 > 组件级配置 > 默认配置

6.2 配置系统流程

mermaid
graph TD
    Start[配置系统] --> Config1[创建默认配置]

    Config1 --> |A[langchain配置]|
    Config1 --> |B[组件配置]|
    Config1 --> |C[环境变量]|

    A --> |A1[模型参数]|
    A --> |A2[系统设置]|
    A --> |A3[追踪配置]|

    B --> |B1[组件参数]|
    B --> |B2[超时设置]|

    C --> |C1[环境变量读取]|
    C --> |C2[动态配置]|

    A1 --> Merge1[合并基础配置]
    A2 --> Merge2
    A3 --> Merge1

    B1 --> Merge2
    B2 --> Merge1

    C1 --> Merge3
    C2 --> Merge1

    Merge1 --> MergeBase[基础合并]
    Merge2 --> Merge1
    Merge3 --> Merge1

    MergeBase --> Override[运行时配置]

    Override --> Final[最终配置]

    Final --> Validate[验证配置]
    Validate --> Valid{有效}

    Valid --> Use[使用配置执行]

    style Start fill:#e1f5ff
    style Config1 fill:#fff4e6
    style A fill:#d1fae5
    style B fill:#fff4e6
    style C fill:#fff4e6
    style A1 fill:#fff4e6
    style A2 fill:#fff4e6
    style A3 fill:#fff4e6
    style B1 fill:#fff4e6
    style B2 fill:#fff4e6
    style C1 fill:#fff4e6
    style C2 fill:#fff4e6
    style Merge1 fill:#fff4e6
    style Merge2 fill:#fff4e6
    style Merge3 fill:#fff4e6
    style MergeBase fill:#8ec5fc
    style Override fill:#82c91a
    style Validate fill:#82c91a
    style Use fill:#f59e0b

7. 流式处理

7.1 流式处理流程图

mermaid
graph TD
    Input[输入] --> Stream[创建流]

    Stream --> AsyncIterator[异步迭代器]
    AsyncIterator --> |Item1[项目1]|
    AsyncIterator --> |Item2[项目2]|

    Item1 --> Process1[处理1]
    Item2 --> Process2[处理2]

    Process1 --> Yield1[yield输出1]
    Process2 --> Yield2[yield输出2]

    Yield1 --> Consumer[消费者接收]
    Yield2 --> Consumer

    Consumer --> |Async[异步处理]|
    Consumer --> |Sync[同步处理]|

    Async --> |Event[触发事件]|
    Sync --> |Event2[触发事件2]|

    Event --> Log[记录日志]
    Event2 --> Log

    Log --> StreamEnd[流结束]
    Log --> StreamEnd

    StreamEnd --> Final[完成]

    style Input fill:#e1f5ff
    style Stream fill:#82c91a
    style AsyncIterator fill:#fff4e6
    style Item1 fill:#d1fae5
    style Item2 fill:#d1fae5
    style Process1 fill:#fff4e6
    style Process2 fill:#fff4e6
    style Yield1 fill:#e1f5ff
    style Yield2 fill:#e1f5ff
    style Consumer fill:#d1fae5
    style Async fill:#d1fae5
    style Sync fill:#d1fae5
    style Event fill:#8ec5fc
    style Event2 fill:#8ec5fc
    style Log fill:#82c91a
    style Log2 fill:#82c91a
    style StreamEnd fill:#f59e0b
    style Final fill:#f59e0b

7.2 流式类型

typescript
// 异步迭代器
for await (const chunk of stream) {
  console.log(chunk);
}

// 转换为同步数组
const chunks = [];
for await (const chunk of stream) {
  chunks.push(chunk);
}

8. 追踪与调试

8.1 追踪系统流程图

mermaid
graph TD
    Start[开始执行] --> Trace{追踪初始化}

    Trace --> |A[创建Tracer]|
    Trace --> |B[获取配置]|
    Trace --> |C[设置回调]|

    A --> Register[注册追踪器]
    B --> Register
    C --> Register

    Register --> Run{运行任务}

    Run --> |A[记录开始]|
    Run --> |B[记录中间]|
    Run --> |C[记录结束]|

    A --> EventStart{触发start事件}
    B --> EventEnd{触发end事件}
    C --> EventError{触发error事件}

    A --> |D[传递给LLM]|
    B --> |D[记录输出]|
    C --> |D[记录错误]|

    D --> |D1[new_token事件]|
    D --> |D2[完成事件]|
    D --> |D3[错误事件]|

    EventStart --> LangSmith[LangSmith追踪]
    EventEnd --> LangSmith
    EventError --> LangSmith

    D1 --> Log{记录到日志}
    D2 --> Finalize[最终化]
    D3 --> Finalize

    Finalize --> Complete[完成]

    style Start fill:#e1f5ff
    style Trace fill:#82c91a
    style A fill:#fff4e6
    style B fill:#fff4e6
    style C fill:#fff4e6
    style Register fill:#8ec5fc
    style Run fill:#d1fae5
    style A fill:#e1f5ff
    style B fill:#fff4e6
    style C fill:#fff4e6
    style D fill:#d1fae5
    style EventStart fill:#f59e0b
    style EventEnd fill:#f59e0b
    style EventError fill:#f59e0b
    style LangSmith fill:#82c91a
    style Log fill:#fff4e6
    style D1 fill:#e1f5ff
    style D2 fill:#d1fae5
    style D3 fill:#e1f5ff
    style Finalize fill:#8ec5fc
    style Complete fill:#f59e0b

8.2 StreamEvent 数据结构

typescript
interface StreamEvent {
  event: "start" | "end" | "error" | "new_token" | "metadata";
  data: any;
  name: string;
  tags: string[];
  runId: string;
  timestamp: number;
}

9. 设计模式

9.1 策略模式

mermaid
graph TD
    Input[输入] --> Strategy[选择策略]

    Strategy --> |A[顺序执行]|
    Strategy --> |B[并行执行]|
    Strategy --> |C[条件分支]|

    A --> Sequence[RunnableSequence]
    B --> Parallel[RunnableParallel]
    C --> Branch[RunnableBranch]

    Sequence --> |Step1[步骤1]|
    Sequence --> |Step2[步骤2]|
    Sequence --> |Step3[步骤3]|

    Step1 --> Output[最终输出]
    Step2 --> Output
    Step3 --> Output

    Parallel --> |Exec1[并行1]|
    Parallel --> |Exec2[并行2]|
    Parallel --> |Exec3[并行3]|

    Exec1 --> Collect[收集结果]
    Exec2 --> Collect
    Exec3 --> Collect

    Collect --> Merge[合并结果]
    Merge --> Output

    Branch --> Condition{条件判断}
    Condition --> |Branch1[分支1]|
    Condition --> |Branch2[分支2]|
    Condition --> |BranchDefault[默认]|

    Branch1 --> Exec1[执行分支1]
    Branch2 --> Exec2[执行分支2]
    BranchDefault --> ExecDefault[执行默认]

    Exec1 --> Output
    Exec2 --> Output
    ExecDefault --> Output

    style Input fill:#e1f5ff
    style Strategy fill:#82c91a
    style Sequence fill:#fff4e6
    style Step1 fill:#d1fae5
    style Step2 fill:#d1fae5
    style Step3 fill:#d1fae5
    style Parallel fill:#fff4e6
    style Exec1 fill:#8ec5fc
    style Exec2 fill:#8ec5fc
    style Exec3 fill:#8ec5fc
    style Collect fill:#d1fae5
    style Condition fill:#f59e0b
    style Branch1 fill:#fff4e6
    style Branch2 fill:#fff4e6
    style BranchDefault fill:#fff4e6
    style Output fill:#f59e0b

9.2 装饰器模式

mermaid
graph TD
    Input[Runnable实例] --> Wrapper[装饰器]

    Wrapper --> |A[Config装饰器]|
    Wrapper --> |B[Retry装饰器]|
    Wrapper --> |C[Fallback装饰器]|

    A --> Configured[配置绑定]
    B --> Retrying[重试机制]
    C --> Fallbacks[降级机制]

    Configured --> Use[使用配置]
    Retrying --> |Success[成功]|
    Retrying --> |Retry[继续重试]|
    Retrying --> |Failure[失败]|

    Fallbacks --> |Primary[主要]|
    Fallbacks --> |Backup1[备选1]|
    Fallbacks --> |Backup2[备选2]|

    Use --> Run[运行]
    Success --> Output[返回结果]
    Retry --> Run
    Failure --> Fallbacks

    Fallbacks --> |Try1[尝试主要]|
    Fallbacks --> |Try2[尝试备选1]|
    Fallbacks --> |Try3[尝试备选2]|

    Try1 --> |Success1[成功1]|
    Try2 --> |Success2[成功2]|
    Try3 --> |Error[全部失败]|

    Success1 --> Output
    Success2 --> Output
    Error --> Error[返回错误]

    style Input fill:#e1f5ff
    style Wrapper fill:#82c91a
    style A fill:#fff4e6
    style B fill:#fff4e6
    style C fill:#fff4e6
    style Configured fill:#8ec5fc
    style Retrying fill:#f59e0b
    style Fallbacks fill:#8ec5fc
    style Use fill:#d1fae5
    style Run fill:#e1f5ff
    style Success fill:#82c91a
    style Retry fill:#8ec5fc
    style Success1 fill:#82c91a
    style Success2 fill:#82c91a
    style Error fill:#f59e0b
    style Output fill:#f59e0b

10. 使用示例

10.1 简单链式调用

typescript
import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core";

const prompt = PromptTemplate.fromTemplate(
  "Tell me a joke about {topic}"
);

const llm = new ChatOpenAI({ model: "gpt-4" });

const chain = prompt.pipe(llm);

const result = await chain.invoke({ topic: "programming" });

流程图:

mermaid
graph LR
    Input[输入: topic=programming] --> Prompt[模板处理]
    Prompt --> LLM[调用GPT-4]
    LLM --> Output[返回笑话]

10.2 复杂工作流

typescript
const chain = new RunnableSequence([
  loadDocument,
  splitIntoChunks,
  RunnableParallel.from({
    summary: summarizeChain,
    analysis: analyzeChain
  })
]);

流程图:

mermaid
graph TD
    Input[文档] --> Load[加载文档]
    Load --> Split[分割文档]

    Split --> Chunk1[块1]
    Split --> Chunk2[块2]
    Split --> Chunk3[块3]

    Chunk1 --> |Sum[总结]|
    Chunk2 --> |Ana[分析]|
    Chunk3 --> |Facts[提取事实]|

    Sum --> Summary[摘要]
    Ana --> Analysis[分析]
    Facts --> FactList[事实列表]

    Summary --> Merge[合并结果]
    Analysis --> Merge
    FactList --> Merge

    Merge --> Output[最终输出]

    style Input fill:#e1f5ff
    style Load fill:#fff4e6
    style Split fill:#82c91a
    style Chunk1 fill:#d1fae5
    style Chunk2 fill:#d1fae5
    style Chunk3 fill:#d1fae5
    style Sum fill:#8ec5fc
    style Ana fill:#8ec5fc
    style Facts fill:#8ec5fc
    style Summary fill:#fff4e6
    style Analysis fill:#fff4e6
    style FactList fill:#fff4e6
    style Merge fill:#f59e0b
    style Output fill:#f59e0b

11. 最佳实践

11.1 性能优化

  1. 使用批量调用
typescript
// ❌ 多次调用
const results = [];
for (const input of inputs) {
  results.push(await llm.invoke(input));
}

// ✅ 批量调用
const results = await llm.batch(inputs);
  1. 使用并发
typescript
const results = await Promise.all([
  chain1.invoke(input),
  chain2.invoke(input),
  chain3.invoke(input)
]);
  1. 使用流式输出
typescript
for await (const chunk of stream) {
  // 实时处理每个 chunk
}

11.2 错误处理

  1. 使用重试
typescript
const resilientLlm = llm.withRetry({
  stopAfterAttempt: 3
});
  1. 使用降级
typescript
const fallbackLlm = llm.withFallbacks([backup1, backup2]);

总结

Runnable 接口是 LangChain.js 的核心抽象,提供了:

  1. 统一的执行模型: 所有组件共享相同的接口
  2. 强大的组合能力: pipe、sequence、parallel、branch
  3. 丰富的增强功能: retry、fallback、config
  4. 完整的流式支持: stream、transform
  5. 内置追踪: LangSmith 集成
  6. 类型安全: 完整的 TypeScript 泛型支持

关键设计原则:

  • 一切皆 Runnable
  • 一切皆可组合
  • 一切皆可配置
  • 一切皆可追踪

这种设计让 LangChain.js 具有极强的灵活性和扩展性,是构建复杂 AI 应用的理想框架。


✅ 模块 1 分析完成: Runnable 接口与实现(含完整流程图)📊 总进度: 1/6 (16.7%)