From 85de3dc3725c4c722943cd7d7e9c729fbfe1d9fd Mon Sep 17 00:00:00 2001 From: Dillon Nys <24740863+dnys1@users.noreply.github.com> Date: Sun, 29 Sep 2024 13:01:36 -0700 Subject: [PATCH] feat(celest): Add request context (#176) Adds a context object which is unique per-request and holds request-specific information. The context will also serve as the global service locator for DI. --- packages/celest/lib/celest.dart | 2 + packages/celest/lib/src/core/annotations.dart | 33 ++ packages/celest/lib/src/core/context.dart | 217 +++++++++--- packages/celest/lib/src/core/environment.dart | 18 + packages/celest/lib/src/runtime/gcp/gcp.dart | 19 ++ .../celest/lib/src/runtime/http/logging.dart | 118 +++++++ .../lib/src/runtime/http/middleware.dart | 186 +++++++++++ packages/celest/lib/src/runtime/serve.dart | 316 +----------------- packages/celest/lib/src/runtime/targets.dart | 159 +++++++++ packages/celest/pubspec.yaml | 6 + 10 files changed, 734 insertions(+), 340 deletions(-) create mode 100644 packages/celest/lib/src/core/environment.dart create mode 100644 packages/celest/lib/src/runtime/gcp/gcp.dart create mode 100644 packages/celest/lib/src/runtime/http/logging.dart create mode 100644 packages/celest/lib/src/runtime/http/middleware.dart create mode 100644 packages/celest/lib/src/runtime/targets.dart diff --git a/packages/celest/lib/celest.dart b/packages/celest/lib/celest.dart index ffed66da..6320dad1 100644 --- a/packages/celest/lib/celest.dart +++ b/packages/celest/lib/celest.dart @@ -2,6 +2,7 @@ library celest; export 'package:celest_core/celest_core.dart'; +export 'package:shelf/shelf.dart' show Request, Response; /// Auth export 'src/auth/auth.dart'; @@ -14,6 +15,7 @@ export 'src/config/env.dart'; export 'src/core/annotations.dart'; export 'src/core/cloud_widget.dart'; export 'src/core/context.dart'; +export 'src/core/environment.dart'; export 'src/core/project.dart'; /// Functions diff --git a/packages/celest/lib/src/core/annotations.dart b/packages/celest/lib/src/core/annotations.dart index 22918737..ed48925a 100644 --- a/packages/celest/lib/src/core/annotations.dart +++ b/packages/celest/lib/src/core/annotations.dart @@ -1,3 +1,4 @@ +import 'package:celest/celest.dart'; import 'package:meta/meta_meta.dart'; /// Marks a function or library as a cloud API. @@ -41,3 +42,35 @@ const customOverride = _CustomOverride(); final class _CustomOverride { const _CustomOverride(); } + +/// {@template celest.core.principal} +/// A contextual reference to the principal ([User]) invoking a [CloudFunction]. +/// +/// For more information, see [Authorizing your functions](https://celest.dev/docs/functions/authorizing-functions). +/// +/// ## Example +/// +/// To inject a user into an `@authenticated` function: +/// +/// ```dart +/// @authenticated +/// Future sayHello({ +/// @principal required User user, +/// }) async { +/// print('Hello, ${user.displayName}!'); +/// } +/// ``` +/// +/// If a user is injected to a `@public` or private function, then the +/// user parameter must be nullable: +/// +/// ```dart +/// @public +/// Future sayHello({ +/// @principal User? user, +/// }) async { +/// print('Hello, ${user?.displayName ?? 'stranger'}!'); +/// } +/// ``` +/// {@endtemplate} +const principal = ContextKey.principal; diff --git a/packages/celest/lib/src/core/context.dart b/packages/celest/lib/src/core/context.dart index 3d925c8f..f27da05a 100644 --- a/packages/celest/lib/src/core/context.dart +++ b/packages/celest/lib/src/core/context.dart @@ -1,48 +1,187 @@ +import 'dart:async'; +import 'dart:io'; + import 'package:celest/celest.dart'; +import 'package:celest/src/runtime/gcp/gcp.dart'; +import 'package:celest_core/_internal.dart'; +import 'package:cloud_http/cloud_http.dart'; +import 'package:meta/meta.dart'; +import 'package:shelf/shelf.dart' as shelf; -/// {@template celest.core.principal} -/// A contextual reference to the principal ([User]) invoking a [CloudFunction]. -/// -/// For more information, see [Authorizing your functions](https://celest.dev/docs/functions/authorizing-functions). -/// -/// ## Example -/// -/// To inject a user into an `@authenticated` function: -/// -/// ```dart -/// @authenticated -/// Future sayHello({ -/// @principal required User user, -/// }) async { -/// print('Hello, ${user.displayName}!'); -/// } -/// ``` -/// -/// If a user is injected to a `@public` or private function, then the -/// user parameter must be nullable: -/// -/// ```dart -/// @public -/// Future sayHello({ -/// @principal User? user, -/// }) async { -/// print('Hello, ${user?.displayName ?? 'stranger'}!'); -/// } -/// ``` -/// {@endtemplate} -const principal = _UserContext(); +/// The [Context] for the current request. +Context get context => Context.current; -/// {@template celest.core.context} -/// The context of a [CloudFunction] invocation. +/// {@template celest.runtime.celest_context} +/// A per-request context object which propogates request information and common accessors to the Celest server environment. /// {@endtemplate} final class Context { - const Context._(); + /// {@macro celest.runtime.celest_context} + Context._(this._zone); + + /// The [Context] for the given [zone]. + factory Context.of(Zone zone) { + return _contexts[zone] ??= Context._(zone); + } + + /// All context objects by their [Zone]. + /// + /// Contexts are attached to a zone such that they are disposed + /// when the Zone in which they were created is disposed. + static final Expando _contexts = Expando('contexts'); + + /// The root [Context]. + static final Context root = Context.of(Zone.root); + + /// The [Context] for the current execution scope. + static Context get current => Context.of(Zone.current); + + /// Context-specific values. + final Map, Object> _values = {}; + + /// The zone in which this context was created. + final Zone _zone; + + /// Retrieves the value in this context for the given [key]. + Object? operator [](ContextKey key) { + return _values[key]; + } + + /// Sets the value of the given [key] in this context. + void operator []=(ContextKey key, Object? value) { + if (value == null) { + _values.remove(key); + } else { + _values[key] = value; + } + } + + /// The parent [Context] to `this`. + Context? get parent { + var parent = _zone.parent; + while (parent != null) { + if (_contexts[parent] case final parentContext?) { + return parentContext; + } + parent = parent.parent; + } + return null; + } + + /// Whether Celest is running in the cloud. + bool get isRunningInCloud => root.get(googleCloudProjectKey) != null; + + /// The shelf [shelf.Request] object which triggered the current function invocation. + shelf.Request get currentRequest => expect(ContextKey.currentRequest); + + /// The [Traceparent] for the current request. + Traceparent get currentTrace => expect(ContextKey.currentTrace); + + /// The Celest [Environment] of the running service. + Environment get environment { + return Environment(Platform.environment['ENV']!); + } + + (Context, V)? _get(ContextKey key) { + if (key.read(this) case final value?) { + return (this, value); + } + return parent?._get(key); + } + + /// The value for the given [key] in the current [Context]. + V? get(ContextKey key) { + return _get(key)?.$2; + } + + /// Expects a value present in the given [context]. + /// + /// Throws a [StateError] if the value is not present. + V expect(ContextKey key) { + final value = get(key); + if (value == null) { + throw StateError('Expected value for key "$key" in context'); + } + return value; + } + + /// Sets the value of [key] in the current [Context]. + void put(ContextKey key, V value) { + key.set(this, value); + } + + /// Updates the value of [key] in place. + void update( + ContextKey key, + V Function(V? value) update, + ) { + final (context, value) = _get(key) ?? (this, null); + final updated = update(value); + context.put(key, updated); + } +} + +/// {@template celest.runtime.context_key} +/// A key for a typed value stored in a [Context]. +/// {@endtemplate} +@immutable +abstract interface class ContextKey { + /// {@macro celest.runtime.context_key} + const factory ContextKey([String? label]) = _ContextKey; + + /// The context key for the current [shelf.Request]. + static const ContextKey currentRequest = + ContextKey('current request'); + + /// The context key for the current [Traceparent]. + static const ContextKey currentTrace = + ContextKey('current trace'); + + /// The context key for the current [User]. + static const ContextKey principal = _PrincipalContextKey(); + + /// Reads the value for `this` from the given [context]. + V? read(Context context); + + /// Sets the value for `this` in the given [context]. + void set(Context context, V? value); +} + +final class _ContextKey implements ContextKey { + const _ContextKey([this.label]); + + final String? label; + + @override + V? read(Context context) { + return context[this] as V?; + } + + @override + void set(Context context, V? value) { + context[this] = value; + } + + @override + bool operator ==(Object other) { + return identical(this, other) || + other is _ContextKey && other.label == label; + } + + @override + int get hashCode => Object.hash(_ContextKey, label); - /// {@macro celest.core.principal} - @Deprecated('Use @principal instead.') - static const Context user = principal; + @override + String toString() { + if (label case final label?) { + return label; + } + if (kDebugMode || !context.environment.isProduction) { + return '<$V>'; + } + return ''; + } } -final class _UserContext implements Context { - const _UserContext(); +final class _PrincipalContextKey extends _ContextKey { + const _PrincipalContextKey() : super('principal'); } diff --git a/packages/celest/lib/src/core/environment.dart b/packages/celest/lib/src/core/environment.dart new file mode 100644 index 00000000..5ab9eb5a --- /dev/null +++ b/packages/celest/lib/src/core/environment.dart @@ -0,0 +1,18 @@ +/// An environment of a deployed Celest service. +/// +/// Celest services can have multiple isolated branches, for example +/// a `development` and `production` environment. +extension type const Environment(String _env) implements String { + /// The local Celest environment, used to delineate when a + /// Celest service is running on a developer machine as opposed + /// to the cloud. + static const Environment local = Environment('local'); + + /// The production Celest environment which is common to all + /// Celest projects and labels the environment which is considered + /// live and served to end-users. + static const Environment production = Environment('production'); + + /// Whether `this` represents the production environment. + bool get isProduction => this == production; +} diff --git a/packages/celest/lib/src/runtime/gcp/gcp.dart b/packages/celest/lib/src/runtime/gcp/gcp.dart new file mode 100644 index 00000000..7152628c --- /dev/null +++ b/packages/celest/lib/src/runtime/gcp/gcp.dart @@ -0,0 +1,19 @@ +@internal +library; + +import 'package:celest/celest.dart'; +import 'package:google_cloud/google_cloud.dart'; +import 'package:meta/meta.dart'; + +/// The context key for the active GCP project ID. +const ContextKey googleCloudProjectKey = ContextKey(); + +/// Returns the GCP project ID for the active environment or +/// `null` if running locally. +Future googleCloudProject() async { + try { + return await computeProjectId(); + } on Exception { + return null; + } +} diff --git a/packages/celest/lib/src/runtime/http/logging.dart b/packages/celest/lib/src/runtime/http/logging.dart new file mode 100644 index 00000000..b558b760 --- /dev/null +++ b/packages/celest/lib/src/runtime/http/logging.dart @@ -0,0 +1,118 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:developer' as developer; +import 'dart:io'; + +import 'package:celest/celest.dart'; +import 'package:celest/src/runtime/gcp/gcp.dart'; +import 'package:celest_core/_internal.dart'; +import 'package:cloud_http/cloud_http.dart'; +import 'package:logging/logging.dart'; +import 'package:stack_trace/stack_trace.dart'; + +/// Configures logging depending on the environment in which +/// Celest is running. +void configureLogging() { + Logger.root.level = Level.ALL; + Logger.root.onRecord.listen((record) { + if (kDebugMode) { + return developer.log( + record.message, + time: record.time, + sequenceNumber: record.sequenceNumber, + level: record.level.value, + name: record.loggerName, + zone: record.zone, + error: record.error, + stackTrace: record.stackTrace, + ); + } + final context = Context.of(record.zone ?? Zone.current); + if (context.isRunningInCloud) { + final trace = context.currentTrace; + final jsonRecord = record.logEntry(trace: trace); + stdout.writeln(jsonRecord); + } else { + final message = StringBuffer(record.message); + if (record.error case final error?) { + message + ..writeln() + ..write(error); + if (record.stackTrace case final stackTrace?) { + message + ..writeln() + ..write(stackTrace); + } + } + stdout.writeln('[${record.loggerName}] ${record.level.name}: $message'); + } + }); +} + +extension on LogRecord { + String get severity => switch (this) { + Level.FINE || Level.FINER || Level.FINEST => 'DEBUG', + Level.INFO => 'INFO', + Level.CONFIG => 'NOTICE', + Level.WARNING => 'WARNING', + Level.SEVERE => 'ERROR', + Level.SHOUT => 'CRITICAL', + _ => 'DEFAULT', + }; + + String logEntry({ + Traceparent? trace, + }) { + final message = StringBuffer(this.message); + + Chain? chain; + if (error != null) { + chain = switch (stackTrace) { + final stackTrace? when stackTrace != StackTrace.empty => + Chain.forTrace(stackTrace), + _ => Chain.current(), + }; + chain = chain.foldFrames( + (f) => f.isCore || (f.package?.startsWith('celest') ?? false), + terse: true, + ); + message + ..writeln() + ..writeln(error) + ..write(chain.toString()); + } + + // https://cloud.google.com/logging/docs/agent/logging/configuration#special-fields + final logContent = { + 'message': message.toString().trim(), + 'severity': severity, + 'timestamp': time.toIso8601String(), + if (trace != null) ...{ + 'logging.googleapis.com/trace': trace.cloudTraceId, + 'logging.googleapis.com/spanId': trace.parentId, + 'logging.googleapis.com/trace_sampled': trace.traceFlagSampled != 0, + }, + if (chain?.traces.firstOrNull?.frames.firstOrNull case final stackFrame?) + 'logging.googleapis.com/sourceLocation': stackFrame.sourceLocation, + }; + return jsonEncode(logContent); + } +} + +extension on Traceparent { + String get cloudTraceId { + final projectId = Context.root.expect(googleCloudProjectKey); + return 'projects/$projectId/traces/$parentId'; + } +} + +extension on Frame { +// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogEntrySourceLocation + Map get sourceLocation => { + // TODO: Will need to fix `package:` URIs to file paths when possible + // GoogleCloudPlatform/functions-framework-dart#40 + 'file': library, + if (line != null) 'line': line.toString(), + 'function': member, + }; +} diff --git a/packages/celest/lib/src/runtime/http/middleware.dart b/packages/celest/lib/src/runtime/http/middleware.dart new file mode 100644 index 00000000..716e53e3 --- /dev/null +++ b/packages/celest/lib/src/runtime/http/middleware.dart @@ -0,0 +1,186 @@ +@internal +library; + +import 'dart:async'; +import 'dart:convert'; +import 'dart:math'; + +import 'package:celest/celest.dart'; +import 'package:celest/src/runtime/json_utils.dart'; +import 'package:cloud_http/cloud_http.dart'; +import 'package:convert/convert.dart'; +import 'package:fixnum/fixnum.dart'; +import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; +import 'package:shelf/shelf.dart'; + +/// Serves CORS headers on the local server so that Web clients can access it. +Handler corsMiddleware(Handler inner) { + return (request) async { + final corsHeaders = { + // TODO(dnys1): Restrict these in production + 'Access-Control-Allow-Origin': request.headers['origin'] ?? '*', + // Needed when `Access-Control-Allow-Origin` is a specific domain, + // e.g. not a wildcard. + 'Vary': 'Origin', + + 'Access-Control-Allow-Methods': '*', + 'Access-Control-Allow-Headers': + request.headers['access-control-request-headers'] ?? '*', + 'Access-Control-Expose-Headers': '*', + 'Access-Control-Allow-Credentials': 'true', + }; + if (request.method == 'OPTIONS') { + return Response.ok( + null, + headers: corsHeaders, + ); + } + final response = await inner(request); + return response.change( + headers: corsHeaders, + ); + }; +} + +/// Handles common exception types and maps exceptions to HTTP responses. +Handler cloudExceptionMiddleware(Handler inner) { + return (request) async { + try { + return await inner(request); + } on Exception catch (e) { + if (e is HijackException) rethrow; + return _badRequest( + code: e.runtimeType.toString(), + ); + } on Error catch (e) { + return _internalServerError( + code: e.runtimeType.toString(), + ); + } + }; +} + +Response _badRequest({ + required String code, + Object? details, +}) { + return Response.badRequest( + headers: { + contentTypeHeader: jsonContentType, + }, + body: jsonEncode({ + 'error': { + 'code': code, + if (details != null) 'details': details, + }, + }), + ); +} + +Response _internalServerError({ + required String code, + Object? details, +}) { + return Response.internalServerError( + headers: { + contentTypeHeader: jsonContentType, + }, + body: jsonEncode({ + 'error': { + 'code': code, + if (details != null) 'details': details, + }, + }), + ); +} + +/// Standard header used by +/// [Cloud Trace](https://cloud.google.com/trace/docs/setup). +const cloudTraceContextHeader = 'x-cloud-trace-context'; + +/// Configures the [Context] for each request. +/// +/// Should be run before all other middleware. +Handler rootMiddleware(Handler inner) { + return (request) async { + final completer = Completer.sync(); + + final requestZone = Zone.current.fork( + specification: ZoneSpecification( + handleUncaughtError: (self, parent, zone, error, stackTrace) { + Logger.root.shout( + 'An unexpected error occurred', + error, + stackTrace, + ); + if (completer.isCompleted) { + return; + } + completer.completeError(error, stackTrace); + }, + print: (self, parent, zone, line) { + Logger.root.info(line); + }, + ), + ); + requestZone.runGuarded( + () async { + Context.current + ..put(ContextKey.currentRequest, request) + ..put(ContextKey.currentTrace, request.trace); + final response = await inner(request); + if (!completer.isCompleted) { + completer.complete(response); + } + }, + ); + + // TODO: Convert errors to response + return completer.future; + }; +} + +extension on Request { + static final _random = Random.secure(); + + /// The trace parent of the request. + Traceparent get trace { + final traceContext = TraceContext.fromHeaders(headersAll); + if (traceContext.traceparent case final traceparent?) { + return traceparent; + } + if (headers[cloudTraceContextHeader] case final traceHeader?) { + final spanDelim = traceHeader.indexOf('/'); + final optsDelim = traceHeader.indexOf(';'); + if (spanDelim != -1 && optsDelim != -1) { + final traceId = traceHeader.substring(0, spanDelim); + final spanId = traceHeader.substring(spanDelim + 1, optsDelim); + final flags = switch (traceHeader.substring(optsDelim + 1)) { + 'o=1' => 1, + 'o=0' => 0, + _ => 0, + }; + return Traceparent( + traceId: traceId, + parentId: Int64.parseInt(spanId).toHexString().toLowerCase(), + traceFlags: flags, + version: Traceparent.defaultVersion, + ); + } + } + // A vendor receiving a request without a traceparent header SHOULD + // generate traceparent headers for outbound requests, effectively + // starting a new trace. + return Traceparent.create( + traceId: hex.encode( + List.generate(16, (_) => _random.nextInt(256)), + ), + parentId: hex.encode( + List.generate(8, (_) => _random.nextInt(256)), + ), + sampled: true, + random: true, + ); + } +} diff --git a/packages/celest/lib/src/runtime/serve.dart b/packages/celest/lib/src/runtime/serve.dart index ce9a673b..e128cc65 100644 --- a/packages/celest/lib/src/runtime/serve.dart +++ b/packages/celest/lib/src/runtime/serve.dart @@ -2,15 +2,16 @@ import 'dart:async'; import 'dart:convert'; -import 'dart:developer'; import 'dart:io'; import 'package:async/async.dart'; import 'package:celest/celest.dart'; +import 'package:celest/src/runtime/gcp/gcp.dart'; +import 'package:celest/src/runtime/http/logging.dart'; +import 'package:celest/src/runtime/http/middleware.dart'; import 'package:celest/src/runtime/json_utils.dart'; import 'package:celest/src/runtime/sse/sse_handler.dart'; import 'package:celest_core/_internal.dart'; -import 'package:logging/logging.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as shelf_io; import 'package:shelf_router/shelf_router.dart'; @@ -18,6 +19,8 @@ import 'package:shelf_web_socket/shelf_web_socket.dart'; import 'package:stream_channel/stream_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; +part 'targets.dart'; + /// The default port on which Celest runs locally. const int defaultCelestPort = 7777; @@ -25,31 +28,23 @@ const int defaultCelestPort = 7777; Future serve({ required Map targets, }) async { - Logger.root.level = Level.ALL; - Logger.root.onRecord.listen((record) { - log( - record.message, - time: record.time, - sequenceNumber: record.sequenceNumber, - level: record.level.value, - name: record.loggerName, - zone: record.zone, - error: record.error?.toString(), - stackTrace: record.stackTrace, - ); - }); - final router = Router()..get('/_health', (_) => Response.ok('OK')); + configureLogging(); + final projectId = await googleCloudProject(); + if (projectId != null) { + Context.root.put(googleCloudProjectKey, projectId); + } + final router = Router()..get('/v1/healthz', (_) => Response.ok('OK')); for (final MapEntry(key: route, value: target) in targets.entries) { target._apply(router, route); } final pipeline = const Pipeline() - .addMiddleware(_heartbeatMiddleware) - .addMiddleware(_corsMiddleware) - .addMiddleware(_cloudExceptionMiddleware) + .addMiddleware(rootMiddleware) + .addMiddleware(corsMiddleware) + .addMiddleware(cloudExceptionMiddleware) .addHandler(router.call); final port = switch (Platform.environment['PORT']) { final port? => - int.tryParse(port) ?? (throw StateError('Invalid PORT set: $port')), + int.tryParse(port) ?? (throw StateError('Invalid PORT set: "$port"')), _ => defaultCelestPort, }; final server = await shelf_io.serve( @@ -66,284 +61,3 @@ Future serve({ ]).first; await server.close(); } - -/// The response of a [CloudFunctionTarget]. -typedef CelestResponse = ({int statusCode, Object? body}); - -/// {@template celest.runtime.cloud_function_target} -/// An instantiation of a [CloudFunction]. -/// {@endtemplate} -abstract base class CloudFunctionTarget { - /// {@macro celest.runtime.cloud_function_target} - CloudFunctionTarget() { - init(); - } - - static const _contextHeaderPrefix = 'X-Context-'; - static final _contextHeaderMatcher = RegExp( - _contextHeaderPrefix, - caseSensitive: false, - ); - Map _contextForRequest(Map> headers) { - final context = {}; - headers.forEach((key, value) { - key = key.toLowerCase(); - if (key.startsWith(_contextHeaderMatcher)) { - context[key.substring(_contextHeaderPrefix.length)] = value.join(', '); - } - }); - return context; - } - - /// The name of the [CloudFunction] this class targets. - String get name; - - /// Initializes this target. - /// - /// This is called once when the target is instantiated. - void init() {} - - void _apply(Router router, String route); -} - -/// {@template celest.runtime.cloud_function_http_target} -/// A [CloudFunctionTarget] that handles HTTP requests. -/// {@endtemplate} -abstract base class CloudFunctionHttpTarget extends CloudFunctionTarget { - Future _handler(Request request) async { - final bodyJson = await request.decodeJson(); - final context = _contextForRequest(request.headersAll); - final response = await runZoned( - () => handle( - bodyJson, - context: context, - headers: request.headersAll, - queryParameters: request.url.queryParametersAll, - ), - zoneSpecification: ZoneSpecification( - print: (self, parent, zone, message) { - parent.print(zone, '[$name] $message'); - }, - ), - ); - return Response( - response.statusCode, - body: jsonEncode(response.body), - headers: { - contentTypeHeader: jsonContentType, - }, - ); - } - - /// The HTTP method of the [CloudFunction] this class targets. - String get method => 'POST'; - - @override - void _apply(Router router, String route) { - router.add(method, route, _handler); - } - - /// Handles a JSON [request] to this target. - Future handle( - Map request, { - required Map context, - required Map> headers, - required Map> queryParameters, - }); -} - -/// {@template celest.runtime.cloud_event_source_target} -/// A [CloudFunctionTarget] that handles Server-Sent Events (SSE) and WebSocket -/// event producers. -/// {@endtemplate} -abstract base class CloudEventSourceTarget extends CloudFunctionTarget { - @override - void _apply(Router router, String route) { - router.add('GET', route, _handler); - router.add('POST', route, _sseHandler); - } - - late final Handler _sseHandler = sseHandler(_handleConnection); - late final Handler _wsHandler = webSocketHandler( - (WebSocketChannel webSocket) { - _handleConnection( - webSocket.transform( - StreamChannelTransformer( - StreamTransformer.fromHandlers( - handleData: (data, sink) { - sink.add(JsonUtf8.decodeAny(data)); - }, - ), - StreamSinkTransformer.fromHandlers( - handleData: (data, sink) { - sink.add(jsonEncode(data)); - }, - ), - ), - ), - ); - }, - ); - - Future _handleConnection( - StreamChannel> connection, - ) async { - await runZonedGuarded( - () async { - final requests = StreamQueue(connection.stream); - var request = const {}; - if (hasBody) { - request = await requests.next; - } - final (headers, queryParameters) = switch (connection) { - SseConnection(:final headers, :final queryParameters) => ( - headers, - queryParameters - ), - _ => ( - Zone.current[#_headers] as Map>, - Zone.current[#_queryParameters] as Map>, - ), - }; - final context = _contextForRequest(headers); - final stream = handle( - request, - headers: headers, - queryParameters: queryParameters, - context: context, - ); - stream.listen( - connection.sink.add, - onDone: connection.sink.close, - // Should never emit an error. - ); - }, - zoneSpecification: ZoneSpecification( - print: (self, parent, zone, message) { - parent.print(zone, '[$name] $message'); - }, - ), - (Object e, StackTrace st) { - print('An unexpected error occurred: $e'); - print(st); - connection.sink.addError(e, st); - connection.sink.close(); - }, - ); - } - - Future _handler(Request request) async { - if (request.method == 'GET' && request.headers['Upgrade'] == 'websocket') { - return runZoned( - () => _wsHandler(request), - zoneValues: { - #_headers: request.headersAll, - #_queryParameters: request.url.queryParametersAll, - }, - ); - } - return _sseHandler(request); - } - - /// Whether the target has a body. - /// - /// If this is `true`, the target will wait for an initial message sent by - /// the client with the body of the request. - /// - /// If this is `false`, for example the cloud function takes no parameters - /// or all parameters are mapped to headers or query parameters, then the - /// target will immediately start sending events. - bool get hasBody; - - /// Handles a JSON [request] to this target. - Stream> handle( - Map request, { - required Map> headers, - required Map> queryParameters, - required Map context, - }); -} - -Handler _heartbeatMiddleware(Handler inner) { - return (request) async { - print(request.requestedUri.path); - return inner(request); - }; -} - -/// Serves CORS headers on the local server so that Web clients can access it. -Handler _corsMiddleware(Handler inner) { - const corsHeaders = { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'POST', - 'Access-Control-Allow-Headers': '*', - }; - return (request) async { - if (request.method == 'OPTIONS') { - return Response.ok( - null, - headers: corsHeaders, - ); - } - final response = await inner(request); - return response.change( - headers: corsHeaders, - ); - }; -} - -/// Handles common exception types and maps exceptions to HTTP responses. -Handler _cloudExceptionMiddleware(Handler inner) { - return (request) async { - try { - return await inner(request); - } on Exception catch (e, st) { - if (e is HijackException) rethrow; - print('An unexpected error occurred: $e'); - print(st); - return _badRequest( - code: e.runtimeType.toString(), - ); - } on Error catch (e, st) { - print('An unexpected error occurred: $e'); - print(st); - return _internalServerError( - code: e.runtimeType.toString(), - ); - } - }; -} - -Response _badRequest({ - required String code, - Object? details, -}) { - return Response.badRequest( - headers: { - contentTypeHeader: jsonContentType, - }, - body: jsonEncode({ - 'error': { - 'code': code, - if (details != null) 'details': details, - }, - }), - ); -} - -Response _internalServerError({ - required String code, - Object? details, -}) { - return Response.internalServerError( - headers: { - contentTypeHeader: jsonContentType, - }, - body: jsonEncode({ - 'error': { - 'code': code, - if (details != null) 'details': details, - }, - }), - ); -} diff --git a/packages/celest/lib/src/runtime/targets.dart b/packages/celest/lib/src/runtime/targets.dart new file mode 100644 index 00000000..635d6be2 --- /dev/null +++ b/packages/celest/lib/src/runtime/targets.dart @@ -0,0 +1,159 @@ +part of 'serve.dart'; + +/// The response of a [CloudFunctionTarget]. +typedef CelestResponse = ({int statusCode, Object? body}); + +/// {@template celest.runtime.cloud_function_target} +/// An instantiation of a [CloudFunction]. +/// {@endtemplate} +abstract base class CloudFunctionTarget { + /// {@macro celest.runtime.cloud_function_target} + CloudFunctionTarget() { + init(); + } + + /// The name of the [CloudFunction] this class targets. + String get name; + + /// Initializes this target. + /// + /// This is called once when the target is instantiated. + void init() {} + + void _apply(Router router, String route); +} + +/// {@template celest.runtime.cloud_function_http_target} +/// A [CloudFunctionTarget] that handles HTTP requests. +/// {@endtemplate} +abstract base class CloudFunctionHttpTarget extends CloudFunctionTarget { + Future _handler(Request request) async { + final bodyJson = await request.decodeJson(); + final response = await handle( + bodyJson, + headers: request.headersAll, + queryParameters: request.url.queryParametersAll, + ); + return Response( + response.statusCode, + body: jsonEncode(response.body), + headers: { + contentTypeHeader: jsonContentType, + }, + ); + } + + /// The HTTP method of the [CloudFunction] this class targets. + String get method => 'POST'; + + @override + void _apply(Router router, String route) { + router.add(method, route, _handler); + } + + /// Handles a JSON [request] to this target. + Future handle( + Map request, { + required Map> headers, + required Map> queryParameters, + }); +} + +/// {@template celest.runtime.cloud_event_source_target} +/// A [CloudFunctionTarget] that handles Server-Sent Events (SSE) and WebSocket +/// event producers. +/// {@endtemplate} +abstract base class CloudEventSourceTarget extends CloudFunctionTarget { + @override + void _apply(Router router, String route) { + router.add('GET', route, _handler); + router.add('POST', route, _sseHandler); + } + + late final Handler _sseHandler = sseHandler(_handleConnection); + late final Handler _wsHandler = webSocketHandler( + (WebSocketChannel webSocket) { + _handleConnection( + webSocket.transform( + StreamChannelTransformer( + StreamTransformer.fromHandlers( + handleData: (data, sink) { + sink.add(JsonUtf8.decodeAny(data)); + }, + ), + StreamSinkTransformer.fromHandlers( + handleData: (data, sink) { + sink.add(jsonEncode(data)); + }, + ), + ), + ), + ); + }, + ); + + Future _handleConnection( + StreamChannel> connection, + ) async { + await runZonedGuarded( + () async { + final requests = StreamQueue(connection.stream); + var request = const {}; + if (hasBody) { + request = await requests.next; + } + final (headers, queryParameters) = switch (connection) { + SseConnection(:final headers, :final queryParameters) => ( + headers, + queryParameters + ), + _ => ( + context.currentRequest.headersAll, + context.currentRequest.url.queryParametersAll, + ), + }; + final stream = handle( + request, + headers: headers, + queryParameters: queryParameters, + ); + stream.listen( + connection.sink.add, + onDone: connection.sink.close, + // Should never emit an error. + ); + }, + (Object e, StackTrace st) { + print('An unexpected error occurred: $e'); + print(st); + connection.sink.addError(e, st); + connection.sink.close(); + }, + ); + } + + Future _handler(Request request) async { + if (request.method == 'GET' && request.headers['Upgrade'] == 'websocket') { + return _wsHandler(request); + } else { + return _sseHandler(request); + } + } + + /// Whether the target has a body. + /// + /// If this is `true`, the target will wait for an initial message sent by + /// the client with the body of the request. + /// + /// If this is `false`, for example the cloud function takes no parameters + /// or all parameters are mapped to headers or query parameters, then the + /// target will immediately start sending events. + bool get hasBody; + + /// Handles a JSON [request] to this target. + Stream> handle( + Map request, { + required Map> headers, + required Map> queryParameters, + }); +} diff --git a/packages/celest/pubspec.yaml b/packages/celest/pubspec.yaml index 0d1c2534..8487e570 100644 --- a/packages/celest/pubspec.yaml +++ b/packages/celest/pubspec.yaml @@ -14,14 +14,20 @@ dependencies: celest_cloud: ^0.1.1 celest_core: ^0.5.0-dev.1 chunked_stream: ^1.4.2 + cloud_http: ^0.1.0 collection: ^1.18.0 + convert: ^3.1.1 + fixnum: ^1.1.0 + google_cloud: ^0.2.0 http: ^1.0.0 http_parser: ^4.0.0 + http_sfv: ^0.1.0 logging: ^1.2.0 meta: ^1.11.0 shelf: ^1.4.1 shelf_router: ^1.1.4 shelf_web_socket: ^2.0.0 + stack_trace: ^1.11.1 stream_channel: ^2.1.2 web_socket_channel: ^3.0.0