大模型学习笔记:基于OpenAI API的机票预订对话系统实现

2026年01月13日21 次阅读0 人喜欢
AILLMTypeScriptNode.jsOpenAI对话系统流式处理

大模型学习笔记:基于OpenAI API的机票预订对话系统实现

项目概述

最近在学习大模型相关的开发,创建了一个AI学习代码笔记仓库。本文记录了一个基于OpenAI API实现的机票预订对话系统,该系统通过结构化的XML标签实现了多轮对话的信息收集和确认流程。

技术栈

  • Node.js + TypeScript: 项目主要使用TypeScript开发
  • OpenAI SDK: 使用官方OpenAI SDK进行API调用
  • SiliconFlow: 作为OpenAI API的代理服务,支持多种模型
  • Vue 3: 使用Vue的响应式系统管理对话状态
  • eventsource-parser: 用于解析SSE格式的流式响应

核心功能实现

1. OpenAI Provider封装

首先封装了OpenAI客户端的创建逻辑,支持通过环境变量配置API密钥和基础URL:

typescript 复制代码
export const createOpenAI = () => new OpenAI({
  apiKey: process.env.SILICONFLOW_API_KEY,
  baseURL: process.env.SILICONFLOW_BASE_URL,
});

支持多种模型配置,包括DeepSeek和Qwen模型。

2. 结构化对话提示词设计

系统使用XML标签来结构化对话流程:

  • <question>: AI向用户提问
  • <answer>: 用户回答
  • <finish>: 对话结束标记

提示词设计要点:

  • 明确角色定义:AI是机票预订助手
  • 流程规范:必须收集完整信息后才能确认
  • 确认机制:所有信息必须向用户确认后才能完成预订

3. 两种响应模式

非流式响应(Await模式)

typescript 复制代码
export async function orderFlyTickAwait() {
  const openai = createOpenAI();
  // 获取用户输入
  const question = await input({ message: orderFlyTickPrompt.value[1].content as string });
  orderFlyTickPrompt.value.push({ role: "user", content: `<answer>${question}</answer>` });

  while (true) {
    const response = await openai.chat.completions.create({
      model: defaultModel,
      messages: orderFlyTickPrompt.value,
    });
    const content = response.choices[0].message.content;
    if (content) {
      if (content.includes('<question>')) {
        const question = content.match(/<question>(.*?)<\/question>/)?.[1];
        const finish = content.match(/<finish>(.*?)<\/finish>/)?.[1];
        if (question) {
          orderFlyTickPrompt.value.push({ role: "assistant", content: `<question>${question}</question>` });
          const newAnswer = readlineSync.question(question);
          orderFlyTickPrompt.value.push({ role: "user", content: `<answer>${newAnswer}</answer>` });
        }
        if (finish) {
          orderFlyTickPrompt.value.push({ role: "assistant", content: `<finish>${finish}</finish>` });
          console.log(finish);
          break
        }
      }
    }
  }
}

特点:

  • 等待完整响应后再处理
  • 使用正则表达式解析XML标签
  • 适合需要完整响应的场景

流式响应(Stream模式)

在流式模式下,我们需要实时处理分块到达的数据。最初的实现可能需要手动缓冲和解析,但为了更好的代码复用和可维护性,我们实现了一个专门的流式XML解析器。

4. XML标签解析工具

实现了一个通用的标签解析工具 parseTag.ts,核心功能:

typescript 复制代码
export const clearTag = (content: string, tagName: string, options?: ParseTagOptions) => {
  let buffer = '';
  let startTagName = '<' + tagName + '>';
  let endTagName = '</' + tagName + '>';
  
  if (content.startsWith(startTagName)) {
    // 去除开头的标签
    buffer = content.substring(startTagName.length);
    
    // 处理不完整的结束标签
    const lastTagIndex = buffer.lastIndexOf('<');
    if (lastTagIndex !== -1) {
      const potentialTag = buffer.substring(lastTagIndex);
      const isIncomplete = potentialTag.length < endTagName.length ||
        (potentialTag.length >= endTagName.length &&
          potentialTag.substring(0, endTagName.length) !== endTagName);
      if (isIncomplete) {
        buffer = buffer.substring(0, lastTagIndex);
      }
    }
    
    options?.onMessage?.(buffer);
  }
  
  return content.replace(startTagName, '').replace(endTagName, '');
}

关键特性:

  • 支持流式数据的不完整标签处理
  • 提供回调机制(onMessage、onFinish、onChunk)
  • 自动清理标签,提取纯文本内容

5. 流式XML解析器(StreamXMLParser)

为了解决流式响应中XML标签解析的复杂性,我们实现了一个专门的流式XML解析器 StreamXMLParser

设计思路

流式解析的挑战在于:

  1. XML标签可能被分割到多个数据块中
  2. 需要在标签完整前就能输出部分内容(渐进式输出)
  3. 需要准确管理解析状态,处理多种标签类型

核心实现

typescript 复制代码
export class StreamXMLParser {
  private buffer = "";
  private state: "IDLE" | "QUESTION" | "ANSWER" | "FINISH" = "IDLE";
  private questionPrintedLen = 0;

  constructor(private handlers: {
    onQuestionChunk?: (text: string) => void;
    onQuestionFinish?: (text: string) => void;
    onAnswerFinish?: (text: string) => void;
    onFinish?: (text: string) => void;
  }) { }

  push(chunk: string) {
    this.buffer += chunk;
    this.parse();
  }
}

状态机设计

  • IDLE: 空闲状态,等待标签开始
  • QUESTION: 正在解析问题标签
  • ANSWER: 正在解析答案标签
  • FINISH: 正在解析完成标签

IDLE状态处理

在IDLE状态,解析器会查找所有可能的标签开始位置,选择最早出现的标签:

typescript 复制代码
if (this.state === "IDLE") {
  const qStart = this.buffer.indexOf("<question>");
  const aStart = this.buffer.indexOf("<answer>");
  const fStart = this.buffer.indexOf("<finish>");

  const next = Math.min(
    ...[qStart, aStart, fStart].filter(i => i !== -1)
  );

  if (next === Infinity) return;

  if (next === qStart) {
    this.buffer = this.buffer.slice(qStart + 10);
    this.state = "QUESTION";
  } else if (next === aStart) {
    this.buffer = this.buffer.slice(aStart + 8);
    this.state = "ANSWER";
  } else if (next === fStart) {
    this.buffer = this.buffer.slice(fStart + 8);
    this.state = "FINISH";
  }
}

QUESTION状态处理(渐进式输出)

这是最复杂的部分,需要支持在标签完整前就能输出内容:

typescript 复制代码
if (this.state === "QUESTION") {
  const end = this.buffer.indexOf("</question>");

  if (end === -1) {
    // 标签未完整,尝试输出新增部分
    const delta = this.buffer.slice(this.questionPrintedLen);
    const lt = delta.indexOf("<");

    if (lt !== -1) {
      // 发现新的标签开始,只输出安全部分
      const safe = delta.slice(0, lt);
      if (safe) {
        this.handlers.onQuestionChunk?.(safe);
        this.questionPrintedLen += safe.length;
      }
      return; // 等更多 chunk
    }

    if (delta) {
      this.handlers.onQuestionChunk?.(delta);
      this.questionPrintedLen = this.buffer.length;
    }
    return;
  }

  // 标签完整,调用完成回调
  const content = this.buffer.slice(0, end);
  this.handlers.onQuestionFinish?.(content);

  this.buffer = this.buffer.slice(end + 11);
  this.questionPrintedLen = 0;
  this.state = "IDLE";
  continue;
}

关键点

  • 使用 questionPrintedLen 追踪已输出的内容长度
  • 当检测到新的 < 符号时,停止输出,避免输出不完整的标签
  • 只有在标签完整时才调用 onQuestionFinish

使用示例

typescript 复制代码
export async function orderFlyTickStreamXML() {
  const openai = createOpenAI();
  const welcomeMessage = clearTag(orderFlyTickPrompt.value[1].content as string, 'question');
  const question = await input({ message: welcomeMessage });
  orderFlyTickPrompt.value.push({ role: "user", content: `<answer>${question}</answer>` });

  while (true) {
    const response = await openai.chat.completions.create({
      model: QWEN_MODEL,
      messages: orderFlyTickPrompt.value,
      stream: true,
    });

    const parser = new StreamXMLParser({
      onQuestionChunk: (t) => process.stdout.write(t),
      onQuestionFinish: async (question) => {
        console.log("\n? 问题:", question);

        orderFlyTickPrompt.value.push({
          role: "assistant",
          content: `<question>${question}</question>`
        });

        const answer = readlineSync.question("> ");
        orderFlyTickPrompt.value.push({
          role: "user",
          content: `<answer>${answer}</answer>`
        });
      },
      onFinish: (msg) => {
        console.log("✅", msg);
      }
    });

    for await (const chunk of response) {
      parser.push(chunk.choices[0].delta.content ?? "");
    }
  }
}

工作流程

  1. 创建解析器,配置各种回调函数
  2. 从OpenAI API接收数据块
  3. 将每个数据块推送给解析器
  4. 问题内容实时显示在终端(通过 onQuestionChunk
  5. 问题完整后触发 onQuestionFinish,等待用户输入
  6. 继续下一轮对话,直到完成

优势

  • 实时性:用户可以看到问题的实时生成过程
  • 结构化:使用XML标签清晰分隔不同类型的内容
  • 可靠性:状态机设计保证了解析的正确性
  • 可扩展:可以轻松扩展支持更多标签类型

6. SSE格式流式处理

在尝试了JSON Lines格式后,我们发现SSE(Server-Sent Events)格式更适合流式处理。SSE格式有明确的事件边界,使用专门的解析库可以很好地处理流式数据。

SSE格式的优势

  1. 明确的事件边界:通过空行分隔事件,易于识别
  2. 标准格式:SSE是Web标准,有成熟的解析库
  3. 流式友好:即使数据被分割,也能正确识别事件边界
  4. 易于扩展:可以轻松添加新的事件类型

提示词设计

在提示词中明确要求模型按照SSE格式输出:

复制代码
你必须严格按照 SSE(Server-Sent Events)格式输出。

规则:
1. 每个事件必须包含 event: 和 data:
2. 每一行必须以换行符 \n 结尾
3. 每个事件结束后,必须输出一个额外的空行(即两个连续的 \n)
4. 禁止输出任何 event / data 之外的文本
5. 禁止解释、注释、前后缀

正确示例(注意空行):

event: question
data: 你从哪个城市出发呢?

event: finish
data: 预定成功

使用 eventsource-parser 解析

使用 eventsource-parser 库可以很好地处理流式SSE数据:

typescript 复制代码
import { createParser } from 'eventsource-parser';

export async function orderFlyTickStreamSSE() {
  const openai = createOpenAI();
  const response = await openai.chat.completions.create({
    model: QWEN_MODEL,
    messages: orderFlyTickPromptSSE.value,
    stream: true,
  });

  let inData = false;
  let dataBuffer = "";
  let pendingQuestion: string | undefined = undefined;

  const parser = createParser({
    onEvent: (event) => {
      if (event.event === 'question') {
        const question = event.data;
        orderFlyTickPromptSSE.value.push({ 
          role: "assistant", 
          content: `event: question\ndata: ${question}` 
        });
        pendingQuestion = question;
      }
      if (event.event === 'finish') {
        const finish = event.data;
        orderFlyTickPromptSSE.value.push({ 
          role: "assistant", 
          content: `event: finish\ndata: ${finish}` 
        });
        isFinish = true;
      }
    },
    onError: event => {
      console.error('error', event)
    }
  });

  for await (const chunk of response) {
    const text = chunk.choices[0].delta?.content;
    if (text) {
      parser.feed(text);
      dataBuffer += text;

      // 检测是否进入data部分,用于实时显示
      if (dataBuffer.includes("data: ")) {
        inData = true;
        dataBuffer = "";
      }

      // 实时显示data部分的内容
      if (inData) {
        process.stdout.write(text);
      }
    }
    
    if (pendingQuestion || isFinish) {
      dataBuffer = "";
      parser.reset();
      break;
    }
  }

  if (pendingQuestion) {
    const answer = await input({ message: pendingQuestion });
    orderFlyTickPromptSSE.value.push({ 
      role: "user", 
      content: `event: answer\ndata: ${answer}` 
    });
  }
}

工作流程

  1. 创建解析器:使用 createParser 创建SSE解析器,配置事件回调
  2. 流式接收:从OpenAI API接收数据块
  3. 解析处理:将每个数据块通过 parser.feed() 推送给解析器
  4. 事件触发:当解析器识别到完整事件时,触发 onEvent 回调
  5. 实时显示:在检测到 data: 后,实时显示内容
  6. 交互处理:问题完整后等待用户输入,继续下一轮对话

与JSON Lines的对比

特性 JSON Lines SSE格式
事件边界识别 需要按行分割,手动解析 通过空行自动识别
解析库支持 需要手动实现 有成熟的 eventsource-parser
流式处理 需要手动缓冲和拼接 解析器自动处理
格式规范 依赖模型严格遵循 标准格式,易于识别
实现复杂度 较高 较低

经验总结

  1. SSE格式更适合流式处理:有明确的事件边界,解析库成熟
  2. 使用专业解析库eventsource-parser 能够很好地处理不完整的数据块
  3. 提示词设计重要:明确要求格式,包括空行等细节
  4. 实时显示策略:可以通过检测 data: 关键字来实时显示内容

7. 对话状态管理

使用Vue的响应式系统管理对话历史:

typescript 复制代码
export const orderFlyTickPrompt = ref<ChatCompletionMessageParam[]>([
  {
    role: "system", 
    content: `你是一个飞机票预定的助手...`
  },
  { 
    role: "assistant", 
    content: `<question>你好,我是飞机票预定助手,请问有什么可以帮你的吗?</question>` 
  }
]);

watchEffect(() => {
  fs.writeFileSync(filePath, JSON.stringify(orderFlyTickPrompt.value, null, 2));
});

特点:

  • 自动持久化对话历史到JSON文件
  • 响应式更新,便于调试和恢复

学习收获

  1. 结构化提示词设计: 通过XML标签实现结构化的对话流程,使AI能够按照预定流程工作

  2. 流式处理技巧: 学会了如何处理流式响应中的不完整数据,特别是XML标签的边界处理

  3. 状态机设计: 通过状态机模式管理复杂的解析流程,提高了代码的可维护性和可靠性

  4. 渐进式输出: 理解了如何在流式处理中实现渐进式输出,提升用户体验

  5. SSE格式处理: 学会了使用SSE格式和专门的解析库来处理流式结构化数据

  6. 多轮对话管理: 理解了如何在多轮对话中维护上下文,确保信息收集的完整性

  7. 错误处理: 在流式处理中需要考虑各种边界情况,如不完整的标签、网络中断等

技术难点与解决方案

难点1: 流式数据中的不完整标签处理

问题: 在流式响应中,XML标签可能被分割到多个chunk中,需要正确识别和处理。

解决方案:

  • 使用buffer累积数据
  • 检测最后一个<的位置,判断标签是否完整
  • 对于不完整的标签,暂时不处理,等待更多数据
  • 使用状态机管理解析状态,确保状态转换的正确性

难点2: 渐进式输出的实现

问题: 如何在XML标签完整前就能输出部分内容,同时避免输出不完整的标签。

解决方案:

  • 使用 questionPrintedLen 追踪已输出的内容长度
  • 检测新数据中的 < 符号,如果发现可能是新标签的开始,只输出安全部分
  • 只有在标签完整时才触发完成回调

难点3: 对话流程控制

问题: 如何确保AI按照预定流程(收集信息→确认→完成)执行。

解决方案:

  • 在system prompt中明确流程要求
  • 使用结构化标签强制AI按照格式输出
  • 在代码中解析标签,控制对话流程

难点4: 流式结构化数据解析

问题: 如何在流式响应中解析结构化数据(JSON、SSE等),处理数据被分割的情况。

解决方案:

  • 方案一(XML): 实现自定义解析器,使用状态机管理解析状态
  • 方案二(SSE): 使用成熟的解析库(如 eventsource-parser),自动处理不完整数据
  • 方案三(JSON Lines): 使用buffer累积数据,按行分割后解析

踩坑记录

踩坑1: DeepSeek模型在Stream模式下的思考过程

问题描述:
使用OpenAI SDK调用DeepSeek模型时,如果开启了stream: true模式,模型一定会输出思考过程(reasoning),delta.reasoning_content字段会有值,且无法通过参数关闭

具体表现:

typescript 复制代码
const response = await openai.chat.completions.create({
  model: 'deepseek-chat', // DeepSeek模型
  messages: orderFlyTickPrompt.value,
  stream: true,
});

for await (const chunk of response) {
  const delta = chunk.choices[0].delta;
  console.log('reasoning_content:', delta.reasoning_content); // 总是有值
  console.log('content:', delta.content); // 实际回复内容
}

影响:

  1. 流式响应中会先输出思考过程,然后才输出实际内容
  2. 增加了响应时间和token消耗
  3. 如果只关注delta.content,可能会忽略思考过程的数据流
  4. 无法通过API参数(如reasoning: false)来关闭思考过程

解决方案:

  1. 方案一: 如果不需要思考过程,在流式处理时只关注delta.content字段,忽略delta.reasoning_content
  2. 方案二: 如果确实需要思考过程,可以分别处理两个字段,将思考过程和实际内容分开展示
  3. 方案三: 对于不需要思考过程的场景,使用非流式模式(stream: false),或者切换到其他不支持思考过程的模型(如Qwen)

代码示例:

typescript 复制代码
for await (const chunk of response) {
  const delta = chunk.choices[0].delta;
  
  // 只处理实际内容,忽略思考过程
  if (delta.content) {
    buffer += delta.content;
    process.stdout.write(delta.content);
  }
  
  // 如果需要记录思考过程,可以单独处理
  if (delta.reasoning_content) {
    // 思考过程处理逻辑
    console.debug('Reasoning:', delta.reasoning_content);
  }
}

注意事项:

  • 这是DeepSeek模型的特性,不是bug
  • 思考过程有助于理解模型的推理逻辑,但会增加响应时间
  • 在生产环境中,如果对响应速度有要求,建议使用非流式模式或切换模型

踩坑2: JSON Lines(NDJSON)格式在流式响应中的误解

问题描述:
在尝试使用JSON Lines(NDJSON)格式进行流式输出时,我误以为在提示词中要求模型按照NDJSON格式输出,开启stream后每个chunk能够返回完整的JSON对象。实际上,流式响应仍然是按token返回的,每个chunk只包含部分内容,无法直接解析为完整的JSON。

具体表现:
在提示词中明确要求:

复制代码
你必须遵守以下协议进行输出:
- 使用 JSON Lines(NDJSON)协议
- 每一行必须是一个完整 JSON
- 每生成一个最小语义单位(一个字或一个词),就立刻输出一行 JSON
- 不允许重复之前的内容
- 不允许输出未完成的 JSON
- 每一行以 \n 结尾

期望的输出格式:

json 复制代码
{"event":"question","delta":"你"}\n
{"event":"question","delta":"从"}\n
{"event":"question","delta":"哪"}\n
{"event":"question","delta":"出发"}\n
{"event":"question","delta":"?"}\n
{"event":"question","done":true}\n

实际代码尝试:

typescript 复制代码
export async function orderFlyTickStreamJSONEventStream() {
  const response = await openai.chat.completions.create({
    model: process.env.OPEN_ROUTER_GPT as string,
    messages: orderFlyTickPromptJSONEventStream.value,
    stream: true,
  });
  
  for await (const chunk of response) {
    const content = chunk.choices[0].delta.content;
    if (content) {
      try {
        // 期望每个chunk是完整的JSON,但实际上只是部分token
        const json = JSON.parse(content);
        console.log(json, 'json');
      } catch (error) {
        console.error(error, 'error', chunk);
        // 频繁报错:Unexpected token in JSON
      }
    }
  }
}

实际情况:

  1. 流式响应仍然是按token粒度返回的,每个chunk可能只包含几个字符
  2. 即使模型"理解"了NDJSON格式要求,输出时仍然是一个token一个token地生成
  3. 单个chunk可能只包含 {"event" 这样的片段,无法直接解析为JSON
  4. JSON对象可能被分割到多个chunk中,需要手动缓冲和拼接

根本原因:

  • 流式响应的本质是按token返回,这是API层面的行为,不受提示词控制
  • 模型虽然可以"理解"格式要求,但生成过程仍然是逐token的
  • 提示词只能影响模型输出的内容结构,不能改变API的返回方式

解决方案:

  1. 方案一: 使用buffer累积数据,按行分割后解析完整的JSON对象

    typescript 复制代码
    let buffer = '';
    for await (const chunk of response) {
      const content = chunk.choices[0].delta.content;
      if (content) {
        buffer += content;
        // 按行分割
        const lines = buffer.split('\n');
        // 保留最后一行(可能不完整)
        buffer = lines.pop() || '';
        
        // 解析完整的行
        for (const line of lines) {
          if (line.trim()) {
            try {
              const json = JSON.parse(line);
              // 处理完整的JSON对象
            } catch (e) {
              // 忽略解析错误
            }
          }
        }
      }
    }
  2. 方案二: 使用类似StreamXMLParser的方式,实现一个StreamJSONParser,通过状态机管理JSON解析状态

  3. 方案三: 放弃在流式模式下使用NDJSON,改用非流式模式,或者使用XML标签(更容易处理不完整的情况)

  4. 方案四(推荐): 使用SSE格式,配合 eventsource-parser 库,可以很好地处理流式结构化数据

经验总结:

  • 流式响应的chunk粒度是API层面的特性,无法通过提示词改变
  • 如果需要在流式模式下处理结构化数据,必须实现相应的解析器来处理不完整的数据块
  • XML标签比JSON更适合流式处理,因为标签的开始和结束更容易识别
  • SSE格式配合专业解析库是最佳选择,有明确的事件边界,解析库成熟可靠
  • 对于需要完整JSON对象的场景,非流式模式可能更合适

注意事项:

  • 不要期望模型能够"控制"API的返回粒度
  • 流式处理结构化数据时,必须考虑数据可能被分割的情况
  • 提示词只能影响内容,不能改变API行为
  • 优先考虑使用标准格式(如SSE)和成熟的解析库

后续计划

  1. 扩展更多业务场景(酒店预订、餐厅预订等)
  2. 优化流式处理的性能
  3. 添加错误重试机制
  4. 实现对话历史的持久化和恢复
  5. 探索LangChain等框架的高级功能
  6. 优化StreamXMLParser,支持更多标签类型和错误恢复
  7. 完善SSE格式处理,支持更多事件类型

总结

通过这个项目,深入理解了:

  • OpenAI API的使用方式
  • 流式和非流式响应的处理差异
  • 流式XML解析器的设计和实现
  • SSE格式在流式处理中的优势
  • 状态机模式在解析器中的应用
  • 结构化提示词的设计方法
  • 多轮对话系统的实现思路
  • DeepSeek模型在流式模式下的特殊行为
  • 流式响应的本质:按token返回,不受提示词控制
  • SSE格式配合专业解析库是处理流式结构化数据的最佳实践

这些经验为后续开发更复杂的AI应用打下了基础。


项目地址: ai-learn

相关代码文件:

加载评论中...