Skip to content

Commit

Permalink
Rename source.map => transformValuesWith and add warning
Browse files Browse the repository at this point in the history
  • Loading branch information
natsukagami committed Dec 10, 2023
1 parent 8f32c1b commit b244be6
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
9 changes: 6 additions & 3 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ object Async:
end values

extension [T](src: Source[T])
/** Pass on data transformed by `f` */
def map[U](f: T => U) =
/** Create a new source that requires the original source to run the given transformation function on every value
* received. Note that [[f]] is **always** run on the computation that produces the values from the original
* source, so this is very likely to run **sequentially** and be a performance bottleneck.
*/
def transformValuesWith[U](f: T => U) =
new Source[U]:
selfSrc =>
def transform(k: Listener[U]) =
Expand Down Expand Up @@ -267,5 +270,5 @@ object Async:
* continuation.
*/
def either[T1, T2](src1: Source[T1], src2: Source[T2]): Source[Either[T1, T2]] =
race(src1.map(Left(_)), src2.map(Right(_)))
race(src1.transformValuesWith(Left(_)), src2.transformValuesWith(Right(_)))
end Async
2 changes: 1 addition & 1 deletion shared/src/test/scala/ChannelBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class ChannelBehavior extends munit.FunSuite {
}
val race = Async.race(
(0 until 100).map(i =>
Async.race((10 * i until 10 * i + 10).map(idx => channels(idx).readSource.map(_.right.get))*)
Async.race((10 * i until 10 * i + 10).map(idx => channels(idx).readSource.transformValuesWith(_.right.get))*)
)*
)
var sum = 0
Expand Down
8 changes: 4 additions & 4 deletions shared/src/test/scala/SourceBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ class SourceBehavior extends munit.FunSuite {
assertEquals(bRan, true)
}

test("map") {
test("transform values with") {
Async.blocking:
val f: Future[Int] = Future { 10 }
assertEquals(f.map({ case Success(i) => i + 1 }).awaitResult, 11)
assertEquals(f.transformValuesWith({ case Success(i) => i + 1 }).awaitResult, 11)
val g: Future[Int] = Future.now(Failure(AssertionError(1123)))
assertEquals(g.map({ case Failure(_) => 17 }).awaitResult, 17)
assertEquals(g.transformValuesWith({ case Failure(_) => 17 }).awaitResult, 17)
}

test("all listeners in chain fire") {
Expand All @@ -143,7 +143,7 @@ class SourceBehavior extends munit.FunSuite {
sleep(50)
10
}
val g = f.map(identity)
val g = f.transformValuesWith(identity)
f.onComplete(Listener.acceptingListener { (_, _) => aRan.complete(Success(())) })
g.onComplete(Listener.acceptingListener { (_, _) => bRan.complete(Success(())) })
assertEquals(aRan.future.poll(), None)
Expand Down

0 comments on commit b244be6

Please sign in to comment.