Skip to content

Commit

Permalink
Add json modules, and finish documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed Oct 5, 2023
1 parent dc78402 commit 9d2a38b
Show file tree
Hide file tree
Showing 36 changed files with 1,687 additions and 663 deletions.
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,69 @@
# Kotlin GCP PubSub

A KotlinX (Flow) binding for Google Cloud PubSub.
Google Cloud PubSub made easy! Kotlin GCP PubSub offers idiomatic KotlinX & Ktor integration for GCP.

```kotlin
@Serializable
data class Event(val key: String, val message: String)

fun Application.pubSubApp() {
pubSub(ProjectId("my-project")) {
subscribe<Event>(SubscriptionId("my-subscription")) { (event, record) ->
println("event.key: ${event.key}, event.message: ${event.message}")
record.ack()
}
}

routing {
post("/publish/{key}/{message}") {
val event = Event(call.parameters["key"]!!, call.parameters["message"]!!)

pubSub()
.publisher(ProjectId("my-project"))
.publish(TopicId("my-topic"), event)

call.respond(HttpStatusCode.Accepted)
}
}
}
```

## Modules

- [PubSub Ktor plugin](pubsub-ktor/README.MD) to conveniently consume messages from GCP PubSub, and publish messages to
GCP PubSub
- [PubSub Ktor KotlinX Serialization Json](pubsub-ktor-kotlinx-serialization-json/README.MD) to conveniently consume
messages from GCP PubSub, and publish messages to GCP PubSub using KotlinX Serialization Json
- [PubSub test](pubsub-test/README.MD) one-line testing support powered by testcontainers
- [GCP PubSub](pubsub/README.MD): KotlinX integration for `TopicAdminClient`, `SubscriptionAdminClient`, `Susbcriber`
and `Publisher`.
- [PubSub Ktor KotlinX Serialization Json](pubsub-kotlinx-serialization-json/README.MD) to conveniently consume messages
from GCP PubSub, and publish messages to GCP PubSub
- [Google Common API](api-core/README.MD): KotlinX integration for `ApiFuture`

## Using in your projects

### Gradle

Add dependencies (you can also add other modules that you need):

```kotlin
dependencies {
implementation("io.github.nomisrev:gcp-pubsub-ktor:1.0.0")
implementation("io.github.nomisrev:gcp-pubsub-ktor-kotlinx-serialization-json:1.0.0")
testImplementation("io.github.nomisrev:gcp-pubsub-test:1.0.0")
}
```

### Maven

Add dependencies (you can also add other modules that you need):

```xml

<dependency>
<groupId>io.github.nomisrev</groupId>
<artifactId>gcp-pubsub-ktor</artifactId>
<version>1.0.0</version>
</dependency>
```
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ plugins {
alias(libs.plugins.knit)
alias(libs.plugins.spotless)
}

repositories {
mavenCentral()
}
65 changes: 65 additions & 0 deletions common-api/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Module common-api

KotlinX integration module for Google's api-core module. If you need to work with
Google's [`ApiFuture`](https://cloud.google.com/java/docs/reference/api-common/latest/com.google.api.core.ApiFutures).

Extension functions:

| **Name** | **Description**
|--------------------------|---------------------------------------------------
| [ApiFuture.await]() | Awaits for completion of the future (cancellable)
| [ApiFuture.asDeferred]() | Converts a deferred value to the future

## Example

Given the following functions defined in some Java API based on Guava:

```java
public ApiFuture<AckResponse> ack(); // starts async acknowledging of message
```

We can consume this API from Kotlin coroutine to load two images and combine then asynchronously.
The resulting function returns `ListenableFuture<Image>` for ease of use back from Guava-based Java code.

```kotlin
suspend fun processMessage(record: PubsubRecord): Unit {
println(record.message.data.toStringUtf8())
when (val response = record.ack().await()) {
SUCCESSFUL -> println("Message was successfully acknowledged. Will not be redelivered.")
else -> println("Acknowledgment failed, message might be redelivered.")
}
}
```

