Skip to content

Commit

Permalink
Implement systemd-style readiness notification via NOTIFY_SOCKET
Browse files Browse the repository at this point in the history
The service may specify this with ready-notification=socket.
This will result in dinit creating an abstract datagram socket
and perform reads on it. As soon as the READY=1 datagram arrives,
the readiness notification is signaled.
  • Loading branch information
q66 committed Oct 17, 2024
1 parent e7ad5b1 commit abd7160
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 15 deletions.
27 changes: 23 additions & 4 deletions src/baseproc-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ bool base_process_service::start_ps_process(const std::vector<const char *> &cmd

int control_socket[2] = {-1, -1};
int notify_pipe[2] = {-1, -1};
bool have_notify = !notification_var.empty() || force_notification_fd != -1;
int notify_fd = -1;
bool have_notify = !notification_var.empty() || notification_sock || force_notification_fd != -1;
ready_notify_watcher * rwatcher = have_notify ? get_ready_watcher() : nullptr;
bool ready_watcher_registered = false;

Expand All @@ -175,7 +176,7 @@ bool base_process_service::start_ps_process(const std::vector<const char *> &cmd
}
}

if (have_notify) {
if (have_notify && !notification_sock) {
// Create a notification pipe:
if (bp_sys::pipe2(notify_pipe, 0) != 0) {
log(loglevel_t::ERROR, get_name(), ": can't create notification pipe: ", strerror(errno));
Expand All @@ -185,10 +186,28 @@ bool base_process_service::start_ps_process(const std::vector<const char *> &cmd
// Set the read side as close-on-exec:
int fdflags = bp_sys::fcntl(notify_pipe[0], F_GETFD);
bp_sys::fcntl(notify_pipe[0], F_SETFD, fdflags | FD_CLOEXEC);
notify_fd = notify_pipe[1];
rwatcher->sun.sun_family = AF_UNSPEC;
} else if (have_notify) {
notify_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
if (notify_fd < 0) {
log(loglevel_t::ERROR, get_name(), ": can't create notification socket: ", strerror(errno));
goto out_cs_h;
}
memset(&rwatcher->sun, 0, sizeof(rwatcher->sun));
rwatcher->sun.sun_family = AF_UNIX;
snprintf(&rwatcher->sun.sun_path[1], sizeof(rwatcher->sun.sun_path) - 1, "/tmp/dinit-ready-notify");
if (bind(notify_fd, reinterpret_cast<sockaddr *>(&rwatcher->sun), sizeof("/tmp/dinit-ready-notify") + sizeof(sa_family_t)) < 0) {
log(loglevel_t::ERROR, get_name(), ": can't bind to notification socket: ", strerror(errno));
goto out_cs_h;
}
notification_var = "NOTIFY_SOCKET=@/tmp/dinit-ready-notify";
}

if (have_notify) {
// add, but don't yet enable, readiness watcher:
try {
rwatcher->add_watch(event_loop, notify_pipe[0], dasynq::IN_EVENTS, false);
rwatcher->add_watch(event_loop, notification_sock ? notify_fd : notify_pipe[0], dasynq::IN_EVENTS, false);
ready_watcher_registered = true;
}
catch (std::exception &exc) {
Expand Down Expand Up @@ -251,7 +270,7 @@ bool base_process_service::start_ps_process(const std::vector<const char *> &cmd
run_params.unmask_sigint = onstart_flags.unmask_intr;
run_params.csfd = control_socket[1];
run_params.socket_fd = socket_fd;
run_params.notify_fd = notify_pipe[1];
run_params.notify_fd = notify_fd;
run_params.force_notify_fd = force_notification_fd;
run_params.notify_var = notification_var.c_str();
run_params.env_file = env_file.c_str();
Expand Down
6 changes: 5 additions & 1 deletion src/includes/load-service.h
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,7 @@ class service_settings_wrapper
std::vector<service_rlimits> rlimits;

int readiness_fd = -1; // readiness fd in service process
bool readiness_sock = false; // using a socket for readiness
string readiness_var; // environment var to hold readiness fd

uid_t run_as_uid = -1;
Expand Down Expand Up @@ -1395,7 +1396,7 @@ class service_settings_wrapper
report_error("process ID file ('pid-file') not specified for bgprocess service.");
}

if (readiness_fd != -1 || !readiness_var.empty()) {
if (readiness_fd != -1 || readiness_sock || !readiness_var.empty()) {
report_error("readiness notification ('ready-notification') is not supported "
"for bgprocess services.");
}
Expand Down Expand Up @@ -1872,6 +1873,9 @@ void process_service_line(settings_wrapper &settings, const char *name, const ch
"ready-notification", input_pos);
}
}
else if (notify_setting == "socket") {
settings.readiness_sock = true;
}
else {
throw service_description_exc(name, "unrecognised setting: " + notify_setting,
"ready-notification", input_pos);
Expand Down
8 changes: 8 additions & 0 deletions src/includes/proc-service.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <sys/types.h>
#include <sys/resource.h>
#include <sys/un.h>

#include <baseproc-sys.h>
#include <service.h>
Expand Down Expand Up @@ -110,6 +111,7 @@ class ready_notify_watcher : public eventloop_t::fd_watcher_impl<ready_notify_wa
public:
base_process_service *service;
dasynq::rearm fd_event(eventloop_t &eloop, int fd, int flags) noexcept;
sockaddr_un sun;

ready_notify_watcher(base_process_service * sr) noexcept : service(sr) { }

Expand Down Expand Up @@ -217,6 +219,7 @@ class base_process_service : public service_record
uid_t run_as_uid = -1;
gid_t run_as_gid = -1;
int force_notification_fd = -1; // if set, notification fd for service process is set to this fd
bool notification_sock = false; // if set, socket will be used for notification
string notification_var; // if set, name of an environment variable for notification fd

pid_t pid = -1; // PID of the process. For a scripted service which is STARTING or STOPPING,
Expand Down Expand Up @@ -518,6 +521,11 @@ class base_process_service : public service_record
notification_var = std::move(varname);
}

void set_notification_sock(bool v)
{
notification_sock = v;
}

// The restart/stop timer expired.
void timer_expired() noexcept;

Expand Down
1 change: 1 addition & 0 deletions src/load-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ service_record * dirload_service_set::load_reload_service(const char *fullname,
rvalps->set_run_as_uid_gid(settings.run_as_uid, settings.run_as_gid);
rvalps->set_notification_fd(settings.readiness_fd);
rvalps->set_notification_var(std::move(settings.readiness_var));
rvalps->set_notification_sock(settings.readiness_sock);
rvalps->set_logfile_details(std::move(settings.logfile), settings.logfile_perms,
settings.logfile_uid, settings.logfile_gid);
rvalps->set_log_buf_max(settings.max_log_buffer_sz);
Expand Down
24 changes: 20 additions & 4 deletions src/proc-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void process_service::exec_succeeded() noexcept
// that case. Otherwise, we are STARTING or STOPPING:

if (get_state() == service_state_t::STARTING) {
if (force_notification_fd != -1 || !notification_var.empty()) {
if (force_notification_fd != -1 || notification_sock || !notification_var.empty()) {
// Wait for readiness notification:
readiness_watcher.set_enabled(event_loop, true);
}
Expand Down Expand Up @@ -175,8 +175,20 @@ rearm ready_notify_watcher::fd_event(eventloop_t &, int fd, int flags) noexcept
{
char buf[128];
if (service->get_state() == service_state_t::STARTING) {
// can we actually read anything from the notification pipe?
int r = bp_sys::read(fd, buf, sizeof(buf));
// can we actually read anything from the notification pipe/socket?
ssize_t r;
if (sun.sun_family == AF_UNSPEC) {
r = bp_sys::read(fd, buf, sizeof(buf));
}
else {
socklen_t alen = strlen(&sun.sun_path[1]) + sizeof(sa_family_t);
r = recvfrom(fd, buf, sizeof(buf), 0, reinterpret_cast<sockaddr *>(&sun), &alen);
if (r > 0 && (r != strlen("READY=1") || memcmp(buf, "READY=1", strlen("READY=1")))) {
/* ignore datagram */
errno = EAGAIN;
r = -1;
}
}
if (r > 0) {
if (service->waiting_stopstart_timer) {
service->process_timer.stop_timer(event_loop);
Expand All @@ -195,7 +207,7 @@ rearm ready_notify_watcher::fd_event(eventloop_t &, int fd, int flags) noexcept
}
service->services->process_queues();
}
else {
else if (sun.sun_family == AF_UNSPEC) {
// Just keep consuming data from the pipe:
int r = bp_sys::read(fd, buf, sizeof(buf));
if (r == 0) {
Expand All @@ -204,6 +216,10 @@ rearm ready_notify_watcher::fd_event(eventloop_t &, int fd, int flags) noexcept
service->notification_fd = -1;
return rearm::DISARM;
}
} else {
// Just consume the datagram
socklen_t alen = strlen(&sun.sun_path[1]) + sizeof(sa_family_t);
recvfrom(fd, buf, sizeof(buf), 0, reinterpret_cast<sockaddr *>(&sun), &alen);
}

return rearm::REARM;
Expand Down
16 changes: 10 additions & 6 deletions src/run-child-proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ void base_process_service::run_child_proc(run_proc_params params) noexcept
err.stage = exec_stage::SET_NOTIFYFD_VAR;
// We need to do an allocation: the variable name length, '=', and space for the value,
// and nul terminator:
int notify_var_len = strlen(notify_var);
int req_sz = notify_var_len + ((CHAR_BIT * sizeof(int) - 1 + 2) / 3) + 1;
char * var_str = (char *) malloc(req_sz);
if (var_str == nullptr) goto failure_out;
snprintf(var_str, req_sz, "%s=%d", notify_var, notify_fd);
service_env.set_var(var_str);
if (!strchr(notify_var, '=')) {
int notify_var_len = strlen(notify_var);
int req_sz = notify_var_len + ((CHAR_BIT * sizeof(int) - 1 + 2) / 3) + 1;
char * var_str = (char *) malloc(req_sz);
if (var_str == nullptr) goto failure_out;
snprintf(var_str, req_sz, "%s=%d", notify_var, notify_fd);
service_env.set_var(var_str);
} else {
service_env.set_var(notify_var);
}
}

// Set up Systemd-style socket activation:
Expand Down

0 comments on commit abd7160

Please sign in to comment.