From ec3c511a7146a3f1ca04eda46ce3e938ee76b119 Mon Sep 17 00:00:00 2001 From: TheoryOfNekomata Date: Sat, 29 Apr 2023 20:51:22 +0800 Subject: [PATCH] Major refactor Split endpoints to multiple ones with defined responsibilities. Could be deployed as their own services. Also consume core SDK based on mio-ai. --- src/index.ts | 6 +- src/modules/summary/SummaryController.ts | 40 +++------ src/modules/summary/SummaryService.ts | 82 ++++--------------- .../transcript/TranscriptController.ts | 53 ++++++++++++ src/modules/transcript/TranscriptService.ts | 37 +++++++++ .../event-emitter-to-readable-stream.ts | 34 ++++++++ src/packages/fastify-controller.ts | 5 ++ src/routes.ts | 20 ++++- 8 files changed, 179 insertions(+), 98 deletions(-) create mode 100644 src/modules/transcript/TranscriptController.ts create mode 100644 src/modules/transcript/TranscriptService.ts create mode 100644 src/packages/event-emitter-to-readable-stream.ts create mode 100644 src/packages/fastify-controller.ts diff --git a/src/index.ts b/src/index.ts index 1b05b28..f41058a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,6 @@ import * as config from './config'; import { createServer } from './server'; - -import { addHealthRoutes, addSummaryRoutes } from './routes'; +import { addHealthRoutes, addSummaryRoutes, addTranscriptRoutes } from './routes'; const server = createServer({ logger: process.env.NODE_ENV !== 'test', @@ -9,6 +8,7 @@ const server = createServer({ addHealthRoutes(server); addSummaryRoutes(server); +addTranscriptRoutes(server); server.listen( { @@ -20,5 +20,5 @@ server.listen( server.log.error(err.message); process.exit(1); } - } + }, ); diff --git a/src/modules/summary/SummaryController.ts b/src/modules/summary/SummaryController.ts index 3de38de..285c630 100644 --- a/src/modules/summary/SummaryController.ts +++ b/src/modules/summary/SummaryController.ts @@ -1,11 +1,8 @@ -import { RouteHandlerMethod } from 'fastify'; -import {CreateSummarizerParams, SummarizerProcessParams} from '@modal-sh/webvideo-transcript-summary-core'; -import * as config from '../../config'; +import { constants } from 'http2'; import { SummaryService, SummaryServiceImpl } from './SummaryService'; +import { Controller } from '../../packages/fastify-controller'; -export interface SummaryController { - summarizeVideoTranscript: RouteHandlerMethod; -} +export interface SummaryController extends Controller<'summarizeVideoTranscript'> {} export class SummaryControllerImpl implements SummaryController { constructor( @@ -14,33 +11,18 @@ export class SummaryControllerImpl implements SummaryController { // noop } - readonly summarizeVideoTranscript: RouteHandlerMethod = async (request, reply) => { - const params = request.body as CreateSummarizerParams & SummarizerProcessParams; + readonly summarizeVideoTranscript: SummaryController['summarizeVideoTranscript'] = async (request, reply) => { try { - this.summaryService.initializeSummarizer({ - type: params.type, - openAiParams: { - apiKey: config.openAi.apiKey, - organizationId: config.openAi.organizationId, - temperature: params.openAiParams?.temperature ?? 0.6, - model: params.openAiParams?.model ?? 'gpt-3.5-turbo', - }, - }); - - const summaryResult = await this.summaryService.summarizeVideoTranscript( - { - url: params.url, - language: params.language ?? 'en', - country: params.country ?? 'US', - }, - ); - reply.send(summaryResult); + const stream = this.summaryService.createSummaryStream(request.body as string); + reply.header('Content-Type', 'application/octet-stream'); + reply.raw.statusMessage = 'Summary Generated'; + return reply.send(stream); } catch (errRaw) { const err = errRaw as Error; request.server.log.error(err); - reply - .code(500) - .send(); + reply.code(constants.HTTP_STATUS_INTERNAL_SERVER_ERROR) + reply.raw.statusMessage = 'Summary Failed'; + reply.send(); } }; } diff --git a/src/modules/summary/SummaryService.ts b/src/modules/summary/SummaryService.ts index a7f0f7d..143353a 100644 --- a/src/modules/summary/SummaryService.ts +++ b/src/modules/summary/SummaryService.ts @@ -1,78 +1,32 @@ import { createSummarizer, - CreateSummarizerParams, SummarizerEventEmitter, - SummarizerProcessParams, + SummarizerEventEmitter, + OPENAI_API_VERSION, } from '@modal-sh/webvideo-transcript-summary-core'; - -export interface SummaryResult { - summary: string; - normalizedTranscript: string; - rawTranscript: string; -} +import { Readable } from '../../packages/event-emitter-to-readable-stream'; +import * as config from '../../config'; export interface SummaryService { - initializeSummarizer(params: CreateSummarizerParams): void; - summarizeVideoTranscript(processParams: SummarizerProcessParams): Promise>; + createSummaryStream(transcriptText: string): NodeJS.ReadableStream; } export class SummaryServiceImpl implements SummaryService { - private summarizer?: SummarizerEventEmitter; + private readonly summarizer: SummarizerEventEmitter; - constructor(private readonly logger = console) { + constructor() { // noop + this.summarizer = createSummarizer({ + apiKey: config.openAi.apiKey, + organizationId: config.openAi.organizationId, + apiVersion: OPENAI_API_VERSION, + }); } - initializeSummarizer(params: CreateSummarizerParams): void { - this.summarizer = createSummarizer(params); - } - - summarizeVideoTranscript(processParams: SummarizerProcessParams) { - return new Promise>((resolve, reject) => { - if (!this.summarizer) { - reject(new Error('Summarizer is not initialized')); - return; - } - - const successEvent = {} as Partial; - let error: Error; - - this.summarizer.on('process', (data) => { - this.logger.log('process', data); - if (data.phase === 'success') { - switch (data.processType) { - case 'fetch-transcript': - successEvent.rawTranscript = ( - JSON.parse(data.content as string) as { text: string }[] - ) - .map((item) => item.text).join(' '); - break; - case 'normalize-transcript': - successEvent.normalizedTranscript = data.content as string; - break; - case 'summarize-transcript': - successEvent.summary = data.content as string; - break; - default: - break; - } - } - }); - - this.summarizer.on('error', (err) => { - this.logger.log('error', err); - error = err; - }); - - this.summarizer.on('end', () => { - this.logger.log('end'); - if (error) { - reject(error); - return; - } - resolve(successEvent); - }); - - this.summarizer.process(processParams); - }); + createSummaryStream(transcriptText: string) { + const stream = Readable.fromEventEmitter( + this.summarizer, + ) as unknown as NodeJS.ReadableStream & { tokenCount: number }; + this.summarizer.summarize(transcriptText); + return stream; } } diff --git a/src/modules/transcript/TranscriptController.ts b/src/modules/transcript/TranscriptController.ts new file mode 100644 index 0000000..1c611b6 --- /dev/null +++ b/src/modules/transcript/TranscriptController.ts @@ -0,0 +1,53 @@ +import { Controller } from '../../packages/fastify-controller'; +import { TranscriptService, TranscriptServiceImpl } from './TranscriptService'; +import { BaseTranscriptItem, VideoType, YouTube } from '@modal-sh/webvideo-transcript-summary-core'; +import { constants } from 'http2'; + +export interface TranscriptController extends Controller< + 'getVideoTranscript' + | 'normalizeVideoTranscriptText' +> {} + +export class TranscriptControllerImpl implements TranscriptController { + constructor( + private readonly transcriptService: TranscriptService = new TranscriptServiceImpl(), + ) { + // noop + } + + readonly getVideoTranscript: TranscriptController['getVideoTranscript'] = async (request, reply) => { + try { + const { videoType, videoId } = request.params as { videoType: VideoType; videoId: string }; + const transcript = await this.transcriptService + .getVideoTranscript(videoType, { url: videoId }); + + reply.raw.statusMessage = 'Transcript Fetched Successfully'; + reply.send(transcript); + } catch (errRaw) { + const err = errRaw as Error; + request.server.log.error(err); + + reply.code(constants.HTTP_STATUS_BAD_GATEWAY); + reply.raw.statusMessage = 'Transcript Fetch Failed'; + reply.send(); + } + }; + + readonly normalizeVideoTranscriptText: TranscriptController['normalizeVideoTranscriptText'] = async (request, reply) => { + try { + const transcriptItems = request.body as BaseTranscriptItem[]; + const normalizedText = await this.transcriptService + .normalizeVideoTranscriptText(transcriptItems); + + reply.raw.statusMessage = 'Transcript Text Normalized Successfully'; + reply.send(normalizedText); + } catch (errRaw) { + const err = errRaw as Error; + request.server.log.error(err); + + reply.code(constants.HTTP_STATUS_INTERNAL_SERVER_ERROR); + reply.raw.statusMessage = 'Transcript Text Normalization Failed'; + reply.send(); + } + }; +} diff --git a/src/modules/transcript/TranscriptService.ts b/src/modules/transcript/TranscriptService.ts new file mode 100644 index 0000000..6abd102 --- /dev/null +++ b/src/modules/transcript/TranscriptService.ts @@ -0,0 +1,37 @@ +import { + createTranscriptFetcher, + VideoType, + createSummarizer, + BaseTranscriptItem, + OPENAI_API_VERSION, +} from '@modal-sh/webvideo-transcript-summary-core'; + +import * as config from '../../config'; + +export interface TranscriptService { + getVideoTranscript( + videoType: VideoType, + ...etcParams: Parameters> + ): Promise; + normalizeVideoTranscriptText(text: BaseTranscriptItem[]): Promise; +} + +export class TranscriptServiceImpl implements TranscriptService { + async getVideoTranscript( + videoType: VideoType, + ...etcParams: Parameters> + ) { + const transcriptFetcher = createTranscriptFetcher({ type: videoType }); + return transcriptFetcher(...etcParams); + } + + async normalizeVideoTranscriptText(text: BaseTranscriptItem[]) { + const summarizer = createSummarizer({ + apiKey: config.openAi.apiKey, + organizationId: config.openAi.organizationId, + apiVersion: OPENAI_API_VERSION, + }); + + return summarizer.normalize(text); + } +} diff --git a/src/packages/event-emitter-to-readable-stream.ts b/src/packages/event-emitter-to-readable-stream.ts new file mode 100644 index 0000000..9051fb8 --- /dev/null +++ b/src/packages/event-emitter-to-readable-stream.ts @@ -0,0 +1,34 @@ +import { Readable as NodeReadable } from 'stream'; + +export interface Options { + dataEvent?: string; + endEvent?: string; + onBeforeEnd?: (emitter: T) => void; +} + +export namespace Readable { + export const fromEventEmitter = ( + eventEmitter: T, + options = {} as Options, + ) => { + const stream = new NodeReadable({ + read() { + // noop + }, + }); + + const dataEventHandler = (d: unknown) => { + stream.emit('data', d); + }; + + const dataEvent = options?.dataEvent ?? 'data'; + eventEmitter.on(dataEvent, dataEventHandler); + eventEmitter.on(options?.endEvent ?? 'end', () => { + options?.onBeforeEnd?.(eventEmitter); + stream.emit('end'); + eventEmitter.off(dataEvent, dataEventHandler); + }); + + return stream; + }; +} diff --git a/src/packages/fastify-controller.ts b/src/packages/fastify-controller.ts new file mode 100644 index 0000000..3dccc84 --- /dev/null +++ b/src/packages/fastify-controller.ts @@ -0,0 +1,5 @@ +import { RouteHandlerMethod } from 'fastify'; + +export type Controller = { + [P in T]: RouteHandlerMethod; +} diff --git a/src/routes.ts b/src/routes.ts index 888501b..a07ac5b 100644 --- a/src/routes.ts +++ b/src/routes.ts @@ -1,5 +1,6 @@ import { FastifyInstance } from 'fastify'; import { SummaryController, SummaryControllerImpl } from './modules/summary'; +import { TranscriptController, TranscriptControllerImpl } from './modules/transcript/TranscriptController'; export const addHealthRoutes = (server: FastifyInstance) => { server @@ -17,14 +18,29 @@ export const addHealthRoutes = (server: FastifyInstance) => { reply.send({ status: 'ok' }); }, }); -} +}; export const addSummaryRoutes = (server: FastifyInstance) => { const summaryController: SummaryController = new SummaryControllerImpl(); server .route({ method: 'POST', - url: '/api/summary', + url: '/api/summarize', handler: summaryController.summarizeVideoTranscript, }); }; + +export const addTranscriptRoutes = (server: FastifyInstance) => { + const transcriptController: TranscriptController = new TranscriptControllerImpl(); + server + .route({ + method: 'GET', + url: '/api/transcripts/get/:videoType/:videoId', + handler: transcriptController.getVideoTranscript, + }) + .route({ + method: 'POST', + url: '/api/transcripts/normalize', + handler: transcriptController.normalizeVideoTranscriptText, + }); +};