From fa6f87c589f27515206b8e528e203fc60c79b594 Mon Sep 17 00:00:00 2001 From: pohanhuangtw Date: Fri, 12 Jul 2024 02:24:39 +0000 Subject: [PATCH 1/7] [NVSHAS-9189] Add HealthCheck grpc call to check controller health periodically. --- scanner.go | 2 ++ server.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/scanner.go b/scanner.go index 2231e648..38d5e92f 100644 --- a/scanner.go +++ b/scanner.go @@ -124,6 +124,8 @@ func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinP scanner.CVEDB = nil dbData = make(map[string]*share.ScanVulnerability) // zero size + go periodCheckHealth(joinIP, joinPort, cb) + // start responding shutdown notice cb.ignoreShutdown = false <-cb.shutCh diff --git a/server.go b/server.go index 0f744aaf..41bcde92 100644 --- a/server.go +++ b/server.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "strings" "time" log "github.com/sirupsen/logrus" @@ -404,3 +405,47 @@ func scannerDeregister(joinIP string, joinPort uint16, id string) error { } return nil } + +func isConnectionError(err error) bool { + if st, ok := status.FromError(err); ok { + if st.Code() == codes.Unavailable { + if strings.Contains(st.Message(), "connection refused") || + strings.Contains(st.Message(), "transport: Error while dialing") || + strings.Contains(st.Message(), "connection error") { + return true + } + } + } + return false +} + +func getControllerHealthCheck(joinIP string, joinPort uint16, cb cluster.GRPCCallback) error { + client, err := getControllerServiceClient(joinIP, joinPort, cb) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") + return errors.New("Failed to connect to controller") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) + defer cancel() + + _, errHealthCheck := client.HealthCheck(ctx, &share.RPCVoid{}) + + return errHealthCheck +} + +func periodCheckHealth(joinIP string, joinPort uint16, cb *clientCallback) { + period := 30 + ticker := time.NewTicker(time.Duration(period) * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if isConnectionError(getControllerHealthCheck(joinIP, joinPort, cb)) { + log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort}).Error("Prepare to Reconnect the controller") + cb.shutCh <- true + } + } + } +} From 6292e7b2e3b1945589067eca3fb64991e416c12d Mon Sep 17 00:00:00 2001 From: pohanhuangtw Date: Mon, 15 Jul 2024 01:57:16 +0000 Subject: [PATCH 2/7] [NVSHAS-9189] Use GetCaps grpc call to check controller health periodically. --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 41bcde92..0388feea 100644 --- a/server.go +++ b/server.go @@ -429,7 +429,7 @@ func getControllerHealthCheck(joinIP string, joinPort uint16, cb cluster.GRPCCal ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) defer cancel() - _, errHealthCheck := client.HealthCheck(ctx, &share.RPCVoid{}) + _, errHealthCheck := client.GetCaps(ctx, &share.RPCVoid{}) return errHealthCheck } From 30e056ea154ed841c53571cc1684c183df5ee614 Mon Sep 17 00:00:00 2001 From: pohanhuangtw Date: Mon, 15 Jul 2024 09:51:54 +0000 Subject: [PATCH 3/7] [NVSHAS-9189] Add comment mesage to the periodCheckHealth --- server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server.go b/server.go index 0388feea..4b8dd13b 100644 --- a/server.go +++ b/server.go @@ -434,6 +434,8 @@ func getControllerHealthCheck(joinIP string, joinPort uint16, cb cluster.GRPCCal return errHealthCheck } +// To ensure the controller's availability, periodCheckHealth use GetCaps to periodically check if the controller is alive. +// Additionally, if the controller is deleted or not responsive, the scanner will re-register. func periodCheckHealth(joinIP string, joinPort uint16, cb *clientCallback) { period := 30 ticker := time.NewTicker(time.Duration(period) * time.Minute) From eb676ac4eec3a6b3eaa610f694f76c3cabdc7722 Mon Sep 17 00:00:00 2001 From: pohanhuangtw Date: Tue, 16 Jul 2024 08:26:51 +0000 Subject: [PATCH 4/7] [NVSHAS-9189] Improve the periodCheckHealth make sure it only create once, shorten the check period. --- scanner.go | 9 ++++++++- server.go | 7 ++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/scanner.go b/scanner.go index 38d5e92f..dfde87a5 100644 --- a/scanner.go +++ b/scanner.go @@ -104,6 +104,8 @@ func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinP ignoreShutdown: true, } + var healthCheckCh chan struct{} + for { // forever retry dbData := dbRead(path, 0, "") @@ -124,7 +126,12 @@ func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinP scanner.CVEDB = nil dbData = make(map[string]*share.ScanVulnerability) // zero size - go periodCheckHealth(joinIP, joinPort, cb) + if healthCheckCh != nil { + close(healthCheckCh) + } + + healthCheckCh = make(chan struct{}) + go periodCheckHealth(joinIP, joinPort, cb, healthCheckCh) // start responding shutdown notice cb.ignoreShutdown = false diff --git a/server.go b/server.go index 4b8dd13b..c6ba5f07 100644 --- a/server.go +++ b/server.go @@ -436,8 +436,8 @@ func getControllerHealthCheck(joinIP string, joinPort uint16, cb cluster.GRPCCal // To ensure the controller's availability, periodCheckHealth use GetCaps to periodically check if the controller is alive. // Additionally, if the controller is deleted or not responsive, the scanner will re-register. -func periodCheckHealth(joinIP string, joinPort uint16, cb *clientCallback) { - period := 30 +func periodCheckHealth(joinIP string, joinPort uint16, cb *clientCallback, healthCheckCh chan struct{}) { + period := 20 ticker := time.NewTicker(time.Duration(period) * time.Minute) defer ticker.Stop() @@ -446,8 +446,9 @@ func periodCheckHealth(joinIP string, joinPort uint16, cb *clientCallback) { case <-ticker.C: if isConnectionError(getControllerHealthCheck(joinIP, joinPort, cb)) { log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort}).Error("Prepare to Reconnect the controller") - cb.shutCh <- true } + case <-healthCheckCh: + return } } } From b8547b7681475cb742437b0296f0bab0a5a4357c Mon Sep 17 00:00:00 2001 From: pohanhuangtw Date: Wed, 17 Jul 2024 11:08:38 +0000 Subject: [PATCH 5/7] [NVSHAS-9189] Improve the periodCheckHealth make sure it can restart the scanner if the scanner is not in the controller list. --- monitor/monitor.c | 12 +++++++++++- scanner.go | 8 +++++--- server.go | 46 ++++++++++++++++++++++------------------------ 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/monitor/monitor.c b/monitor/monitor.c index 6f90cf82..9c34f6e9 100644 --- a/monitor/monitor.c +++ b/monitor/monitor.c @@ -42,6 +42,8 @@ #define ENV_SCANNER_CACHE_MAX "MAX_CACHE_RECORD_MB" #define ENV_CAP_CRITICAL "CAP_CRITICAL" +#define ENV_HEALTH_CHECK_PERIOD "HEALTH_CHECK_PERIOD" +#define ENV_RETRY "MAX_RETRY" enum { PROC_SCANNER = 0, @@ -120,7 +122,7 @@ static pid_t fork_exec(int i) char *args[PROC_ARGS_MAX], *join, *adv, *url; char *join_port, *adv_port; char *license, *registry, *repository, *tag, *user, *pass, *base, *api_user, *api_pass, *enable; - char *on_demand, *cache_record_max; + char *on_demand, *cache_record_max, *period, *retry_max; int a; switch (i) { @@ -205,6 +207,14 @@ static pid_t fork_exec(int i) args[a ++] = "--tag"; args[a ++] = tag; } + if ((period = getenv(ENV_HEALTH_CHECK_PERIOD)) != NULL) { + args[a ++] = "--period"; + args[a ++] = period; + } + if ((retry_max = getenv(ENV_RETRY)) != NULL) { + args[a ++] = "--retry_max"; + args[a ++] = retry_max; + } } // The following options apply to both standalone or non-standalone mode diff --git a/scanner.go b/scanner.go index dfde87a5..838ea2cc 100644 --- a/scanner.go +++ b/scanner.go @@ -98,7 +98,7 @@ func dbRead(path string, maxRetry int, output string) map[string]*share.ScanVuln } } -func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinPort uint16) { +func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinPort uint16, period, retryMax int, doneCh chan bool) { cb := &clientCallback{ shutCh: make(chan interface{}, 1), ignoreShutdown: true, @@ -131,7 +131,7 @@ func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinP } healthCheckCh = make(chan struct{}) - go periodCheckHealth(joinIP, joinPort, cb, healthCheckCh) + go periodCheckHealth(joinIP, joinPort, &scanner, cb, healthCheckCh, doneCh, period, retryMax) // start responding shutdown notice cb.ignoreShutdown = false @@ -186,6 +186,8 @@ func main() { noWait := flag.Bool("no_wait", false, "No initial wait") noTask := flag.Bool("no_task", false, "Not using scanner task") verbose := flag.Bool("x", false, "more debug") + period := flag.Int("period", 20, "Minutes to check if the scanner is in the controller and controller is alive") + retryMax := flag.Int("retry_max", 3, "Number of retry") output := flag.String("o", "", "Output CVEDB in json format, specify the output file") show := flag.String("show", "", "Standalone Mode: Stdout print options, cmd,module") @@ -425,7 +427,7 @@ func main() { // Use the original address, which is the service name, so when controller changes, // new IP can be resolved - go connectController(*dbPath, *adv, *join, selfID, (uint32)(*advPort), (uint16)(*joinPort)) + go connectController(*dbPath, *adv, *join, selfID, (uint32)(*advPort), (uint16)(*joinPort), *period, *retryMax, done) <-done log.Info("Exiting ...") diff --git a/server.go b/server.go index c6ba5f07..180201ad 100644 --- a/server.go +++ b/server.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "strings" "time" log "github.com/sirupsen/logrus" @@ -406,46 +405,45 @@ func scannerDeregister(joinIP string, joinPort uint16, id string) error { return nil } -func isConnectionError(err error) bool { - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unavailable { - if strings.Contains(st.Message(), "connection refused") || - strings.Contains(st.Message(), "transport: Error while dialing") || - strings.Contains(st.Message(), "connection error") { - return true - } - } - } - return false -} - -func getControllerHealthCheck(joinIP string, joinPort uint16, cb cluster.GRPCCallback) error { +func getScannerAvailable(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) (*share.ScannerAvailable, error) { client, err := getControllerServiceClient(joinIP, joinPort, cb) if err != nil { log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") - return errors.New("Failed to connect to controller") + return &share.ScannerAvailable{Visible: false}, errors.New("Failed to connect to controller") } ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) defer cancel() - _, errHealthCheck := client.GetCaps(ctx, &share.RPCVoid{}) + scannerAvailable, errHealthCheck := client.HealthCheck(ctx, data) - return errHealthCheck + return scannerAvailable, errHealthCheck } -// To ensure the controller's availability, periodCheckHealth use GetCaps to periodically check if the controller is alive. +// To ensure the controller's availability, periodCheckHealth use HealthCheck to periodically check if the controller is alive. // Additionally, if the controller is deleted or not responsive, the scanner will re-register. -func periodCheckHealth(joinIP string, joinPort uint16, cb *clientCallback, healthCheckCh chan struct{}) { - period := 20 +func periodCheckHealth(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb *clientCallback, healthCheckCh chan struct{}, done chan bool, period, retryMax int) { ticker := time.NewTicker(time.Duration(period) * time.Minute) defer ticker.Stop() - for { select { case <-ticker.C: - if isConnectionError(getControllerHealthCheck(joinIP, joinPort, cb)) { - log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort}).Error("Prepare to Reconnect the controller") + retryCnt := 0 + for retryCnt < retryMax { + scannerAvailable, errHealthCheck := getScannerAvailable(joinIP, joinPort, data, cb) + if errHealthCheck == nil { + if scannerAvailable.Visible { + break + } + } else { + log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort, "errHealthCheck": errHealthCheck}).Error("periodCheckHealth has error") + } + retryCnt++ + time.Sleep(time.Duration(period) * time.Second) // Add a delay before retrying + } + if retryCnt >= retryMax { + log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort, "retryMax": retryMax}).Error("The scanner is not in the controller, restart the scanner pod.") + done <- true } case <-healthCheckCh: return From 5a87be6b8b1717c381f24bc9eb683a80648a0b35 Mon Sep 17 00:00:00 2001 From: pohanhuangtw Date: Thu, 18 Jul 2024 07:32:55 +0000 Subject: [PATCH 6/7] [NVSHAS-9189] Add modified shared files. --- scanner.go | 6 +++--- server.go | 26 +++++++++++++------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/scanner.go b/scanner.go index 838ea2cc..28b4016c 100644 --- a/scanner.go +++ b/scanner.go @@ -98,7 +98,7 @@ func dbRead(path string, maxRetry int, output string) map[string]*share.ScanVuln } } -func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinPort uint16, period, retryMax int, doneCh chan bool) { +func connectController(path, advIP, joinHost, selfID string, advPort uint32, joinPort uint16, period, retryMax int, doneCh chan bool) { cb := &clientCallback{ shutCh: make(chan interface{}, 1), ignoreShutdown: true, @@ -118,7 +118,7 @@ func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinP ID: selfID, } - for scannerRegister(joinIP, joinPort, &scanner, cb) != nil { + for scannerRegister(joinHost, joinPort, &scanner, cb) != nil { time.Sleep(registerWaitTime) } @@ -131,7 +131,7 @@ func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinP } healthCheckCh = make(chan struct{}) - go periodCheckHealth(joinIP, joinPort, &scanner, cb, healthCheckCh, doneCh, period, retryMax) + go periodCheckHealth(joinHost, joinPort, &scanner, cb, healthCheckCh, doneCh, period, retryMax) // start responding shutdown notice cb.ignoreShutdown = false diff --git a/server.go b/server.go index 180201ad..2a0baf7f 100644 --- a/server.go +++ b/server.go @@ -220,9 +220,9 @@ func createControllerScanServiceWrapper(conn *grpc.ClientConn) cluster.Service { return share.NewControllerScanServiceClient(conn) } -func getControllerServiceClient(joinIP string, joinPort uint16, cb cluster.GRPCCallback) (share.ControllerScanServiceClient, error) { +func getControllerServiceClient(joinHost string, joinPort uint16, cb cluster.GRPCCallback) (share.ControllerScanServiceClient, error) { if cluster.GetGRPCClientEndpoint(controller) == "" { - ep := fmt.Sprintf("%s:%v", joinIP, joinPort) + ep := fmt.Sprintf("%s:%v", joinHost, joinPort) cluster.CreateGRPCClient(controller, ep, true, createControllerScanServiceWrapper) } c, err := cluster.GetGRPCClient(controller, nil, cb) @@ -349,12 +349,12 @@ func downgradeCriticalSeverityInCVEDB(data *share.ScannerRegisterData) { return } -func scannerRegister(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) error { +func scannerRegister(joinHost string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) error { log.WithFields(log.Fields{ - "join": fmt.Sprintf("%s:%d", joinIP, joinPort), "version": data.CVEDBVersion, "entries": len(data.CVEDB), + "join": fmt.Sprintf("%s:%d", joinHost, joinPort), "version": data.CVEDBVersion, "entries": len(data.CVEDB), }).Debug() - client, err := getControllerServiceClient(joinIP, joinPort, cb) + client, err := getControllerServiceClient(joinHost, joinPort, cb) if err != nil { log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") return errors.New("Failed to connect to controller") @@ -385,10 +385,10 @@ func scannerRegister(joinIP string, joinPort uint16, data *share.ScannerRegister return nil } -func scannerDeregister(joinIP string, joinPort uint16, id string) error { +func scannerDeregister(joinHost string, joinPort uint16, id string) error { log.Debug() - client, err := getControllerServiceClient(joinIP, joinPort, nil) + client, err := getControllerServiceClient(joinHost, joinPort, nil) if err != nil { log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") return errors.New("Failed to connect to controller") @@ -405,8 +405,8 @@ func scannerDeregister(joinIP string, joinPort uint16, id string) error { return nil } -func getScannerAvailable(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) (*share.ScannerAvailable, error) { - client, err := getControllerServiceClient(joinIP, joinPort, cb) +func getScannerAvailable(joinHost string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) (*share.ScannerAvailable, error) { + client, err := getControllerServiceClient(joinHost, joinPort, cb) if err != nil { log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") return &share.ScannerAvailable{Visible: false}, errors.New("Failed to connect to controller") @@ -422,7 +422,7 @@ func getScannerAvailable(joinIP string, joinPort uint16, data *share.ScannerRegi // To ensure the controller's availability, periodCheckHealth use HealthCheck to periodically check if the controller is alive. // Additionally, if the controller is deleted or not responsive, the scanner will re-register. -func periodCheckHealth(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb *clientCallback, healthCheckCh chan struct{}, done chan bool, period, retryMax int) { +func periodCheckHealth(joinHost string, joinPort uint16, data *share.ScannerRegisterData, cb *clientCallback, healthCheckCh chan struct{}, done chan bool, period, retryMax int) { ticker := time.NewTicker(time.Duration(period) * time.Minute) defer ticker.Stop() for { @@ -430,19 +430,19 @@ func periodCheckHealth(joinIP string, joinPort uint16, data *share.ScannerRegist case <-ticker.C: retryCnt := 0 for retryCnt < retryMax { - scannerAvailable, errHealthCheck := getScannerAvailable(joinIP, joinPort, data, cb) + scannerAvailable, errHealthCheck := getScannerAvailable(joinHost, joinPort, data, cb) if errHealthCheck == nil { if scannerAvailable.Visible { break } } else { - log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort, "errHealthCheck": errHealthCheck}).Error("periodCheckHealth has error") + log.WithFields(log.Fields{"joinHost": joinHost, "joinPort": joinPort, "errHealthCheck": errHealthCheck}).Debug("periodCheckHealth has error") } retryCnt++ time.Sleep(time.Duration(period) * time.Second) // Add a delay before retrying } if retryCnt >= retryMax { - log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort, "retryMax": retryMax}).Error("The scanner is not in the controller, restart the scanner pod.") + log.WithFields(log.Fields{"joinHost": joinHost, "joinPort": joinPort, "retryMax": retryMax}).Error("The scanner is not in the controller, restart the scanner pod.") done <- true } case <-healthCheckCh: From bf28afef500d823b3d521ac516c4b8c9823f8722 Mon Sep 17 00:00:00 2001 From: pohanhuangtw Date: Wed, 31 Jul 2024 10:40:56 +0000 Subject: [PATCH 7/7] [NVSHAS-9189] Remove setting from monitor and add isGetCapsActivate to avoid old controller to keep register. --- monitor/monitor.c | 12 +----------- scanner.go | 15 +++++++++------ server.go | 33 ++++++++++++++++++++------------- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/monitor/monitor.c b/monitor/monitor.c index 9c34f6e9..6f90cf82 100644 --- a/monitor/monitor.c +++ b/monitor/monitor.c @@ -42,8 +42,6 @@ #define ENV_SCANNER_CACHE_MAX "MAX_CACHE_RECORD_MB" #define ENV_CAP_CRITICAL "CAP_CRITICAL" -#define ENV_HEALTH_CHECK_PERIOD "HEALTH_CHECK_PERIOD" -#define ENV_RETRY "MAX_RETRY" enum { PROC_SCANNER = 0, @@ -122,7 +120,7 @@ static pid_t fork_exec(int i) char *args[PROC_ARGS_MAX], *join, *adv, *url; char *join_port, *adv_port; char *license, *registry, *repository, *tag, *user, *pass, *base, *api_user, *api_pass, *enable; - char *on_demand, *cache_record_max, *period, *retry_max; + char *on_demand, *cache_record_max; int a; switch (i) { @@ -207,14 +205,6 @@ static pid_t fork_exec(int i) args[a ++] = "--tag"; args[a ++] = tag; } - if ((period = getenv(ENV_HEALTH_CHECK_PERIOD)) != NULL) { - args[a ++] = "--period"; - args[a ++] = period; - } - if ((retry_max = getenv(ENV_RETRY)) != NULL) { - args[a ++] = "--retry_max"; - args[a ++] = retry_max; - } } // The following options apply to both standalone or non-standalone mode diff --git a/scanner.go b/scanner.go index 28b4016c..d077ec5b 100644 --- a/scanner.go +++ b/scanner.go @@ -50,6 +50,7 @@ var cveDB *common.CveDB var ctrlCaps share.ControllerCaps var scanTasker *Tasker var selfID string +var isGetCapsActivate bool func dbRead(path string, maxRetry int, output string) map[string]*share.ScanVulnerability { dbFile := path + share.DefaultCVEDBName @@ -98,7 +99,7 @@ func dbRead(path string, maxRetry int, output string) map[string]*share.ScanVuln } } -func connectController(path, advIP, joinHost, selfID string, advPort uint32, joinPort uint16, period, retryMax int, doneCh chan bool) { +func connectController(path, advIP, joinIP, selfID string, advPort uint32, joinPort uint16, doneCh chan bool) { cb := &clientCallback{ shutCh: make(chan interface{}, 1), ignoreShutdown: true, @@ -118,7 +119,7 @@ func connectController(path, advIP, joinHost, selfID string, advPort uint32, joi ID: selfID, } - for scannerRegister(joinHost, joinPort, &scanner, cb) != nil { + for scannerRegister(joinIP, joinPort, &scanner, cb) != nil { time.Sleep(registerWaitTime) } @@ -131,7 +132,11 @@ func connectController(path, advIP, joinHost, selfID string, advPort uint32, joi } healthCheckCh = make(chan struct{}) - go periodCheckHealth(joinHost, joinPort, &scanner, cb, healthCheckCh, doneCh, period, retryMax) + // Check if the gRPC HealthCheck API is active (indicated by isGetCapsActivate being true). + // If active, initiate periodic health checks by launching a goroutine to monitor the health status of the specified service. + if isGetCapsActivate { + go periodCheckHealth(joinIP, joinPort, &scanner, cb, healthCheckCh, doneCh) + } // start responding shutdown notice cb.ignoreShutdown = false @@ -186,8 +191,6 @@ func main() { noWait := flag.Bool("no_wait", false, "No initial wait") noTask := flag.Bool("no_task", false, "Not using scanner task") verbose := flag.Bool("x", false, "more debug") - period := flag.Int("period", 20, "Minutes to check if the scanner is in the controller and controller is alive") - retryMax := flag.Int("retry_max", 3, "Number of retry") output := flag.String("o", "", "Output CVEDB in json format, specify the output file") show := flag.String("show", "", "Standalone Mode: Stdout print options, cmd,module") @@ -427,7 +430,7 @@ func main() { // Use the original address, which is the service name, so when controller changes, // new IP can be resolved - go connectController(*dbPath, *adv, *join, selfID, (uint32)(*advPort), (uint16)(*joinPort), *period, *retryMax, done) + go connectController(*dbPath, *adv, *join, selfID, (uint32)(*advPort), (uint16)(*joinPort), done) <-done log.Info("Exiting ...") diff --git a/server.go b/server.go index 2a0baf7f..d6a1da01 100644 --- a/server.go +++ b/server.go @@ -19,6 +19,11 @@ import ( "github.com/neuvector/scanner/cvetools" ) +const ( + period = 20 // Minutes to check if the scanner is in the controller and controller is alive + retryMax = 3 // Number of retry +) + func createEnforcerScanServiceWrapper(conn *grpc.ClientConn) cluster.Service { return share.NewEnforcerScanServiceClient(conn) } @@ -220,9 +225,9 @@ func createControllerScanServiceWrapper(conn *grpc.ClientConn) cluster.Service { return share.NewControllerScanServiceClient(conn) } -func getControllerServiceClient(joinHost string, joinPort uint16, cb cluster.GRPCCallback) (share.ControllerScanServiceClient, error) { +func getControllerServiceClient(joinIP string, joinPort uint16, cb cluster.GRPCCallback) (share.ControllerScanServiceClient, error) { if cluster.GetGRPCClientEndpoint(controller) == "" { - ep := fmt.Sprintf("%s:%v", joinHost, joinPort) + ep := fmt.Sprintf("%s:%v", joinIP, joinPort) cluster.CreateGRPCClient(controller, ep, true, createControllerScanServiceWrapper) } c, err := cluster.GetGRPCClient(controller, nil, cb) @@ -349,12 +354,12 @@ func downgradeCriticalSeverityInCVEDB(data *share.ScannerRegisterData) { return } -func scannerRegister(joinHost string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) error { +func scannerRegister(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) error { log.WithFields(log.Fields{ - "join": fmt.Sprintf("%s:%d", joinHost, joinPort), "version": data.CVEDBVersion, "entries": len(data.CVEDB), + "join": fmt.Sprintf("%s:%d", joinIP, joinPort), "version": data.CVEDBVersion, "entries": len(data.CVEDB), }).Debug() - client, err := getControllerServiceClient(joinHost, joinPort, cb) + client, err := getControllerServiceClient(joinIP, joinPort, cb) if err != nil { log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") return errors.New("Failed to connect to controller") @@ -365,8 +370,10 @@ func scannerRegister(joinHost string, joinPort uint16, data *share.ScannerRegist caps, err := client.GetCaps(ctx, &share.RPCVoid{}) if err != nil { + isGetCapsActivate = false downgradeCriticalSeverityInCVEDB(data) } else { + isGetCapsActivate = true ctrlCaps = *caps if !caps.CriticalVul { downgradeCriticalSeverityInCVEDB(data) @@ -385,10 +392,10 @@ func scannerRegister(joinHost string, joinPort uint16, data *share.ScannerRegist return nil } -func scannerDeregister(joinHost string, joinPort uint16, id string) error { +func scannerDeregister(joinIP string, joinPort uint16, id string) error { log.Debug() - client, err := getControllerServiceClient(joinHost, joinPort, nil) + client, err := getControllerServiceClient(joinIP, joinPort, nil) if err != nil { log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") return errors.New("Failed to connect to controller") @@ -405,8 +412,8 @@ func scannerDeregister(joinHost string, joinPort uint16, id string) error { return nil } -func getScannerAvailable(joinHost string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) (*share.ScannerAvailable, error) { - client, err := getControllerServiceClient(joinHost, joinPort, cb) +func getScannerAvailable(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb cluster.GRPCCallback) (*share.ScannerAvailable, error) { + client, err := getControllerServiceClient(joinIP, joinPort, cb) if err != nil { log.WithFields(log.Fields{"error": err}).Error("Failed to find ctrl client") return &share.ScannerAvailable{Visible: false}, errors.New("Failed to connect to controller") @@ -422,7 +429,7 @@ func getScannerAvailable(joinHost string, joinPort uint16, data *share.ScannerRe // To ensure the controller's availability, periodCheckHealth use HealthCheck to periodically check if the controller is alive. // Additionally, if the controller is deleted or not responsive, the scanner will re-register. -func periodCheckHealth(joinHost string, joinPort uint16, data *share.ScannerRegisterData, cb *clientCallback, healthCheckCh chan struct{}, done chan bool, period, retryMax int) { +func periodCheckHealth(joinIP string, joinPort uint16, data *share.ScannerRegisterData, cb *clientCallback, healthCheckCh chan struct{}, done chan bool) { ticker := time.NewTicker(time.Duration(period) * time.Minute) defer ticker.Stop() for { @@ -430,19 +437,19 @@ func periodCheckHealth(joinHost string, joinPort uint16, data *share.ScannerRegi case <-ticker.C: retryCnt := 0 for retryCnt < retryMax { - scannerAvailable, errHealthCheck := getScannerAvailable(joinHost, joinPort, data, cb) + scannerAvailable, errHealthCheck := getScannerAvailable(joinIP, joinPort, data, cb) if errHealthCheck == nil { if scannerAvailable.Visible { break } } else { - log.WithFields(log.Fields{"joinHost": joinHost, "joinPort": joinPort, "errHealthCheck": errHealthCheck}).Debug("periodCheckHealth has error") + log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort, "errHealthCheck": errHealthCheck}).Debug("periodCheckHealth has error") } retryCnt++ time.Sleep(time.Duration(period) * time.Second) // Add a delay before retrying } if retryCnt >= retryMax { - log.WithFields(log.Fields{"joinHost": joinHost, "joinPort": joinPort, "retryMax": retryMax}).Error("The scanner is not in the controller, restart the scanner pod.") + log.WithFields(log.Fields{"joinIP": joinIP, "joinPort": joinPort, "retryMax": retryMax}).Error("The scanner is not in the controller, restart the scanner pod.") done <- true } case <-healthCheckCh: