From f64ffb51aa79fa155de66662d6f4039da5fb4aaf Mon Sep 17 00:00:00 2001
From: Kathia Barahona <kathia.barahona@aiven.io>
Date: Wed, 4 Oct 2023 13:48:01 +0000
Subject: [PATCH] upload event tracker

Introduces UploadEventProgressTracker thread for monitoring the progress for each individual UploadEvent and logs a warning if an event has no progress for a specified period of time. It helps ensure that file uploads are making
progress and do not get stuck.
---
 Vagrantfile                  |   2 +-
 pghoard/basebackup/base.py   |   6 --
 pghoard/basebackup/chunks.py |   9 --
 pghoard/basebackup/delta.py  |   5 --
 pghoard/pghoard.py           |  13 ++-
 pghoard/transfer.py          | 164 +++++++++++++++++++++++++++++++++--
 test/test_transferagent.py   | 110 +++++++++++++++++------
 7 files changed, 251 insertions(+), 58 deletions(-)

diff --git a/Vagrantfile b/Vagrantfile
index 928a6e70..010fbfe2 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -39,7 +39,7 @@ Vagrant.configure("2") do |config|
         sed -i "s/^#create_main_cluster.*/create_main_cluster=false/g" /etc/postgresql-common/createcluster.conf
 
         apt-get install -y python{3.8,3.9,3.10} python{3.8,3.9,3.10}-dev python{3.8,3.9,3.10}-venv
-        apt-get install -y postgresql-{10,11,12,13,14} postgresql-server-dev-{10,11,12,13,14}
+        apt-get install -y postgresql-{11,12,13,14,15,16} postgresql-server-dev-{11,12,13,14,15,16}
 
         username="$(< /dev/urandom tr -dc a-z | head -c${1:-32};echo;)"
         password=$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c${1:-32};echo;)
diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py
index ecc70ec2..50c2be4c 100644
--- a/pghoard/basebackup/base.py
+++ b/pghoard/basebackup/base.py
@@ -316,9 +316,6 @@ def run_piped_basebackup(self):
         })
         metadata.update(self.metadata)
 
-        def callback(n_bytes: int) -> None:
-            self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": False})
-
         self.transfer_queue.put(
             UploadEvent(
                 file_type=FileType.Basebackup,
@@ -326,7 +323,6 @@ def callback(n_bytes: int) -> None:
                 file_path=basebackup_path,
                 callback_queue=self.callback_queue,
                 file_size=compressed_file_size,
-                incremental_progress_callback=callback,
                 source_data=stream_target,
                 remove_after_upload=True,
                 metadata=metadata
@@ -615,7 +611,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
                         data_file_format,
                         compressed_base,
                         delta_stats=delta_stats,
-                        delta=delta,
                         file_type=FileType.Basebackup_chunk
                     )
 
@@ -725,7 +720,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
             callback_queue=self.callback_queue,
             chunk_path=Path(data_file_format(0)), # pylint: disable=too-many-format-args
             temp_dir=compressed_base,
-            delta=delta,
             files_to_backup=control_files,
             file_type=FileType.Basebackup,
             extra_metadata={
diff --git a/pghoard/basebackup/chunks.py b/pghoard/basebackup/chunks.py
index 1618aba7..caa10d64 100644
--- a/pghoard/basebackup/chunks.py
+++ b/pghoard/basebackup/chunks.py
@@ -136,7 +136,6 @@ def tar_one_file(
         chunk_path,
         files_to_backup,
         callback_queue: CallbackQueue,
-        delta: bool,
         file_type: FileType = FileType.Basebackup_chunk,
         extra_metadata: Optional[Dict[str, Any]] = None,
         delta_stats: Optional[DeltaStats] = None
@@ -194,9 +193,6 @@ def tar_one_file(
 
         middle_path, chunk_name = ChunkUploader.chunk_path_to_middle_path_name(Path(chunk_path), file_type)
 
-        def callback(n_bytes: int) -> None:
-            self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": delta})
-
         self.transfer_queue.put(
             UploadEvent(
                 callback_queue=callback_queue,
@@ -205,7 +201,6 @@ def callback(n_bytes: int) -> None:
                 file_path=middle_path / chunk_name,
                 source_data=chunk_path,
                 metadata=metadata,
-                incremental_progress_callback=callback,
                 backup_site_name=self.site,
             )
         )
@@ -221,7 +216,6 @@ def handle_single_chunk(
         chunks,
         index: int,
         temp_dir: Path,
-        delta: bool,
         delta_stats: Optional[DeltaStats] = None,
         file_type: FileType = FileType.Basebackup_chunk
     ) -> Dict[str, Any]:
@@ -229,7 +223,6 @@ def handle_single_chunk(
         chunk_name, input_size, result_size = self.tar_one_file(
             callback_queue=chunk_callback_queue,
             chunk_path=chunk_path,
-            delta=delta,
             temp_dir=temp_dir,
             files_to_backup=one_chunk_files,
             delta_stats=delta_stats,
@@ -270,7 +263,6 @@ def create_and_upload_chunks(
         chunks,
         data_file_format: Callable[[int], str],
         temp_base_dir: Path,
-        delta: bool,
         delta_stats: Optional[DeltaStats] = None,
         file_type: FileType = FileType.Basebackup_chunk,
         chunks_max_progress: float = 100.0
@@ -307,7 +299,6 @@ def create_and_upload_chunks(
                         chunks=chunks,
                         index=i,
                         temp_dir=temp_base_dir,
-                        delta=delta,
                         delta_stats=delta_stats,
                         file_type=file_type
                     )
diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py
index a5311dcd..d2dc36ae 100644
--- a/pghoard/basebackup/delta.py
+++ b/pghoard/basebackup/delta.py
@@ -194,9 +194,6 @@ def progress_callback(n_bytes: int = 1) -> None:
 
         dest_path = Path("basebackup_delta") / result_digest
 
-        def callback(n_bytes: int) -> None:
-            self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True})
-
         self.transfer_queue.put(
             UploadEvent(
                 callback_queue=callback_queue,
@@ -205,7 +202,6 @@ def callback(n_bytes: int) -> None:
                 backup_site_name=self.site,
                 metadata=metadata,
                 file_path=dest_path,
-                incremental_progress_callback=callback,
                 source_data=chunk_path
             )
         )
@@ -370,7 +366,6 @@ def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[Uplo
         chunk_files = self.chunk_uploader.create_and_upload_chunks(
             chunks=delta_chunks,
             data_file_format=self.data_file_format,
-            delta=True,
             temp_base_dir=self.compressed_base,
             file_type=FileType.Basebackup_delta_chunk,
             chunks_max_progress=chunks_max_progress,
diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py
index a2a04a3b..396ecda1 100644
--- a/pghoard/pghoard.py
+++ b/pghoard/pghoard.py
@@ -45,7 +45,7 @@
     is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation
 )
 from pghoard.receivexlog import PGReceiveXLog
-from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
+from pghoard.transfer import (TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker)
 from pghoard.walreceiver import WALReceiver
 from pghoard.webserver import WebServer
 
@@ -143,6 +143,9 @@ def __init__(self, config_path):
         self.requested_basebackup_sites = set()
         self.inotify_adapter = InotifyAdapter(self.compression_queue)
         self.inotify = InotifyWatcher(self.inotify_adapter)
+
+        self.upload_tracker = UploadEventProgressTracker(metrics=self.metrics)
+
         self.webserver = WebServer(
             self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
         )
@@ -167,6 +170,7 @@ def __init__(self, config_path):
                 config=self.config,
                 mp_manager=self.mp_manager,
                 transfer_queue=self.transfer_queue,
+                upload_tracker=self.upload_tracker,
                 metrics=self.metrics,
                 shared_state_dict=self.transfer_agent_state
             )
@@ -695,6 +699,7 @@ def startup_walk_for_missed_files(self):
     def start_threads_on_startup(self):
         # Startup threads
         self.inotify.start()
+        self.upload_tracker.start()
         self.webserver.start()
         self.wal_file_deleter.start()
         for compressor in self.compressors:
@@ -971,7 +976,11 @@ def load_config(self, _signal=None, _frame=None):  # pylint: disable=unused-argu
 
     def _get_all_threads(self):
         all_threads = []
-        # on first config load webserver isn't initialized yet
+
+        # on first config load upload_tracker and webserver aren't initialized yet
+        if hasattr(self, "upload_tracker"):
+            all_threads.append(self.upload_tracker)
+
         if hasattr(self, "webserver"):
             all_threads.append(self.webserver)
         all_threads.extend(self.basebackups.values())
diff --git a/pghoard/transfer.py b/pghoard/transfer.py
index 7ed73f8f..dd23278e 100644
--- a/pghoard/transfer.py
+++ b/pghoard/transfer.py
@@ -8,18 +8,20 @@
 import enum
 import logging
 import os
+import threading
 import time
-from contextlib import suppress
+from contextlib import contextmanager, suppress
 from dataclasses import dataclass
+from functools import partial
 from io import BytesIO
 from pathlib import Path
 from queue import Empty
 from threading import Lock
-from typing import Any, BinaryIO, Dict, Optional, Union
+from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Union
 
 from rohmu import get_transfer
 from rohmu.errors import FileNotFoundFromStorageError
-from rohmu.object_storage.base import (BaseTransfer, IncrementalProgressCallbackType)
+from rohmu.object_storage.base import BaseTransfer
 from rohmu.typing import Metadata
 
 from pghoard.common import (
@@ -27,6 +29,7 @@
     get_object_storage_config
 )
 from pghoard.fetcher import FileFetchManager
+from pghoard.metrics import Metrics
 
 _STATS_LOCK = Lock()
 _last_stats_transmit_time = 0
@@ -55,7 +58,6 @@ class UploadEvent(BaseTransferEvent):
     file_size: Optional[int]
     remove_after_upload: bool = True
     retry_number: int = 0
-    incremental_progress_callback: Optional[IncrementalProgressCallbackType] = None
 
     @property
     def operation(self):
@@ -100,8 +102,150 @@ def operation(self):
 TransferQueue = Queue
 
 
+@dataclass
+class TransferIncrement:
+    total_bytes_uploaded: float
+    tracked_at: float = dataclasses.field(default_factory=time.monotonic)
+
+
+@dataclass
+class UploadEventProgress:
+    key: str
+    file_size: Optional[int]
+    file_type: FileType
+    increments: List[TransferIncrement] = dataclasses.field(default_factory=list)
+    started_at: float = dataclasses.field(default_factory=time.monotonic)
+
+
+class UploadEventProgressTracker(PGHoardThread):
+    CHECK_FREQUENCY = 5.0  # check every 5 seconds for progress
+    WARNING_TIMEOUT = 5.0 * 60  # log a warning in case there is no progress during last 5 minutes
+
+    def __init__(self, metrics: Metrics) -> None:
+        self.metrics = metrics
+        self.log = logging.getLogger("UploadEventProgressTracker")
+
+        self.running: bool = False
+
+        self._tracked_events: Dict[str, UploadEventProgress] = {}
+        self._tracked_events_lock = threading.Lock()
+        self.log.debug("UploadEventProgressTracker initialized")
+
+        super().__init__()
+
+    def track_upload_event(self, file_key: str, file_type: FileType, file_size: Optional[int]) -> None:
+        with self._tracked_events_lock:
+            self.log.debug("Tracking upload event for file %s", file_key)
+            self._tracked_events[file_key] = UploadEventProgress(key=file_key, file_type=file_type, file_size=file_size)
+
+    def untrack_upload_event(self, file_key: str) -> None:
+        if file_key not in self._tracked_events:
+            return
+
+        with self._tracked_events_lock:
+            self._tracked_events.pop(file_key)
+
+    def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
+        metric_data = {}
+        with self._tracked_events_lock:
+            if file_key not in self._tracked_events:
+                raise Exception(f"UploadEvent for {file_key} is not being tracked.")
+
+            file_type = self._tracked_events[file_key].file_type
+            if file_type in (
+                FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
+            ):
+                metric_data = {
+                    "metric": "pghoard.basebackup_bytes_uploaded",
+                    "inc_value": total_bytes_uploaded,
+                    "tags": {
+                        "delta": file_type in (FileType.Basebackup_delta, FileType.Basebackup_delta_chunk)
+                    },
+                }
+            elif file_type in (FileType.Wal, FileType.Timeline):
+                metric_data = {"metric": "pghoard.compressed_file_upload", "inc_value": total_bytes_uploaded}
+
+            self._tracked_events[file_key].increments.append(TransferIncrement(total_bytes_uploaded=total_bytes_uploaded))
+        if metric_data:
+            self.metrics.increase(**metric_data)
+
+    def reset(self) -> None:
+        with self._tracked_events_lock:
+            self._tracked_events = {}
+            self.running = False
+
+    def run_safe(self):
+        try:
+            self.running = True
+
+            while self.running:
+                with self._tracked_events_lock:
+                    self._check_increment_rate()
+
+                time.sleep(self.CHECK_FREQUENCY)
+        except Exception:  # pylint: disable=broad-except
+            self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload")
+            self.metrics.increase("pghoard.transfer_operation.errors")
+            self.reset()
+            self.stop()
+
+        self.log.debug("Quitting UploadEventProgressTracker")
+
+    def stop(self) -> None:
+        self.running = False
+
+    def _check_increment_rate(self) -> None:
+        """
+            Check if the transfer operation is progressing by comparing the time elapsed since
+            last increment with the average time it took for previous increments. If the operation has been inactive,
+            a warning will be logged.
+        """
+        now = time.monotonic()
+        for ue_progress in self._tracked_events.values():
+            last_increment_at = ue_progress.started_at
+            avg_rate = 0.
+
+            if ue_progress.increments:
+                # total "waiting" time between all increments
+                total_increment_diff = sum(
+                    next_inc.tracked_at - prev_inc.tracked_at
+                    for prev_inc, next_inc in zip(ue_progress.increments, ue_progress.increments[1:])
+                )
+                if len(ue_progress.increments) > 1:
+                    avg_rate = total_increment_diff / (len(ue_progress.increments) - 1)
+                last_increment_at = ue_progress.increments[-1].tracked_at
+
+            # log warning in case we have not tracked any progress for the operation since
+            # the last check
+            if last_increment_at and (now - last_increment_at) >= avg_rate + self.WARNING_TIMEOUT:
+                self.log.warning(
+                    "Upload for file %s has been inactive since %s seconds.", ue_progress.key, now - last_increment_at
+                )
+
+
+@contextmanager
+def track_upload_event(progress_tracker: UploadEventProgressTracker, file_key: str, upload_event: UploadEvent) -> Iterator:
+    progress_tracker.track_upload_event(
+        file_key=file_key,
+        file_type=upload_event.file_type,
+        file_size=upload_event.file_size,
+    )
+    try:
+        yield
+    finally:
+        progress_tracker.untrack_upload_event(file_key)
+
+
 class TransferAgent(PGHoardThread):
-    def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, shared_state_dict):
+    def __init__(
+        self,
+        config,
+        mp_manager,
+        transfer_queue: TransferQueue,
+        upload_tracker: UploadEventProgressTracker,
+        metrics: Metrics,
+        shared_state_dict,
+    ):
         super().__init__()
         self.log = logging.getLogger("TransferAgent")
         self.config = config
@@ -109,6 +253,7 @@ def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, s
         self.mp_manager = mp_manager
         self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage)
         self.transfer_queue = transfer_queue
+        self.upload_tracker = upload_tracker
         self.running = True
         self.sleep = time.sleep
         self.state = shared_state_dict
@@ -321,12 +466,16 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):
                 f = file_to_transfer.source_data
             else:
                 f = open(file_to_transfer.source_data, "rb")
-            with f:
+            with f, track_upload_event(progress_tracker=self.upload_tracker, file_key=key, upload_event=file_to_transfer):
                 metadata = file_to_transfer.metadata.copy()
                 if file_to_transfer.file_size:
                     metadata["Content-Length"] = str(file_to_transfer.file_size)
+                upload_progress_fn = partial(self.upload_tracker.increment, file_key=key)
                 storage.store_file_object(
-                    key, f, metadata=metadata, upload_progress_fn=file_to_transfer.incremental_progress_callback
+                    key,
+                    f,
+                    metadata=metadata,
+                    upload_progress_fn=lambda n_bytes: upload_progress_fn(total_bytes_uploaded=n_bytes),
                 )
             if unlink_local:
                 if isinstance(file_to_transfer.source_data, Path):
@@ -357,6 +506,7 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):
                     "Problem in moving file: %r, need to retry (%s: %s)", file_to_transfer.source_data,
                     ex.__class__.__name__, ex
                 )
+
             file_to_transfer = dataclasses.replace(file_to_transfer, retry_number=file_to_transfer.retry_number + 1)
             if file_to_transfer.retry_number > self.config["upload_retries_warning_limit"]:
                 create_alert_file(self.config, "upload_retries_warning")
diff --git a/test/test_transferagent.py b/test/test_transferagent.py
index 713bb550..703383fb 100644
--- a/test/test_transferagent.py
+++ b/test/test_transferagent.py
@@ -8,14 +8,15 @@
 import time
 from pathlib import Path
 from queue import Empty, Queue
-from unittest.mock import ANY, Mock, patch
+from unittest.mock import Mock, patch
 
 import pytest
+from _pytest.logging import LogCaptureFixture
 from rohmu.errors import FileNotFoundFromStorageError, StorageError
 
 from pghoard import metrics
-from pghoard.common import CallbackEvent, FileType, QuitEvent
-from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent)
+from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent
+from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker)
 
 # pylint: disable=attribute-defined-outside-init
 from .base import PGHoardTestCase
@@ -40,6 +41,31 @@ def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimet
         raise StorageError("foo")
 
 
+class MockStorageNetworkThrottle(MockStorage):
+    """
+    Storage simulating network throttling when uploading file chunks to object storage.
+    """
+    NUM_CHUNKS = 4
+    INCREMENT_WAIT_PER_CHUNK = [0.1, 0.1, 1, 0.1]
+
+    # pylint: disable=unused-argument
+    def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None):
+        file_size = int(metadata["Content-Length"]) if "Content-Length" in metadata else None
+        if not file_size:
+            return
+
+        chunk_size = round(file_size / self.NUM_CHUNKS)
+        for chunk_num in range(self.NUM_CHUNKS):
+            time.sleep(self.INCREMENT_WAIT_PER_CHUNK[chunk_num])
+            if upload_progress_fn:
+                upload_progress_fn(chunk_size)
+
+
+class PatchedUploadEventProgressTracker(UploadEventProgressTracker):
+    CHECK_FREQUENCY = .2
+    WARNING_TIMEOUT = .5
+
+
 class TestTransferAgent(PGHoardTestCase):
     def setup_method(self, method):
         super().setup_method(method)
@@ -66,16 +92,22 @@ def setup_method(self, method):
 
         self.compression_queue = Queue()
         self.transfer_queue = Queue()
+        self.upload_tracker = PatchedUploadEventProgressTracker(metrics=metrics.Metrics(statsd={}))
+        self.upload_tracker.start()
+
         self.transfer_agent = TransferAgent(
             config=self.config,
             mp_manager=None,
             transfer_queue=self.transfer_queue,
+            upload_tracker=self.upload_tracker,
             metrics=metrics.Metrics(statsd={}),
             shared_state_dict={}
         )
         self.transfer_agent.start()
 
     def teardown_method(self, method):
+        self.upload_tracker.stop()
+        self.upload_tracker.join()
         self.transfer_agent.running = False
         self.transfer_queue.put(QuitEvent)
         self.transfer_agent.join()
@@ -85,7 +117,7 @@ def _inject_prefix(self, prefix):
         self.config["backup_sites"][self.test_site]["prefix"] = prefix
 
     def test_handle_download(self):
-        callback_queue = Queue()
+        callback_queue = CallbackQueue()
         # Check the local storage fails and returns correctly
         self.transfer_queue.put(
             DownloadEvent(
@@ -122,7 +154,7 @@ def test_handle_download(self):
         storage.get_contents_to_string.assert_called_with(expected_key)
 
     def test_handle_upload_xlog(self):
-        callback_queue = Queue()
+        callback_queue = CallbackQueue()
         storage = Mock()
         self.transfer_agent.get_object_storage = lambda x: storage
         assert os.path.exists(self.foo_path) is True
@@ -141,44 +173,42 @@ def test_handle_upload_xlog(self):
         assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3})
         assert os.path.exists(self.foo_path) is True
         expected_key = os.path.join(self.test_site, "xlog/00000001000000000000000C")
