Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt TypeScript & Refocus Core APIs #158

Open
20 of 28 tasks
viglucci opened this issue Aug 3, 2021 · 23 comments
Open
20 of 28 tasks

Adopt TypeScript & Refocus Core APIs #158

viglucci opened this issue Aug 3, 2021 · 23 comments
Assignees
Labels
1.0 Pullrequests & issues related to the Typescript rewrite and 1.0 release enhancement Suggests, requests, or implements a feature or enhancement

Comments

@viglucci
Copy link
Member

viglucci commented Aug 3, 2021

This issue proposes that an effort should be made to re-write the core set of rsocket-js packages from Flow to TypeScript, with a focus on adopting a "small core" mentality when designing the projects public API.

This issue is to serve as:

  1. Documentation of the proposal that was discussed and decided upon
  2. A means of tracking progress towards implementing the proposal

Motivation

TypeScript in Favor of Flow

TypeScript is a proven solution for authoring JavaScript libraries for use in NodeJs and browser environments, and will provide a number of benefits to the project:

  • Continued development & support from Microsoft
  • Large community adoption
  • Easier and more familiar tooling
  • Types can be published from source without authoring additional @types

In addition to the above, in a recent post on the Flow Type blog, the Flow team described a refocusing of the projects priorities towards Facebook's internal needs. This is a signal that Flow may not be a suitable solution for projects outside of Facebook's internal infrastructure and ecosystem.

Small Core

New APIs and paradigms should be introduced to adopt a "small core" mentality, with core rsocket-js packages providing only the basic building blocks and implementations for the RSocket protocol. APIs should be easily extendable/composable to support additional packages that can later provide functionality outside of implementing the RSocket protocol, for example, an RxJS interface.

This "small core" approach is intended to assist with maintainability of the project, as well as ease of support for satisfying feature requests through extensions and "addon" packages.

Other RSocket implementations, such as RSocket-Swift, have benefited to a degree from this approach.

Serialization and Encoding

In conformance with the "small core" goal of this initiative, support for payload data encoding should be reduced down to transmitting instances of Buffer, rather than supporting a wide variety of data and metadata payload serialization and encodings. This stance is in contrast to the current support provided by @rsocket packages, where support for serializing objects to JSON and other formats is provided by interfaces such as JsonSerializer, and encoding is provided by interfaces such as BufferEncoders.

Support for user friendly translation of objects and other structures to/from JSON and other serialization formats can be accomplished via extension packages, rather than as a core feature of rsocket-js.

Reactive APIs

rsocket-js has historically leveraged a Reactive Streams implementation (rsocket-flowable) both internally, and as its public API. This implementation has historically been published under @rsocekt/rsocket-flowable. Moving forward, rsocket-js will move away from @rsocket/rsocket-flowable as a key feature of its public API, and instead focus on exposing a core set of Stream APIs that satisfy the core requirements of the RSocket protocol, while also promoting composition and extension with other Reactive Streams implementations, such as RxJS and reactive-streams-js.

Interfaces example:

export interface Cancellable {
  cancel(): void;
}

export interface Requestable {
  request(requestN: number): void;
}

export interface OnExtensionSubscriber {
  onExtension(
    extendedType: number,
    content: Buffer | null | undefined,
    canBeIgnored: boolean
  ): void;
}

export interface OnNextSubscriber {
  onNext(payload: Payload, isComplete: boolean): void;
}

export interface OnTerminalSubscriber {
  onError(error: Error): void;
  onComplete(): void;
}

The intention of this change is two fold:

  • Simplify the internal implementation details of the core rsocket-js libraries
  • Allow for consumers and integrators to easily extend rsocket-js with their Reactive Streams implementation of choice

Desired solution

Core RSocket protocol feature support, and APIs implementations needed:

TODO: complete list of necessary core protocol features

Considered alternatives

N/A

Additional context

Major Version Change

Because of the breaking API changes that this effort will introduce, the version of all @rsocket scoped packages who's APIs are modified will be incremented to an initial major version release of 1.0.

Issues & pull requests related to this work should be/are tagged with the 1.0 tag.

Work in progress

Work for this effort is ongoing in the dev branch.

API Example

const connector = new RSocketConnector({
  transport: new TcpClientTransport({
    connectionOptions: {
      host: "127.0.0.1",
      port: 9090,
    },
  }),
});

const rsocket = await connector.bind();

await new Promise((resolve, reject) =>
  rsocket.requestResponse(
    {
      data: Buffer.from("Hello World"),
    },
    {
      onError: (e) => reject(e),
      onNext: (payload, isComplete) => {
        console.log(
          `payload[data: ${payload.data}; metadata: ${payload.metadata}]|${isComplete}`
        );
        resolve(payload);
      },
      onComplete: () => { },
      onExtension: () => { },
      cancel: () => { },
      request: () => { },
    }
  )
);
@viglucci viglucci added the 1.0 Pullrequests & issues related to the Typescript rewrite and 1.0 release label Aug 3, 2021
@sdeleuze
Copy link

Thank you so much for working on this.

@Polve
Copy link

Polve commented Aug 17, 2021

wonderful, thanks

@LifeIsStrange
Copy link

Maybe that support for interceptors could be added to the desired solution checklist #192 ?

@lachenmayer
Copy link

lachenmayer commented Nov 22, 2021

Hi @viglucci, thank you so much for your efforts on this - this would make our lives so much easier. If you'd like us to "beta test" any of your changes, let me know when you think it's ready to look at.

I'm particularly excited about the removal of Flowable - we currently have a RxJS<->Flowable interop layer which pretty much completely disables the back-pressure features of Flowable. It would be awesome if we could just use regular RxJS operators on Flowables. This could be achieved by making a Flowable implement the Observable interface, or by defining generic helper functions which could model some common back-pressure mechanisms.

For example, we currently define toObservable<T>: (flowable: Flowable<T>, options: { batchSize: number }) => Observable<T>, which requests batchSize entries at a time once the subscription is active.

To implement toFlowable<T>: (observable: Observable<T>) => Flowable<T>, we have created BufferingSubscription and DroppingSubscription classes which buffer or drop emitted values on the given observable if the subscriber hasn't requested any more values.

As API inspiration, I found the Apple Combine Subscription class a quite nice request API. One nice feature is that you can just request unlimited, effectively disabling back-pressure. (It's also possible to request none, but I'm not sure how useful that is - maybe for the fire and forget request type?).

@viglucci
Copy link
Member Author

viglucci commented Nov 26, 2021

Hi @lachenmayer, and others.

Many thanks for your kind words and encouragement.

In regards to beta testings the changes that have been made, we are hoping to have a 0.1.0 release, or another semantic versioning appropriate preview version available ahead of the initial 1.0.0 release, hopefully sometime between now and the end of Q1 2022 (ideally sooner rather than later, but of course these timelines are fluid and subject to change). We'll be sure to broadcast its availability once that has happened.

In regards to RxJS interoperability, one of our goals is definitely improved interoperability with existing reactive libraries, and to support future implementations (such as a Reactive Streams compliant library in JavaScript). We believe this will likely take the form of an "extension" library which will wrap/compose with the core RSocket APIs, rather than as a core feature of RSocket-JS. I discussed this approach a bit more here. You'll see in the linked example of an RxJS extension that the API is pretty simple, though the batching approach you mention should likely be something we consider.

The finer details of how the extensions will take shape, and which we'll implement immediately are all still very much in the works though, so I do appreciate the details and examples you've provided above, as they will likely serve well as inspiration.

@viglucci
Copy link
Member Author

viglucci commented Feb 15, 2022

Providing a status update on this effort:

We believe we are at a point with the 1.0.x branch where we would like to publish some preview versions to npm. We are working through some outstanding requirements to be able to actually publish on npm and will do so once those blockers are cleared. Once those preview versions are available we'll be sure to add a release on GitHub and provide an update referencing them.

Additionally, there have been some conversations around how to best support browser environments that require a buffer polyfill, and the current strategy we are leaning towards is to require consumers to polyfill buffer via their build tooling, and provide ample documentation to make that as clear and painless as possible. If you are interested in that conversation or would like to provide insights or alternatives, please refer to #213 .

@viglucci viglucci added the enhancement Suggests, requests, or implements a feature or enhancement label Mar 6, 2022
@viglucci
Copy link
Member Author

viglucci commented May 1, 2022

We are excited to share that we've released the first alpha versioned artifacts from this effort. 🎉 😄

Please see the 1.0.0-alpha.1 release for an overview.

@viglucci
Copy link
Member Author

Hi all,

1.0.0-alpha has been out for a few months now, and we are hoping to receive feedback from the community/current consumers prior to going to 1.0.0. If you have any feedback or would be willing to try migrating to the new versions, that would help us accelerate moving to a stable release version. Thanks!

@lachenmayer
Copy link

Hi @viglucci, I'm finally getting around to integrating RSocket 1.0 in a greenfield project, so I'll be adding to this thread as and when I find anything worth sharing.

So far loving the simple integration with RxJS, particularly on the client side this has reduced our RSocket setup code by at least 10x. Great work on that.

I had one small hiccup that I've had to hack around on the server side. We're running an Express server with some "normal" HTTP endpoints, and we want RSocket to only listen on a /subscriptions URL. The way we're doing this is:

We have an existing httpServer: http.Server, and my initial instinct was to instantiate the transport like this:

  const websocketServer = new WebSocket.Server({
    server: httpServer,
    path: '/subscriptions',
  })
  const transport = new WebsocketServerTransport({
    wsCreator: () => websocketServer,
  })
  const rsocketServer = new RSocketServer({
    transport,
    // ...
  })

We were calling this as follows in our server setup code:

  await listen() // () => new Promise<void>((resolve) => httpServer.listen(env.port, resolve))
  await rsocketServer.bind()

This didn't work though, as rsocketServer.bind() never resolves.

I noticed that WebsocketServerTransport#connectServer uses the listening event to resolve the server:

websocketServer.addListener("listening", () => resolve(websocketServer));

Manually adding websocketServer.addListener('listening', () => console.log('I am listening')), and also adding 'connection' listeners showed me that the WebSocket server was listening correctly.

Instantiating the WebSocket.Server inside the wsCreator callback did not help either. WebSocket.Server just binds to the HTTP server's listening event (code), so this never emits if the server is already listening.

I can fix this for now by instantiating the RSocket server before listening on the HTTP server, eg.:

  const rsocketStarted = rsocketServer.bind()
  await listen()
  await rsocketStarted

...but it's a bit confusing that the order makes a difference.

I can see why we'd want to await the listening event inside connectServer, but is there potentially some way of checking whether the WebSocket.Server is already listening and immediately resolving if so?

Anyway not a blocker at all, just a papercut :)

Will report on more as I come across it & once again thanks a lot for working on this!

@viglucci
Copy link
Member Author

viglucci commented Aug 7, 2022

Thanks for the feedback @lachenmayer . I've documented this behavior in #237 .

@lachenmayer
Copy link

Tiny tiny API inconsistency in the RxJS adapter: on the RxRequestersFactory, factories with input & output codecs are specified as separate arguments, while they're specified as an object on the RxRespondersFactory.

inputCodec: Codec<TData>,
outputCodec: Codec<RData>

codecs: {
inputCodec: IN extends void | null | undefined ? undefined : Codec<IN>;
outputCodec: OUT extends void | null | undefined ? undefined : Codec<OUT>;
}

I would personally prefer the object approach, as I'm not a huge fan of having multiple unnamed parameters with the same type, but easy either way :)

Also, I fully agree with your rationale of not providing any Codec implementations in rsocket-core, but it seems to me that the Codec type should live in rsocket-core, not in rsocket-messaging. This type is currently the only reason rsocket-adapter-rxjs depends on rsocket-messaging.

In fact we were able to easily implement our RSocket endpoints without using rsocket-messaging (we already have a well-trodden RPC mechanism in our codebase).

It took me a while to piece apart the RxJS example since it relies heavily on rsocket-messaging - it would be nice to create another more simplified RxJS "hello world" example without rsocket-messaging, which highlights that this is the only thing you need to do to get RxJS streams out of the server:

const server = new RSocketServer({
  transport,
  acceptor: {
    async accept(setupPayload) {
      return {
        requestStream: RxRespondersFactory.requestStream((data) => {
          return of(data) // arbitrary RxJS stream!
        }, codecs),
      }
    },
  },
})

Once again, amazing work on this API - this was previously a huge amount of work.

(Sorry, I am currently under heavy deadline pressure for at least the rest of this month, otherwise I would definitely just contribute this myself!)

