Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding websocket event publisher #65

Merged
merged 3 commits into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions brew_view/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from bg_utils.event_publisher import EventPublishers
from bg_utils.pika import TransientPikaClient
from bg_utils.plugin_logging_loader import PluginLoggingLoader
from brew_view.publishers import MongoPublisher, RequestPublisher, TornadoPikaPublisher
from brew_view.publishers import (MongoPublisher, RequestPublisher,
TornadoPikaPublisher, WebsocketPublisher)
from brew_view.specification import get_default_logging_config
from brewtils.schemas import ParameterSchema, CommandSchema, InstanceSchema, SystemSchema, \
RequestSchema, PatchSchema, LoggingConfigSchema, EventSchema, QueueSchema
Expand Down Expand Up @@ -51,6 +52,11 @@ def setup_brew_view(spec, cli_args):
_setup_application()


def shutdown():
from brew_view.controllers import EventSocket
EventSocket.shutdown()


def load_plugin_logging_config(input_config):
global plugin_logging_config

Expand Down Expand Up @@ -86,7 +92,7 @@ def _setup_tornado_app():
from brew_view.controllers import AdminAPI, CommandAPI, CommandListAPI, ConfigHandler, \
InstanceAPI, QueueAPI, QueueListAPI, RequestAPI, RequestListAPI, SystemAPI, SystemListAPI, \
VersionHandler, SpecHandler, SwaggerConfigHandler, OldAdminAPI, OldQueueAPI, \
OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI
OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI, EventSocket

prefix = config.web.url_prefix
static_base = os.path.join(os.path.dirname(__file__), 'static', 'dist')
Expand Down Expand Up @@ -120,6 +126,7 @@ def _setup_tornado_app():
(r'{0}config/swagger/?'.format(prefix), SwaggerConfigHandler),
(r'{0}version/?'.format(prefix), VersionHandler),
(r'{0}api/v1/spec/?'.format(prefix), SpecHandler),
(r'{0}events/?'.format(prefix), EventSocket),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we would want to version this endpoint also? If we aren't sure, I'd still expose it on api/vbeta/websocket or something to that affect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - my initial thought is to put it with the versioned ones. Let me know if you'd rather have it in beta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I remember why I punted on this decision 😄

We already have a POST api/vbeta/events/ route that allows for publishing your own events. I think we eventually want to support retrieving events by an ID (assuming you've been saving them to mongo)?

Now I'm thinking it would be better to have this be something besides events so we don't muddy the RESTfulness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And now I remember more ... the apispec package we're using to construct the swagger spec doesn't like that we're putting a websocket handler in our 'published' path list. Which makes sense - it's not really designed for non-REST things. See:

swagger-api/swagger-socket#47
OAI/OpenAPI-Specification#55

So far all of the API we've wanted to publish has fit nicely into REST, so it's fit nicely into swagger (and apispec). We'll have to decide how to move forward with documenting the websocket support.

(r'{0}'.format(prefix[:-1]), RedirectHandler, {"url": prefix}),
(r'{0}swagger/(.*)'.format(prefix), StaticFileHandler,
{'path': os.path.join(static_base, 'swagger')}),
Expand Down Expand Up @@ -184,9 +191,13 @@ def bg_thrift_context(async=True, **kwargs):


def _setup_event_publishers(ssl_context):
from brew_view.controllers.event_api import EventSocket

# Create the collection of event publishers and add concrete publishers to it
pubs = EventPublishers({'request': RequestPublisher(ssl_context=ssl_context)})
pubs = EventPublishers({
'request': RequestPublisher(ssl_context=ssl_context),
'websocket': WebsocketPublisher(EventSocket)
})

if config.event.mongo.enable:
pubs['mongo'] = MongoPublisher()
Expand Down
3 changes: 3 additions & 0 deletions brew_view/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def shutdown():
brew_view.logger.info("Stopping server.")
brew_view.server.stop()

# Close any open websocket connections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather see this as a docstring on the shutdown method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

brew_view.shutdown()

# Publish shutdown notification
brew_view.event_publishers.publish_event(Event(name=Events.BREWVIEW_STOPPED.name))

Expand Down
2 changes: 1 addition & 1 deletion brew_view/controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
from brew_view.controllers.misc_controllers import ConfigHandler, VersionHandler, SpecHandler, \
SwaggerConfigHandler
from brew_view.controllers.logging_api import LoggingConfigAPI
from brew_view.controllers.event_api import EventPublisherAPI
from brew_view.controllers.event_api import EventPublisherAPI, EventSocket
43 changes: 43 additions & 0 deletions brew_view/controllers/event_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging

from tornado.websocket import WebSocketHandler

import brew_view
from bg_utils.parser import BeerGardenSchemaParser
from brew_view.base_handler import BaseHandler
from brewtils.schema_parser import SchemaParser

Expand Down Expand Up @@ -50,3 +53,43 @@ def post(self):
brew_view.event_publishers[publisher].publish_event(event)

self.set_status(204)


class EventSocket(WebSocketHandler):

logger = logging.getLogger(__name__)
parser = BeerGardenSchemaParser()

closing = False
listeners = set()

def check_origin(self, origin):
return True

def open(self):
if EventSocket.closing:
self.close(reason='Shutting down')
else:
EventSocket.listeners.add(self)

def on_close(self):
EventSocket.listeners.discard(self)

def on_message(self, message):
pass

@classmethod
def publish(cls, message):
# Don't bother if nobody is listening
if not len(cls.listeners):
return

for listener in cls.listeners:
listener.write_message(message)

@classmethod
def shutdown(cls):
EventSocket.closing = True

for listener in cls.listeners:
listener.close(reason='Shutting down')
12 changes: 12 additions & 0 deletions brew_view/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ def _event_publish_args(self, event, **kwargs):
return {'urls': urls} if urls else {}


class WebsocketPublisher(BeergardenPublisher):
"""Publisher implementation that publishes to a websocket"""

def __init__(self, socket_class):
BeergardenPublisher.__init__(self)

self._socket = socket_class

def publish(self, message, **kwargs):
self._socket.publish(message)


class MongoPublisher(BeergardenPublisher):
"""Publisher implementation that 'publishes' to Mongo"""

Expand Down