Skip to content

Commit

Permalink
pghoard: added a thread for processing the "download_results" queue
Browse files Browse the repository at this point in the history
Added a new DownloadResultsProcessor thread, which validates downloaded files and saves them to the target: '<pg_wal>/<wal_name>.tmp' -> '<pg_wal>/<wal_name>.prefetch'.
Files with "prefetch" suffix can be copied to the destination without extra checks now.
Also changed items type for "pending_download_ops" dict from "dict" to "dataclass" (PendingDownloadOp)
Fixed typing issues for webserver.py
renaming prefetched WAL in python CLI "pghoard_postgres_command"
renaming prefetched WAL in "pghoard_postgres_command_go"
  • Loading branch information
egor-voynov-aiven committed Apr 4, 2024
1 parent 5505b86 commit 6a53f07
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 131 deletions.
14 changes: 12 additions & 2 deletions golang/pghoard_postgres_command_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func run() (int, error) {
retry_seconds := *riPtr
for {
attempt += 1
rc, err := restore_command(url, *outputPtr)
rc, err := restore_command(url, *outputPtr, *xlogPtr)
if rc != EXIT_RESTORE_FAIL {
return rc, err
}
Expand All @@ -100,7 +100,7 @@ func archive_command(url string) (int, error) {
return EXIT_ABORT, errors.New("archive_command not yet implemented")
}

func restore_command(url string, output string) (int, error) {
func restore_command(url string, output string, xlog string) (int, error) {
var output_path string
var req *http.Request
var err error
Expand All @@ -120,6 +120,16 @@ func restore_command(url string, output string) (int, error) {
}
output_path = path.Join(cwd, output)
}
// if file "<xlog>.pghoard.prefetch" exists, just move it to destination
xlogPrefetchPath := path.Join(path.Dir(output_path), xlog+".pghoard.prefetch")
_, err = os.Stat(xlogPrefetchPath)
if err == nil {
err := os.Rename(xlogPrefetchPath, output_path)
if err != nil {
return EXIT_ABORT, err
}
return EXIT_OK, nil
}
req, err = http.NewRequest("GET", url, nil)
req.Header.Set("x-pghoard-target-path", output_path)
}
Expand Down
4 changes: 2 additions & 2 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ class UnhandledThreadException(Exception):


class PGHoardThread(Thread):
def __init__(self):
super().__init__()
def __init__(self, name: Optional[str] = None):
super().__init__(name=name)
self.exception: Optional[Exception] = None

def run_safe(self):
Expand Down
9 changes: 8 additions & 1 deletion pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import (TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker)
from pghoard.walreceiver import WALReceiver
from pghoard.webserver import WebServer
from pghoard.webserver import DownloadResultsProcessor, WebServer


@dataclass
Expand Down Expand Up @@ -149,6 +149,10 @@ def __init__(self, config_path):
self.webserver = WebServer(
self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
)
self.download_results_processor = DownloadResultsProcessor(
self.webserver.lock, self.webserver.download_results, self.webserver.pending_download_ops,
self.webserver.prefetch_404
)

self.wal_file_deleter = WALFileDeleterThread(
config=self.config, wal_file_deletion_queue=self.wal_file_deletion_queue, metrics=self.metrics
Expand Down Expand Up @@ -701,6 +705,7 @@ def start_threads_on_startup(self):
self.inotify.start()
self.upload_tracker.start()
self.webserver.start()
self.download_results_processor.start()
self.wal_file_deleter.start()
for compressor in self.compressors:
compressor.start()
Expand Down Expand Up @@ -983,6 +988,8 @@ def _get_all_threads(self):

if hasattr(self, "webserver"):
all_threads.append(self.webserver)
if hasattr(self, "download_results_processor"):
all_threads.append(self.download_results_processor)
all_threads.extend(self.basebackups.values())
all_threads.extend(self.receivexlogs.values())
all_threads.extend(self.walreceivers.values())
Expand Down
5 changes: 5 additions & 0 deletions pghoard/postgres_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ def restore_command(site, xlog, output, host=PGHOARD_HOST, port=PGHOARD_PORT, re
# directory. Note that os.path.join strips preceding components if a new components starts with a
# slash so it's still possible to use this with absolute paths.
output_path = os.path.join(os.getcwd(), output)
# if file "<xlog>.pghoard.prefetch" exists, just move it to destination
prefetch_path = os.path.join(os.path.dirname(output_path), xlog + ".pghoard.prefetch")
if os.path.exists(prefetch_path):
os.rename(prefetch_path, output_path)
return
headers = {"x-pghoard-target-path": output_path}
method = "GET"
path = "/{}/archive/{}".format(site, xlog)
Expand Down
Loading

0 comments on commit 6a53f07

Please sign in to comment.