Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added websocket event sources #60

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open

Added websocket event sources #60

wants to merge 43 commits into from

Conversation

ajaychandran
Copy link
Contributor

I have tested the implementation for binary and text data. Included support for dom.Blob data type, not sure how to test it though.

This example can be added to the Laminar project once the PR is merged and released.

raquo and others added 30 commits December 25, 2020 00:22
Previously, if you stopped the delayed stream and then immediately started it, delayed events scheduled before the stream was stopped would fire after it was re-started if their delays did not complete while the stream was stopped.
Same reasoning as delay operator in prev commit
Same reasoning as Delay, see previous commit
Same reasoning as Delay, see previous commit
- New: Signal.fromValue and Signal.fromTry
- New: Observer.toJsFn1
- Fix: Ajax error message is not actually available
- Fix: delay xhr.open() so that readyStateChange fires with readyState = 1
@ajaychandran
Copy link
Contributor Author

Looks like I started with an older version of the target branch. Let me know if you want me to resubmit the PR.

# Conflicts:
#	README.md
#	src/main/scala/com/raquo/airstream/web/AjaxEventStream.scala
#	src/main/scala/com/raquo/airstream/web/DomEventStream.scala
@ajaychandran
Copy link
Contributor Author

@raquo Please review.

@yurique
Copy link
Contributor

yurique commented Jan 2, 2021

I have a WebSocket API in laminext, and I took a different approach there (I just added support for blob/arrays inspired by this PR :) ).

The usage looks like this:

      val ws = websocket.forUrl("wss://echo.websocket.org").string
// or
      websocket.forUrl("wss://echo.websocket.org").text(decode, encode)
      websocket.forUrl("wss://echo.websocket.org").blob
      websocket.forUrl("wss://echo.websocket.org").arraybuffer
// or      
      websocket.forUrl("wss://echo.websocket.org").receiveString.sendArray
      websocket.forUrl("wss://echo.websocket.org").receiveBlob.sendText(decode, encode)
// and all other combinations 

and then there's a bunch of binders + a stream and an observer provided:

simplest:

val outgoingEvents: Observable[String] = ???

div(
  ws.connect, //only needed when no other binders are used, like in this example
  child.text <-- ws.received.stream,
  outgoingEvents --> ws.sendObserver
)

other binders:

div(
  ws.send <-- outgoingEvents
)
div(
  ws.connected --> connectedObserver,  // or () => Unit, as usual
  ws.closed --> closedObserver,  
  ws.error --> errorObserver,  
  ws.received --> receivedObserver
)

It buffers the outgoing messages until connected (configurable). A couple of things remain to be done, like the re-connection logic.

The working example can look like this:

val ws = websocket.forUrl("wss://echo.websocket.org").string
val inputElement = input()
div(
  div(inputElement),
  div(  
    button(
      "send",
      onClick.mapTo(inputElement.ref.value) --> ws.sendObserver
    )
  ),
  ws.connect,
  div(text <-- ws.received)
)

@yurique
Copy link
Contributor

yurique commented Jan 2, 2021

It's a quite old piece of code, though. Now I'm looking at it and wondering if I could ditch half of it (together with some extra complexity)

@yurique
Copy link
Contributor

yurique commented Jan 2, 2021

after some cleanup my implementation looks like this: https://gist.github.com/yurique/baae21c9eea620033e0fc2cf5f370701

  • a couple of files with builders

@raquo
Copy link
Owner

raquo commented Jan 5, 2021

I reviewed this branch, Iurii's gist, and my own private websocket implementation (my own stuff is not very useful because it's too tied to my personal code and the libraries that I use), and after all, I don't think adding websockets to Airstream is the right thing to do now.

The amount of complexity required to make a reasonably safe, resilient and pleasant API is very significant, and I don't have the capacity right now to get to the bottom of it and make the right call. Things that need to be designed, at the very least, are:

  • handling errors and disconnects, and maybe automatic reconnects
  • queing of pending client-to-server messages while the the connection was not yet open or while it was disconnected
  • library needs to be aware that users will most likely use a codec for both inbound and outbound messages, and so should probably support such config

It's not clear to me how the API should be designed in terms of stream lifecycle either.

Iurii's implementation depends on Laminar but provides a seemingly more convenient API. I honestly don't know if that's the way to go or not. I think we'll need to see Iurii's laminext released, presumably with his websockets implementation, and see how that fares, what kind of feedback we get about such an approach to websockets.

Websockets are quite far from Airstream's core functionality, so I'm ok having examples / gists / third party libraries cover that for now.

At any rate, this turned out to be too big of an issue to include in v0.12.0. I'm going to fix a bunch more small stuff and wrap up that release now.

@raquo raquo added hard problem need to find time https://falseknees.com/297.html labels Jan 5, 2021
@yurique
Copy link
Contributor

yurique commented Jan 5, 2021

This is a very exciting release!

I'll do my best to release laminext asap after 0.12 is out. Almost everything is prepared, just a couple of modules left to document.

@ngbinh
Copy link
Contributor

ngbinh commented Jan 6, 2021

should this be included as an extension module of Airstream instead? Of course, post 0.12?

@raquo
Copy link
Owner

raquo commented Jan 6, 2021 via email

@ajaychandran
Copy link
Contributor Author

ajaychandran commented Jan 6, 2021

I have redesigned the API to support connection management.

// a duplex connection
// (Observer[Boolean], Observer[String[, EventStream[String])
val (control, receive, transmit) = WebSocketEventStream.text.open("absolute/url")
  • a connection is established when receive is started
  • sending true to control will open a new connection (provided receive is started)
  • sending false to control will close the connection
  • the connection is closed when receive is stopped
  • control can be used in conjunction with a socketErrorObserver (optional constructor parameter) to implement an automatic retry policy.

At any rate, this turned out to be too big of an issue to include in v0.12.0. I'm going to fix a bunch more small stuff and wrap up that release now.

I agree.
It would be nice to get some feedback from developers using any of the implementations presented here.

@doofin
Copy link

doofin commented Aug 13, 2024

Hi @ajaychandran , as people mentioned before laminext https://github.com/tulz-app/laminext is the right place to put this, so Airstream won't merge this. let's discuss if we can contrib in laminext

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
hard problem need to find time https://falseknees.com/297.html
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants