mirror of
https://github.com/langgenius/dify.git
synced 2026-01-14 06:07:33 +08:00
Some checks are pending
autofix.ci / autofix (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
Main CI Pipeline / Check Changed Files (push) Waiting to run
Main CI Pipeline / API Tests (push) Blocked by required conditions
Main CI Pipeline / Web Tests (push) Blocked by required conditions
Main CI Pipeline / Style Check (push) Waiting to run
Main CI Pipeline / VDB Tests (push) Blocked by required conditions
Main CI Pipeline / DB Migration Test (push) Blocked by required conditions
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
134 lines
3.2 KiB
TypeScript
134 lines
3.2 KiB
TypeScript
import type { Readable } from "node:stream";
|
|
import { StringDecoder } from "node:string_decoder";
|
|
import type { BinaryStream, DifyStream, Headers, StreamEvent } from "../types/common";
|
|
|
|
const readLines = async function* (stream: Readable): AsyncIterable<string> {
|
|
const decoder = new StringDecoder("utf8");
|
|
let buffered = "";
|
|
for await (const chunk of stream) {
|
|
buffered += decoder.write(chunk as Buffer);
|
|
let index = buffered.indexOf("\n");
|
|
while (index >= 0) {
|
|
let line = buffered.slice(0, index);
|
|
buffered = buffered.slice(index + 1);
|
|
if (line.endsWith("\r")) {
|
|
line = line.slice(0, -1);
|
|
}
|
|
yield line;
|
|
index = buffered.indexOf("\n");
|
|
}
|
|
}
|
|
buffered += decoder.end();
|
|
if (buffered) {
|
|
yield buffered;
|
|
}
|
|
};
|
|
|
|
const parseMaybeJson = (value: string): unknown => {
|
|
if (!value) {
|
|
return null;
|
|
}
|
|
try {
|
|
return JSON.parse(value);
|
|
} catch {
|
|
return value;
|
|
}
|
|
};
|
|
|
|
export const parseSseStream = async function* <T>(
|
|
stream: Readable
|
|
): AsyncIterable<StreamEvent<T>> {
|
|
let eventName: string | undefined;
|
|
const dataLines: string[] = [];
|
|
|
|
const emitEvent = function* (): Iterable<StreamEvent<T>> {
|
|
if (!eventName && dataLines.length === 0) {
|
|
return;
|
|
}
|
|
const raw = dataLines.join("\n");
|
|
const parsed = parseMaybeJson(raw) as T | string | null;
|
|
yield {
|
|
event: eventName,
|
|
data: parsed,
|
|
raw,
|
|
};
|
|
eventName = undefined;
|
|
dataLines.length = 0;
|
|
};
|
|
|
|
for await (const line of readLines(stream)) {
|
|
if (!line) {
|
|
yield* emitEvent();
|
|
continue;
|
|
}
|
|
if (line.startsWith(":")) {
|
|
continue;
|
|
}
|
|
if (line.startsWith("event:")) {
|
|
eventName = line.slice("event:".length).trim();
|
|
continue;
|
|
}
|
|
if (line.startsWith("data:")) {
|
|
dataLines.push(line.slice("data:".length).trimStart());
|
|
continue;
|
|
}
|
|
}
|
|
|
|
yield* emitEvent();
|
|
};
|
|
|
|
const extractTextFromEvent = (data: unknown): string => {
|
|
if (typeof data === "string") {
|
|
return data;
|
|
}
|
|
if (!data || typeof data !== "object") {
|
|
return "";
|
|
}
|
|
const record = data as Record<string, unknown>;
|
|
if (typeof record.answer === "string") {
|
|
return record.answer;
|
|
}
|
|
if (typeof record.text === "string") {
|
|
return record.text;
|
|
}
|
|
if (typeof record.delta === "string") {
|
|
return record.delta;
|
|
}
|
|
return "";
|
|
};
|
|
|
|
export const createSseStream = <T>(
|
|
stream: Readable,
|
|
meta: { status: number; headers: Headers; requestId?: string }
|
|
): DifyStream<T> => {
|
|
const iterator = parseSseStream<T>(stream)[Symbol.asyncIterator]();
|
|
const iterable = {
|
|
[Symbol.asyncIterator]: () => iterator,
|
|
data: stream,
|
|
status: meta.status,
|
|
headers: meta.headers,
|
|
requestId: meta.requestId,
|
|
toReadable: () => stream,
|
|
toText: async () => {
|
|
let text = "";
|
|
for await (const event of iterable) {
|
|
text += extractTextFromEvent(event.data);
|
|
}
|
|
return text;
|
|
},
|
|
} satisfies DifyStream<T>;
|
|
|
|
return iterable;
|
|
};
|
|
|
|
export const createBinaryStream = (
|
|
stream: Readable,
|
|
meta: { status: number; headers: Headers; requestId?: string }
|
|
): BinaryStream => ({
|
|
data: stream,
|
|
status: meta.status,
|
|
headers: meta.headers,
|
|
requestId: meta.requestId,
|
|
toReadable: () => stream,
|
|
});
|