import { BaseController, CoolController } from '@cool-midway/core'; import { Body, Inject, Post } from '@midwayjs/core'; import { Context } from 'koa'; import { PassThrough } from 'stream'; import { FlowRunService } from '../../service/run'; /** * 流程运行 */ @CoolController() export class AdminFlowRunController extends BaseController { @Inject() flowRunService: FlowRunService; @Inject() ctx: Context; @Post('/debug', { summary: '调试' }) async debug( // 参数 @Body('params') params: any, // 流程label @Body('label') label: string, // 如果有传递nodeId,则只是调试这个节点 @Body('nodeId') nodeId: string, // 是否流式调用 @Body('stream') stream = false ) { // 设置响应头 this.ctx.set('Content-Type', 'text/event-stream'); this.ctx.set('Cache-Control', 'no-cache'); this.ctx.set('Connection', 'keep-alive'); const resStream = new PassThrough(); // 发送数据 const send = (data: any) => { resStream.write(`data:${JSON.stringify(data)}\n\n`); }; this.flowRunService .debug(params, label, nodeId, stream, res => { send(res); if (res.isEnd && !stream) { this.ctx.res.end(); } }) .then(async result => { if (stream) { // 流式输出 for await (const chunk of result.stream) { send({ isEnd: false, llmStream: true, content: chunk.toString() }); } send({ isEnd: true, llmStream: true }); this.ctx.res.end(); } }) .catch(e => { this.ctx.res.end(); }); this.ctx.status = 200; this.ctx.body = resStream; } @Post('/invoke', { summary: '调用流程' }) async invoke( // 参数 @Body('params') params: any, // 流程label @Body('label') label: string, // 是否流式调用 @Body('stream') stream = false ) { if (stream) { // 设置响应头 this.ctx.set('Content-Type', 'text/event-stream'); this.ctx.set('Cache-Control', 'no-cache'); this.ctx.set('Connection', 'keep-alive'); const stream = new PassThrough(); // 发送数据 const send = (data: any) => { stream.write(`data:${JSON.stringify(data)}\n\n`); }; const result = await this.flowRunService.invoke(params, label, true); const output = async () => { // 流式输出 for await (const chunk of result.stream) { send({ isEnd: false, content: chunk.toString() }); } send({ isEnd: true }); this.ctx.res.end(); }; output(); this.ctx.status = 200; this.ctx.body = stream; } else { return await this.flowRunService.invoke(params, label); } } }