diff --git a/gateway/handlers/notifiers.go b/gateway/handlers/notifiers.go index ce581cf50..10fa23497 100644 --- a/gateway/handlers/notifiers.go +++ b/gateway/handlers/notifiers.go @@ -43,15 +43,21 @@ func urlToLabel(path string) string { // PrometheusFunctionNotifier records metrics to Prometheus type PrometheusFunctionNotifier struct { Metrics *metrics.MetricOptions + //FunctionNamespace default namespace of the function + FunctionNamespace string } // Notify records metrics in Prometheus func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalURL string, statusCode int, event string, duration time.Duration) { - if event == "completed" { + serviceName := getServiceName(originalURL) + if len(p.FunctionNamespace) > 0 { + if !strings.Contains(serviceName, ".") { + serviceName = fmt.Sprintf("%s.%s", serviceName, p.FunctionNamespace) + } + } + if event == "completed" { seconds := duration.Seconds() - serviceName := getServiceName(originalURL) - p.Metrics.GatewayFunctionsHistogram. WithLabelValues(serviceName). Observe(seconds) @@ -62,7 +68,6 @@ func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalUR With(prometheus.Labels{"function_name": serviceName, "code": code}). Inc() } else if event == "started" { - serviceName := getServiceName(originalURL) p.Metrics.GatewayFunctionInvocationStarted.WithLabelValues(serviceName).Inc() } diff --git a/gateway/main.go b/gateway/main.go index 2be7a11d6..19477285d 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -65,7 +65,7 @@ func main() { fmt.Println(metadataQuery) metricsOptions := metrics.BuildMetricsOptions() - exporter := metrics.NewExporter(metricsOptions, credentials) + exporter := metrics.NewExporter(metricsOptions, credentials, config.Namespace) exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval) metrics.RegisterExporter(exporter) @@ -77,7 +77,8 @@ func main() { loggingNotifier := handlers.LoggingNotifier{} prometheusNotifier := handlers.PrometheusFunctionNotifier{ - Metrics: &metricsOptions, + Metrics: &metricsOptions, + FunctionNamespace: config.Namespace, } prometheusServiceNotifier := handlers.PrometheusServiceNotifier{ diff --git a/gateway/metrics/add_metrics.go b/gateway/metrics/add_metrics.go index 4490cd2ae..2171db239 100644 --- a/gateway/metrics/add_metrics.go +++ b/gateway/metrics/add_metrics.go @@ -88,7 +88,7 @@ func mixIn(functions *[]types.FunctionStatus, metrics *VectorQueryResponse) { for i, function := range *functions { for _, v := range metrics.Data.Result { - if v.Metric.FunctionName == function.Name { + if v.Metric.FunctionName == fmt.Sprintf("%s.%s", function.Name, function.Namespace) { metricValue := v.Value[1] switch metricValue.(type) { case string: diff --git a/gateway/metrics/add_metrics_test.go b/gateway/metrics/add_metrics_test.go index 3389565f9..a3ebff567 100644 --- a/gateway/metrics/add_metrics_test.go +++ b/gateway/metrics/add_metrics_test.go @@ -14,7 +14,7 @@ type FakePrometheusQueryFetcher struct { } func (q FakePrometheusQueryFetcher) Fetch(query string) (*VectorQueryResponse, error) { - val := []byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"code":"200","function_name":"func_echoit"},"value":[1509267827.752,"1"]}]}}`) + val := []byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"code":"200","function_name":"func_echoit.openfaas-fn"},"value":[1509267827.752,"1"]}]}}`) queryRes := VectorQueryResponse{} err := json.Unmarshal(val, &queryRes) return &queryRes, err @@ -84,8 +84,9 @@ func makeFunctionsHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { functions := []types.FunctionStatus{ types.FunctionStatus{ - Name: "func_echoit", - Replicas: 0, + Name: "func_echoit", + Replicas: 0, + Namespace: "openfaas-fn", }, } bytesOut, marshalErr := json.Marshal(&functions) diff --git a/gateway/metrics/exporter.go b/gateway/metrics/exporter.go index 5c662fdd6..c3e5c0a32 100644 --- a/gateway/metrics/exporter.go +++ b/gateway/metrics/exporter.go @@ -6,10 +6,12 @@ package metrics import ( "encoding/json" + "fmt" "io/ioutil" "net" "net/http" "net/url" + "path" "time" "log" @@ -21,17 +23,19 @@ import ( // Exporter is a prometheus exporter type Exporter struct { - metricOptions MetricOptions - services []types.FunctionStatus - credentials *auth.BasicAuthCredentials + metricOptions MetricOptions + services []types.FunctionStatus + credentials *auth.BasicAuthCredentials + FunctionNamespace string } // NewExporter creates a new exporter for the OpenFaaS gateway metrics -func NewExporter(options MetricOptions, credentials *auth.BasicAuthCredentials) *Exporter { +func NewExporter(options MetricOptions, credentials *auth.BasicAuthCredentials, namespace string) *Exporter { return &Exporter{ - metricOptions: options, - services: []types.FunctionStatus{}, - credentials: credentials, + metricOptions: options, + services: []types.FunctionStatus{}, + credentials: credentials, + FunctionNamespace: namespace, } } @@ -56,8 +60,14 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { e.metricOptions.ServiceReplicasGauge.Reset() for _, service := range e.services { + var serviceName string + if len(service.Namespace) > 0 { + serviceName = fmt.Sprintf("%s.%s", service.Name, service.Namespace) + } else { + serviceName = service.Name + } e.metricOptions.ServiceReplicasGauge. - WithLabelValues(service.Name). + WithLabelValues(serviceName). Set(float64(service.Replicas)) } @@ -72,9 +82,45 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri ticker := time.NewTicker(interval) quit := make(chan struct{}) - timeout := 3 * time.Second + go func() { + for { + select { + case <-ticker.C: + + namespaces, err := e.getNamespaces(endpointURL) + if err != nil { + log.Println(err) + } + + if len(namespaces) == 0 { + services, err := e.getFunctions(endpointURL, e.FunctionNamespace) + if err != nil { + log.Println(err) + continue + } + e.services = services + } else { + for _, namespace := range namespaces { + services, err := e.getFunctions(endpointURL, namespace) + if err != nil { + log.Println(err) + continue + } + e.services = append(e.services, services...) + } + } + + break + case <-quit: + return + } + } + }() +} - proxyClient := http.Client{ +func (e *Exporter) getHTTPClient(timeout time.Duration) http.Client { + + return http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ @@ -87,45 +133,71 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri ExpectContinueTimeout: 1500 * time.Millisecond, }, } +} - go func() { - for { - select { - case <-ticker.C: +func (e *Exporter) getFunctions(endpointURL url.URL, namespace string) ([]types.FunctionStatus, error) { + timeout := 3 * time.Second + proxyClient := e.getHTTPClient(timeout) - get, err := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil) - if err != nil { - log.Println(err) - return - } + endpointURL.Path = path.Join(endpointURL.Path, "/system/functions") + if len(namespace) > 0 { + q := endpointURL.Query() + q.Set("namespace", namespace) + endpointURL.RawQuery = q.Encode() + } - if e.credentials != nil { - get.SetBasicAuth(e.credentials.User, e.credentials.Password) - } + get, _ := http.NewRequest(http.MethodGet, endpointURL.String(), nil) + if e.credentials != nil { + get.SetBasicAuth(e.credentials.User, e.credentials.Password) + } - services := []types.FunctionStatus{} - res, err := proxyClient.Do(get) - if err != nil { - log.Println(err) - continue - } - bytesOut, readErr := ioutil.ReadAll(res.Body) - if readErr != nil { - log.Println(err) - continue - } - unmarshalErr := json.Unmarshal(bytesOut, &services) - if unmarshalErr != nil { - log.Println(err) - continue - } + services := []types.FunctionStatus{} + res, err := proxyClient.Do(get) + if err != nil { + return services, err + } - e.services = services + bytesOut, readErr := ioutil.ReadAll(res.Body) + if readErr != nil { + return services, readErr + } - break - case <-quit: - return - } - } - }() + unmarshalErr := json.Unmarshal(bytesOut, &services) + if unmarshalErr != nil { + return services, unmarshalErr + } + return services, nil +} + +func (e *Exporter) getNamespaces(endpointURL url.URL) ([]string, error) { + namespaces := []string{} + endpointURL.Path = path.Join(endpointURL.Path, "system/namespaces") + + get, _ := http.NewRequest(http.MethodGet, endpointURL.String(), nil) + if e.credentials != nil { + get.SetBasicAuth(e.credentials.User, e.credentials.Password) + } + + timeout := 3 * time.Second + proxyClient := e.getHTTPClient(timeout) + + res, err := proxyClient.Do(get) + if err != nil { + return namespaces, err + } + + if res.StatusCode == http.StatusNotFound { + return namespaces, nil + } + + bytesOut, readErr := ioutil.ReadAll(res.Body) + if readErr != nil { + return namespaces, readErr + } + + unmarshalErr := json.Unmarshal(bytesOut, &namespaces) + if unmarshalErr != nil { + return namespaces, unmarshalErr + } + return namespaces, nil } diff --git a/gateway/metrics/exporter_test.go b/gateway/metrics/exporter_test.go index 93cd03908..069b5b8a6 100644 --- a/gateway/metrics/exporter_test.go +++ b/gateway/metrics/exporter_test.go @@ -33,7 +33,7 @@ func readGauge(g prometheus.Metric) metricResult { func Test_Describe_DescribesThePrometheusMetrics(t *testing.T) { metricsOptions := BuildMetricsOptions() - exporter := NewExporter(metricsOptions, nil) + exporter := NewExporter(metricsOptions, nil, "openfaas-fn") ch := make(chan *prometheus.Desc) // defer close(ch) @@ -65,7 +65,7 @@ func Test_Describe_DescribesThePrometheusMetrics(t *testing.T) { func Test_Collect_CollectsTheNumberOfReplicasOfAService(t *testing.T) { metricsOptions := BuildMetricsOptions() - exporter := NewExporter(metricsOptions, nil) + exporter := NewExporter(metricsOptions, nil, "openfaas-fn") expectedService := types.FunctionStatus{ Name: "function_with_two_replica",