Skip to content

Commit

Permalink
[core] Add emit support for all actor logic creators (#4935)
Browse files Browse the repository at this point in the history
* Add emit support for all actor logic creators

* Add test for restored root actor

* Update .changeset/smooth-crabs-dress.md

Co-authored-by: with-heart <[email protected]>

---------

Co-authored-by: with-heart <[email protected]>
  • Loading branch information
davidkpiano and with-heart authored Jun 22, 2024
1 parent 417f35a commit 2ac08b7
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 37 deletions.
60 changes: 60 additions & 0 deletions .changeset/smooth-crabs-dress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
---
'xstate': minor
---

All actor logic creators now support [emitting events](https://stately.ai/docs/event-emitter):

**Promise actors**

```ts
const logic = fromPromise(async ({ emit }) => {
// ...
emit({
type: 'emitted',
msg: 'hello'
});
// ...
});
```

**Transition actors**

```ts
const logic = fromTransition((state, event, { emit }) => {
// ...
emit({
type: 'emitted',
msg: 'hello'
});
// ...
return state;
}, {});
```

**Observable actors**

```ts
const logic = fromObservable(({ emit }) => {
// ...

emit({
type: 'emitted',
msg: 'hello'
});

// ...
});
```

**Callback actors**

```ts
const logic = fromCallback(({ emit }) => {
// ...
emit({
type: 'emitted',
msg: 'hello'
});
// ...
});
```
26 changes: 16 additions & 10 deletions packages/core/src/actors/callback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ export type CallbackSnapshot<TInput> = Snapshot<undefined> & {

export type CallbackActorLogic<
TEvent extends EventObject,
TInput = NonReducibleUnknown
TInput = NonReducibleUnknown,
TEmitted extends EventObject = EventObject
> = ActorLogic<
CallbackSnapshot<TInput>,
TEvent,
TInput,
AnyActorSystem,
EventObject // TEmitted
TEmitted
>;

export type CallbackActorRef<
Expand All @@ -49,13 +50,15 @@ export type Receiver<TEvent extends EventObject> = (
export type InvokeCallback<
TEvent extends EventObject = AnyEventObject,
TSentEvent extends EventObject = AnyEventObject,
TInput = NonReducibleUnknown
TInput = NonReducibleUnknown,
TEmitted extends EventObject = EventObject
> = ({
input,
system,
self,
sendBack,
receive
receive,
emit
}: {
/**
* Data that was provided to the callback actor
Expand All @@ -79,6 +82,7 @@ export type InvokeCallback<
* the listener is then called whenever events are received by the callback actor
*/
receive: Receiver<TEvent>;
emit: (emitted: TEmitted) => void;
}) => (() => void) | void;

/**
Expand Down Expand Up @@ -140,14 +144,15 @@ export type InvokeCallback<
*/
export function fromCallback<
TEvent extends EventObject,
TInput = NonReducibleUnknown
TInput = NonReducibleUnknown,
TEmitted extends EventObject = EventObject
>(
invokeCallback: InvokeCallback<TEvent, AnyEventObject, TInput>
): CallbackActorLogic<TEvent, TInput> {
const logic: CallbackActorLogic<TEvent, TInput> = {
invokeCallback: InvokeCallback<TEvent, AnyEventObject, TInput, TEmitted>
): CallbackActorLogic<TEvent, TInput, TEmitted> {
const logic: CallbackActorLogic<TEvent, TInput, TEmitted> = {
config: invokeCallback,
start: (state, actorScope) => {
const { self, system } = actorScope;
const { self, system, emit } = actorScope;

const callbackState: CallbackInstanceState<TEvent> = {
receivers: undefined,
Expand All @@ -171,7 +176,8 @@ export function fromCallback<
receive: (listener) => {
callbackState.receivers ??= new Set();
callbackState.receivers.add(listener);
}
},
emit
});
},
transition: (state, event, actorScope) => {
Expand Down
49 changes: 31 additions & 18 deletions packages/core/src/actors/observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ export type ObservableSnapshot<

export type ObservableActorLogic<
TContext,
TInput extends NonReducibleUnknown
TInput extends NonReducibleUnknown,
TEmitted extends EventObject = EventObject
> = ActorLogic<
ObservableSnapshot<TContext, TInput>,
{ type: string; [k: string]: unknown },
TInput,
AnyActorSystem,
EventObject // TEmitted
TEmitted
>;

export type ObservableActorRef<TContext> = ActorRefFrom<
Expand Down Expand Up @@ -78,20 +79,26 @@ export type ObservableActorRef<TContext> = ActorRefFrom<
* @see {@link https://rxjs.dev} for documentation on RxJS Observable and observable creators.
* @see {@link Subscribable} interface in XState, which is based on and compatible with RxJS Observable.
*/
export function fromObservable<TContext, TInput extends NonReducibleUnknown>(
export function fromObservable<
TContext,
TInput extends NonReducibleUnknown,
TEmitted extends EventObject = EventObject
>(
observableCreator: ({
input,
system
system,
self
}: {
input: TInput;
system: AnyActorSystem;
self: ObservableActorRef<TContext>;
emit: (emitted: TEmitted) => void;
}) => Subscribable<TContext>
): ObservableActorLogic<TContext, TInput> {
): ObservableActorLogic<TContext, TInput, TEmitted> {
// TODO: add event types
const logic: ObservableActorLogic<TContext, TInput> = {
const logic: ObservableActorLogic<TContext, TInput, TEmitted> = {
config: observableCreator,
transition: (snapshot, event, { self, id, defer, system }) => {
transition: (snapshot, event) => {
if (snapshot.status !== 'active') {
return snapshot;
}
Expand Down Expand Up @@ -141,15 +148,16 @@ export function fromObservable<TContext, TInput extends NonReducibleUnknown>(
_subscription: undefined
};
},
start: (state, { self, system }) => {
start: (state, { self, system, emit }) => {
if (state.status === 'done') {
// Do not restart a completed observable
return;
}
state._subscription = observableCreator({
input: state.input!,
system,
self
self,
emit
}).subscribe({
next: (value) => {
system._relay(self, self, {
Expand Down Expand Up @@ -223,20 +231,24 @@ export function fromObservable<TContext, TInput extends NonReducibleUnknown>(
* ```
*/
export function fromEventObservable<
T extends EventObject,
TInput extends NonReducibleUnknown
TEvent extends EventObject,
TInput extends NonReducibleUnknown,
TEmitted extends EventObject = EventObject
>(
lazyObservable: ({
input,
system
system,
self,
emit
}: {
input: TInput;
system: AnyActorSystem;
self: ObservableActorRef<T>;
}) => Subscribable<T>
): ObservableActorLogic<T, TInput> {
self: ObservableActorRef<TEvent>;
emit: (emitted: TEmitted) => void;
}) => Subscribable<TEvent>
): ObservableActorLogic<TEvent, TInput, TEmitted> {
// TODO: event types
const logic: ObservableActorLogic<T, TInput> = {
const logic: ObservableActorLogic<TEvent, TInput, TEmitted> = {
config: lazyObservable,
transition: (state, event) => {
if (state.status !== 'active') {
Expand Down Expand Up @@ -281,7 +293,7 @@ export function fromEventObservable<
_subscription: undefined
};
},
start: (state, { self, system }) => {
start: (state, { self, system, emit }) => {
if (state.status === 'done') {
// Do not restart a completed observable
return;
Expand All @@ -290,7 +302,8 @@ export function fromEventObservable<
state._subscription = lazyObservable({
input: state.input!,
system,
self
self,
emit
}).subscribe({
next: (value) => {
if (self._parent) {
Expand Down
29 changes: 21 additions & 8 deletions packages/core/src/actors/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ export type PromiseSnapshot<TOutput, TInput> = Snapshot<TOutput> & {
const XSTATE_PROMISE_RESOLVE = 'xstate.promise.resolve';
const XSTATE_PROMISE_REJECT = 'xstate.promise.reject';

export type PromiseActorLogic<TOutput, TInput = unknown> = ActorLogic<
export type PromiseActorLogic<
TOutput,
TInput = unknown,
TEmitted extends EventObject = EventObject
> = ActorLogic<
PromiseSnapshot<TOutput, TInput>,
{ type: string; [k: string]: unknown },
TInput, // input
AnyActorSystem,
EventObject // TEmitted
TEmitted // TEmitted
>;

export type PromiseActorRef<TOutput> = ActorRefFrom<
Expand Down Expand Up @@ -75,10 +79,17 @@ export type PromiseActorRef<TOutput> = ActorRefFrom<

const controllerMap = new WeakMap<AnyActorRef, AbortController>();

export function fromPromise<TOutput, TInput = NonReducibleUnknown>(
export function fromPromise<
TOutput,
TInput = NonReducibleUnknown,
TEmitted extends EventObject = EventObject
>(
promiseCreator: ({
input,
system
system,
self,
signal,
emit
}: {
/**
* Data that was provided to the promise actor
Expand All @@ -93,9 +104,10 @@ export function fromPromise<TOutput, TInput = NonReducibleUnknown>(
*/
self: PromiseActorRef<TOutput>;
signal: AbortSignal;
emit: (emitted: TEmitted) => void;
}) => PromiseLike<TOutput>
): PromiseActorLogic<TOutput, TInput> {
const logic: PromiseActorLogic<TOutput, TInput> = {
): PromiseActorLogic<TOutput, TInput, TEmitted> {
const logic: PromiseActorLogic<TOutput, TInput, TEmitted> = {
config: promiseCreator,
transition: (state, event, scope) => {
if (state.status !== 'active') {
Expand Down Expand Up @@ -131,7 +143,7 @@ export function fromPromise<TOutput, TInput = NonReducibleUnknown>(
return state;
}
},
start: (state, { self, system }) => {
start: (state, { self, system, emit }) => {
// TODO: determine how to allow customizing this so that promises
// can be restarted if necessary
if (state.status !== 'active') {
Expand All @@ -144,7 +156,8 @@ export function fromPromise<TOutput, TInput = NonReducibleUnknown>(
input: state.input!,
system,
self,
signal: controller.signal
signal: controller.signal,
emit
})
);

Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/actors/transition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ export function fromTransition<
transition: (
snapshot: TContext,
event: TEvent,
actorScope: ActorScope<TransitionSnapshot<TContext>, TEvent, TSystem>
actorScope: ActorScope<
TransitionSnapshot<TContext>,
TEvent,
TSystem,
TEmitted
>
) => TContext,
initialContext:
| TContext
Expand Down
Loading

0 comments on commit 2ac08b7

Please sign in to comment.