Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.6.0 #20

Merged
merged 4 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=js
| Service::PluginName | service_plugin_name | Name of the plugin |
| Service::OwnerName | service_owner_name | Name of the owner |
| Service::RepoName | service_repo_name | Name of the repo |
| Service::Status | service_status | Status of the service |
| Service::Status | service_status | Status of the service (idle, running, limit_paused, stopped) |
| Service::LastSuccessfulRunJobUrl | service_last_successful_run_job_url | Last successful run job url of the service |
| Service::LastFailedRunJobUrl | service_last_failed_run_job_url | Last failed run job url of the service |
| Service::LastSuccessfulRun | service_last_successful_run | Timestamp of last successful run of the service (RFC3339) |
| Service::LastFailedRun | service_last_failed_run | Timestamp of last failed run of the service (RFC3339) |
| Service::StatusRunningSince | service_status_running_since | Timestamp of when the service was last started (RFC3339) |
| HostCPUCount | host_cpu_count | Total CPU count of the host |
| HostCPUUsedCount | host_cpu_used_count | Total in use CPU count of the host |
| HostCPUUsagePercentage | host_cpu_usage_percentage | CPU usage percentage of the host |
Expand Down Expand Up @@ -181,7 +182,8 @@ Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=js
"LastSuccessfulRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180172013/job/25243983121",
"LastFailedRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180170811/job/25243979917",
"LastSuccessfulRun": "2024-05-21T14:16:06.300971-05:00",
"LastFailedRun": "2024-05-21T14:15:10.994464-05:00"
"LastFailedRun": "2024-05-21T14:15:10.994464-05:00",
"StatusRunningSince": "2024-05-21T14:16:06.300971-05:00"
},
{
"Name": "RUNNER1",
Expand All @@ -192,7 +194,8 @@ Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=js
"LastSuccessfulRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180172546/job/25243984537",
"LastFailedRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180171228/job/25243980930",
"LastSuccessfulRun": "2024-05-21T14:16:35.532016-05:00",
"LastFailedRun": "2024-05-21T14:15:45.930051-05:00"
"LastFailedRun": "2024-05-21T14:15:45.930051-05:00",
"StatusRunningSince": "2024-05-21T14:16:35.532016-05:00"
}
]
}
Expand All @@ -207,9 +210,11 @@ total_failed_runs_since_start 2
service_status{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet} idle
service_last_successful_run{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180172013/job/25243983121} 2024-05-21T14:16:06-05:00
service_last_failed_run{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180170811/job/25243979917} 2024-05-21T14:15:10-05:00
service_status_running_since{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet} 2024-05-21T14:16:06-05:00
service_status{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet} idle
service_last_successful_run{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180172546/job/25243984537} 2024-05-21T14:16:35-05:00
service_last_failed_run{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180171228/job/25243980930} 2024-05-21T14:15:45-05:00
service_status_running_since{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet} 2024-05-21T14:16:35-05:00
host_cpu_count 12
host_cpu_used_count 1
host_cpu_usage_percentage 10.674157
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.0
0.6.0
2 changes: 1 addition & 1 deletion internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func CheckIfKeyExists(ctx context.Context, key string) (bool, error) {
// introduce a random millisecond sleep to prevent concurrent executions from colliding
src := rand.NewSource(time.Now().UnixNano())
r := rand.New(src)
randomSleep := time.Duration(r.Intn(200)) * time.Millisecond
randomSleep := time.Duration(r.Intn(100)) * time.Millisecond
time.Sleep(randomSleep)
exists, err := database.Client.Exists(ctx, key).Result()
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Service struct {
LastFailedRunJobUrl string
LastSuccessfulRun time.Time
LastFailedRun time.Time
StatusRunningSince time.Time
}

type MetricsData struct {
Expand Down Expand Up @@ -98,6 +99,9 @@ func CompareAndUpdateMetrics(currentService Service, updatedService Service) Ser
currentService.PluginName = updatedService.PluginName
}
if updatedService.Status != "" {
if currentService.Status != updatedService.Status { // make sure we don't update the time if nothing has changed
currentService.StatusRunningSince = time.Now()
}
currentService.Status = updatedService.Status
}
if !updatedService.LastSuccessfulRun.IsZero() {
Expand Down Expand Up @@ -166,12 +170,12 @@ func UpdateService(workerCtx context.Context, serviceCtx context.Context, logger
LastFailedRun: metricsData.Services[i].LastFailedRun,
LastSuccessfulRunJobUrl: metricsData.Services[i].LastSuccessfulRunJobUrl,
LastFailedRunJobUrl: metricsData.Services[i].LastFailedRunJobUrl,
StatusRunningSince: metricsData.Services[i].StatusRunningSince,
}
newService = CompareAndUpdateMetrics(newService, updatedService)
metricsData.Services[i] = newService
}
}
UpdateSystemMetrics(serviceCtx, logger, metricsData)
}

func (m *MetricsData) UpdateService(serviceCtx context.Context, logger *slog.Logger, updatedService Service) {
Expand All @@ -183,7 +187,6 @@ func (m *MetricsData) UpdateService(serviceCtx context.Context, logger *slog.Log
for i, svc := range m.Services {
if svc.Name == updatedService.Name {
m.Services[i] = CompareAndUpdateMetrics(svc, updatedService)
UpdateSystemMetrics(serviceCtx, logger, m)
}
}
}
Expand All @@ -196,14 +199,18 @@ func NewServer(port string) *Server {
}

// Start runs the HTTP server
func (s *Server) Start(parentCtx context.Context) {
func (s *Server) Start(parentCtx context.Context, logger *slog.Logger) {
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
// update system metrics each call
metricsData := GetMetricsDataFromContext(parentCtx)
UpdateSystemMetrics(parentCtx, logger, metricsData)
//
if r.URL.Query().Get("format") == "json" {
s.handleJsonMetrics(parentCtx)(w, r)
} else if r.URL.Query().Get("format") == "prometheus" {
s.handlePrometheusMetrics(parentCtx)(w, r)
} else {
http.Error(w, "Unsupported format", http.StatusBadRequest)
http.Error(w, "unsupported format, please use '?format=json' or '?format=prometheus'", http.StatusBadRequest)
}
})
http.ListenAndServe(":"+s.Port, nil)
Expand All @@ -230,6 +237,7 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context) http.HandlerFunc {
w.Write([]byte(fmt.Sprintf("service_status{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.Status)))
w.Write([]byte(fmt.Sprintf("service_last_successful_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastSuccessfulRunJobUrl, service.LastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_last_failed_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastFailedRunJobUrl, service.LastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_status_running_since{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.StatusRunningSince.Format(time.RFC3339))))
}
w.Write([]byte(fmt.Sprintf("host_cpu_count %d\n", metricsData.HostCPUCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_used_count %d\n", metricsData.HostCPUUsedCount)))
Expand Down
18 changes: 9 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.
metricsPort = loadedConfig.Metrics.Port
}
metricsServer := metrics.NewServer(metricsPort)
go metricsServer.Start(workerCtx)
go metricsServer.Start(workerCtx, logger)
logger.InfoContext(workerCtx, "metrics server started on port "+metricsPort)
metrics.UpdateSystemMetrics(workerCtx, logger, metricsData)

/////////////
// MAIN LOGIC
for _, service := range loadedConfig.Services {
Expand Down Expand Up @@ -230,11 +232,12 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.

logger.InfoContext(serviceCtx, "started service")
metricsData.AddService(metrics.Service{
Name: service.Name,
PluginName: service.Plugin,
RepoName: service.Repo,
OwnerName: service.Owner,
Status: "idle",
Name: service.Name,
PluginName: service.Plugin,
RepoName: service.Repo,
OwnerName: service.Owner,
Status: "idle",
StatusRunningSince: time.Now(),
})

for {
Expand All @@ -247,9 +250,6 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.
})
return
default:
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
Status: "checking",
})
run.Plugin(workerCtx, serviceCtx, logger)
if workerCtx.Err() != nil || toRunOnce == "true" {
serviceCancel()
Expand Down
35 changes: 28 additions & 7 deletions plugins/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,18 @@ func ExecuteGitHubClientFunction[T any](serviceCtx context.Context, logger *slog
if response.Rate.Remaining <= 10 { // handle primary rate limiting
sleepDuration := time.Until(response.Rate.Reset.Time) + time.Second // Adding a second to ensure we're past the reset time
logger.WarnContext(serviceCtx, "GitHub API rate limit exceeded, sleeping until reset")
metricsData := metrics.GetMetricsDataFromContext(serviceCtx)
service := config.GetServiceFromContext(serviceCtx)
metricsData.UpdateService(serviceCtx, logger, metrics.Service{
Name: service.Name,
Status: "limit_paused",
})
select {
case <-time.After(sleepDuration):
metricsData.UpdateService(serviceCtx, logger, metrics.Service{
Name: service.Name,
Status: "running",
})
return ExecuteGitHubClientFunction(serviceCtx, logger, executeFunc) // Retry the function after waiting
case <-serviceCtx.Done():
return serviceCtx, nil, nil, serviceCtx.Err()
Expand Down Expand Up @@ -106,21 +116,23 @@ func getWorkflowRunJobs(serviceCtx context.Context, logger *slog.Logger) ([]Work
workflows, resp, err := githubClient.Actions.ListWorkflows(context.Background(), service.Owner, service.Repo, &github.ListOptions{})
return &workflows, resp, err
})

if serviceCtx.Err() != nil {
logger.WarnContext(serviceCtx, "context canceled during workflows listing")
return []WorkflowRunJobDetail{}, errors.New("context canceled during workflows listing")
return []WorkflowRunJobDetail{}, nil
}
if err != nil {
logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.ListWorkflows", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error executing githubClient.Actions.ListWorkflows")
}

for _, workflow := range (*workflows).Workflows {
if *workflow.State == "active" {
// WORKFLOW RUNS
serviceCtx, workflow_runs, _, err := ExecuteGitHubClientFunction[*github.WorkflowRuns](serviceCtx, logger, func() (**github.WorkflowRuns, *github.Response, error) {
workflow_runs, resp, err := githubClient.Actions.ListWorkflowRunsByID(context.Background(), service.Owner, service.Repo, *workflow.ID, &github.ListWorkflowRunsOptions{
// ListOptions: github.ListOptions{PerPage: 30},
Status: "queued",
ListOptions: github.ListOptions{PerPage: 30},
Status: "queued",
})
return &workflow_runs, resp, err // Adjusted to return the direct result
})
Expand All @@ -140,10 +152,11 @@ func getWorkflowRunJobs(serviceCtx context.Context, logger *slog.Logger) ([]Work
if err != nil {
if strings.Contains(err.Error(), "context canceled") {
logger.WarnContext(serviceCtx, "context canceled during githubClient.Actions.ListWorkflowJobs", "err", err)
return []WorkflowRunJobDetail{}, nil
} else {
logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.ListWorkflowJobs", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error executing githubClient.Actions.ListWorkflowJobs")
}
return []WorkflowRunJobDetail{}, errors.New("error executing githubClient.Actions.ListWorkflowJobs")
}
for _, job := range workflowRunJobs.Jobs {
if *job.Status == "queued" { // I don't know why, but we'll get completed jobs back in the list
Expand Down Expand Up @@ -187,10 +200,11 @@ func getWorkflowRunJobs(serviceCtx context.Context, logger *slog.Logger) ([]Work
if err != nil {
if strings.Contains(err.Error(), "context canceled") {
logger.WarnContext(serviceCtx, "context was canceled while checking if key exists in database", "err", err)
return []WorkflowRunJobDetail{}, nil
} else {
logger.ErrorContext(serviceCtx, "error checking if key exists in database", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error checking if key exists in database")
}
return []WorkflowRunJobDetail{}, errors.New("error checking if key exists in database")
}

if !exists {
Expand Down Expand Up @@ -239,6 +253,7 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log

hostHasVmCapacity := anka.HostHasVmCapacity(serviceCtx)
if !hostHasVmCapacity {
logger.DebugContext(serviceCtx, "host does not have vm capacity")
return
}

Expand Down Expand Up @@ -268,6 +283,11 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log
// obtain all queued workflow runs and jobs
allWorkflowRunJobDetails, err := getWorkflowRunJobs(serviceCtx, logger)
if err != nil {
logger.ErrorContext(serviceCtx, "error getting workflow run jobs", "err", err)
return
}
if serviceCtx.Err() != nil {
logger.WarnContext(serviceCtx, "context canceled after getWorkflowRunJobs")
return
}

Expand Down Expand Up @@ -300,10 +320,11 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log
logger.ErrorContext(serviceCtx, "error checking if already in db", "err", err)
return
} else if already {
// logger.DebugContext(serviceCtx, "job already running, skipping")
logger.DebugContext(serviceCtx, "job already running, skipping")
// this would cause a double run problem if a job finished on hostA and hostB had an array of workflowRunJobs with queued still for the same job
// we get the latest workflow run jobs each run to prevent this
return
// also, we don't return and use continue below so that we can just use the next job in the list and not have to re-parse the entire thing or make more api calls
continue
} else if !already {
added, err := dbFunctions.AddUniqueRunKey(serviceCtx)
if added && err != nil {
Expand Down
Loading