diff --git a/brew_view/__init__.py b/brew_view/__init__.py index e9849bd3..808af587 100644 --- a/brew_view/__init__.py +++ b/brew_view/__init__.py @@ -16,7 +16,8 @@ from bg_utils.event_publisher import EventPublishers from bg_utils.pika import TransientPikaClient from bg_utils.plugin_logging_loader import PluginLoggingLoader -from brew_view.publishers import MongoPublisher, RequestPublisher, TornadoPikaPublisher +from brew_view.publishers import (MongoPublisher, RequestPublisher, + TornadoPikaPublisher, WebsocketPublisher) from brew_view.specification import get_default_logging_config from brewtils.schemas import ParameterSchema, CommandSchema, InstanceSchema, SystemSchema, \ RequestSchema, PatchSchema, LoggingConfigSchema, EventSchema, QueueSchema @@ -51,6 +52,12 @@ def setup_brew_view(spec, cli_args): _setup_application() +def shutdown(): + """Close any open websocket connections""" + from brew_view.controllers import EventSocket + EventSocket.shutdown() + + def load_plugin_logging_config(input_config): global plugin_logging_config @@ -86,7 +93,7 @@ def _setup_tornado_app(): from brew_view.controllers import AdminAPI, CommandAPI, CommandListAPI, ConfigHandler, \ InstanceAPI, QueueAPI, QueueListAPI, RequestAPI, RequestListAPI, SystemAPI, SystemListAPI, \ VersionHandler, SpecHandler, SwaggerConfigHandler, OldAdminAPI, OldQueueAPI, \ - OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI + OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI, EventSocket prefix = config.web.url_prefix static_base = os.path.join(os.path.dirname(__file__), 'static', 'dist') @@ -116,13 +123,23 @@ def _setup_tornado_app(): # And these do not unpublished_url_specs = [ + # These are a little special - unpublished but still versioned + # The swagger spec + (r'{0}api/v1/spec/?'.format(prefix), SpecHandler), + # Events websocket + (r'{0}api/v1/socket/events/?'.format(prefix), EventSocket), + + # Version / configs + (r'{0}version/?'.format(prefix), VersionHandler), (r'{0}config/?'.format(prefix), ConfigHandler), (r'{0}config/swagger/?'.format(prefix), SwaggerConfigHandler), - (r'{0}version/?'.format(prefix), VersionHandler), - (r'{0}api/v1/spec/?'.format(prefix), SpecHandler), + + # Not sure if these are really necessary (r'{0}'.format(prefix[:-1]), RedirectHandler, {"url": prefix}), (r'{0}swagger/(.*)'.format(prefix), StaticFileHandler, {'path': os.path.join(static_base, 'swagger')}), + + # Static content (r'{0}(.*)'.format(prefix), StaticFileHandler, {'path': static_base, 'default_filename': 'index.html'}) ] @@ -184,9 +201,13 @@ def bg_thrift_context(async=True, **kwargs): def _setup_event_publishers(ssl_context): + from brew_view.controllers.event_api import EventSocket # Create the collection of event publishers and add concrete publishers to it - pubs = EventPublishers({'request': RequestPublisher(ssl_context=ssl_context)}) + pubs = EventPublishers({ + 'request': RequestPublisher(ssl_context=ssl_context), + 'websocket': WebsocketPublisher(EventSocket) + }) if config.event.mongo.enable: pubs['mongo'] = MongoPublisher() diff --git a/brew_view/__main__.py b/brew_view/__main__.py index fde65ffd..1d43b2a6 100755 --- a/brew_view/__main__.py +++ b/brew_view/__main__.py @@ -23,6 +23,10 @@ def shutdown(): brew_view.logger.info("Stopping server.") brew_view.server.stop() + # Shutdown everything short of the event loop + # (we need the event loop to publish the shutdown event) + brew_view.shutdown() + # Publish shutdown notification brew_view.event_publishers.publish_event(Event(name=Events.BREWVIEW_STOPPED.name)) diff --git a/brew_view/controllers/__init__.py b/brew_view/controllers/__init__.py index fba89adf..2fd7860a 100644 --- a/brew_view/controllers/__init__.py +++ b/brew_view/controllers/__init__.py @@ -13,4 +13,4 @@ from brew_view.controllers.misc_controllers import ConfigHandler, VersionHandler, SpecHandler, \ SwaggerConfigHandler from brew_view.controllers.logging_api import LoggingConfigAPI -from brew_view.controllers.event_api import EventPublisherAPI +from brew_view.controllers.event_api import EventPublisherAPI, EventSocket diff --git a/brew_view/controllers/event_api.py b/brew_view/controllers/event_api.py index b3c8fde4..e1ee1ca1 100644 --- a/brew_view/controllers/event_api.py +++ b/brew_view/controllers/event_api.py @@ -1,6 +1,9 @@ import logging +from tornado.websocket import WebSocketHandler + import brew_view +from bg_utils.parser import BeerGardenSchemaParser from brew_view.base_handler import BaseHandler from brewtils.schema_parser import SchemaParser @@ -50,3 +53,43 @@ def post(self): brew_view.event_publishers[publisher].publish_event(event) self.set_status(204) + + +class EventSocket(WebSocketHandler): + + logger = logging.getLogger(__name__) + parser = BeerGardenSchemaParser() + + closing = False + listeners = set() + + def check_origin(self, origin): + return True + + def open(self): + if EventSocket.closing: + self.close(reason='Shutting down') + else: + EventSocket.listeners.add(self) + + def on_close(self): + EventSocket.listeners.discard(self) + + def on_message(self, message): + pass + + @classmethod + def publish(cls, message): + # Don't bother if nobody is listening + if not len(cls.listeners): + return + + for listener in cls.listeners: + listener.write_message(message) + + @classmethod + def shutdown(cls): + EventSocket.closing = True + + for listener in cls.listeners: + listener.close(reason='Shutting down') diff --git a/brew_view/publishers.py b/brew_view/publishers.py index c9917393..fcf2c447 100644 --- a/brew_view/publishers.py +++ b/brew_view/publishers.py @@ -95,6 +95,18 @@ def _event_publish_args(self, event, **kwargs): return {'urls': urls} if urls else {} +class WebsocketPublisher(BeergardenPublisher): + """Publisher implementation that publishes to a websocket""" + + def __init__(self, socket_class): + BeergardenPublisher.__init__(self) + + self._socket = socket_class + + def publish(self, message, **kwargs): + self._socket.publish(message) + + class MongoPublisher(BeergardenPublisher): """Publisher implementation that 'publishes' to Mongo"""