From c91396b1c2f045587fdd5f995c7764ebae8ec0d6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 22 Feb 2023 06:14:13 +0000 Subject: [PATCH 1/2] Override `getAndDiscreteUpdates` for signal `ap` --- frp/src/main/scala/calico/frp/frp.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/frp/src/main/scala/calico/frp/frp.scala b/frp/src/main/scala/calico/frp/frp.scala index 9a4aa8ef..a9cfb841 100644 --- a/frp/src/main/scala/calico/frp/frp.scala +++ b/frp/src/main/scala/calico/frp/frp.scala @@ -42,6 +42,7 @@ import cats.Monad import cats.StackSafeMonad import cats.data.OptionT import cats.effect.kernel.Concurrent +import cats.effect.kernel.Resource import cats.syntax.all.* import fs2.Pull import fs2.Stream @@ -65,6 +66,16 @@ given [F[_]: Concurrent]: Monad[Signal[F, _]] = new StackSafeMonad[Signal[F, _]] def continuous: Stream[F, B] = Stream.repeatEval(get) def get: F[B] = ff.get.ap(fa.get) + override def getAndDiscreteUpdates( + implicit ev: Concurrent[F]): Resource[F, (B, Stream[F, B])] = + getAndDiscreteUpdatesImpl + + private def getAndDiscreteUpdatesImpl = + (ff.getAndDiscreteUpdates, fa.getAndDiscreteUpdates).mapN { + case ((f, fs), (a, as)) => + (f(a), nondeterministicZip(fs, as).map { case (f, a) => f(a) }) + } + private def nondeterministicZip[A0, A1]( xs: Stream[F, A0], ys: Stream[F, A1] From 1761eb4f00c5de3ef9d9478f4ab1fcb1b169bbd3 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 22 Feb 2023 06:29:04 +0000 Subject: [PATCH 2/2] Override `getAndDiscreteUpdates` for signal `flatMap` --- frp/src/main/scala/calico/frp/frp.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/frp/src/main/scala/calico/frp/frp.scala b/frp/src/main/scala/calico/frp/frp.scala index a9cfb841..599faf76 100644 --- a/frp/src/main/scala/calico/frp/frp.scala +++ b/frp/src/main/scala/calico/frp/frp.scala @@ -58,6 +58,14 @@ given [F[_]: Concurrent]: Monad[Signal[F, _]] = new StackSafeMonad[Signal[F, _]] def get = siga.get.flatMap(f(_).get) def continuous = Stream.repeatEval(get) def discrete = siga.discrete.switchMap(f(_).discrete) + override def getAndDiscreteUpdates(using Concurrent[F]) = + getAndDiscreteUpdatesImpl + private def getAndDiscreteUpdatesImpl = + siga.getAndDiscreteUpdates.flatMap { (a, as) => + f(a).getAndDiscreteUpdates.map { (b, bs) => + (b, (Stream.emit(bs) ++ as.map(f(_).discrete)).switchMap(identity(_))) + } + } override def ap[A, B](ff: Signal[F, A => B])(fa: Signal[F, A]) = new: @@ -66,8 +74,7 @@ given [F[_]: Concurrent]: Monad[Signal[F, _]] = new StackSafeMonad[Signal[F, _]] def continuous: Stream[F, B] = Stream.repeatEval(get) def get: F[B] = ff.get.ap(fa.get) - override def getAndDiscreteUpdates( - implicit ev: Concurrent[F]): Resource[F, (B, Stream[F, B])] = + override def getAndDiscreteUpdates(using Concurrent[F]): Resource[F, (B, Stream[F, B])] = getAndDiscreteUpdatesImpl private def getAndDiscreteUpdatesImpl =