Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add namespace in function name for metrics #1488

Merged
merged 3 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions gateway/handlers/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,21 @@ func urlToLabel(path string) string {
// PrometheusFunctionNotifier records metrics to Prometheus
type PrometheusFunctionNotifier struct {
Metrics *metrics.MetricOptions
//FunctionNamespace default namespace of the function
FunctionNamespace string
}

// Notify records metrics in Prometheus
func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalURL string, statusCode int, event string, duration time.Duration) {
if event == "completed" {
serviceName := getServiceName(originalURL)
if len(p.FunctionNamespace) > 0 {
if !strings.Contains(serviceName, ".") {
serviceName = fmt.Sprintf("%s.%s", serviceName, p.FunctionNamespace)
}
}

if event == "completed" {
seconds := duration.Seconds()
serviceName := getServiceName(originalURL)

p.Metrics.GatewayFunctionsHistogram.
WithLabelValues(serviceName).
Observe(seconds)
Expand All @@ -62,7 +68,6 @@ func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalUR
With(prometheus.Labels{"function_name": serviceName, "code": code}).
Inc()
} else if event == "started" {
serviceName := getServiceName(originalURL)
p.Metrics.GatewayFunctionInvocationStarted.WithLabelValues(serviceName).Inc()
}

Expand Down
5 changes: 3 additions & 2 deletions gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
fmt.Println(metadataQuery)

metricsOptions := metrics.BuildMetricsOptions()
exporter := metrics.NewExporter(metricsOptions, credentials)
exporter := metrics.NewExporter(metricsOptions, credentials, config.Namespace)
exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)
metrics.RegisterExporter(exporter)

Expand All @@ -77,7 +77,8 @@ func main() {
loggingNotifier := handlers.LoggingNotifier{}

prometheusNotifier := handlers.PrometheusFunctionNotifier{
Metrics: &metricsOptions,
Metrics: &metricsOptions,
FunctionNamespace: config.Namespace,
}

prometheusServiceNotifier := handlers.PrometheusServiceNotifier{
Expand Down
2 changes: 1 addition & 1 deletion gateway/metrics/add_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func mixIn(functions *[]types.FunctionStatus, metrics *VectorQueryResponse) {
for i, function := range *functions {
for _, v := range metrics.Data.Result {

if v.Metric.FunctionName == function.Name {
if v.Metric.FunctionName == fmt.Sprintf("%s.%s", function.Name, function.Namespace) {
metricValue := v.Value[1]
switch metricValue.(type) {
case string:
Expand Down
7 changes: 4 additions & 3 deletions gateway/metrics/add_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type FakePrometheusQueryFetcher struct {
}

func (q FakePrometheusQueryFetcher) Fetch(query string) (*VectorQueryResponse, error) {
val := []byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"code":"200","function_name":"func_echoit"},"value":[1509267827.752,"1"]}]}}`)
val := []byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"code":"200","function_name":"func_echoit.openfaas-fn"},"value":[1509267827.752,"1"]}]}}`)
queryRes := VectorQueryResponse{}
err := json.Unmarshal(val, &queryRes)
return &queryRes, err
Expand Down Expand Up @@ -84,8 +84,9 @@ func makeFunctionsHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
functions := []types.FunctionStatus{
types.FunctionStatus{
Name: "func_echoit",
Replicas: 0,
Name: "func_echoit",
Replicas: 0,
Namespace: "openfaas-fn",
},
}
bytesOut, marshalErr := json.Marshal(&functions)
Expand Down
162 changes: 117 additions & 45 deletions gateway/metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package metrics

import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"time"

"log"
Expand All @@ -21,17 +23,19 @@ import (

// Exporter is a prometheus exporter
type Exporter struct {
metricOptions MetricOptions
services []types.FunctionStatus
credentials *auth.BasicAuthCredentials
metricOptions MetricOptions
services []types.FunctionStatus
credentials *auth.BasicAuthCredentials
FunctionNamespace string
}

// NewExporter creates a new exporter for the OpenFaaS gateway metrics
func NewExporter(options MetricOptions, credentials *auth.BasicAuthCredentials) *Exporter {
func NewExporter(options MetricOptions, credentials *auth.BasicAuthCredentials, namespace string) *Exporter {
return &Exporter{
metricOptions: options,
services: []types.FunctionStatus{},
credentials: credentials,
metricOptions: options,
services: []types.FunctionStatus{},
credentials: credentials,
FunctionNamespace: namespace,
}
}

Expand All @@ -56,8 +60,14 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {

e.metricOptions.ServiceReplicasGauge.Reset()
for _, service := range e.services {
var serviceName string
if len(service.Namespace) > 0 {
serviceName = fmt.Sprintf("%s.%s", service.Name, service.Namespace)
} else {
serviceName = service.Name
}
e.metricOptions.ServiceReplicasGauge.
WithLabelValues(service.Name).
WithLabelValues(serviceName).
Set(float64(service.Replicas))
}

Expand All @@ -72,9 +82,45 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri
ticker := time.NewTicker(interval)
quit := make(chan struct{})

timeout := 3 * time.Second
go func() {
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this new segment of code for? I was mainly expecting this PR to set a default suffix in the invocation URL when there was not one given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New segment of code is for getting replica count of the services, which was not having support for multiple namespace. It was only getting replica count for the functions present in default namespace.

select {
case <-ticker.C:

namespaces, err := e.getNamespaces(endpointURL)
if err != nil {
log.Println(err)
}

if len(namespaces) == 0 {
services, err := e.getFunctions(endpointURL, e.FunctionNamespace)
if err != nil {
log.Println(err)
continue
}
e.services = services
} else {
for _, namespace := range namespaces {
services, err := e.getFunctions(endpointURL, namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works well for now, but I also suspect a larger discussion is needed about how/if we move this into the provider sending a request per namespace every second seems like something the provider should encapsulate so that it can optimize that behavior as it best makes sense for itself.

This current implementation would create a "thundering herd"-like problem with no way to really streamline or optimize it. For example, the k8s provider should be able to request the existing functions from several/all namespaces in a single request.

Perhaps we need a new endpoint in the provider that allows the gateway to send 0, 1, or more namespaces as part of the request. When it is 0, it would request for all namespaces, otherwise the response would send back a list of functions for the provided namespaces. Alternatively, we could allow a special _all value. Then 0 namespaces == default, 1 or more applied as obvious, _all return functions from all possible namespaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree and we should move function aggregation logic to the provider.

I think we can modify the exiting function list endpoint using some special query parameters like _all, for which will combine functions from all namespaces and return it. Otherwise it works as it is working now. I think it will be a very minimal change compared to having a new endpoint. For new endpoint we will have to write different logic depending on the provider at several places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing a thundering herd issue concerns me. What are the alternatives to doing that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thundering herd issue is because every 5 second (configurable) it tries to get all namespaces and then for each namespace it lists its functions to count service/replica count of functions.

Solution is to use a new endpoint or existing which list all functions across all namespaces so that we don't have to make these multiple calls. This implementation can be done in faas-netes which will be efficient.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viveksyngh can you write a bash loop to deploy 100-200 functions and then measure the effect of the patch?

Copy link
Member

@alexellis alexellis Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for j in {1..10} do ;
  kubectl create namespace $i #annotate it
  for i in {1..10} do ;
   faas-cli deploy --image figlet --name fn$I --namespace $j
  done
done

Perhaps create 10-20 in 10 different namespaces

if err != nil {
log.Println(err)
continue
}
e.services = append(e.services, services...)
}
}

break
case <-quit:
return
}
}
}()
}

proxyClient := http.Client{
func (e *Exporter) getHTTPClient(timeout time.Duration) http.Client {

return http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Expand All @@ -87,45 +133,71 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri
ExpectContinueTimeout: 1500 * time.Millisecond,
},
}
}

go func() {
for {
select {
case <-ticker.C:
func (e *Exporter) getFunctions(endpointURL url.URL, namespace string) ([]types.FunctionStatus, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the CLI here instead?

timeout := 3 * time.Second
proxyClient := e.getHTTPClient(timeout)

get, err := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil)
if err != nil {
log.Println(err)
return
}
endpointURL.Path = path.Join(endpointURL.Path, "/system/functions")
if len(namespace) > 0 {
q := endpointURL.Query()
q.Set("namespace", namespace)
endpointURL.RawQuery = q.Encode()
}

if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}
get, _ := http.NewRequest(http.MethodGet, endpointURL.String(), nil)
if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}

services := []types.FunctionStatus{}
res, err := proxyClient.Do(get)
if err != nil {
log.Println(err)
continue
}
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
log.Println(err)
continue
}
unmarshalErr := json.Unmarshal(bytesOut, &services)
if unmarshalErr != nil {
log.Println(err)
continue
}
services := []types.FunctionStatus{}
res, err := proxyClient.Do(get)
if err != nil {
return services, err
}

e.services = services
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
return services, readErr
}

break
case <-quit:
return
}
}
}()
unmarshalErr := json.Unmarshal(bytesOut, &services)
if unmarshalErr != nil {
return services, unmarshalErr
}
return services, nil
}

