Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How does event listening work? #114

Open
burakakca opened this issue Feb 20, 2023 · 4 comments
Open

How does event listening work? #114

burakakca opened this issue Feb 20, 2023 · 4 comments

Comments

@burakakca
Copy link

I'm using two ZIO scope blocks in my codebase one of them is for creating commands and the other for listening if an event published. When the command and the listener are in the same code block and run sequentially, there is no problem, but when they are run in different time intervals, the listening side does not receive information. How can I solve this?

There is a pub/sub mechanism or it's reading from the database?

Command code block like this example; https://github.com/hnaderi/edomata-example/blob/05609b7f54388dae89b9a925eea21b794db9fd39/core/src/main/scala/dev/hnaderi/example/Main.scala#L47

Listening code block

def mySink(app: Application[Task]) = ZSink.foreach((item: OutboxItem[Notification]) =>
  for {
    _ <- printLine("mySink: " + item)
    _ <- app.accounts.storage.outbox.markAsSent(item)
  } yield ()
)

def printOutbox(app: Application[Task]) = app.accounts.storage.outbox.read.toZStream().run(mySink(app))

ZIO.scoped {
 for {
   app <- Application[Task]().toScopedZIO
   _ <- printOutbox(app)
  } yield ()
 }..fork
@hnaderi
Copy link
Owner

hnaderi commented Feb 20, 2023

Basically storage methods are all providing data from database itself, there are also notifications for updates (both in journal and outbox), so you can listen to them and read when necessary.
outbox.read reads all of the available outboxed messages.
You can also use edomata.backend.OutboxConsumer which handles all of these, so you don't need to do it by hand, as most of the storage methods are low-level.
You can see how OutboxConsumer works in the following code:

(emit(()) ++ signal)
.flatMap(_ => backend.read)
.chunks
.foreach(ch =>
NonEmptyChain
.fromChain(ch.toChain)
.fold(F.unit)(
ch.traverse(run) >> backend.markAllAsSent(_)
)
)

@hnaderi
Copy link
Owner

hnaderi commented Feb 20, 2023

but when they are run in different time intervals, the listening side does not receive information

What do you mean by different time intervals? Are they running in parallel?
Can you provide an example and its results?

@burakakca
Copy link
Author

What do you mean by different time intervals? Are they running in parallel?

Yes, running on different zio fibers.

@hnaderi
Copy link
Owner

hnaderi commented Feb 21, 2023

Is your problem solved?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants