Skip to content

Commit

Permalink
Merge pull request #362 from powerapi-ng/feat/rework-socketdb
Browse files Browse the repository at this point in the history
feat(database/socket): Rework socket database
  • Loading branch information
gfieni authored Jun 27, 2024
2 parents 95b6130 + ce2c777 commit 7f60c01
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 369 deletions.
182 changes: 117 additions & 65 deletions src/powerapi/database/socket_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,99 +27,151 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
from typing import Type, List
import json

from powerapi.utils import JsonStream
import logging
from json import JSONDecoder, JSONDecodeError
from queue import SimpleQueue, Empty
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler
from threading import Thread
from typing import Type, Iterator

from powerapi.database.base_db import IterDB, BaseDB
from powerapi.report import Report
from .base_db import IterDB, BaseDB, DBError


class IterSocketDB(IterDB):
class ThreadedTCPServer(ThreadingMixIn, TCPServer):
"""
iterator connected to a socket that receive report from a sensor
TCP Server implementation.
Each client connected will be served by a separate thread.
"""
daemon_threads = True
allow_reuse_address = True

def __init__(self, report_type, stream_mode, queue):
def __init__(self, server_address, request_handler_class, received_data_queue: SimpleQueue):
"""
:param server_address: The address to listen on
:param request_handler_class: The request handler class to use when receiving requests
:param received_data_queue: The data queue to store the received data
"""
IterDB.__init__(self, None, report_type, stream_mode)
super().__init__(server_address, request_handler_class)
self.received_data_queue = received_data_queue


self.queue = queue
class JsonRequestHandler(StreamRequestHandler):
"""
Request handler that handles JSON documents received from the client.
"""
server: ThreadedTCPServer

@staticmethod
def parse_json_documents(data: str) -> Iterator[dict]:
"""
Try to parse json document(s) from the given string.
This function tolerates truncated and malformed json documents.
:param data: The raw data to decode
:return: Iterator over parsed json documents
"""
decoder = JSONDecoder()
idx = 0
while idx < len(data):
try:
obj, end_idx = decoder.raw_decode(data, idx)
yield obj
idx += end_idx

# Search and try to parse the remaining document(s)
except JSONDecodeError as e:
idx = data.find('{', e.pos)
if idx == -1:
break

def handle(self):
"""
Handle incoming connections.
The received data is parsed and the result(s) stored in the data queue for further processing.
It is expected for the data to be in json format (utf-8 charset) and newline terminated.
"""
caddr = '{}:{}'.format(*self.client_address)
logging.info('New incoming connection from %s', caddr)

while True:
try:
data = self.rfile.readline()
if not data:
break

for obj in self.parse_json_documents(data.decode('utf-8')):
self.server.received_data_queue.put(obj)

except ValueError as e:
logging.warning('[%s] Received malformed data: %s', caddr, e)
continue
except OSError as e:
logging.error('[%s] Caught OSError while handling request: %s', caddr, e)
break
except KeyboardInterrupt:
break

logging.info('Connection from %s closed', caddr)


class IterSocketDB(IterDB):
"""
SocketDB iterator that returns the received data.
"""

def __aiter__(self):
def __iter__(self):
return self

async def __anext__(self):
def __next__(self):
try:
json_str = await asyncio.wait_for(self.queue.get(), 2)
# json = self.queue.get_nowait()
# self.queue.get()
report = self.report_type.from_json(json.loads(json_str))
return report
# except Empty:
except asyncio.TimeoutError:
return None
document = self.db.received_data_queue.get(block=False)
return self.report_type.from_json(document)
except Empty as e:
raise StopIteration from e


class SocketDB(BaseDB):
"""
Database that act as a server that expose a socket where data source will push data
Database implementation that exposes a TCP socket the clients can connect to.
"""

def __init__(self, report_type: Type[Report], host: str, port: int):
BaseDB.__init__(self, report_type, is_async=True)
self.queue = None
self.host = host
self.port = port
self.server = None
"""
:param report_type: The type of report to create
:param host: The host address to listen on
:param port: The port number to listen on
"""
super().__init__(report_type)

async def connect(self):
self.server_address = (host, port)

self.received_data_queue = None
self.background_thread = None

def _tcpserver_background_thread_target(self):
"""
Connect to the socket database.
Target function of the thread that will run the TCP server in background.
"""
self.queue = asyncio.Queue()
self.server = await asyncio.start_server(self._gen_server_callback(), host=self.host, port=self.port)
with ThreadedTCPServer(self.server_address, JsonRequestHandler, self.received_data_queue) as server:
logging.info('TCP socket is listening on %s:%s', *self.server_address)
server.serve_forever()

async def disconnect(self):
def connect(self):
"""
Disconnect from the socket database.
Connect to the socket database.
"""
self.received_data_queue = SimpleQueue()
self.background_thread = Thread(target=self._tcpserver_background_thread_target, daemon=True)
self.background_thread.start()

async def stop(self):
def disconnect(self):
"""
stop server connection
Disconnect from the socket database.
"""
self.server.close()
await self.server.wait_closed()

def iter(self, stream_mode: bool = False) -> IterSocketDB:
return IterSocketDB(self.report_type, stream_mode, self.queue)

def _gen_server_callback(self):
async def callback(stream_reader, _):
stream = JsonStream(stream_reader)
count = 0 # If 10 times in a row we don't have a full message we stop
while True:
json_str = await stream.read_json_object()
if json_str is None:
if count > 10:
break
count += 1
continue
count = 0
await self.queue.put(json_str)

# self.queue.put(json_str)

return callback

def __iter__(self):
raise DBError('Socket db don\'t support __iter__ method')

