增加 openai 的模型兼容

This commit is contained in:
jwangkun
2026-01-08 17:09:34 +08:00
parent 6558006a4d
commit 579071ac95
18 changed files with 5185 additions and 212 deletions

View File

@@ -1,6 +1,11 @@
import { ModelOption, ExpertResult } from '../../types';
import { getExpertSystemInstruction } from './prompts';
import { withRetry } from '../utils/retry';
import { generateContentStream as generateOpenAIStream } from './openaiClient';
const isGoogleProvider = (ai: any): boolean => {
return ai?.models?.generateContentStream !== undefined;
};
export const streamExpertResponse = async (
ai: any,
@@ -11,44 +16,65 @@ export const streamExpertResponse = async (
signal: AbortSignal,
onChunk: (text: string, thought: string) => void
): Promise<void> => {
// We wrap the stream initiation in retry.
// If the stream is successfully established but fails during iteration,
// we catch that separately.
const streamResult = await withRetry(() => ai.models.generateContentStream({
model: model,
contents: expert.prompt,
config: {
systemInstruction: getExpertSystemInstruction(expert.role, expert.description, context),
temperature: expert.temperature,
thinkingConfig: {
const isGoogle = isGoogleProvider(ai);
if (isGoogle) {
const streamResult = await withRetry(() => ai.models.generateContentStream({
model: model,
contents: expert.prompt,
config: {
systemInstruction: getExpertSystemInstruction(expert.role, expert.description, context),
temperature: expert.temperature,
thinkingConfig: {
thinkingBudget: budget,
includeThoughts: true
includeThoughts: true
}
}
}));
try {
for await (const chunk of (streamResult as any)) {
if (signal.aborted) break;
let chunkText = "";
let chunkThought = "";
if (chunk.candidates?.[0]?.content?.parts) {
for (const part of chunk.candidates[0].content.parts) {
if (part.thought) {
chunkThought += (part.text || "");
} else if (part.text) {
chunkText += part.text;
}
}
onChunk(chunkText, chunkThought);
}
}
} catch (streamError) {
console.error(`Stream interrupted for expert ${expert.role}:`, streamError);
throw streamError;
}
}));
} else {
const stream = generateOpenAIStream(ai, {
model,
systemInstruction: getExpertSystemInstruction(expert.role, expert.description, context),
content: expert.prompt,
temperature: expert.temperature,
thinkingConfig: {
thinkingBudget: budget,
includeThoughts: true
}
});
try {
for await (const chunk of streamResult) {
if (signal.aborted) break;
try {
for await (const chunk of (stream as any)) {
if (signal.aborted) break;
let chunkText = "";
let chunkThought = "";
if (chunk.candidates?.[0]?.content?.parts) {
for (const part of chunk.candidates[0].content.parts) {
if (part.thought) {
chunkThought += (part.text || "");
} else if (part.text) {
chunkText += part.text;
}
}
onChunk(chunkText, chunkThought);
}
onChunk(chunk.text, chunk.thought || '');
}
} catch (streamError) {
console.error(`Stream interrupted for expert ${expert.role}:`, streamError);
throw streamError;
}
} catch (streamError) {
console.error(`Stream interrupted for expert ${expert.role}:`, streamError);
// We don't retry mid-stream automatically here to avoid complex state management,
// but the initial connection is protected by withRetry.
throw streamError;
}
};

View File

@@ -1,8 +1,14 @@
import { Type } from "@google/genai";
import { ModelOption, AnalysisResult, ExpertResult, ReviewResult } from '../../types';
import { ModelOption, AnalysisResult, ExpertResult, ReviewResult, ApiProvider } from '../../types';
import { cleanJsonString } from '../../utils';
import { MANAGER_SYSTEM_PROMPT, MANAGER_REVIEW_SYSTEM_PROMPT } from './prompts';
import { withRetry } from '../utils/retry';
import { generateContent as generateOpenAIContent } from './openaiClient';
import { getAIProvider } from '../../api';
const isGoogleProvider = (ai: any): boolean => {
return ai?.models?.generateContent !== undefined;
};
export const executeManagerAnalysis = async (
ai: any,
@@ -11,57 +17,86 @@ export const executeManagerAnalysis = async (
context: string,
budget: number
): Promise<AnalysisResult> => {
const managerSchema = {
type: Type.OBJECT,
properties: {
thought_process: { type: Type.STRING, description: "Brief explanation of why these supplementary experts were chosen." },
experts: {
type: Type.ARRAY,
items: {
type: Type.OBJECT,
properties: {
role: { type: Type.STRING },
description: { type: Type.STRING },
temperature: { type: Type.NUMBER },
prompt: { type: Type.STRING }
},
required: ["role", "description", "temperature", "prompt"]
}
}
},
required: ["thought_process", "experts"]
};
const isGoogle = isGoogleProvider(ai);
try {
const analysisResp = await withRetry(() => ai.models.generateContent({
model: model,
contents: `Context:\n${context}\n\nCurrent Query: "${query}"`,
config: {
systemInstruction: MANAGER_SYSTEM_PROMPT,
responseMimeType: "application/json",
responseSchema: managerSchema,
thinkingConfig: {
includeThoughts: true,
thinkingBudget: budget
if (isGoogle) {
const managerSchema = {
type: Type.OBJECT,
properties: {
thought_process: { type: Type.STRING, description: "Brief explanation of why these supplementary experts were chosen." },
experts: {
type: Type.ARRAY,
items: {
type: Type.OBJECT,
properties: {
role: { type: Type.STRING },
description: { type: Type.STRING },
temperature: { type: Type.NUMBER },
prompt: { type: Type.STRING }
},
required: ["role", "description", "temperature", "prompt"]
}
}
}
}));
const rawText = analysisResp.text || '{}';
const cleanText = cleanJsonString(rawText);
const analysisJson = JSON.parse(cleanText) as AnalysisResult;
if (!analysisJson.experts || !Array.isArray(analysisJson.experts)) {
throw new Error("Invalid schema structure");
}
return analysisJson;
} catch (e) {
console.error("Manager Analysis Error:", e);
// Return a fallback so the process doesn't completely die if planning fails
return {
thought_process: "Direct processing fallback due to analysis error.",
experts: []
},
required: ["thought_process", "experts"]
};
try {
const analysisResp = await withRetry(() => ai.models.generateContent({
model: model,
contents: `Context:\n${context}\n\nCurrent Query: "${query}"`,
config: {
systemInstruction: MANAGER_SYSTEM_PROMPT,
responseMimeType: "application/json",
responseSchema: managerSchema,
thinkingConfig: {
includeThoughts: true,
thinkingBudget: budget
}
}
}));
const rawText = (analysisResp as any).text || '{}';
const cleanText = cleanJsonString(rawText);
const analysisJson = JSON.parse(cleanText) as AnalysisResult;
if (!analysisJson.experts || !Array.isArray(analysisJson.experts)) {
throw new Error("Invalid schema structure");
}
return analysisJson;
} catch (e) {
console.error("Manager Analysis Error:", e);
return {
thought_process: "Direct processing fallback due to analysis error.",
experts: []
};
}
} else {
try {
const response = await generateOpenAIContent(ai, {
model,
systemInstruction: MANAGER_SYSTEM_PROMPT,
content: `Context:\n${context}\n\nCurrent Query: "${query}"\n\nReturn a JSON response with this structure:\n{\n "thought_process": "...",\n "experts": [\n { "role": "...", "description": "...", "temperature": number, "prompt": "..." }\n ]\n}`,
temperature: 0.7,
responseFormat: 'json_object',
thinkingConfig: {
includeThoughts: true,
thinkingBudget: budget
}
});
const analysisJson = JSON.parse(response.text) as AnalysisResult;
if (!analysisJson.experts || !Array.isArray(analysisJson.experts)) {
throw new Error("Invalid schema structure");
}
return analysisJson;
} catch (e) {
console.error("Manager Analysis Error:", e);
return {
thought_process: "Direct processing fallback due to analysis error.",
experts: []
};
}
}
};
@@ -72,57 +107,78 @@ export const executeManagerReview = async (
currentExperts: ExpertResult[],
budget: number
): Promise<ReviewResult> => {
const reviewSchema = {
type: Type.OBJECT,
properties: {
satisfied: { type: Type.BOOLEAN, description: "True if the experts have fully answered the query with high quality." },
critique: { type: Type.STRING, description: "If not satisfied, explain why and what is missing." },
next_round_strategy: { type: Type.STRING, description: "Plan for the next iteration." },
refined_experts: {
type: Type.ARRAY,
description: "The list of experts for the next round. Can be the same roles or new ones.",
items: {
type: Type.OBJECT,
properties: {
role: { type: Type.STRING },
description: { type: Type.STRING },
temperature: { type: Type.NUMBER },
prompt: { type: Type.STRING }
},
required: ["role", "description", "temperature", "prompt"]
}
}
},
required: ["satisfied", "critique"]
};
const expertOutputs = currentExperts.map(e =>
const isGoogle = isGoogleProvider(ai);
const expertOutputs = currentExperts.map(e =>
`--- [Round ${e.round}] Expert: ${e.role} ---\nOutput: ${e.content?.slice(0, 2000)}...`
).join('\n\n');
const content = `User Query: "${query}"\n\nCurrent Expert Outputs:\n${expertOutputs}`;
try {
const resp = await withRetry(() => ai.models.generateContent({
model: model,
contents: content,
config: {
systemInstruction: MANAGER_REVIEW_SYSTEM_PROMPT,
responseMimeType: "application/json",
responseSchema: reviewSchema,
thinkingConfig: {
includeThoughts: true,
thinkingBudget: budget
if (isGoogle) {
const reviewSchema = {
type: Type.OBJECT,
properties: {
satisfied: { type: Type.BOOLEAN, description: "True if the experts have fully answered the query with high quality." },
critique: { type: Type.STRING, description: "If not satisfied, explain why and what is missing." },
next_round_strategy: { type: Type.STRING, description: "Plan for the next iteration." },
refined_experts: {
type: Type.ARRAY,
description: "The list of experts for the next round. Can be the same roles or new ones.",
items: {
type: Type.OBJECT,
properties: {
role: { type: Type.STRING },
description: { type: Type.STRING },
temperature: { type: Type.NUMBER },
prompt: { type: Type.STRING }
},
required: ["role", "description", "temperature", "prompt"]
}
}
}
}));
},
required: ["satisfied", "critique"]
};
const rawText = resp.text || '{}';
const cleanText = cleanJsonString(rawText);
return JSON.parse(cleanText) as ReviewResult;
} catch (e) {
console.error("Review Error:", e);
// Fallback: Assume satisfied if JSON or API fails to avoid infinite loops
return { satisfied: true, critique: "Processing Error, proceeding to synthesis." };
try {
const resp = await withRetry(() => ai.models.generateContent({
model: model,
contents: content,
config: {
systemInstruction: MANAGER_REVIEW_SYSTEM_PROMPT,
responseMimeType: "application/json",
responseSchema: reviewSchema,
thinkingConfig: {
includeThoughts: true,
thinkingBudget: budget
}
}
}));
const rawText = (resp as any).text || '{}';
const cleanText = cleanJsonString(rawText);
return JSON.parse(cleanText) as ReviewResult;
} catch (e) {
console.error("Review Error:", e);
return { satisfied: true, critique: "Processing Error, proceeding to synthesis." };
}
} else {
try {
const response = await generateOpenAIContent(ai, {
model,
systemInstruction: MANAGER_REVIEW_SYSTEM_PROMPT,
content: `${content}\n\nReturn a JSON response with this structure:\n{\n "satisfied": boolean,\n "critique": "...",\n "next_round_strategy": "...",\n "refined_experts": [...]\n}`,
temperature: 0.7,
responseFormat: 'json_object',
thinkingConfig: {
includeThoughts: true,
thinkingBudget: budget
}
});
return JSON.parse(response.text) as ReviewResult;
} catch (e) {
console.error("Review Error:", e);
return { satisfied: true, critique: "Processing Error, proceeding to synthesis." };
}
}
};

View File

@@ -0,0 +1,157 @@
import OpenAI from "openai";
import { ModelOption } from '../../types';
import { withRetry } from '../utils/retry';
export interface OpenAIStreamChunk {
text: string;
thought?: string;
}
export interface OpenAIConfig {
model: ModelOption;
systemInstruction?: string;
content: string;
temperature?: number;
responseFormat?: 'text' | 'json_object';
thinkingConfig?: {
includeThoughts: boolean;
thinkingBudget: number;
};
}
const parseThinkingTokens = (text: string): { thought: string; text: string } => {
const thinkPattern = /<thinking>([\s\S]*?)<\/thinking>/g;
let thought = '';
let cleanText = text;
const matches = text.matchAll(thinkPattern);
for (const match of matches) {
thought += match[1];
}
cleanText = text.replace(thinkPattern, '');
return { thought: thought.trim(), text: cleanText.trim() };
};
export const generateContent = async (
ai: OpenAI,
config: OpenAIConfig
): Promise<{ text: string; thought?: string }> => {
const messages: Array<OpenAI.Chat.ChatCompletionMessageParam> = [];
if (config.systemInstruction) {
messages.push({
role: 'system',
content: config.systemInstruction
});
}
messages.push({
role: 'user',
content: config.content
});
const requestOptions: any = {
model: config.model,
messages,
temperature: config.temperature,
};
if (config.responseFormat === 'json_object') {
requestOptions.response_format = { type: 'json_object' };
}
try {
const response = await withRetry(() => ai.chat.completions.create(requestOptions));
const content = response.choices[0]?.message?.content || '';
if (config.thinkingConfig?.includeThoughts) {
const { thought, text } = parseThinkingTokens(content);
return { text, thought };
}
return { text: content };
} catch (error) {
console.error('OpenAI generateContent error:', error);
throw error;
}
};
export async function* generateContentStream(
ai: OpenAI,
config: OpenAIConfig
): AsyncGenerator<OpenAIStreamChunk, void, unknown> {
const messages: Array<OpenAI.Chat.ChatCompletionMessageParam> = [];
if (config.systemInstruction) {
messages.push({
role: 'system',
content: config.systemInstruction
});
}
messages.push({
role: 'user',
content: config.content
});
const requestOptions: any = {
model: config.model,
messages,
temperature: config.temperature,
stream: true,
};
const stream = await withRetry(() => ai.chat.completions.create(requestOptions) as any);
let accumulatedText = '';
let inThinking = false;
let currentThought = '';
for await (const chunk of (stream as any)) {
const delta = chunk.choices[0]?.delta?.content || '';
if (!delta) continue;
accumulatedText += delta;
if (config.thinkingConfig?.includeThoughts) {
if (delta.includes('<thinking>')) {
inThinking = true;
continue;
}
if (inThinking) {
if (delta.includes('</thinking>')) {
inThinking = false;
const parts = delta.split('</thinking>', 2);
currentThought += parts[0];
if (currentThought.trim()) {
yield { text: '', thought: currentThought };
currentThought = '';
}
if (parts[1]) {
yield { text: parts[1], thought: '' };
}
} else {
currentThought += delta;
if (currentThought.length > 100) {
yield { text: '', thought: currentThought };
currentThought = '';
}
}
} else {
yield { text: delta, thought: '' };
}
} else {
yield { text: delta, thought: '' };
}
}
if (currentThought.trim()) {
yield { text: '', thought: currentThought };
}
}

View File

@@ -1,6 +1,11 @@
import { ModelOption, ExpertResult } from '../../types';
import { getSynthesisPrompt } from './prompts';
import { withRetry } from '../utils/retry';
import { generateContentStream as generateOpenAIStream } from './openaiClient';
const isGoogleProvider = (ai: any): boolean => {
return ai?.models?.generateContentStream !== undefined;
};
export const streamSynthesisResponse = async (
ai: any,
@@ -13,38 +18,63 @@ export const streamSynthesisResponse = async (
onChunk: (text: string, thought: string) => void
): Promise<void> => {
const prompt = getSynthesisPrompt(historyContext, query, expertResults);
const isGoogle = isGoogleProvider(ai);
const synthesisStream = await withRetry(() => ai.models.generateContentStream({
model: model,
contents: prompt,
config: {
thinkingConfig: {
if (isGoogle) {
const synthesisStream = await withRetry(() => ai.models.generateContentStream({
model: model,
contents: prompt,
config: {
thinkingConfig: {
thinkingBudget: budget,
includeThoughts: true
}
}
}));
}
}
}));
try {
for await (const chunk of synthesisStream) {
if (signal.aborted) break;
try {
for await (const chunk of (synthesisStream as any)) {
if (signal.aborted) break;
let chunkText = "";
let chunkThought = "";
let chunkText = "";
let chunkThought = "";
if (chunk.candidates?.[0]?.content?.parts) {
if (chunk.candidates?.[0]?.content?.parts) {
for (const part of chunk.candidates[0].content.parts) {
if (part.thought) {
chunkThought += (part.text || "");
} else if (part.text) {
chunkText += part.text;
}
if (part.thought) {
chunkThought += (part.text || "");
} else if (part.text) {
chunkText += part.text;
}
}
onChunk(chunkText, chunkThought);
}
}
} catch (streamError) {
console.error("Synthesis stream interrupted:", streamError);
throw streamError;
}
} else {
const stream = generateOpenAIStream(ai, {
model,
systemInstruction: undefined,
content: prompt,
temperature: 0.7,
thinkingConfig: {
thinkingBudget: budget,
includeThoughts: true
}
});
try {
for await (const chunk of (stream as any)) {
if (signal.aborted) break;
onChunk(chunk.text, chunk.thought || '');
}
} catch (streamError) {
console.error("Synthesis stream interrupted:", streamError);
throw streamError;
}
} catch (streamError) {
console.error("Synthesis stream interrupted:", streamError);
throw streamError;
}
};