大模型学习笔记:基于OpenAI API的机票预订对话系统实现
大模型学习笔记:基于OpenAI API的机票预订对话系统实现
项目概述
最近在学习大模型相关的开发,创建了一个AI学习代码笔记仓库。本文记录了一个基于OpenAI API实现的机票预订对话系统,该系统通过结构化的XML标签实现了多轮对话的信息收集和确认流程。
技术栈
- Node.js + TypeScript: 项目主要使用TypeScript开发
- OpenAI SDK: 使用官方OpenAI SDK进行API调用
- SiliconFlow: 作为OpenAI API的代理服务,支持多种模型
- Vue 3: 使用Vue的响应式系统管理对话状态
- eventsource-parser: 用于解析SSE格式的流式响应
核心功能实现
1. OpenAI Provider封装
首先封装了OpenAI客户端的创建逻辑,支持通过环境变量配置API密钥和基础URL:
typescript
export const createOpenAI = () => new OpenAI({
apiKey: process.env.SILICONFLOW_API_KEY,
baseURL: process.env.SILICONFLOW_BASE_URL,
});
支持多种模型配置,包括DeepSeek和Qwen模型。
2. 结构化对话提示词设计
系统使用XML标签来结构化对话流程:
<question>: AI向用户提问<answer>: 用户回答<finish>: 对话结束标记
提示词设计要点:
- 明确角色定义:AI是机票预订助手
- 流程规范:必须收集完整信息后才能确认
- 确认机制:所有信息必须向用户确认后才能完成预订
3. 两种响应模式
非流式响应(Await模式)
typescript
export async function orderFlyTickAwait() {
const openai = createOpenAI();
// 获取用户输入
const question = await input({ message: orderFlyTickPrompt.value[1].content as string });
orderFlyTickPrompt.value.push({ role: "user", content: `<answer>${question}</answer>` });
while (true) {
const response = await openai.chat.completions.create({
model: defaultModel,
messages: orderFlyTickPrompt.value,
});
const content = response.choices[0].message.content;
if (content) {
if (content.includes('<question>')) {
const question = content.match(/<question>(.*?)<\/question>/)?.[1];
const finish = content.match(/<finish>(.*?)<\/finish>/)?.[1];
if (question) {
orderFlyTickPrompt.value.push({ role: "assistant", content: `<question>${question}</question>` });
const newAnswer = readlineSync.question(question);
orderFlyTickPrompt.value.push({ role: "user", content: `<answer>${newAnswer}</answer>` });
}
if (finish) {
orderFlyTickPrompt.value.push({ role: "assistant", content: `<finish>${finish}</finish>` });
console.log(finish);
break
}
}
}
}
}
特点:
- 等待完整响应后再处理
- 使用正则表达式解析XML标签
- 适合需要完整响应的场景
流式响应(Stream模式)
在流式模式下,我们需要实时处理分块到达的数据。最初的实现可能需要手动缓冲和解析,但为了更好的代码复用和可维护性,我们实现了一个专门的流式XML解析器。
4. XML标签解析工具
实现了一个通用的标签解析工具 parseTag.ts,核心功能:
typescript
export const clearTag = (content: string, tagName: string, options?: ParseTagOptions) => {
let buffer = '';
let startTagName = '<' + tagName + '>';
let endTagName = '</' + tagName + '>';
if (content.startsWith(startTagName)) {
// 去除开头的标签
buffer = content.substring(startTagName.length);
// 处理不完整的结束标签
const lastTagIndex = buffer.lastIndexOf('<');
if (lastTagIndex !== -1) {
const potentialTag = buffer.substring(lastTagIndex);
const isIncomplete = potentialTag.length < endTagName.length ||
(potentialTag.length >= endTagName.length &&
potentialTag.substring(0, endTagName.length) !== endTagName);
if (isIncomplete) {
buffer = buffer.substring(0, lastTagIndex);
}
}
options?.onMessage?.(buffer);
}
return content.replace(startTagName, '').replace(endTagName, '');
}
关键特性:
- 支持流式数据的不完整标签处理
- 提供回调机制(onMessage、onFinish、onChunk)
- 自动清理标签,提取纯文本内容
5. 流式XML解析器(StreamXMLParser)
为了解决流式响应中XML标签解析的复杂性,我们实现了一个专门的流式XML解析器 StreamXMLParser。
设计思路
流式解析的挑战在于:
- XML标签可能被分割到多个数据块中
- 需要在标签完整前就能输出部分内容(渐进式输出)
- 需要准确管理解析状态,处理多种标签类型
核心实现
typescript
export class StreamXMLParser {
private buffer = "";
private state: "IDLE" | "QUESTION" | "ANSWER" | "FINISH" = "IDLE";
private questionPrintedLen = 0;
constructor(private handlers: {
onQuestionChunk?: (text: string) => void;
onQuestionFinish?: (text: string) => void;
onAnswerFinish?: (text: string) => void;
onFinish?: (text: string) => void;
}) { }
push(chunk: string) {
this.buffer += chunk;
this.parse();
}
}
状态机设计:
IDLE: 空闲状态,等待标签开始QUESTION: 正在解析问题标签ANSWER: 正在解析答案标签FINISH: 正在解析完成标签
IDLE状态处理
在IDLE状态,解析器会查找所有可能的标签开始位置,选择最早出现的标签:
typescript
if (this.state === "IDLE") {
const qStart = this.buffer.indexOf("<question>");
const aStart = this.buffer.indexOf("<answer>");
const fStart = this.buffer.indexOf("<finish>");
const next = Math.min(
...[qStart, aStart, fStart].filter(i => i !== -1)
);
if (next === Infinity) return;
if (next === qStart) {
this.buffer = this.buffer.slice(qStart + 10);
this.state = "QUESTION";
} else if (next === aStart) {
this.buffer = this.buffer.slice(aStart + 8);
this.state = "ANSWER";
} else if (next === fStart) {
this.buffer = this.buffer.slice(fStart + 8);
this.state = "FINISH";
}
}
QUESTION状态处理(渐进式输出)
这是最复杂的部分,需要支持在标签完整前就能输出内容:
typescript
if (this.state === "QUESTION") {
const end = this.buffer.indexOf("</question>");
if (end === -1) {
// 标签未完整,尝试输出新增部分
const delta = this.buffer.slice(this.questionPrintedLen);
const lt = delta.indexOf("<");
if (lt !== -1) {
// 发现新的标签开始,只输出安全部分
const safe = delta.slice(0, lt);
if (safe) {
this.handlers.onQuestionChunk?.(safe);
this.questionPrintedLen += safe.length;
}
return; // 等更多 chunk
}
if (delta) {
this.handlers.onQuestionChunk?.(delta);
this.questionPrintedLen = this.buffer.length;
}
return;
}
// 标签完整,调用完成回调
const content = this.buffer.slice(0, end);
this.handlers.onQuestionFinish?.(content);
this.buffer = this.buffer.slice(end + 11);
this.questionPrintedLen = 0;
this.state = "IDLE";
continue;
}
关键点:
- 使用
questionPrintedLen追踪已输出的内容长度 - 当检测到新的
<符号时,停止输出,避免输出不完整的标签 - 只有在标签完整时才调用
onQuestionFinish
使用示例
typescript
export async function orderFlyTickStreamXML() {
const openai = createOpenAI();
const welcomeMessage = clearTag(orderFlyTickPrompt.value[1].content as string, 'question');
const question = await input({ message: welcomeMessage });
orderFlyTickPrompt.value.push({ role: "user", content: `<answer>${question}</answer>` });
while (true) {
const response = await openai.chat.completions.create({
model: QWEN_MODEL,
messages: orderFlyTickPrompt.value,
stream: true,
});
const parser = new StreamXMLParser({
onQuestionChunk: (t) => process.stdout.write(t),
onQuestionFinish: async (question) => {
console.log("\n? 问题:", question);
orderFlyTickPrompt.value.push({
role: "assistant",
content: `<question>${question}</question>`
});
const answer = readlineSync.question("> ");
orderFlyTickPrompt.value.push({
role: "user",
content: `<answer>${answer}</answer>`
});
},
onFinish: (msg) => {
console.log("✅", msg);
}
});
for await (const chunk of response) {
parser.push(chunk.choices[0].delta.content ?? "");
}
}
}
工作流程:
- 创建解析器,配置各种回调函数
- 从OpenAI API接收数据块
- 将每个数据块推送给解析器
- 问题内容实时显示在终端(通过
onQuestionChunk) - 问题完整后触发
onQuestionFinish,等待用户输入 - 继续下一轮对话,直到完成
优势:
- 实时性:用户可以看到问题的实时生成过程
- 结构化:使用XML标签清晰分隔不同类型的内容
- 可靠性:状态机设计保证了解析的正确性
- 可扩展:可以轻松扩展支持更多标签类型
6. SSE格式流式处理
在尝试了JSON Lines格式后,我们发现SSE(Server-Sent Events)格式更适合流式处理。SSE格式有明确的事件边界,使用专门的解析库可以很好地处理流式数据。
SSE格式的优势
- 明确的事件边界:通过空行分隔事件,易于识别
- 标准格式:SSE是Web标准,有成熟的解析库
- 流式友好:即使数据被分割,也能正确识别事件边界
- 易于扩展:可以轻松添加新的事件类型
提示词设计
在提示词中明确要求模型按照SSE格式输出:
你必须严格按照 SSE(Server-Sent Events)格式输出。
规则:
1. 每个事件必须包含 event: 和 data:
2. 每一行必须以换行符 \n 结尾
3. 每个事件结束后,必须输出一个额外的空行(即两个连续的 \n)
4. 禁止输出任何 event / data 之外的文本
5. 禁止解释、注释、前后缀
正确示例(注意空行):
event: question
data: 你从哪个城市出发呢?
event: finish
data: 预定成功
使用 eventsource-parser 解析
使用 eventsource-parser 库可以很好地处理流式SSE数据:
typescript
import { createParser } from 'eventsource-parser';
export async function orderFlyTickStreamSSE() {
const openai = createOpenAI();
const response = await openai.chat.completions.create({
model: QWEN_MODEL,
messages: orderFlyTickPromptSSE.value,
stream: true,
});
let inData = false;
let dataBuffer = "";
let pendingQuestion: string | undefined = undefined;
const parser = createParser({
onEvent: (event) => {
if (event.event === 'question') {
const question = event.data;
orderFlyTickPromptSSE.value.push({
role: "assistant",
content: `event: question\ndata: ${question}`
});
pendingQuestion = question;
}
if (event.event === 'finish') {
const finish = event.data;
orderFlyTickPromptSSE.value.push({
role: "assistant",
content: `event: finish\ndata: ${finish}`
});
isFinish = true;
}
},
onError: event => {
console.error('error', event)
}
});
for await (const chunk of response) {
const text = chunk.choices[0].delta?.content;
if (text) {
parser.feed(text);
dataBuffer += text;
// 检测是否进入data部分,用于实时显示
if (dataBuffer.includes("data: ")) {
inData = true;
dataBuffer = "";
}
// 实时显示data部分的内容
if (inData) {
process.stdout.write(text);
}
}
if (pendingQuestion || isFinish) {
dataBuffer = "";
parser.reset();
break;
}
}
if (pendingQuestion) {
const answer = await input({ message: pendingQuestion });
orderFlyTickPromptSSE.value.push({
role: "user",
content: `event: answer\ndata: ${answer}`
});
}
}
工作流程
- 创建解析器:使用
createParser创建SSE解析器,配置事件回调 - 流式接收:从OpenAI API接收数据块
- 解析处理:将每个数据块通过
parser.feed()推送给解析器 - 事件触发:当解析器识别到完整事件时,触发
onEvent回调 - 实时显示:在检测到
data:后,实时显示内容 - 交互处理:问题完整后等待用户输入,继续下一轮对话
与JSON Lines的对比
| 特性 | JSON Lines | SSE格式 |
|---|---|---|
| 事件边界识别 | 需要按行分割,手动解析 | 通过空行自动识别 |
| 解析库支持 | 需要手动实现 | 有成熟的 eventsource-parser |
| 流式处理 | 需要手动缓冲和拼接 | 解析器自动处理 |
| 格式规范 | 依赖模型严格遵循 | 标准格式,易于识别 |
| 实现复杂度 | 较高 | 较低 |
经验总结
- SSE格式更适合流式处理:有明确的事件边界,解析库成熟
- 使用专业解析库:
eventsource-parser能够很好地处理不完整的数据块 - 提示词设计重要:明确要求格式,包括空行等细节
- 实时显示策略:可以通过检测
data:关键字来实时显示内容
7. 对话状态管理
使用Vue的响应式系统管理对话历史:
typescript
export const orderFlyTickPrompt = ref<ChatCompletionMessageParam[]>([
{
role: "system",
content: `你是一个飞机票预定的助手...`
},
{
role: "assistant",
content: `<question>你好,我是飞机票预定助手,请问有什么可以帮你的吗?</question>`
}
]);
watchEffect(() => {
fs.writeFileSync(filePath, JSON.stringify(orderFlyTickPrompt.value, null, 2));
});
特点:
- 自动持久化对话历史到JSON文件
- 响应式更新,便于调试和恢复
学习收获
-
结构化提示词设计: 通过XML标签实现结构化的对话流程,使AI能够按照预定流程工作
-
流式处理技巧: 学会了如何处理流式响应中的不完整数据,特别是XML标签的边界处理
-
状态机设计: 通过状态机模式管理复杂的解析流程,提高了代码的可维护性和可靠性
-
渐进式输出: 理解了如何在流式处理中实现渐进式输出,提升用户体验
-
SSE格式处理: 学会了使用SSE格式和专门的解析库来处理流式结构化数据
-
多轮对话管理: 理解了如何在多轮对话中维护上下文,确保信息收集的完整性
-
错误处理: 在流式处理中需要考虑各种边界情况,如不完整的标签、网络中断等
技术难点与解决方案
难点1: 流式数据中的不完整标签处理
问题: 在流式响应中,XML标签可能被分割到多个chunk中,需要正确识别和处理。
解决方案:
- 使用buffer累积数据
- 检测最后一个
<的位置,判断标签是否完整 - 对于不完整的标签,暂时不处理,等待更多数据
- 使用状态机管理解析状态,确保状态转换的正确性
难点2: 渐进式输出的实现
问题: 如何在XML标签完整前就能输出部分内容,同时避免输出不完整的标签。
解决方案:
- 使用
questionPrintedLen追踪已输出的内容长度 - 检测新数据中的
<符号,如果发现可能是新标签的开始,只输出安全部分 - 只有在标签完整时才触发完成回调
难点3: 对话流程控制
问题: 如何确保AI按照预定流程(收集信息→确认→完成)执行。
解决方案:
- 在system prompt中明确流程要求
- 使用结构化标签强制AI按照格式输出
- 在代码中解析标签,控制对话流程
难点4: 流式结构化数据解析
问题: 如何在流式响应中解析结构化数据(JSON、SSE等),处理数据被分割的情况。
解决方案:
- 方案一(XML): 实现自定义解析器,使用状态机管理解析状态
- 方案二(SSE): 使用成熟的解析库(如
eventsource-parser),自动处理不完整数据 - 方案三(JSON Lines): 使用buffer累积数据,按行分割后解析
踩坑记录
踩坑1: DeepSeek模型在Stream模式下的思考过程
问题描述:
使用OpenAI SDK调用DeepSeek模型时,如果开启了stream: true模式,模型一定会输出思考过程(reasoning),delta.reasoning_content字段会有值,且无法通过参数关闭。
具体表现:
typescript
const response = await openai.chat.completions.create({
model: 'deepseek-chat', // DeepSeek模型
messages: orderFlyTickPrompt.value,
stream: true,
});
for await (const chunk of response) {
const delta = chunk.choices[0].delta;
console.log('reasoning_content:', delta.reasoning_content); // 总是有值
console.log('content:', delta.content); // 实际回复内容
}
影响:
- 流式响应中会先输出思考过程,然后才输出实际内容
- 增加了响应时间和token消耗
- 如果只关注
delta.content,可能会忽略思考过程的数据流 - 无法通过API参数(如
reasoning: false)来关闭思考过程
解决方案:
- 方案一: 如果不需要思考过程,在流式处理时只关注
delta.content字段,忽略delta.reasoning_content - 方案二: 如果确实需要思考过程,可以分别处理两个字段,将思考过程和实际内容分开展示
- 方案三: 对于不需要思考过程的场景,使用非流式模式(
stream: false),或者切换到其他不支持思考过程的模型(如Qwen)
代码示例:
typescript
for await (const chunk of response) {
const delta = chunk.choices[0].delta;
// 只处理实际内容,忽略思考过程
if (delta.content) {
buffer += delta.content;
process.stdout.write(delta.content);
}
// 如果需要记录思考过程,可以单独处理
if (delta.reasoning_content) {
// 思考过程处理逻辑
console.debug('Reasoning:', delta.reasoning_content);
}
}
注意事项:
- 这是DeepSeek模型的特性,不是bug
- 思考过程有助于理解模型的推理逻辑,但会增加响应时间
- 在生产环境中,如果对响应速度有要求,建议使用非流式模式或切换模型
踩坑2: JSON Lines(NDJSON)格式在流式响应中的误解
问题描述:
在尝试使用JSON Lines(NDJSON)格式进行流式输出时,我误以为在提示词中要求模型按照NDJSON格式输出,开启stream后每个chunk能够返回完整的JSON对象。实际上,流式响应仍然是按token返回的,每个chunk只包含部分内容,无法直接解析为完整的JSON。
具体表现:
在提示词中明确要求:
你必须遵守以下协议进行输出:
- 使用 JSON Lines(NDJSON)协议
- 每一行必须是一个完整 JSON
- 每生成一个最小语义单位(一个字或一个词),就立刻输出一行 JSON
- 不允许重复之前的内容
- 不允许输出未完成的 JSON
- 每一行以 \n 结尾
期望的输出格式:
json
{"event":"question","delta":"你"}\n
{"event":"question","delta":"从"}\n
{"event":"question","delta":"哪"}\n
{"event":"question","delta":"出发"}\n
{"event":"question","delta":"?"}\n
{"event":"question","done":true}\n
实际代码尝试:
typescript
export async function orderFlyTickStreamJSONEventStream() {
const response = await openai.chat.completions.create({
model: process.env.OPEN_ROUTER_GPT as string,
messages: orderFlyTickPromptJSONEventStream.value,
stream: true,
});
for await (const chunk of response) {
const content = chunk.choices[0].delta.content;
if (content) {
try {
// 期望每个chunk是完整的JSON,但实际上只是部分token
const json = JSON.parse(content);
console.log(json, 'json');
} catch (error) {
console.error(error, 'error', chunk);
// 频繁报错:Unexpected token in JSON
}
}
}
}
实际情况:
- 流式响应仍然是按token粒度返回的,每个chunk可能只包含几个字符
- 即使模型"理解"了NDJSON格式要求,输出时仍然是一个token一个token地生成
- 单个chunk可能只包含
{"event"这样的片段,无法直接解析为JSON - JSON对象可能被分割到多个chunk中,需要手动缓冲和拼接
根本原因:
- 流式响应的本质是按token返回,这是API层面的行为,不受提示词控制
- 模型虽然可以"理解"格式要求,但生成过程仍然是逐token的
- 提示词只能影响模型输出的内容结构,不能改变API的返回方式
解决方案:
-
方案一: 使用buffer累积数据,按行分割后解析完整的JSON对象
typescriptlet buffer = ''; for await (const chunk of response) { const content = chunk.choices[0].delta.content; if (content) { buffer += content; // 按行分割 const lines = buffer.split('\n'); // 保留最后一行(可能不完整) buffer = lines.pop() || ''; // 解析完整的行 for (const line of lines) { if (line.trim()) { try { const json = JSON.parse(line); // 处理完整的JSON对象 } catch (e) { // 忽略解析错误 } } } } } -
方案二: 使用类似StreamXMLParser的方式,实现一个StreamJSONParser,通过状态机管理JSON解析状态
-
方案三: 放弃在流式模式下使用NDJSON,改用非流式模式,或者使用XML标签(更容易处理不完整的情况)
-
方案四(推荐): 使用SSE格式,配合
eventsource-parser库,可以很好地处理流式结构化数据
经验总结:
- 流式响应的chunk粒度是API层面的特性,无法通过提示词改变
- 如果需要在流式模式下处理结构化数据,必须实现相应的解析器来处理不完整的数据块
- XML标签比JSON更适合流式处理,因为标签的开始和结束更容易识别
- SSE格式配合专业解析库是最佳选择,有明确的事件边界,解析库成熟可靠
- 对于需要完整JSON对象的场景,非流式模式可能更合适
注意事项:
- 不要期望模型能够"控制"API的返回粒度
- 流式处理结构化数据时,必须考虑数据可能被分割的情况
- 提示词只能影响内容,不能改变API行为
- 优先考虑使用标准格式(如SSE)和成熟的解析库
后续计划
- 扩展更多业务场景(酒店预订、餐厅预订等)
- 优化流式处理的性能
- 添加错误重试机制
- 实现对话历史的持久化和恢复
- 探索LangChain等框架的高级功能
- 优化StreamXMLParser,支持更多标签类型和错误恢复
- 完善SSE格式处理,支持更多事件类型
总结
通过这个项目,深入理解了:
- OpenAI API的使用方式
- 流式和非流式响应的处理差异
- 流式XML解析器的设计和实现
- SSE格式在流式处理中的优势
- 状态机模式在解析器中的应用
- 结构化提示词的设计方法
- 多轮对话系统的实现思路
- DeepSeek模型在流式模式下的特殊行为
- 流式响应的本质:按token返回,不受提示词控制
- SSE格式配合专业解析库是处理流式结构化数据的最佳实践
这些经验为后续开发更复杂的AI应用打下了基础。
项目地址: ai-learn
相关代码文件: