From 20fe87f28c2563fe880d8b41edbb9f5bb7b6c9a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Koz=C5=82owski?= Date: Sat, 21 May 2022 20:41:06 +0200 Subject: [PATCH] Add todos, remove self type, extract transformer --- .../scala/io/pg/messaging/messaging.scala | 9 +++-- .../src/main/scala/io/pg/gitlab/Gitlab.scala | 2 ++ .../scala/io/pg/gitlab/webhook/webhook.scala | 1 + src/main/scala/io/pg/webhook/webhook.scala | 35 ++++++++++--------- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/io/pg/messaging/messaging.scala b/core/src/main/scala/io/pg/messaging/messaging.scala index 0eafaef5..169fa60f 100644 --- a/core/src/main/scala/io/pg/messaging/messaging.scala +++ b/core/src/main/scala/io/pg/messaging/messaging.scala @@ -36,16 +36,19 @@ object Processor { } -trait Channel[F[_], A] extends Publisher[F, A] { self => +trait Channel[F[_], A] extends Publisher[F, A] { def consume: fs2.Stream[F, A] } object Channel { - given[F[_]]: Invariant[Channel[F, *]] with { + + given [F[_]]: Invariant[Channel[F, *]] with { + def imap[A, B](chan: Channel[F, A])(f: A => B)(g: B => A): Channel[F, B] = new { - def consume: fs2.Stream[F,B] = chan.consume.map(f) + def consume: fs2.Stream[F, B] = chan.consume.map(f) def publish(b: B): F[Unit] = chan.publish(g(b)) } + } def fromQueue[F[_]: Functor, A](q: Queue[F, A]): Channel[F, A] = diff --git a/gitlab/src/main/scala/io/pg/gitlab/Gitlab.scala b/gitlab/src/main/scala/io/pg/gitlab/Gitlab.scala index 1bafe5bc..beaaed68 100644 --- a/gitlab/src/main/scala/io/pg/gitlab/Gitlab.scala +++ b/gitlab/src/main/scala/io/pg/gitlab/Gitlab.scala @@ -281,12 +281,14 @@ object GitlabEndpoints { } object ApprovalRule { + // todo: use configured codec when https://github.com/circe/circe/pull/1800 is available given CirceCodec[ApprovalRule] = CirceCodec.forProduct3("id", "name", "rule_type")(apply)(r => (r.id, r.name, r.ruleType)) } final case class MergeRequestApprovals(approvalsRequired: Int) object MergeRequestApprovals { + // todo: use configured codec when https://github.com/circe/circe/pull/1800 is available given CirceCodec[MergeRequestApprovals] = CirceCodec.forProduct1("approvals_required")(apply)(_.approvalsRequired) } diff --git a/gitlab/src/main/scala/io/pg/gitlab/webhook/webhook.scala b/gitlab/src/main/scala/io/pg/gitlab/webhook/webhook.scala index 63cbe4ed..e8a638e6 100644 --- a/gitlab/src/main/scala/io/pg/gitlab/webhook/webhook.scala +++ b/gitlab/src/main/scala/io/pg/gitlab/webhook/webhook.scala @@ -5,6 +5,7 @@ import io.circe.Codec final case class WebhookEvent(project: Project, objectKind: String /* for logs */ ) object WebhookEvent { + // todo: use configured codec when https://github.com/circe/circe/pull/1800 is available given Codec[WebhookEvent] = Codec.forProduct2("project", "object_kind")(apply)(we => (we.project, we.objectKind)) } diff --git a/src/main/scala/io/pg/webhook/webhook.scala b/src/main/scala/io/pg/webhook/webhook.scala index 093849a8..70171f0a 100644 --- a/src/main/scala/io/pg/webhook/webhook.scala +++ b/src/main/scala/io/pg/webhook/webhook.scala @@ -17,6 +17,7 @@ import org.http4s.dsl.Http4sDsl import cats.MonadThrow import io.pg.gitlab.Gitlab.MergeRequestInfo import io.pg.MergeRequestState.Mergeability +import io.pg.MergeRequestState object WebhookRouter { @@ -36,29 +37,29 @@ object WebhookRouter { MergeRequests[F] .build(proj) .nested - .map { s => - transport.MergeRequestState( - projectId = s.projectId, - mergeRequestIid = s.mergeRequestIid, - description = s.description, - status = s.status match { - case MergeRequestInfo.Status.Success => transport.MergeRequestState.Status.Success - case MergeRequestInfo.Status.Other(s) => transport.MergeRequestState.Status.Other(s) - }, - mergeability = s.mergeability match { - case Mergeability.CanMerge => transport.MergeRequestState.Mergeability.CanMerge - case Mergeability.HasConflicts => transport.MergeRequestState.Mergeability.HasConflicts - case Mergeability.NeedsRebase => transport.MergeRequestState.Mergeability.NeedsRebase - }, - authorUsername = s.authorUsername - ) - } + .map(mergeRequestToTransport) .value .flatMap(Ok(_)) } } + private def mergeRequestToTransport(mr: MergeRequestState): io.pg.transport.MergeRequestState = transport.MergeRequestState( + projectId = mr.projectId, + mergeRequestIid = mr.mergeRequestIid, + description = mr.description, + status = mr.status match { + case MergeRequestInfo.Status.Success => transport.MergeRequestState.Status.Success + case MergeRequestInfo.Status.Other(s) => transport.MergeRequestState.Status.Other(s) + }, + mergeability = mr.mergeability match { + case Mergeability.CanMerge => transport.MergeRequestState.Mergeability.CanMerge + case Mergeability.HasConflicts => transport.MergeRequestState.Mergeability.HasConflicts + case Mergeability.NeedsRebase => transport.MergeRequestState.Mergeability.NeedsRebase + }, + authorUsername = mr.authorUsername + ) + } object WebhookProcessor {