Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(database/socket): Rework socket database #362

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 117 additions & 65 deletions src/powerapi/database/socket_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,99 +27,151 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
from typing import Type, List
import json

from powerapi.utils import JsonStream
import logging
from json import JSONDecoder, JSONDecodeError
from queue import SimpleQueue, Empty
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler
from threading import Thread
from typing import Type, Iterator

from powerapi.database.base_db import IterDB, BaseDB
from powerapi.report import Report
from .base_db import IterDB, BaseDB, DBError


class IterSocketDB(IterDB):
class ThreadedTCPServer(ThreadingMixIn, TCPServer):
"""
iterator connected to a socket that receive report from a sensor
TCP Server implementation.
Each client connected will be served by a separate thread.
"""
daemon_threads = True
allow_reuse_address = True

def __init__(self, report_type, stream_mode, queue):
def __init__(self, server_address, request_handler_class, received_data_queue: SimpleQueue):
"""
:param server_address: The address to listen on
:param request_handler_class: The request handler class to use when receiving requests
:param received_data_queue: The data queue to store the received data
"""
IterDB.__init__(self, None, report_type, stream_mode)
super().__init__(server_address, request_handler_class)
self.received_data_queue = received_data_queue

Check warning on line 56 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L55-L56

Added lines #L55 - L56 were not covered by tests


self.queue = queue
class JsonRequestHandler(StreamRequestHandler):
"""
Request handler that handles JSON documents received from the client.
"""
server: ThreadedTCPServer

@staticmethod
def parse_json_documents(data: str) -> Iterator[dict]:
"""
Try to parse json document(s) from the given string.
This function tolerates truncated and malformed json documents.
:param data: The raw data to decode
:return: Iterator over parsed json documents
"""
decoder = JSONDecoder()
idx = 0
while idx < len(data):
try:
obj, end_idx = decoder.raw_decode(data, idx)
yield obj
idx += end_idx

# Search and try to parse the remaining document(s)
except JSONDecodeError as e:
idx = data.find('{', e.pos)
if idx == -1:
break

def handle(self):
"""
Handle incoming connections.
The received data is parsed and the result(s) stored in the data queue for further processing.
It is expected for the data to be in json format (utf-8 charset) and newline terminated.
"""
caddr = '{}:{}'.format(*self.client_address)
logging.info('New incoming connection from %s', caddr)

Check warning on line 94 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L93-L94

Added lines #L93 - L94 were not covered by tests

while True:
try:
data = self.rfile.readline()
if not data:
break

Check warning on line 100 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L96-L100

Added lines #L96 - L100 were not covered by tests

for obj in self.parse_json_documents(data.decode('utf-8')):
self.server.received_data_queue.put(obj)

Check warning on line 103 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L102-L103

Added lines #L102 - L103 were not covered by tests

except ValueError as e:
logging.warning('[%s] Received malformed data: %s', caddr, e)
continue
except OSError as e:
logging.error('[%s] Caught OSError while handling request: %s', caddr, e)
break
except KeyboardInterrupt:
break

Check warning on line 112 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L105-L112

Added lines #L105 - L112 were not covered by tests

logging.info('Connection from %s closed', caddr)

Check warning on line 114 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L114

Added line #L114 was not covered by tests


class IterSocketDB(IterDB):
"""
SocketDB iterator that returns the received data.
"""

def __aiter__(self):
def __iter__(self):
return self

async def __anext__(self):
def __next__(self):
try:
json_str = await asyncio.wait_for(self.queue.get(), 2)
# json = self.queue.get_nowait()
# self.queue.get()
report = self.report_type.from_json(json.loads(json_str))
return report
# except Empty:
except asyncio.TimeoutError:
return None
document = self.db.received_data_queue.get(block=False)
return self.report_type.from_json(document)
except Empty as e:
raise StopIteration from e

Check warning on line 130 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L127-L130

Added lines #L127 - L130 were not covered by tests


class SocketDB(BaseDB):
"""
Database that act as a server that expose a socket where data source will push data
Database implementation that exposes a TCP socket the clients can connect to.
"""

def __init__(self, report_type: Type[Report], host: str, port: int):
BaseDB.__init__(self, report_type, is_async=True)
self.queue = None
self.host = host
self.port = port
self.server = None
"""
:param report_type: The type of report to create
:param host: The host address to listen on
:param port: The port number to listen on
"""
super().__init__(report_type)

async def connect(self):
self.server_address = (host, port)

self.received_data_queue = None
self.background_thread = None

def _tcpserver_background_thread_target(self):
"""
Connect to the socket database.
Target function of the thread that will run the TCP server in background.
"""
self.queue = asyncio.Queue()
self.server = await asyncio.start_server(self._gen_server_callback(), host=self.host, port=self.port)
with ThreadedTCPServer(self.server_address, JsonRequestHandler, self.received_data_queue) as server:
logging.info('TCP socket is listening on %s:%s', *self.server_address)
server.serve_forever()

Check warning on line 157 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L155-L157

Added lines #L155 - L157 were not covered by tests

async def disconnect(self):
def connect(self):
"""
Disconnect from the socket database.
Connect to the socket database.
"""
self.received_data_queue = SimpleQueue()
self.background_thread = Thread(target=self._tcpserver_background_thread_target, daemon=True)
self.background_thread.start()

Check warning on line 165 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L163-L165

Added lines #L163 - L165 were not covered by tests

async def stop(self):
def disconnect(self):
"""
stop server connection
Disconnect from the socket database.
"""
self.server.close()
await self.server.wait_closed()

def iter(self, stream_mode: bool = False) -> IterSocketDB:
return IterSocketDB(self.report_type, stream_mode, self.queue)

def _gen_server_callback(self):
async def callback(stream_reader, _):
stream = JsonStream(stream_reader)
count = 0 # If 10 times in a row we don't have a full message we stop
while True:
json_str = await stream.read_json_object()
if json_str is None:
if count > 10:
break
count += 1
continue
count = 0
await self.queue.put(json_str)

# self.queue.put(json_str)

return callback

def __iter__(self):
raise DBError('Socket db don\'t support __iter__ method')

def save(self, report: Report):
raise DBError('Socket db don\'t support save method')

def save_many(self, reports: List[Report]):
raise DBError('Socket db don\'t support save_many method')
"""
Create the data iterator for the socket database.
:param stream_mode: Whether the data should be pulled continuously or not.
"""
return IterSocketDB(self, self.report_type, stream_mode)

Check warning on line 177 in src/powerapi/database/socket_db.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/database/socket_db.py#L177

Added line #L177 was not covered by tests
90 changes: 25 additions & 65 deletions src/powerapi/puller/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import time
import asyncio
import logging
import time
from threading import Thread

from powerapi.actor import State
from powerapi.message import Message
from powerapi.exception import PowerAPIException, BadInputData
from powerapi.database import DBError
from powerapi.exception import PowerAPIException
from powerapi.filter import FilterUselessError
from powerapi.handler import StartHandler, PoisonPillMessageHandler
from powerapi.database import DBError
from powerapi.message import ErrorMessage, PoisonPillMessage
from powerapi.report.report import DeserializationFail
from powerapi.message import Message
from powerapi.report import BadInputData


class NoReportExtractedException(PowerAPIException):
Expand All @@ -55,37 +54,11 @@
"""

def __init__(self, state, timeout, handler):
Thread.__init__(self, daemon=True)
super().__init__(daemon=True)

Check warning on line 57 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L57

Added line #L57 was not covered by tests
self.timeout = timeout
self.state = state
self.loop = None
self.handler = handler

def _connect(self):
try:
self.state.database.connect()
self.loop.run_until_complete(self.state.database.connect())
self.state.database_it = self.state.database.iter(self.state.stream_mode)
except DBError as error:
self.state.actor.send_control(ErrorMessage(sender_name='system', error_message=error.msg))
self.state.alive = False

def _pull_database(self):
try:
if self.state.database.is_async:
report = self.loop.run_until_complete(anext(self.state.database_it))
if report is None:
raise StopIteration()
return report

return next(self.state.database_it)

except (StopIteration, BadInputData, DeserializationFail) as database_problem:
raise NoReportExtractedException() from database_problem

def _get_dispatchers(self, report):
return self.state.report_filter.route(report)

def run(self):
"""
Read data from Database and send it to the dispatchers.
Expand All @@ -95,37 +68,26 @@

