- grammarly
- Timing
- Subscription Handling
- The Late Subscriber Problem
- Sharing references (will not be solved by component-state)
- The Cold Composition Problem
- Imperative Interaction with Component StateManagement
- Recap
- Dynamic Component State and Reactive Context
As a lot of problems I ran into while my consulting are related to timing issues, this section is here to give a quick overview of all the different types of issues and their reason.
Shouldn't FRP be by design timing independent?
I mean not that there is no time in observables, but when we compose observables we should not care about when any of our state sources exactly emits a value...
In fact that's the case, in a perfect reactive setup we don't need to care about those problems. However, as Angular is an object orientated framework we have to often with different problems related to life-cycles of components and services, router-events and many more things.
In RxJS timing is give by the following:
- For hot observables the time of creation
- For cold observables the time of subscription
- For emitted values the scheduling process
In Angular timing is given by the following:
- For global services the creation as well as the application life-time
- For components the creation, several life-cycle hooks as well as the component life-time
- For local services the creation of the component as well as the components life-time
- For pipes oe directives in the template also the components life-time
All timing relates things in Angular are in object oriented style, very similar to hot observables. Subscription handling can be done declarative over completion operators. The scheduling process can be controlled both over imperative or over operators and can influence the execution context of next error or complete callback.
We see that there are two different concepts combined that have completely different ways of dealing with timing. Angular already solved parts of this friction points but some of them are still left and we have to find the right spots to put our glue code and fix the problem.
This chart shows a minimal Angular app with the different building units ant their timing:
In this example it marks:
- the global store lifetime
- the component store lifetime
- the async pipe lifetime
As we can see It makes a big difference where we place observables and where we subscribe to them. It also shows where we need hot observables and where we need to replay values.
Let's discuss where subscriptions should take place and for which reason they are made.
Subscriptions are here to receive values from any source.
In most cases we want to render the incoming values.
For this reason we use a Pipe
or a Directive
in the template to trigger
change-detection whenever a value arrives.
The other reason could be to run some background tasks in a Component
, Directive
or Service
,
which should not get rendered. I.e. a request to the server every 30 seconds.
As subscriptions in the Pipe
or Directive
are handled over their life-cycle
hooks automatically, we only have to discuss the scenarios for side-effects.
@Component({
...
})
export class SubscriptionHandlingComponent implements OnDestroy {
onDestroy$ = new Subject();
sideEffect$ = timer(0, 1000).pipe(tap(n => serverRequest(n)));
constructor() {
this.sideEffect$
.pipe(takeUntil(this.onDestroy$))
.subscribe();
}
ngOnDestroy(): void {
this.onDestroy$.next(true);
}
}
We already have a declarative subscription handling. But this code could get moved somewhere else. We could use the local service that we most probably will need if we implement the final implementation for component state handling.
Service
export class SubscriptionHandlingService implements OnDestroy {
onDestroy$ = new Subject();
subscribe(o): void {
o.pipe(takeUntil(this.onDestroy$))
.subscribe();
}
ngOnDestroy(): void {
this.onDestroy$.next(true);
}
}
Component
@Component({
...
providers: [SubscriptionHandlingService]
})
export class SubscriptionHandlingComponent {
sideEffect$ = timer(0, 1000)
.pipe(tap(console.log));
constructor(private subHandler: SubscriptionHandlingService) {
this.subHandler
.subscribe(this.sideEffect$)
}
}
In this way we get rid of thinking about subscriptions in the component at all.
Incoming values arrive before the subscription has happened.
For example state over @Input()
decorators arrives before the view gets rendered and a used pipe could receive the value.
@Component({
selector: 'app-late-subscriber',
template: `
{{state$ | async | json}}
`
})
export class LateSubscriberComponent {
state$ = new Subject();
@Input()
set state(v) {
this.state$.next(v);
}
}
We call this situation late subscriber problem. In this case, the view is a late subscribe to the values from '@Input()' properties. There are several situations from our previous explorations that have this problem:
- Input Decorators
- transporting values from
@Input
toAfterViewInit
hook - transporting values from
@Input
to the view - transporting values from
@Input
to the constructor
- transporting values from
- Component And Directive Life Cycle Hooks
- transporting
OnChanges
to the view - getting the state of any life cycle hook later in time (important when hooks are composed)
- transporting
- Local State
- transporting the current local state to the view
- getting the current local state for other compositions
Primitive Solution
@Component({
selector: 'app-late-subscriber',
template: `
{{state$ | async | json}}
`
})
export class LateSubscriberComponent {
state$ = new ReplaySubject(1);
@Input()
set state(v) {
this.state$.next(v);
}
}
The downside here is that we can only replay the latest value emitted.
Replaying more values would cause problems for later compositions of this stream,
as a new subscriber would get all past values of the @Input
Binding.
And that's not what we want.
Another downside is the bundle size of ShareReplay. But it will be used anyway somewhere in our architecture, so it's a general downside.
@TODO Hard rewrite! Let me front off explain that this section is specifically here to explain why this problem should not be part of the components state-management.
To start this section let's discuss the components implementation details first. We focus in the components outputs.
@Component({
...
})
export class AnyComponent {
@Output() compOutput = new EventEmitter()
}
Let's take a closer look at EventEmitter interface:
EventEmitter<T extends any> extends Subject<T>
And Subject
looks like this:
Subject<T> extends Observable<T> implements SubscriptionLike
The important part here is that we can pass everything that holds a subscribe
.
Which means the following would work:
@Component({
...
})
export class AnyComponent {
@Output() compOutput = interval(1000);
}
An observable for example holds a subscribe method in it
With this in mind let's focus on the original problem, sharing a reference.
In this example we receive a config object from the parent component and emit changes from the formGroup created out of the config object.
Every time we receive a new value from the input binding
- we create a config object out of it
- and use the
FormBuilder
service to create the new form. As output value we have to provide something that holds asubscribe
method. So we could use the formGroupsvalueChanges
to provide the forms changes directly as component output events.
@Component({
selector: 'sharing-a-reference',
template: `
<form *ngIf="(formGroup$ | async) as formGroup" [formGroup]="formGroup">
<div *ngFor="let c of formGroup.controls | keyvalue">
<label>{{c.key}}</label>
<input [formControlName]="c.key"/>
</div>
</form>
`
})
export class SharingAReferenceComponent {
state$ = new ReplaySubject(1);
formGroup$: Observable<FormGroup> = this.state$
.pipe(
startWith({}),
map(input => this.getFormGroupFromConfig(input))
);
@Input()
set formGroupModel(value) {
this.state$.next(value);
}
@Output() formValueChange = this.formGroup$
.pipe(switchMap((fg: FormGroup) => fg.valueChanges));
constructor(private fb: FormBuilder) {
}
getFormGroupFromConfig(modelFromInput) {
const config = Object.entries(modelFromInput)
.reduce((c, [name, initialValue]) => ({...c, [name]: [initialValue]}), {});
return this.fb.group(config);
}
}
But the values are not updating in the parent component.
We forgot that our formGroup$
observable ends with a map
operator,
which returns is a uni-cast observable.
What happened is we subscribed once in the template over the async
pipe to render the form.
And another time in the component internals to emit value changes from the form.
As formGroup$
is uni-cast we created a new FormGroup
instance for every subscription.
This can be solved by adding a multicast operator like share
or shareReplay
at the end of formGroup$
.
As we also have late subscribers (async
pipe in the template), we use shareReplay
with bufferSize
1.
formGroup$: Observable<FormGroup> = this.state$
.pipe(
startWith({}),
map(input => this.getFormGroupFromConfig(input)),
shareReplay(1)
);
shareReplay
emits the same value to subscribers.
So the subscription in the template and the subscription in the components internals receive the same instance of FormGroup
.
Important to notice here is that shareReplay
is cold but multicast.
This means it only subscribes to the source if at least one subscriber is present.
This does not solve the problem of cold composition.
As this kind of problem nearly only to component internal instances it should not be bart of the components state-management. Also we never store references of class instances in the store as it takes away the whole idea of consistent, controlled state changes.
Let's quickly clarify hot/cold and uni-case/multi-cast.
cold
Means the producer sits inside of the observable.
Whenever .subscribe is called the subscriber function fires and we create a new instance fo the producer.
i.e. the static interval
gets created whenever we subscribe to it.
hot
Means the producer sits outside of the observable.
The producer emits values independent of the subscriber. If .subscribe is called we will receive all values from the moment of subscription on.
The past values are not recognized.
i.e. the @Input
binding is a hot producer
that sits outside of our Subject
(extends Observable
) and emits values independent of the moment when async
pipe subscribes.
uni-cast
Means the producer is unique per subscription.
i.e. fromEvent
is a uni-cast observable as we pass a
new function per subscription for the internal .addEventListener
call.
And if we unsubscribe we remove only this function, and not the others.
multi-cast Means there is one producer instance fol all subscription. i.e. A subject that emits the values from a single producer to all subscriber.
With this in mind we can discuss the problem of cold composition in case of our component state.
As we will have to deal with:
- View Interaction ( button click )
- Global State Changes ( http update )
- Component State Changes ( triggered by button or interval )
Putting all this logic in the component class is a bad idea. Not only because of separations of concerns, but also because we would have to implemet it over and over again.
We need to make the logic that deal with problems around composition reusable!
So far our sources got subscribed to when the view was ready and we rendered the state. As the view represents a hot producer of values and injected services too we have to decouple the service that handles component state from other sources.
So what is the problem?!?!11
We have hot sources and we have to compose them. As a minimal requirement to create state we should have at least the last emission of each source to compute a state.
If we compose state we have to consider that every operator returns cold observables.
(Operators that return ConnectableObservable
are not really operators as they are not compos-able)
So no matter what we do before, after an operation we get a cold observable, and we have to subscribe to it to trigger the composition. I call this situation cold composition, as with selector functions in i.e. publish we are also able to create hot compositions.
Some of our sources are This can be solved by tow ways:
- a) Make all sources replay at least their last value (push workload to all relevant sources)
- b) Make the composition hot as early as possible (push workload to the component related part)
Let's discuss a) first.
If we would make every source replay at least the last value we would have to implement this logic in following places:
- View Input bindings (multiple times)
- View events (multiple times)
- Other Service Changes (multiple times)
- Component Internal interval (multiple times)
It would also force the parts to cache values and increase memory. Furthermore it would force third party to implement tis too.
IMHO not really scalable.
What would be the scenario with b)?
We could think of the earliest possible moment to make the composition hot. From the diagram above we know that a service, even if locally provided, is instantiated first, before the component.
If we would put it there we could take over the workload from:
- View Input bindings (multiple times)
- View events (multiple times)
- Component Internal interval (multiple times)
All involved services can not be covered.
But what services are we interested in?
Stateful services, and stateful services will, by definition, always provide there last emitted state.
So if we compose other sources form services like @ngRx/store
etc. we can assume they replay their last emitted state and we can run the composition.
In worse case we could fix it by concating with an initial value.
Let's see an simple example where we compose different sources in a service and and one of our sources is not replaying the last value:
Service:
export class SomeService implements OnDestroy {
// one channel for different requests for change
commands$ = new Subject();
composedState$ = this.commands$
.pipe(
scan((acc, i) => {
return {sum : acc['sum'] + i['sum']};
}, {sum: 0})
);
constructor(globalService: GlobalService) {
// initial value from constant
this.commands$.next({sum: 100});
// initial value from other service
this.commands$.next({sum: globalService.value});
}
ngOnDestroy(): void {
this.commands$.complete();
}
}
Component:
@Component({
selector: 'cold-composition-bad',
template: `
<h1>Cold Composition - Missing Values</h1>
composedState$: {{someBadService.composedState$ | async | json}}
`,
providers: [SomeService]
})
export class ColdCompositionBadComponent implements OnDestroy {
input$ = new Subject<number>();
// Input gets updated every second with an incremental number (1,2,3,4,5,...)
@Input()
set inputValue(value: number) {
this.input$.next(value);
}
constructor(public someBadService: SomeBadService) {
// earliest possible moment to forward values
this.input$.pipe(map(n => ({sum: n})))
.subscribe(
n => this.someBadService.commands$.next(n)
);
// earliest possible moment to subscribe to a service
this.someBadService.composedState$.subscribe(s => console.log('composedState bad: ', s) );
}
ngOnDestroy(): void {
this.input$.complete()
}
}
If we run the code we would start rendering with {sum: 1}.
The service is the first thing that gets initialised. The initial commands are fired, but the initial values are fired before the component gets instantiated and subscribes to the service.
Even if the source is hot (the subject in the service is defined on instantiation) the composition made the stream cold again. Which means the composed values can be received only if there is at least 1 subscriber. In our case the subscriber was the component constructor.
Let's see how we can implement the above discussed solution with hot composition:
Hot Composition Service:
import {Injectable, OnDestroy} from '@angular/core';
import {ConnectableObservable, Subject, Subscription} from 'rxjs';
import {publishReplay, scan} from "rxjs/operators";
@Injectable({
providedIn: 'root'
})
export class SomeService implements OnDestroy {
serviceSubscription = new Subscription();
commands$ = new Subject();
composedState$ = this.commands$
.pipe(
scan((acc, i) => {
return {sum : acc['sum'] + i['sum']};
}, {sum: 0}),
publishReplay(1)
) as ConnectableObservable;
constructor() {
// Composition is hot from here on
this.serviceSubscription = this.composedState$.connect();
// initial value from constant
this.commands$.next({sum: 100});
// initial value from other service
this.commands$.next({sum: 25});
}
ngOnDestroy(): void {
this.serviceSubscription.unsubscribe();
}
}
We kept the component untouched and only applied changes to the service.
We used the publishReplay
operator to make the
source replay the last emitted value.
In the service constructor we called .connect
to make it hot (subscribe to the source).
So far we only had focused on independent peaces and didn't payed much attention on their interaction. Let's analyze the way we interact with components and services:
Well known implementations of sate management like @ngrx/store
,
which is a global state management library, implemented the consumer facing API in an imperative way.
The provided method is dispatch
which accepts a single value that get sent to the store.
Let's look at a simple example:
Imperative Interaction Service
export class StateService implements OnDestroy {
private stateSubscription = new Subscription();
private stateSubject = new Subject<{ [key: string]: number }>();
state$ = this.stateSubject
.pipe(
map(obj => Object.entries(obj).pop()),
scan((state, [slice, value]: [string, number]): { [key: string]: number } => ({...state, [slice]: value}), {}),
publishReplay(1)
) as ConnectableObservable<any>;
constructor() {
this.stateSubscription = this.state$.connect();
}
ngOnDestroy(): void {
this.stateSubscription.unsubscribe();
}
// setter with value not compose-able
dispatch(v) {
this.stateSubject.next(v);
}
}
Imperative Interaction Component
@Component({
selector: 'component',
template: `
<p>Imperative Interaction</p>
<pre>{{state$ | async | json}}</pre>
<button (click)="updateCount()">
Update State
</button>
`,
providers: [StateService]
})
export class AnyComponent {
state$ = this.stateService.state$;
constructor(private stateService: StateService) {
}
updateCount() {
this.stateService
.dispatch(({count: ~~(Math.random() * 100)}));
}
}
Why is this imperative? Imperative programming means working with instances and mutating state.
Whenever you write setter
or getter
you program in an imperative way because this is not compose-able.
If we now think about the dispatch
method of @ngrx/store
or our implemented subscribe
and next
calls,
we realize that it is similar to working with setter
and getter
.
But how can we go more declarative or even reactive? By providing something compose-able. Like the observable itself.
Declarative Interaction Service
const stateAccumulator = (acc, [key, value]: [string, number]): { [key: string]: number } => ({...acc, [key]: value});
export class StateService implements OnDestroy {
private stateSubscription = new Subscription();
private stateAccumulator = stateAccumulator;
private stateSubject = new Subject<Observable<{ [key: string]: number }>>();
state$ = this.stateSubject
.pipe(
// process observables of state changes
mergeAll(),
// process single state change
map(obj => Object.entries(obj).pop()),
scan(this.stateAccumulator, {}),
publishReplay(1)
) as ConnectableObservable<any>;
constructor() {
this.stateSubscription = this.state$.connect();
}
ngOnDestroy(): void {
this.stateSubscription.unsubscribe();
}
// "connector" with observable compose-able
connectSlice(o) {
this.stateSubject.next(o);
}
}
Declarative Interaction Component
@Component({
selector: 'component',
template: `
<p>Declarative Interaction</p>
<pre>{{state$ | async | json}}</pre>
<button (click)="update$.next(true)">
Update State
</button>
`,
providers: [StateService]
})
export class AnyComponent {
state$ = this.stateService.state$;
update$ = new Subject();
constructor(private stateService: StateService) {
this.stateService.connectSlice(this.update$
.pipe(map(_ => ({count: ~~(Math.random() * 100)}))));
}
}
By providing the whole observable we can handle all related mechanisms of subscription handling, as well as value processing and emission in the service.
With this we can now also handle our side-effects completely in the service: Service
export class DeclarativeSideEffectsGoodService implements OnDestroy {
private effectSubscription = new Subscription();
private effectSubject = new Subject<Observable<{ [key: string]: number }>>();
constructor() {
this.effectSubscription = (this.effectSubject
.pipe(
// process observables of side-effects
// process side-effect
mergeAll(),
publishReplay(1)
) as ConnectableObservable<any>)
.connect();
}
ngOnDestroy(): void {
this.effectSubscription.unsubscribe();
}
connectEffect(o) {
this.effectSubject.next(o);
}
}
Declarative Interaction Component
@Component({
selector: 'declarative-side-effects-good',
template: `
<p>Declarative SideEffects</p>
`,
providers: [StateAndEffectService]
})
export class AnyComponent {
constructor(private stateService: StateAndEffectService) {
this.stateService.connectEffect(interval(1000)
.pipe(tap(_ => ({count: ~~(Math.random() * 100)}))));
}
}
Note the side-effect is now placed in a tap
operator and the whole observable is handed over.
So far we encountered following topics:
- sharing a reference (not related to component state)
- subscription handling
- late subscriber
- cold composition
- moving primitive tasks as subscription handling and state composition into another layer
- declarative interaction between component and service
If we would pipe every incoming value to a "thing" and implement the a combination of the above problems i a single place.
Let's see how the solutions look like when we put them into i.e. a service.
LOW Level Component State Service
const stateAccumulator = (acc, [key, value]: [string, number]): { [key: string]: number } => ({...acc, [key]: value});
export class LowLevelStateService implements OnDestroy {
private subscription = new Subscription();
private stateAccumulator = stateAccumulator;
private effectSubject = new Subject<Observable<{ [key: string]: any }>>();
private stateSubject = new Subject<Observable<{ [key: string]: any }>>();
state$ = this.stateSubject
.pipe(
mergeAll(),
map(obj => Object.entries(obj).pop()),
scan(this.stateAccumulator, {}),
publishReplay(1)
) as ConnectableObservable<any>;
constructor() {
this.subscription.add(this.state$.connect());
this.subscription.add((this.effectSubject
.pipe(mergeAll(), publishReplay(1)
) as ConnectableObservable<any>).connect()
);
}
connectSlice(o) {
this.stateSubject.next(o);
}
connectEffect(o) {
this.effectSubject.next(o);
}
ngOnDestroy(): void {
this.subscription.unsubscribe();
}
}
LOW Level Component State Service
@Component({
selector: 'low-level-state-component',
template: `
<p>Low-Level StateService</p>
state$: <pre>{{state$ | async | json}}</pre>
<button (click)="btn$.next($event)">
Update Click Time
</button>
`,
providers: [LowLevelStateService]
})
export class AnyComponent {
sideEffect$ = interval(1000)
.pipe(tap(_ => (console.log('Dispatch action to global store'))));
state$ = this.stateService.state$;
value$ = new Subject<number>();
@Input()
set value(value) {
this.value$.next(value);
}
btn$ = new Subject<any>();
constructor(private stateService: LowLevelStateService) {
this.stateService
.connectEffect(this.sideEffect$);
this.stateService
.connectSlice(this.value$.pipe(map(n => ({interval: n}))));
this.stateService
.connectSlice(this.btn$.pipe(map(e => ({time: e.timeStamp}))));
}
}
With a more declarative interaction between component and service we made a big change. We are now aware of the whole observable context.
The next
, error
and compolete
notifications.
This can help us to rethink a lot of problems that did not occur with global state management.
Initializing the state worked well in global state management by providing a default value in the reducer function. Also with input bindings we could set a default value. But there are situations where we need to initialize a not yet emitted state. In addition to that with a highly dynamic state like we have with the components state we also have to think about cleanup unused state.
With the declarative approach we have now the ability to react on the completion of an observable.
The current accumulation function for the components state looks like this:
const stateAccumulator = (state, [key, value]: [string, number]): { [key: string]: number } => ({...state, [key]: value});
Let's add minimal logic to our state accumulation part and discuss it.
const stateAccumulator = (state, [keyToDelete, value]: [string, number]): { [key: string]: number } => {
const isKeyToDeletePresent = keyToDelete in state;
// The key you want to delete is not stored :)
if (!isKeyToDeletePresent && value === undefined) {
return state;
}
// Delete slice
if (value === undefined) {
const {[keyToDelete]: v, ...newS} = state as any;
return newS;
}
// update state
return ({...state, [keyToDelete]:value});
};
Now lets thing about the following example:
@Component({
selector: 'state-init-and-cleanup-bad',
template: `
<button (click)="addDynamicState()">Add dynamic state</button>
<p><b>state$:</b></p>
<pre>{{state$ | async | json}}</pre>
`,
providers: [ComponentStateService]
})
export class StateInitAndCleanupComponent {
state$ = this.componentState.state$;
constructor(private componentState: ComponentStateService) {
}
addDynamicState() {
this.componentState.connectSlice(this.getDynamicStateSlice())
}
getDynamicStateSlice(): Observable<{ [key: string]: number | undefined }> {
const rnd = (p = 1): number => ~~(Math.random() * (10 ** p));
const takeCount = rnd(1);
const intervalDuration = rnd(3);
const id = `rnd ${rnd(2)}-${intervalDuration}-${takeCount}`;
const interval$ = interval(intervalDuration).pipe(map(_ => ({[id]: rnd()})), take(takeCount));
return interval$;
}
}
Whenever we click the button we add a state slice from an observable. The observable has a random number of emissions in a random interval. If the interval ends the key still stuck's in the state object.
As the keys where dynamic there is not really a good way of detection which key is not changing anymore and can be removed.
Let's see what we can do with the above example. As we know we adopted the accumulation function to remove keys with undefined from the state object.
By emitting undefined
after emission of the last value we can cleanup this state.
We can also initialize it with zero even if the first value is sent later in time.
Lets add one line to our state slice observable:
getDynamicStateSlice(): Observable<{ [key: string]: number | undefined }> {
...
const interval$ = interval(intervalDuration).pipe(map(_ => ({[id]: rnd()})), take(takeCount));
const observables = [of({[id]: 0}), interval$, of({[id]: undefined})];
return concat(...observables);
}
Now we have an immediate initial value if the state changes are started and cleanup-logic if the state observable is no longer needed.
Thinks about many dynamic parts of your view that you display and hide or some that you rarely show in the UI. All those dynamism is automatically solved if the source stream completes.
The solution for dynamic Observables from the view belongs not to this document.
As we are noe able to provide observables that contain our state changes a new question needs to get asked.
What happens if we provide provide multiple observables for the same state slice?
Two possible things can happen:
- Both observable get merged and all emissions change the same state slice concurrent
- The new observable for a same state slice overrides the changes from the previous observable for this state slice
Let's discuss scenarios for both.
Concurrent State Changes
Override State Changes