Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scale back the sending rate automatically when timeouts are encountered. #26

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
94 changes: 74 additions & 20 deletions src/smpp_gateway/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import select
import socket

from collections import defaultdict

import smpplib
import smpplib.client
import smpplib.consts
Expand Down Expand Up @@ -182,33 +184,85 @@ def send_mt_messages(self):
return
logger.info(f"Found {len(smses)} messages to send in {self.timeout} seconds")
submit_sm_resps = []
errors = defaultdict(list)
sent = []
adjusted_sending_rate = False
for sms in smses:
params = {**self.submit_sm_params, **sms["params"]}
if self.set_priority_flag and sms["priority_flag"] is not None:
params["priority_flag"] = sms["priority_flag"]
pdus = self.split_and_send_message(sms["short_message"], **params)
# Create placeholder MTMessageStatus objects in the DB, which
# the message_sent handler will later update with the actual command_status
# and message_id (and eventually maybe a delivery report).
now = timezone.now()
submit_sm_resps.extend(
[
MTMessageStatus(
create_time=now,
modify_time=now,
mt_message_id=sms["id"],
backend=self.backend,
sequence_number=pdu.sequence,
try:
pdus = self.split_and_send_message(sms["short_message"], **params)
except Exception as e:
errors[str(e)].append(sms["id"])
logger.exception(
f"An error occurred when sending message ID {sms['id']}."
)
# The smpplib base Client catches socket.error (which is OSError,
# the base class of TimeoutError) and raises a ConnectionError.
# Check if the original error was a TimeoutError and reduce the
# sending rate for the next batch if possible
if (
isinstance(e, smpplib.exceptions.ConnectionError)
and isinstance(e.__context__, TimeoutError)
and not adjusted_sending_rate
):
adjusted_sending_rate = self.adjust_message_sending_rate_on_timeout(
e, sms
)
for pdu in pdus
]
else:
sent.append(sms["id"])
# Create placeholder MTMessageStatus objects in the DB, which
# the message_sent handler will later update with the actual command_status
# and message_id (and eventually maybe a delivery report).
now = timezone.now()
submit_sm_resps.extend(
[
MTMessageStatus(
create_time=now,
modify_time=now,
mt_message_id=sms["id"],
backend=self.backend,
sequence_number=pdu.sequence,
)
for pdu in pdus
]
)
for error, pks in errors.items():
logger.info(f"Updating {len(pks)} messages to ERROR status: {error}")
MTMessage.objects.filter(pk__in=pks).update(
status=MTMessage.Status.ERROR,
modify_time=timezone.now(),
error=error,
)
pks = [sms["id"] for sms in smses]
MTMessage.objects.filter(pk__in=pks).update(
status=MTMessage.Status.SENT,
modify_time=timezone.now(),
if sent:
MTMessage.objects.filter(pk__in=sent).update(
status=MTMessage.Status.SENT,
modify_time=timezone.now(),
)
MTMessageStatus.objects.bulk_create(submit_sm_resps)

def get_new_mt_messages_per_second(self):
"""Get a new value for self.mt_messages_per_second after a timeout."""
return int(self.mt_messages_per_second * 0.75)

def adjust_message_sending_rate_on_timeout(self, exception, sms):
if (new_msgs_per_sec := self.get_new_mt_messages_per_second()) > 0:
logger.warning(
f"A timeout occurred while sending a message, and the sending "
f"rate has been adjusted from {self.mt_messages_per_second} "
f"messages/sec to {new_msgs_per_sec} messages/sec for the "
"next batch.",
exc_info=exception,
)
self.mt_messages_per_second = new_msgs_per_sec
return True
logger.exception(
"A timeout occurred when sending a message, but the sending rate "
"could not be automatically adjusted.",
exc_info=exception,
)
MTMessageStatus.objects.bulk_create(submit_sm_resps)
return False

def split_and_send_message(self, message, **kwargs):
"""
Expand Down
18 changes: 18 additions & 0 deletions src/smpp_gateway/migrations/0009_mtmessage_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.16 on 2024-11-18 14:35

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("smpp_gateway", "0008_remove_mtmessage_mt_message_status_idx_and_more"),
]

operations = [
migrations.AddField(
model_name="mtmessage",
name="error",
field=models.TextField(blank=True, verbose_name="error"),
),
]
1 change: 1 addition & 0 deletions src/smpp_gateway/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class PriorityFlag(models.IntegerChoices):
priority_flag = models.IntegerField(
_("priority flag"), choices=PriorityFlag.choices, null=True
)
error = models.TextField(_("error"), blank=True)

def save(self, *args, **kwargs):
super().save(*args, **kwargs)
Expand Down
121 changes: 120 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import socket

from unittest import mock

import pytest

from smpplib import consts as smpplib_consts
from smpplib.command import DeliverSM, SubmitSMResp

from smpp_gateway.models import MOMessage, MTMessage
from smpp_gateway.models import MOMessage, MTMessage, MTMessageStatus
from smpp_gateway.queries import pg_listen
from smpp_gateway.smpp import PgSmppClient, get_smpplib_client
from tests.factories import BackendFactory, MTMessageFactory, MTMessageStatusFactory
Expand Down Expand Up @@ -258,3 +260,120 @@ def test_set_priority_flag_is_false_but_priority_in_submit_sm_params(

mock_send_message.assert_called_once()
assert mock_send_message.call_args.kwargs["priority_flag"] == priority


@pytest.mark.django_db(transaction=True)
def test_mt_messages_per_second_adjustment_on_timeout():
"""Tests that the sending rate is automatically reduced if a timeout is
encountered when the client is sending a message.
"""
backend = BackendFactory()
client = get_smpplib_client(
"127.0.0.1",
8000,
"notify_mo_channel",
backend,
{}, # submit_sm_params
False, # set_priority_flag
10, # mt_messages_per_second
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
)
MTMessageFactory.create_batch(20, status=MTMessage.Status.NEW, backend=backend)
timeout_patcher = mock.patch.object(
socket.socket,
"send",
side_effect=TimeoutError,
)
# Pretend the client is in a valid state for sending messages. The client
# checks this within send_pdu()
client.state = smpplib_consts.SMPP_CLIENT_STATE_BOUND_TX

# In case of a TimeoutError, the exception should be caught and
# client.mt_messages_per_second should be adjusted from 10 to 7 (reduced by 0.25)
with timeout_patcher as mock_socket_send:
client.send_mt_messages()
mock_socket_send.assert_called()
assert client.mt_messages_per_second == 7
assert MTMessage.objects.filter(status=MTMessage.Status.SENT).count() == 0

MTMessageFactory.create_batch(20, status=MTMessage.Status.NEW, backend=backend)

# No timeout. client.mt_messages_per_second should not be adjusted further
with mock.patch.object(client, "send_pdu", return_value=True) as mock_send_pdu:
client.send_mt_messages()
mock_send_pdu.assert_called()
assert client.mt_messages_per_second == 7
# Should have sent all 20 messages
assert MTMessage.objects.filter(status=MTMessage.Status.SENT).count() == 20

MTMessageFactory.create_batch(20, status=MTMessage.Status.NEW, backend=backend)
client.mt_messages_per_second = 1

# Timeouts occur again, but client.mt_messages_per_second cannot be reduced
# further because it is 1
with timeout_patcher as mock_socket_send:
client.send_mt_messages()
mock_socket_send.assert_called()
assert client.mt_messages_per_second == 1


@pytest.mark.django_db(transaction=True)
def test_messages_not_left_in_sending_status_on_exceptions():
"""Tests that no messages remain in "sending" status in case of exceptions when
sending out a batch of messages.
"""
backend = BackendFactory()
client = get_smpplib_client(
"127.0.0.1",
8000,
"notify_mo_channel",
backend,
{}, # submit_sm_params
False, # set_priority_flag
4, # mt_messages_per_second
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
)
messages = MTMessageFactory.create_batch(
20, status=MTMessage.Status.NEW, backend=backend
)
side_effect = []
expected_sent = set()
expected_error = {f"error {i}": set() for i in range(2)}
for index, message in enumerate(messages):
if index in (2, 4, 6, 9, 13, 17):
error = f"error {index % 2}"
side_effect.append(Exception(error))
expected_error[error].add(message.id)
else:
side_effect.append([mock.Mock(sequence=index)])
expected_sent.add(message.id)

with mock.patch.object(
client, "split_and_send_message", side_effect=side_effect
) as mock_split_and_send_message:
client.send_mt_messages()

assert mock_split_and_send_message.call_count == 20

# Ensure there are no messages remaining with "sending" status
assert not MTMessage.objects.filter(status=MTMessage.Status.SENDING).exists()

qs = MTMessage.objects.values_list("id", flat=True)

# Ensure that the messages where an error was encountered are updated to "error"
# status and a string representation of the error is saved in the error field
for error, expected_pks in expected_error.items():
assert expected_pks == set(
qs.filter(status=MTMessage.Status.ERROR, error=error)
)

# Ensure the messages that are successfully sent have their status updated
# to "sent" and MTMessageStatus objects are created for them
assert expected_sent == set(qs.filter(status=MTMessage.Status.SENT))
assert expected_sent == set(
MTMessageStatus.objects.values_list("mt_message", flat=True)
)
Loading