From c91396b1c2f045587fdd5f995c7764ebae8ec0d6 Mon Sep 17 00:00:00 2001
From: Arman Bilge
Date: Wed, 22 Feb 2023 06:14:13 +0000
Subject: [PATCH 1/2] Override `getAndDiscreteUpdates` for signal `ap`
---
frp/src/main/scala/calico/frp/frp.scala | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/frp/src/main/scala/calico/frp/frp.scala b/frp/src/main/scala/calico/frp/frp.scala
index 9a4aa8ef..a9cfb841 100644
--- a/frp/src/main/scala/calico/frp/frp.scala
+++ b/frp/src/main/scala/calico/frp/frp.scala
@@ -42,6 +42,7 @@ import cats.Monad
import cats.StackSafeMonad
import cats.data.OptionT
import cats.effect.kernel.Concurrent
+import cats.effect.kernel.Resource
import cats.syntax.all.*
import fs2.Pull
import fs2.Stream
@@ -65,6 +66,16 @@ given [F[_]: Concurrent]: Monad[Signal[F, _]] = new StackSafeMonad[Signal[F, _]]
def continuous: Stream[F, B] = Stream.repeatEval(get)
def get: F[B] = ff.get.ap(fa.get)
+ override def getAndDiscreteUpdates(
+ implicit ev: Concurrent[F]): Resource[F, (B, Stream[F, B])] =
+ getAndDiscreteUpdatesImpl
+
+ private def getAndDiscreteUpdatesImpl =
+ (ff.getAndDiscreteUpdates, fa.getAndDiscreteUpdates).mapN {
+ case ((f, fs), (a, as)) =>
+ (f(a), nondeterministicZip(fs, as).map { case (f, a) => f(a) })
+ }
+
private def nondeterministicZip[A0, A1](
xs: Stream[F, A0],
ys: Stream[F, A1]
From 1761eb4f00c5de3ef9d9478f4ab1fcb1b169bbd3 Mon Sep 17 00:00:00 2001
From: Arman Bilge
Date: Wed, 22 Feb 2023 06:29:04 +0000
Subject: [PATCH 2/2] Override `getAndDiscreteUpdates` for signal `flatMap`
---
frp/src/main/scala/calico/frp/frp.scala | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/frp/src/main/scala/calico/frp/frp.scala b/frp/src/main/scala/calico/frp/frp.scala
index a9cfb841..599faf76 100644
--- a/frp/src/main/scala/calico/frp/frp.scala
+++ b/frp/src/main/scala/calico/frp/frp.scala
@@ -58,6 +58,14 @@ given [F[_]: Concurrent]: Monad[Signal[F, _]] = new StackSafeMonad[Signal[F, _]]
def get = siga.get.flatMap(f(_).get)
def continuous = Stream.repeatEval(get)
def discrete = siga.discrete.switchMap(f(_).discrete)
+ override def getAndDiscreteUpdates(using Concurrent[F]) =
+ getAndDiscreteUpdatesImpl
+ private def getAndDiscreteUpdatesImpl =
+ siga.getAndDiscreteUpdates.flatMap { (a, as) =>
+ f(a).getAndDiscreteUpdates.map { (b, bs) =>
+ (b, (Stream.emit(bs) ++ as.map(f(_).discrete)).switchMap(identity(_)))
+ }
+ }
override def ap[A, B](ff: Signal[F, A => B])(fa: Signal[F, A]) =
new:
@@ -66,8 +74,7 @@ given [F[_]: Concurrent]: Monad[Signal[F, _]] = new StackSafeMonad[Signal[F, _]]
def continuous: Stream[F, B] = Stream.repeatEval(get)
def get: F[B] = ff.get.ap(fa.get)
- override def getAndDiscreteUpdates(
- implicit ev: Concurrent[F]): Resource[F, (B, Stream[F, B])] =
+ override def getAndDiscreteUpdates(using Concurrent[F]): Resource[F, (B, Stream[F, B])] =
getAndDiscreteUpdatesImpl
private def getAndDiscreteUpdatesImpl =