-        storage.store_file_object.assert_called_with(
-            expected_key,
-            ANY,
-            metadata={
-                "Content-Length": "3",
-                "start-wal-segment": "00000001000000000000000C"
-            },
-            upload_progress_fn=None
-        )
+
+        assert storage.store_file_object.call_count == 1
+        assert storage.store_file_object.call_args[0][0] == expected_key
+        assert storage.store_file_object.call_args[1]["metadata"] == {
+            "Content-Length": "3",
+            "start-wal-segment": "00000001000000000000000C"
+        }
+
         # Now check that the prefix is used.
         self._inject_prefix("site_specific_prefix")
         self.transfer_queue.put(
             UploadEvent(
                 callback_queue=callback_queue,
                 file_type=FileType.Wal,
-                file_path=Path("xlog/00000001000000000000000C"),
+                file_path=Path("xlog/00000001000000000000000D"),
                 file_size=3,
                 remove_after_upload=True,
                 source_data=Path(self.foo_path),
-                metadata={"start-wal-segment": "00000001000000000000000C"},
+                metadata={"start-wal-segment": "00000001000000000000000D"},
                 backup_site_name=self.test_site
             )
         )
         assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3})
-        expected_key = "site_specific_prefix/xlog/00000001000000000000000C"
-        storage.store_file_object.assert_called_with(
-            expected_key,
-            ANY,
-            metadata={
-                "Content-Length": "3",
-                "start-wal-segment": "00000001000000000000000C"
-            },
-            upload_progress_fn=None
-        )
+        expected_key = "site_specific_prefix/xlog/00000001000000000000000D"
+
+        assert storage.store_file_object.call_count == 2
+        assert storage.store_file_object.call_args[0][0] == expected_key
+        assert storage.store_file_object.call_args[1]["metadata"] == {
+            "Content-Length": "3",
+            "start-wal-segment": "00000001000000000000000D"
+        }
+
         assert os.path.exists(self.foo_path) is False
 
     def test_handle_upload_basebackup(self):