def save(self, report: Report):
raise DBError('Socket db don\'t support save method')

def save_many(self, reports: List[Report]):
raise DBError('Socket db don\'t support save_many method')
"""
Create the data iterator for the socket database.
:param stream_mode: Whether the data should be pulled continuously or not.
"""
return IterSocketDB(self, self.report_type, stream_mode)
90 changes: 25 additions & 65 deletions src/powerapi/puller/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import time
import asyncio
import logging
import time
from threading import Thread

from powerapi.actor import State
from powerapi.message import Message
from powerapi.exception import PowerAPIException, BadInputData
from powerapi.database import DBError
from powerapi.exception import PowerAPIException
from powerapi.filter import FilterUselessError
from powerapi.handler import StartHandler, PoisonPillMessageHandler
from powerapi.database import DBError
from powerapi.message import ErrorMessage, PoisonPillMessage
from powerapi.report.report import DeserializationFail
from powerapi.message import Message
from powerapi.report import BadInputData


class NoReportExtractedException(PowerAPIException):
Expand All @@ -55,37 +54,11 @@ class DBPullerThread(Thread):
"""

def __init__(self, state, timeout, handler):
Thread.__init__(self, daemon=True)
super().__init__(daemon=True)
self.timeout = timeout
self.state = state
self.loop = None
self.handler = handler

def _connect(self):
try:
self.state.database.connect()
self.loop.run_until_complete(self.state.database.connect())
self.state.database_it = self.state.database.iter(self.state.stream_mode)
except DBError as error:
self.state.actor.send_control(ErrorMessage(sender_name='system', error_message=error.msg))
self.state.alive = False

def _pull_database(self):
try:
if self.state.database.is_async:
report = self.loop.run_until_complete(anext(self.state.database_it))
if report is None:
raise StopIteration()
return report

return next(self.state.database_it)

except (StopIteration, BadInputData, DeserializationFail) as database_problem:
raise NoReportExtractedException() from database_problem

def _get_dispatchers(self, report):
return self.state.report_filter.route(report)

def run(self):
"""
Read data from Database and send it to the dispatchers.
Expand All @@ -95,37 +68,26 @@ def run(self):
:param None msg: None.
"""
if self.state.database.is_async:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.state.loop = self.loop
self.loop.set_debug(enabled=True)
logging.basicConfig(level=logging.DEBUG)

self._connect()

while self.state.alive:
try:
raw_report = self._pull_database()

dispatchers = self._get_dispatchers(raw_report)
raw_report = next(self.state.database_it)
dispatchers = self.state.report_filter.route(raw_report)
for dispatcher in dispatchers:
dispatcher.send_data(raw_report)

except NoReportExtractedException:
time.sleep(self.state.timeout_puller / 1000)
self.state.actor.logger.debug('NoReportExtractedException with stream mode ' +
str(self.state.stream_mode))
if not self.state.stream_mode:
self.handler.handle_internal_msg(PoisonPillMessage(soft=False, sender_name='system'))
return

except FilterUselessError:
self.handler.handle_internal_msg(PoisonPillMessage(soft=False, sender_name='system'))
self.handler.handle_internal_msg(PoisonPillMessage(False, self.name))
return

except BadInputData as exn:
logging.error('Received malformed report from database: %s', exn.msg)
logging.debug('Raw report value: %s', exn.input_data)

except StopIteration:
continue
time.sleep(self.state.timeout_puller / 1000)
if not self.state.stream_mode:
self.handler.handle_internal_msg(PoisonPillMessage(False, self.name))
return


class PullerPoisonPillMessageHandler(PoisonPillMessageHandler):
Expand Down Expand Up @@ -166,15 +128,15 @@ def handle_internal_msg(self, msg):
StartHandler.delegate_message_handling(self, msg)

def initialization(self):

self._database_connection()

if not self.state.report_filter.filters:
raise PullerInitializationException('No filters')

# Connect to all dispatcher
for _, dispatcher in self.state.report_filter.filters:
dispatcher.connect_data()

self._connect_database()

def handle(self, msg: Message):
try:
StartHandler.handle(self, msg)
Expand All @@ -183,7 +145,7 @@ def handle(self, msg: Message):

self.pull_db()

self.handle_internal_msg(PoisonPillMessage(soft=False, sender_name='system'))
self.handle_internal_msg(PoisonPillMessage(False, self.state.actor.name))

def pull_db(self):
"""
Expand All @@ -198,12 +160,10 @@ def pull_db(self):
if msg is not None:
self.handle_internal_msg(msg)

def _database_connection(self):
def _connect_database(self):
try:
if not self.state.database.is_async:
self.state.database.connect()
self.state.database_it = self.state.database.iter(stream_mode=self.state.stream_mode)

self.state.database.connect()
self.state.database_it = self.state.database.iter(self.state.stream_mode)
except DBError as error:
self.state.actor.send_control(ErrorMessage(self.state.actor.name, error.msg))
self.state.alive = False
self.state.actor.send_control(ErrorMessage(self.state.actor.name, error.msg))
9 changes: 1 addition & 8 deletions src/powerapi/puller/puller_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,12 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging

from powerapi.actor import Actor, State
from powerapi.exception import PowerAPIException
from powerapi.message import PoisonPillMessage, StartMessage
from powerapi.puller.handlers import PullerPoisonPillMessageHandler, PullerStartHandler


class NoReportExtractedException(PowerAPIException):
"""
Exception raised when we can't extract a report from the given
database
"""


class PullerState(State):
"""
Puller Actor State
Expand Down
Loading

0 comments on commit 7f60c01

Please sign in to comment.