Note that this module should be used only for integration with existing Java APIs based on `ApiFuture`.
Writing pure-Kotlin code that uses `ApiFuture` is highly not recommended, since the resulting APIs based
on the futures are quite error-prone. See the discussion on
[Asynchronous Programming Styles](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md#asynchronous-programming-styles)
for details on general problems pertaining to any future-based API and keep in mind that `ApiFuture` exposes
a _blocking_ method [get](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#get--) that makes
it especially bad choice for coroutine-based Kotlin code.

## Using in your projects

### Gradle

Add dependencies (you can also add other modules that you need):

```kotlin
dependencies {
implementation("io.github.nomisrev:google-common-api:1.0.0")
}
```

### Maven

Add dependencies (you can also add other modules that you need):

```xml

<dependency>
<groupId>io.github.nomisrev</groupId>
<artifactId>google-common-api</artifactId>
<version>1.0.0</version>
</dependency>
```
5 changes: 0 additions & 5 deletions api-core/build.gradle.kts → common-api/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
plugins {
id(libs.plugins.kotlin.jvm.get().pluginId)
id(libs.plugins.kotest.multiplatform.get().pluginId)
id(libs.plugins.dokka.get().pluginId)
id(libs.plugins.kover.get().pluginId)
alias(libs.plugins.knit)
Expand All @@ -19,8 +18,4 @@ configure<JavaPluginExtension> {
dependencies {
api(libs.coroutines)
api(libs.google.api)

testImplementation(libs.kotest.property)
testImplementation(libs.kotest.assertions)
testImplementation(libs.kotest.junit5)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
/**
* This file is inspired by
* https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
* https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-guava/src/ApiFuture.kt
*/
package io.github.nomisrev.gcp.core

import com.google.api.core.ApiFuture
import com.google.api.core.ApiFutureCallback
import com.google.api.core.ApiFutures
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.Uninterruptibles
Expand All @@ -32,52 +31,52 @@ import kotlinx.coroutines.handleCoroutineException
import kotlinx.coroutines.suspendCancellableCoroutine

/**
* Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
* Returns a [Deferred] that is completed or failed by `this` [ApiFuture].
*
* Completion is non-atomic between the two promises.
*
* Cancellation is propagated bidirectionally.
*
* When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
* When `this` `ApiFuture` completes (either successfully or exceptionally) it will try to
* complete the returned `Deferred` with the same value or exception. This will succeed, barring a
* race with cancellation of the `Deferred`.
*
* When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
* When `this` `ApiFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
* it will cancel the returned `Deferred`.
*
* When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
* cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
* `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
* will complete with a different outcome than `this` `ListenableFuture`.
* cancellation to `this` `ApiFuture`. Propagation will succeed, barring a race with the
* `ApiFuture` completing normally. This is the only case in which the returned `Deferred`
* will complete with a different outcome than `this` `ApiFuture`.
*/
public fun <T> ApiFuture<T>.asDeferred(): Deferred<T> {
/* This method creates very specific behaviour as it entangles the `Deferred` and
* `ListenableFuture`. This behaviour is the best discovered compromise between the possible
* `ApiFuture`. This behaviour is the best discovered compromise between the possible
* states and interface contracts of a `Future` and the states of a `Deferred`. The specific
* behaviour is described here.
*
* When `this` `ListenableFuture` is successfully cancelled - meaning
* `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
* When `this` `ApiFuture` is successfully cancelled - meaning
* `ApiFuture.cancel()` returned `true` - it will synchronously cancel the returned
* `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
* `Deferred` will always be put into its "cancelling" state and (barring uncooperative
* cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
* cancelled.
*
* When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
* called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
* cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
* When the returned `Deferred` is cancelled, `ApiFuture.cancel()` will be synchronously
* called on `this` `ApiFuture`. This will attempt to cancel the `Future`, though
* cancellation may not succeed and the `ApiFuture` may complete in a non-cancelled
* terminal state.
*
* The returned `Deferred` may receive and suppress the `true` return value from
* `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
* `ApiFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
* This is unavoidable, so make sure no idempotent cancellation work is performed by a
* reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
* reference-holder of the `ApiFuture` task. The idempotent work won't get done if
* cancellation was from the `Deferred` representation of the task.
*
* This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
* semantics. See `Job` for a description of coroutine cancellation semantics.
*/
// First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
// First, try the fast-fast error path for Guava ApiFutures. This will save allocating an
// Exception by using the same instance the Future created.
if (this is InternalFutureFailureAccess) {
val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
Expand Down Expand Up @@ -205,12 +204,12 @@ private fun ExecutionException.nonNullCause(): Throwable {
}

@InternalCoroutinesApi
private class ListenableFutureCoroutine<T>(context: CoroutineContext) :
private class ApiFutureCoroutine<T>(context: CoroutineContext) :
AbstractCoroutine<T>(context, initParentJob = true, active = true) {

// JobListenableFuture propagates external cancellation to `this` coroutine. See
// JobListenableFuture.
@JvmField val future = JobListenableFuture<T>(this)
// JobApiFuture propagates external cancellation to `this` coroutine. See
// JobApiFuture.
@JvmField val future = JobApiFuture<T>(this)

override fun onCompleted(value: T) {
future.complete(value)
Expand All @@ -229,29 +228,29 @@ private class ListenableFutureCoroutine<T>(context: CoroutineContext) :
}

/**
* A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
* A [ApiFuture] that delegates to an internal [SettableFuture], collaborating with it.
*
* This setup allows the returned [ListenableFuture] to maintain the following properties:
* This setup allows the returned [ApiFuture] to maintain the following properties:
* - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone] and
* [isCancelled] methods
* - Cancellation propagation both to and from [Deferred]
* - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
* with different concrete implementations of [ListenableFuture]
* - Correct cancellation and completion semantics even when this [ApiFuture] is combined
* with different concrete implementations of [ApiFuture]
* - Fully correct cancellation and listener happens-after obeying [Future] and
* [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to
* [ApiFuture]'s documented and implicit contracts is surprisingly difficult to
* achieve. The best way to be correct, especially given the fun corner cases from
* [AbstractFuture.setFuture], is to just use an [AbstractFuture].
* - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary
* - To maintain sanity, this class implements [ApiFuture] and uses an auxiliary
* [SettableFuture] around coroutine's result as a state engine to establish
* happens-after-completion. This could probably be compressed into one subclass of
* [AbstractFuture] to save an allocation, at the cost of the implementation's readability.
*/
private class JobListenableFuture<T>(private val jobToCancel: Job) : ListenableFuture<T> {
private class JobApiFuture<T>(private val jobToCancel: Job) : ApiFuture<T> {
/**
* Serves as a state machine for [Future] cancellation.
*
* [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
* cancellation semantics. By using that type, the [JobListenableFuture] can delegate its
* cancellation semantics. By using that type, the [JobApiFuture] can delegate its
* semantics to `auxFuture.get()` the result in such a way that the `Deferred` is always complete
* when returned.
*
Expand All @@ -261,12 +260,12 @@ private class JobListenableFuture<T>(private val jobToCancel: Job) : ListenableF
private val auxFuture = SettableFuture.create<Any?>()

/**
* `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
* `true` if [auxFuture.get][ApiFuture.get] throws [ExecutionException].
*
* Note: this is eventually consistent with the state of [auxFuture].
*
* Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
* apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException]
* Unfortunately, there's no API to figure out if [ApiFuture] throws [ExecutionException]
* apart from calling [ApiFuture.get] on it. To avoid unnecessary [ExecutionException]
* allocation we use this field as an optimization.
*/
private var auxFutureIsFailed: Boolean = false
Expand Down Expand Up @@ -327,7 +326,7 @@ private class JobListenableFuture<T>(private val jobToCancel: Job) : ListenableF

/**
* Waits for [auxFuture] to complete by blocking, then uses its `result` to get the `T` value
* `this` [ListenableFuture] is pointing to or throw a [CancellationException]. This establishes
* `this` [ApiFuture] is pointing to or throw a [CancellationException]. This establishes
* happens-after ordering for completion of the entangled coroutine.
*
* [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
Expand Down Expand Up @@ -371,7 +370,7 @@ private class JobListenableFuture<T>(private val jobToCancel: Job) : ListenableF
*
* This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
* in a particular way. [jobToCancel] may also be in its "cancelling" state while this
* ListenableFuture is complete and cancelled.
* ApiFuture is complete and cancelled.
*/
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
// TODO: call jobToCancel.cancel() _before_ running the listeners.
Expand Down Expand Up @@ -418,6 +417,6 @@ private class JobListenableFuture<T>(private val jobToCancel: Job) : ListenableF
* If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user.
* Unfortunately, [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap
* cancellation exception into this class and pass it into [SettableFuture.complete]. See
* implementation of [JobListenableFuture].
* implementation of [JobApiFuture].
*/
private class Cancelled(@JvmField val exception: CancellationException)

This file was deleted.

Loading

0 comments on commit 9d2a38b

Please sign in to comment.