:param None msg: None.
"""
if self.state.database.is_async:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.state.loop = self.loop
self.loop.set_debug(enabled=True)
logging.basicConfig(level=logging.DEBUG)

self._connect()

while self.state.alive:
try:
raw_report = self._pull_database()

dispatchers = self._get_dispatchers(raw_report)
raw_report = next(self.state.database_it)
dispatchers = self.state.report_filter.route(raw_report)

Check warning on line 74 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L73-L74

Added lines #L73 - L74 were not covered by tests
for dispatcher in dispatchers:
dispatcher.send_data(raw_report)

except NoReportExtractedException:
time.sleep(self.state.timeout_puller / 1000)
self.state.actor.logger.debug('NoReportExtractedException with stream mode ' +
str(self.state.stream_mode))
if not self.state.stream_mode:
self.handler.handle_internal_msg(PoisonPillMessage(soft=False, sender_name='system'))
return

except FilterUselessError:
self.handler.handle_internal_msg(PoisonPillMessage(soft=False, sender_name='system'))
self.handler.handle_internal_msg(PoisonPillMessage(False, self.name))

Check warning on line 79 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L79

Added line #L79 was not covered by tests
return

except BadInputData as exn:
logging.error('Received malformed report from database: %s', exn.msg)
logging.debug('Raw report value: %s', exn.input_data)

Check warning on line 84 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L82-L84

Added lines #L82 - L84 were not covered by tests

except StopIteration:
continue
time.sleep(self.state.timeout_puller / 1000)
if not self.state.stream_mode:
self.handler.handle_internal_msg(PoisonPillMessage(False, self.name))
return

Check warning on line 90 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L87-L90

Added lines #L87 - L90 were not covered by tests


class PullerPoisonPillMessageHandler(PoisonPillMessageHandler):
Expand Down Expand Up @@ -166,15 +128,15 @@
StartHandler.delegate_message_handling(self, msg)

def initialization(self):

self._database_connection()

if not self.state.report_filter.filters:
raise PullerInitializationException('No filters')

# Connect to all dispatcher
for _, dispatcher in self.state.report_filter.filters:
dispatcher.connect_data()

self._connect_database()

Check warning on line 138 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L138

Added line #L138 was not covered by tests

def handle(self, msg: Message):
try:
StartHandler.handle(self, msg)
Expand All @@ -183,7 +145,7 @@

self.pull_db()

self.handle_internal_msg(PoisonPillMessage(soft=False, sender_name='system'))
self.handle_internal_msg(PoisonPillMessage(False, self.state.actor.name))

Check warning on line 148 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L148

Added line #L148 was not covered by tests

def pull_db(self):
"""
Expand All @@ -198,12 +160,10 @@
if msg is not None:
self.handle_internal_msg(msg)

def _database_connection(self):
def _connect_database(self):
try:
if not self.state.database.is_async:
self.state.database.connect()
self.state.database_it = self.state.database.iter(stream_mode=self.state.stream_mode)

self.state.database.connect()
self.state.database_it = self.state.database.iter(self.state.stream_mode)

Check warning on line 166 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L165-L166

Added lines #L165 - L166 were not covered by tests
except DBError as error:
self.state.actor.send_control(ErrorMessage(self.state.actor.name, error.msg))
self.state.alive = False
self.state.actor.send_control(ErrorMessage(self.state.actor.name, error.msg))

Check warning on line 169 in src/powerapi/puller/handlers.py

View check run for this annotation

Codecov / codecov/patch

src/powerapi/puller/handlers.py#L169

Added line #L169 was not covered by tests
9 changes: 1 addition & 8 deletions src/powerapi/puller/puller_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,12 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging

from powerapi.actor import Actor, State
from powerapi.exception import PowerAPIException
from powerapi.message import PoisonPillMessage, StartMessage
from powerapi.puller.handlers import PullerPoisonPillMessageHandler, PullerStartHandler


class NoReportExtractedException(PowerAPIException):
"""
Exception raised when we can't extract a report from the given
database
"""


class PullerState(State):
"""
Puller Actor State
Expand Down
Loading