-        callback_queue = Queue()
+        callback_queue = CallbackQueue()
         storage = Mock()
         self.transfer_agent.get_object_storage = storage
         assert os.path.exists(self.foo_path) is True
@@ -204,7 +234,7 @@ def sleep(sleep_amount):
             sleeps.append(sleep_amount)
             time.sleep(0.001)
 
-        callback_queue = Queue()
+        callback_queue = CallbackQueue()
         storage = MockStorageRaising()
         self.transfer_agent.sleep = sleep
         self.transfer_agent.get_object_storage = storage
@@ -229,6 +259,30 @@ def sleep(sleep_amount):
         expected_sleeps = [0.5, 1, 2, 4, 8, 16, 20, 20]
         assert sleeps[:8] == expected_sleeps
 
+    def test_tracking_warning_upload_event(self, caplog: LogCaptureFixture) -> None:
+        callback_queue = CallbackQueue()
+        storage = MockStorageNetworkThrottle()
+
+        self.transfer_agent.get_object_storage = lambda x: storage
+        assert os.path.exists(self.foo_path) is True
+        self.transfer_queue.put(
+            UploadEvent(
+                callback_queue=callback_queue,
+                file_type=FileType.Wal,
+                file_path=Path("xlog/00000001000000000000000C"),
+                file_size=100,
+                source_data=Path(self.foo_path),
+                remove_after_upload=True,
+                metadata={"start-wal-segment": "00000001000000000000000C"},
+                backup_site_name=self.test_site
+            )
+        )
+
+        assert callback_queue.get(timeout=2.0) == CallbackEvent(success=True, payload={"file_size": 100})
+        assert any(
+            record for record in caplog.records if record.levelname == "WARNING" and "has been inactive" in record.message
+        )
+
     @pytest.mark.timeout(30)
     def test_unknown_operation_raises_exception(self):
         class DummyEvent(BaseTransferEvent):