Add CQRS event emitter for long-running operations.master
@@ -35,11 +35,11 @@ export abstract class MiddlewareResponseError extends MiddlewareError implements | |||
} | |||
} | |||
export type RequestDecorator = (req: RequestContext) => RequestContext | Promise<RequestContext>; | |||
export type ParamRequestDecorator<Params extends Array<unknown> = []> = (...args: Params) => RequestDecorator; | |||
// TODO put this in HTTP | |||
export type Method = 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE' | 'OPTIONS' | 'HEAD'; | |||
export interface AllowedMiddlewareSpecification<Schema extends BaseSchema = BaseSchema> { | |||
@@ -25,7 +25,8 @@ import {getBody, isTextMediaType} from './utils'; | |||
import {decorateRequestWithBackend} from './decorators/backend'; | |||
import {decorateRequestWithMethod} from './decorators/method'; | |||
import {decorateRequestWithUrl} from './decorators/url'; | |||
import {HttpMiddlewareError, PlainResponse} from './response'; | |||
import {ErrorPlainResponse, PlainResponse} from './response'; | |||
import EventEmitter from 'events'; | |||
type RequiredResource = Required<Pick<RequestContext, 'resource'>>['resource']; | |||
@@ -42,8 +43,8 @@ declare module '../../common' { | |||
body?: unknown; | |||
} | |||
interface Middleware<Req extends ResourceRequestContext = ResourceRequestContext> { | |||
(req: Req): undefined | Response | Promise<undefined | Response>; | |||
interface Middleware<Req extends ResourceRequestContext = ResourceRequestContext, Res extends NodeJS.EventEmitter = NodeJS.EventEmitter> { | |||
(req: Req, res: Res): undefined | Response | Promise<undefined | Response>; | |||
} | |||
} | |||
@@ -137,8 +138,13 @@ export interface CreateServerParams { | |||
streamResponses?: boolean; | |||
} | |||
class CqrsEventEmitter extends EventEmitter { | |||
} | |||
export const createServer = (backendState: BackendState, serverParams = {} as CreateServerParams) => { | |||
const isHttps = 'key' in serverParams && 'cert' in serverParams; | |||
const theRes = new CqrsEventEmitter(); | |||
const server = isHttps | |||
? https.createServer({ | |||
@@ -196,14 +202,16 @@ export const createServer = (backendState: BackendState, serverParams = {} as Cr | |||
const theBodyBuffer = await getBody(req); | |||
const encodingPair = req.backend.app.charsets.get(charset); | |||
if (typeof encodingPair === 'undefined') { | |||
throw new HttpMiddlewareError('unableToDecodeResource', { | |||
throw new ErrorPlainResponse('unableToDecodeResource', { | |||
statusCode: constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, | |||
res: theRes, | |||
}); | |||
} | |||
const deserializerPair = req.backend.app.mediaTypes.get(mediaType); | |||
if (typeof deserializerPair === 'undefined') { | |||
throw new HttpMiddlewareError('unableToDeserializeResource', { | |||
throw new ErrorPlainResponse('unableToDeserializeResource', { | |||
statusCode: constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, | |||
res: theRes, | |||
}); | |||
} | |||
const theBodyStr = encodingPair.decode(theBodyBuffer); | |||
@@ -216,23 +224,27 @@ export const createServer = (backendState: BackendState, serverParams = {} as Cr | |||
// TODO better error reporting, localizable messages | |||
// TODO handle error handlers' errors | |||
if (Array.isArray(err.issues)) { | |||
throw new HttpMiddlewareError('invalidResource', { | |||
throw new ErrorPlainResponse('invalidResource', { | |||
statusCode: constants.HTTP_STATUS_BAD_REQUEST, | |||
body: err.issues.map((i) => ( | |||
`${i.path?.map((p) => p.key)?.join('.') ?? i.reason}: ${i.message}` | |||
)), | |||
res: theRes, | |||
}); | |||
} | |||
} | |||
} | |||
const result = await middleware(req); | |||
const result = await middleware(req, theRes); | |||
// HEAD is just GET without the response body | |||
if (req.method === 'HEAD' && result instanceof PlainResponse) { | |||
const { body: _, ...etcResult } = result; | |||
return new PlainResponse(etcResult); | |||
return new PlainResponse({ | |||
...etcResult, | |||
res: theRes, | |||
}); | |||
} | |||
return result; | |||
@@ -240,34 +252,37 @@ export const createServer = (backendState: BackendState, serverParams = {} as Cr | |||
const processRequest = (middlewares: AllowedMiddlewareSpecification[]) => async (req: ResourceRequestContext) => { | |||
if (req.url === '/' || req.url === '') { | |||
return handleGetRoot(req); | |||
return handleGetRoot(req, theRes); | |||
} | |||
const { resource } = req; | |||
if (typeof resource === 'undefined') { | |||
throw new HttpMiddlewareError('resourceNotFound', { | |||
throw new ErrorPlainResponse('resourceNotFound', { | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND, | |||
res: theRes, | |||
}); | |||
} | |||
if (req.method === 'OPTIONS') { | |||
return handleOptions(middlewares)(req); | |||
return handleOptions(middlewares)(req, theRes); | |||
} | |||
if (typeof resource.dataSource === 'undefined') { | |||
throw new HttpMiddlewareError('unableToInitializeResourceDataSource', { | |||
throw new ErrorPlainResponse('unableToInitializeResourceDataSource', { | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res: theRes, | |||
}); | |||
} | |||
try { | |||
await resource.dataSource.initialize(); | |||
} catch (cause) { | |||
throw new HttpMiddlewareError( | |||
throw new ErrorPlainResponse( | |||
'unableToInitializeResourceDataSource', | |||
{ | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res: theRes, | |||
} | |||
); | |||
} | |||
@@ -281,8 +296,9 @@ export const createServer = (backendState: BackendState, serverParams = {} as Cr | |||
) as Awaited<ReturnType<Middleware>>; | |||
if (typeof middlewareResponse === 'undefined') { | |||
throw new HttpMiddlewareError('resourceNotFound', { | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND | |||
throw new ErrorPlainResponse('resourceNotFound', { | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND, | |||
res: theRes, | |||
}); | |||
} | |||
@@ -313,12 +329,13 @@ export const createServer = (backendState: BackendState, serverParams = {} as Cr | |||
: defaultCollectionMiddlewares | |||
); | |||
const middlewares = effectiveMiddlewares.filter((m) => m.allowed(resourceReq.resource)); | |||
// TODO listen to res.on('response') | |||
const processRequestFn = processRequest(middlewares); | |||
let middlewareState: Response; | |||
try { | |||
middlewareState = await processRequestFn(resourceReq) as any; // TODO fix this | |||
} catch (processRequestErrRaw) { | |||
const finalErr = processRequestErrRaw as HttpMiddlewareError; | |||
const finalErr = processRequestErrRaw as ErrorPlainResponse; | |||
const headers = finalErr.headers ?? {}; | |||
let encoded: Buffer | undefined; | |||
let serialized; | |||
@@ -1,5 +1,5 @@ | |||
import {ParamRequestDecorator} from '../../../../common'; | |||
import {CreateServerParams} from '../../index'; | |||
import {CreateServerParams} from '../../core'; | |||
import {decorateRequestWithScheme} from './scheme'; | |||
import {decorateRequestWithHost} from './host'; | |||
import {decorateRequestWithBasePath} from './base-path'; | |||
@@ -1,9 +1,9 @@ | |||
import {constants} from 'http2'; | |||
import {AllowedMiddlewareSpecification, Middleware} from '../../../common'; | |||
import {LinkMap} from '../utils'; | |||
import {PlainResponse, HttpMiddlewareError} from '../response'; | |||
import {PlainResponse, ErrorPlainResponse} from '../response'; | |||
export const handleGetRoot: Middleware = (req) => { | |||
export const handleGetRoot: Middleware = (req, res) => { | |||
const { backend, basePath } = req; | |||
const data = { | |||
@@ -35,23 +35,26 @@ export const handleGetRoot: Middleware = (req) => { | |||
headers, | |||
statusMessage: 'ok', | |||
statusCode: constants.HTTP_STATUS_OK, | |||
body: data | |||
body: data, | |||
res, | |||
}); | |||
}; | |||
export const handleOptions = (middlewares: AllowedMiddlewareSpecification[]): Middleware => () => { | |||
export const handleOptions = (middlewares: AllowedMiddlewareSpecification[]): Middleware => (_req, res) => { | |||
if (middlewares.length > 0) { | |||
return new PlainResponse({ | |||
headers: { | |||
'Allow': middlewares.flatMap((m) => m.method === 'GET' ? [m.method, 'HEAD'] : [m.method]).join(', '), | |||
}, | |||
statusMessage: 'ok', | |||
statusMessage: 'provideOptions', | |||
statusCode: constants.HTTP_STATUS_NO_CONTENT, | |||
res, | |||
}); | |||
} | |||
// TODO add option for custom error handler | |||
throw new HttpMiddlewareError('methodNotAllowed', { | |||
throw new ErrorPlainResponse('methodNotAllowed', { | |||
statusCode: constants.HTTP_STATUS_METHOD_NOT_ALLOWED, | |||
res, | |||
}); | |||
}; |
@@ -1,9 +1,9 @@ | |||
import { constants } from 'http2'; | |||
import * as v from 'valibot'; | |||
import {Middleware} from '../../../common'; | |||
import {HttpMiddlewareError, PlainResponse} from '../response'; | |||
import {ErrorPlainResponse, PlainResponse} from '../response'; | |||
export const handleGetCollection: Middleware = async (req) => { | |||
export const handleGetCollection: Middleware = async (req, res) => { | |||
const { query, resource, backend } = req; | |||
let data: v.Output<typeof resource.schema>[]; | |||
@@ -15,11 +15,12 @@ export const handleGetCollection: Middleware = async (req) => { | |||
totalItemCount = await resource.dataSource.getTotalCount(query); | |||
} | |||
} catch (cause) { | |||
throw new HttpMiddlewareError( | |||
throw new ErrorPlainResponse( | |||
'unableToFetchResourceCollection', | |||
{ | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
} | |||
); | |||
} | |||
@@ -34,17 +35,19 @@ export const handleGetCollection: Middleware = async (req) => { | |||
statusCode: constants.HTTP_STATUS_OK, | |||
statusMessage: 'resourceCollectionFetched', | |||
body: data, | |||
res, | |||
}); | |||
}; | |||
export const handleGetItem: Middleware = async (req) => { | |||
export const handleGetItem: Middleware = async (req, res) => { | |||
const { resource, resourceId } = req; | |||
if (typeof resourceId === 'undefined' || resourceId.trim().length < 1) { | |||
throw new HttpMiddlewareError( | |||
throw new ErrorPlainResponse( | |||
'resourceIdNotGiven', | |||
{ | |||
statusCode: constants.HTTP_STATUS_BAD_REQUEST, | |||
res, | |||
} | |||
); | |||
} | |||
@@ -53,39 +56,43 @@ export const handleGetItem: Middleware = async (req) => { | |||
try { | |||
data = await resource.dataSource.getById(resourceId); | |||
} catch (cause) { | |||
throw new HttpMiddlewareError( | |||
throw new ErrorPlainResponse( | |||
'unableToFetchResource', | |||
{ | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
} | |||
); | |||
} | |||
if (typeof data !== 'undefined' && data !== null) { | |||
return new PlainResponse({ | |||
statusCode: constants.HTTP_STATUS_OK, | |||
statusMessage: 'resourceFetched', | |||
body: data | |||
}); | |||
if (!(typeof data !== 'undefined' && data !== null)) { | |||
throw new ErrorPlainResponse( | |||
'resourceNotFound', | |||
{ | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND, | |||
res, | |||
}, | |||
); | |||
} | |||
throw new HttpMiddlewareError( | |||
'resourceNotFound', | |||
{ | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND, | |||
} | |||
); | |||
return new PlainResponse({ | |||
statusCode: constants.HTTP_STATUS_OK, | |||
statusMessage: 'resourceFetched', | |||
body: data, | |||
res, | |||
}); | |||
}; | |||
export const handleDeleteItem: Middleware = async (req) => { | |||
export const handleDeleteItem: Middleware = async (req, res) => { | |||
const { resource, resourceId, backend } = req; | |||
if (typeof resourceId === 'undefined' || resourceId.trim().length < 1) { | |||
throw new HttpMiddlewareError( | |||
throw new ErrorPlainResponse( | |||
'resourceIdNotGiven', | |||
{ | |||
statusCode: constants.HTTP_STATUS_BAD_REQUEST, | |||
res, | |||
} | |||
); | |||
} | |||
@@ -94,15 +101,17 @@ export const handleDeleteItem: Middleware = async (req) => { | |||
try { | |||
existing = await resource.dataSource.getById(resourceId); | |||
} catch (cause) { | |||
throw new HttpMiddlewareError('unableToFetchResource', { | |||
throw new ErrorPlainResponse('unableToFetchResource', { | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
if (!existing && backend!.throwsErrorOnDeletingNotFound) { | |||
throw new HttpMiddlewareError('deleteNonExistingResource', { | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND | |||
throw new ErrorPlainResponse('deleteNonExistingResource', { | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND, | |||
res | |||
}); | |||
} | |||
@@ -111,26 +120,29 @@ export const handleDeleteItem: Middleware = async (req) => { | |||
await resource.dataSource.delete(resourceId); | |||
} | |||
} catch (cause) { | |||
throw new HttpMiddlewareError('unableToDeleteResource', { | |||
throw new ErrorPlainResponse('unableToDeleteResource', { | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res | |||
}) | |||
} | |||
return new PlainResponse({ | |||
statusCode: constants.HTTP_STATUS_NO_CONTENT, | |||
statusMessage: 'resourceDeleted', | |||
res, | |||
}); | |||
}; | |||
export const handlePatchItem: Middleware = async (req) => { | |||
export const handlePatchItem: Middleware = async (req, res) => { | |||
const { resource, resourceId, body } = req; | |||
if (typeof resourceId === 'undefined' || resourceId.trim().length < 1) { | |||
throw new HttpMiddlewareError( | |||
throw new ErrorPlainResponse( | |||
'resourceIdNotGiven', | |||
{ | |||
statusCode: constants.HTTP_STATUS_BAD_REQUEST, | |||
res, | |||
} | |||
); | |||
} | |||
@@ -139,15 +151,17 @@ export const handlePatchItem: Middleware = async (req) => { | |||
try { | |||
existing = await resource.dataSource.getById(resourceId!); | |||
} catch (cause) { | |||
throw new HttpMiddlewareError('unableToFetchResource', { | |||
throw new ErrorPlainResponse('unableToFetchResource', { | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
if (!existing) { | |||
throw new HttpMiddlewareError('patchNonExistingResource', { | |||
throw new ErrorPlainResponse('patchNonExistingResource', { | |||
statusCode: constants.HTTP_STATUS_NOT_FOUND, | |||
res, | |||
}); | |||
} | |||
@@ -155,9 +169,10 @@ export const handlePatchItem: Middleware = async (req) => { | |||
try { | |||
newObject = await resource.dataSource.patch(resourceId!, body as object); | |||
} catch (cause) { | |||
throw new HttpMiddlewareError('unableToPatchResource', { | |||
throw new ErrorPlainResponse('unableToPatchResource', { | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
@@ -165,16 +180,18 @@ export const handlePatchItem: Middleware = async (req) => { | |||
statusCode: constants.HTTP_STATUS_OK, | |||
statusMessage: 'resourcePatched', | |||
body: newObject, | |||
res, | |||
}); | |||
}; | |||
export const handleCreateItem: Middleware = async (req) => { | |||
export const handleCreateItem: Middleware = async (req, res) => { | |||
const { resource, body, backend, basePath } = req; | |||
const idAttrRaw = resource.state.shared.get('idAttr'); | |||
if (typeof idAttrRaw === 'undefined') { | |||
throw new HttpMiddlewareError('unableToGenerateIdFromResourceDataSource', { | |||
throw new ErrorPlainResponse('unableToGenerateIdFromResourceDataSource', { | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
const idAttr = idAttrRaw as string; | |||
@@ -186,28 +203,35 @@ export const handleCreateItem: Middleware = async (req) => { | |||
params = { ...body as Record<string, unknown> }; | |||
params[idAttr] = newId; | |||
} catch (cause) { | |||
throw new HttpMiddlewareError('unableToGenerateIdFromResourceDataSource', { | |||
throw new ErrorPlainResponse('unableToGenerateIdFromResourceDataSource', { | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
const location = `${basePath}/${resource.state.routeName}/${newId}`; | |||
res.emit('response', { | |||
Location: location, | |||
}); | |||
// already return 202 accepted here | |||
let newObject; | |||
let totalItemCount: number | undefined; | |||
try { | |||
newObject = await resource.dataSource.create(params); | |||
if (backend!.showTotalItemCountOnCreateItem && typeof resource.dataSource.getTotalCount === 'function') { | |||
totalItemCount = await resource.dataSource.getTotalCount(); | |||
} | |||
} catch (cause) { | |||
throw new HttpMiddlewareError('unableToCreateResource', { | |||
throw new ErrorPlainResponse('unableToCreateResource', { | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
const location = `${basePath}/${resource.state.routeName}/${newId}`; | |||
if (typeof totalItemCount !== 'undefined') { | |||
return new PlainResponse({ | |||
statusCode: constants.HTTP_STATUS_CREATED, | |||
@@ -216,7 +240,8 @@ export const handleCreateItem: Middleware = async (req) => { | |||
'X-Resource-Total-Item-Count': totalItemCount.toString() | |||
}, | |||
body: newObject, | |||
statusMessage: 'resourceCreated' | |||
statusMessage: 'resourceCreated', | |||
res, | |||
}); | |||
} | |||
@@ -226,17 +251,19 @@ export const handleCreateItem: Middleware = async (req) => { | |||
headers: { | |||
'Location': location, | |||
}, | |||
statusMessage: 'resourceCreated' | |||
statusMessage: 'resourceCreated', | |||
res, | |||
}); | |||
} | |||
export const handleEmplaceItem: Middleware = async (req) => { | |||
export const handleEmplaceItem: Middleware = async (req, res) => { | |||
const { resource, resourceId, basePath, body, backend } = req; | |||
const idAttrRaw = resource.state.shared.get('idAttr'); | |||
if (typeof idAttrRaw === 'undefined') { | |||
throw new HttpMiddlewareError('unableToGenerateIdFromResourceDataSource', { | |||
throw new ErrorPlainResponse('unableToGenerateIdFromResourceDataSource', { | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
const idAttr = idAttrRaw as string; | |||
@@ -248,9 +275,10 @@ export const handleEmplaceItem: Middleware = async (req) => { | |||
params[idAttr] = resourceId; | |||
[newObject, isCreated] = await resource.dataSource.emplace(resourceId!, params); | |||
} catch (cause) { | |||
throw new HttpMiddlewareError('unableToEmplaceResource', { | |||
throw new ErrorPlainResponse('unableToEmplaceResource', { | |||
cause, | |||
statusCode: constants.HTTP_STATUS_INTERNAL_SERVER_ERROR, | |||
res, | |||
}); | |||
} | |||
@@ -274,6 +302,7 @@ export const handleEmplaceItem: Middleware = async (req) => { | |||
? 'resourceCreated' | |||
: 'resourceReplaced' | |||
), | |||
body: newObject | |||
body: newObject, | |||
res, | |||
}); | |||
} |
@@ -1,20 +1,25 @@ | |||
import {Language, LanguageStatusMessageMap} from '../../../common'; | |||
import {MiddlewareResponseError, Response} from '../../common'; | |||
interface PlainResponseParams<T = unknown> extends Response { | |||
interface PlainResponseParams<T = unknown, U extends NodeJS.EventEmitter = NodeJS.EventEmitter> extends Response { | |||
body?: T; | |||
res: U; | |||
} | |||
interface HttpMiddlewareErrorParams<T = unknown> extends Omit<PlainResponseParams<T>, 'statusMessage'> { | |||
interface HttpMiddlewareErrorParams<T = unknown, U extends NodeJS.EventEmitter = NodeJS.EventEmitter> extends Omit<PlainResponseParams<T, U>, 'statusMessage'> { | |||
cause?: unknown | |||
} | |||
export class HttpMiddlewareError<T = unknown> extends MiddlewareResponseError implements PlainResponseParams<T> { | |||
body?: T; | |||
export class ErrorPlainResponse<T = unknown, U extends NodeJS.EventEmitter = NodeJS.EventEmitter> extends MiddlewareResponseError implements PlainResponseParams<T, U> { | |||
readonly body?: T; | |||
readonly res: U; | |||
constructor(statusMessage: keyof Language['statusMessages'], params: HttpMiddlewareErrorParams<T>) { | |||
constructor(statusMessage: keyof Language['statusMessages'], params: HttpMiddlewareErrorParams<T, U>) { | |||
super(statusMessage, params); | |||
this.body = params.body; | |||
this.res = params.res; | |||
this.res.emit('response', this); | |||
this.res.emit('close'); | |||
} | |||
} | |||
@@ -32,5 +37,9 @@ export class PlainResponse<T = unknown> implements Response { | |||
this.statusMessage = args.statusMessage; | |||
this.headers = args.headers; | |||
this.body = args.body; | |||
args.res.emit('response', this); | |||
args.res.emit('close'); | |||
} | |||
} | |||
// TODO stream response |
@@ -34,6 +34,7 @@ export const LANGUAGE_DEFAULT_STATUS_MESSAGE_KEYS = [ | |||
'resourceCreated', | |||
'resourceReplaced', | |||
'notImplemented', | |||
'provideOptions', | |||
] as const; | |||
export type LanguageDefaultStatusMessageKey = typeof LANGUAGE_DEFAULT_STATUS_MESSAGE_KEYS[number]; | |||
@@ -74,6 +75,7 @@ export const FALLBACK_LANGUAGE = { | |||
urlNotFound: 'URL Not Found', | |||
badRequest: 'Bad Request', | |||
ok: 'OK', | |||
provideOptions: 'Provide Options', | |||
resourceCollectionFetched: '$RESOURCE Collection Fetched', | |||
resourceFetched: '$RESOURCE Fetched', | |||
resourceNotFound: '$RESOURCE Not Found', | |||