This is an implementation of the Web Application Messaging Protocol (WAMP) written in Kotlin.
The provides facilities for routed Remote Procdeure Calls (RPC) and Publish and Subscribe (PubSub).
It's based on the WAMP spec.
So far this project aims to fulfil the WAMP basic profile.
Under repositories add:
repositories {
mavenCentral()
jcenter() // Required
maven { url 'https://jitpack.io' }
}
Under dependencies:
dependencies {
implementation("com.github.LaurenceGA.kwamp:kwamp-router-core:1.0.5")
}
Under dependencies:
dependencies {
implementation("com.github.LaurenceGA.kwamp:kwamp-client-core:1.0.5")
}
KWAMP is mostly transport independent (it must be a valid WAMP transport). E.G raw socket or web socket.
To use the KWAMP client or router you can use the kwamp-client-core or kwamp-router-core packages respectively. They just need to be hooked into a transport (doing this can be seen in kwamp-client-example and kwamp-router-example respectively).
(At some point these integrations should become actual packages in this project which can be used by themselves.)
The router supports the WAMP basic profile. To create a router:
val router = Router()
A client won't be able to connect to it unless it has a Realm (WAMP message routing domain):
router.addRealm(Realm(Uri("myRealm")))
Connections then must be registered with the router in order for them to communicate with it:
val incoming = Channel<ByteArray>()
val outgoing = Channel<ByteArray>()
val connection = Connection(
incoming,
outgoing,
{ message ->
// callback when router closes a connection
// so you can flush and close the underlying transport
},
messageSerializer = JsonMessageSerializer() // default message serializer
)
router.registerConnection(connection)
You then just need to ensure that the incoming and outgoing channels are hooked into the input/output of the underlying transport.
launch { // In another thread so it doesn't block
outgoing.consumeEach { message ->
// forward on anything the router wants to send over the connection to the transport here
}
}
// Send anything the transport receives to the incoming channel
transportIncoming.consumeEach { message ->
incoming.send(/* transport message as byte array */)
}
A client is created with:
val incoming = Channel<ByteArray>()
val outgoing = Channel<ByteArray>()
val client = ClientImpl(wampIncoming, wampOutgoing, Uri("<REALM_URI_HERE>"))
val sessionId = client.getSessionId()
Then you just need to hook up incoming and outgoing channels with the input/output of the underlying channel (the same as is done above with the router).
To register a procedure with the router the client is connected to use:
val registrationHandle = client.register(Uri("<PROCEDURE_URI_HERE>")) { arguments, argumentsKw ->
// Do something here and return a CallResult object
CallResult(listOf(1, 2, 3), mapOf("one" to 1))
// Or just CallResult() if you want empty return
}
Unregistering is a blocking procedure. Using the registration handle from when you registered:
registrationHandle.unregister()
To call a procedure use:
val call = client.call(Uri("<PROCEDURE_URI_HERE>"))
This produces a DeferredCallResult
.
You can then use this to await the result.
val result = call.await()
If there is an error executing the procedure then this will throw an exception.
To subscribe to a topic use:
val subscriptionHandle = client.subscribe("<PROCEDURE_URI_HERE>") { arguments, argumentsKw ->
// Event callback
// Do something with event arguments here...
}
Unsubscribing is a blocking procedure. Using the subscription handle from when you subscribed:
subscriptionHandle.unsubscribe()
To publish to a topic, a client can use:
client.publish(Uri("<TOPIC_URI_HERE>"), publishArguments, publishArgumentsKw)
where publishArguments
is of type List<Any?>?
and publishArgumentsKw is of type Map<String, Any?>?
.
If you also want it acknowledged, publish()
has an extra optional argument:
client.publish(testTopic, publishArguments, publishArgumentsKw) { id ->
// Do something. Id is the ID of the publication
}
Disconnecting is a blocking procedure used like so:
client.disconnect()
The client will say goodbye to the router and wait for it to say goodbye back. The function also returns the goodbye reason sent from the router if you want it.
Transport agnostic WAMP router implementation logic using Kotlin coroutines that routes client messages.
An example usage of KWAMP router that uses KTOR websockets to host the router.
A transport agnostic WAMP client implementation that interfaces with a WAMP router.
An example usage of KWAMP client that uses the KTOR websocket client to communicate to a WAMP router.
A set of example conversations with a mocked out router or client.