func (e *Exporter) getNamespaces(endpointURL url.URL) ([]string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the CLI's code for this?

namespaces := []string{}
endpointURL.Path = path.Join(endpointURL.Path, "system/namespaces")

get, _ := http.NewRequest(http.MethodGet, endpointURL.String(), nil)
if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}

timeout := 3 * time.Second
proxyClient := e.getHTTPClient(timeout)

res, err := proxyClient.Do(get)
if err != nil {
return namespaces, err
}

if res.StatusCode == http.StatusNotFound {
return namespaces, nil
}

bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
return namespaces, readErr
}

unmarshalErr := json.Unmarshal(bytesOut, &namespaces)
if unmarshalErr != nil {
return namespaces, unmarshalErr
}
return namespaces, nil
}
4 changes: 2 additions & 2 deletions gateway/metrics/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func readGauge(g prometheus.Metric) metricResult {

func Test_Describe_DescribesThePrometheusMetrics(t *testing.T) {
metricsOptions := BuildMetricsOptions()
exporter := NewExporter(metricsOptions, nil)
exporter := NewExporter(metricsOptions, nil, "openfaas-fn")

ch := make(chan *prometheus.Desc)
// defer close(ch)
Expand Down Expand Up @@ -65,7 +65,7 @@ func Test_Describe_DescribesThePrometheusMetrics(t *testing.T) {

func Test_Collect_CollectsTheNumberOfReplicasOfAService(t *testing.T) {
metricsOptions := BuildMetricsOptions()
exporter := NewExporter(metricsOptions, nil)
exporter := NewExporter(metricsOptions, nil, "openfaas-fn")

expectedService := types.FunctionStatus{
Name: "function_with_two_replica",
Expand Down