@lachenmayer
Copy link

Also another super tiny inconvenience in the RxJS requesters API: we're currently not using metadata in our client-side code (yet?), so we have to add new Map() for each requestStream - could be nice to add = new Map() as default value for this argument?

metadata: Map<string | number | WellKnownMimeType, Buffer>

@viglucci
Copy link
Member Author

viglucci commented Aug 9, 2022

@lachenmayer please see #241 which addresses #158 (comment).

Additionally you can experiment with this change in rsocket-adapter-rxjs@1.0.0-alpha-rxjs-adapter-optional-metadata.0

@lachenmayer
Copy link

Can confirm the fixed requesters API works great for us with [email protected]!

@viglucci
Copy link
Member Author

viglucci commented Aug 10, 2022

I would personally prefer the object approach, as I'm not a huge fan of having multiple unnamed parameters with the same type, but easy either way :)

I'm looking into this, but it might not be for a bit. In doing so, I found that we weren't leveraging FNF in the existing rxjs examples. Those examples are now updated. Additionally, I've added a second rxjs example which provides a simpler example mostly devoid of rsocket-messaging concepts (apart from import Codec), which emphasizes your other point about moving Codec to rsocket-core.

I'll need to consider moving Codec to rsocket-core a bit more.

Thanks again for the valuable feedback!

@AndyOGo
Copy link

AndyOGo commented Aug 18, 2022

This is awesome, thank you very much for the effort.

I'm about to migrate and I'm curious about:

  • Why changed the terminology form RSocketClient to RSocketConnector? I can't find it in the protocol's terminology
  • What is the usage of onExtension method?
    Is it for the EXT frame?
    Or is it for other Reactive Streams implementations?

    while also promoting composition and extension with other Reactive Streams implementations, such as RxJS and reactive-streams-js.

@AndyOGo
Copy link

AndyOGo commented Aug 29, 2022

@viglucci
We observed the connectionStatus with v0.

Is there an alternative in v1?

@AndyOGo
Copy link

AndyOGo commented Aug 31, 2022

@viglucci
Are there any plans to document v1?
tsdoc is a nice proposal for standard doclets, what do you think?

@viglucci
Copy link
Member Author

viglucci commented Sep 2, 2022

Hi there, a few answers to the above questions (not exhaustive).

  • Why changed the terminology form RSocketClient to RSocketConnector? I can't find it in the protocol's terminology

We aimed to aligned some of the terminology with other implementations, such as RSocket-Java.

  • What is the usage of onExtension method?
    Is it for the EXT frame?
    Or is it for other Reactive Streams implementations?

    while also promoting composition and extension with other Reactive Streams implementations, such as RxJS and reactive-streams-js.

I don't have a great answer for this, I haven't used onExtension myself.


@viglucci We observed the connectionStatus with v0.

Is there an alternative in v1?

While not an exact replacement, onClose can be used to determine when the underlying transport has closed.

  const connector = new RSocketConnector({
    transport: new WebsocketClientTransport({
      url: "ws://localhost:8080",
      wsCreator: (url) => new WebSocket(url) as any,
    }),
  });

  const rsocket = await connector.connect();

  rsocket.onClose((err) => {
    // server has gone away
  });

@viglucci Are there any plans to document v1? tsdoc is a nice proposal for standard doclets, what do you think?

We have the documentation on roscket.io that I expect to be updated once v1 lands. However, we haven't planned any extensive documentation efforts past that. I could see something like tsdoc being valuable to produce reference documentation down the road.

@AndyOGo
Copy link

AndyOGo commented Sep 2, 2022

Thank you very much @viglucci for the clarification :)

@mmm8955405
Copy link

It seems that stream of node is not supported.

Pipe method and back pressure example of node

@AndyOGo
Copy link

AndyOGo commented Jul 1, 2024

@viglucci Is this project dead?
I see some changes on the main trunk, but no activity here since about 2 years.

@agcII
Copy link

agcII commented Aug 31, 2024

the rxjs example 2, does not work, only work if you exit app if vou send more data crash

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1.0 Pullrequests & issues related to the Typescript rewrite and 1.0 release enhancement Suggests, requests, or implements a feature or enhancement
Projects
None yet
Development

No branches or pull requests

8 participants