diff --git a/cmd/nginx/flag_test.go b/cmd/nginx/flag_test.go new file mode 100644 index 0000000000..ca662e4555 --- /dev/null +++ b/cmd/nginx/flag_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "os" + "testing" +) + +// resetForTesting clears all flag state and sets the usage function as directed. +// After calling resetForTesting, parse errors in flag handling will not +// exit the program. +// Extracted from https://github.com/golang/go/blob/master/src/flag/export_test.go +func resetForTesting(usage func()) { + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + flag.Usage = usage +} + +func TestMandatoryFlag(t *testing.T) { + _, _, err := parseFlags() + if err == nil { + t.Fatalf("expected and error about default backend service") + } +} + +func TestDefaults(t *testing.T) { + resetForTesting(func() { t.Fatal("bad parse") }) + + oldArgs := os.Args + defer func() { os.Args = oldArgs }() + os.Args = []string{"cmd", "--default-backend-service", "namespace/test", "--http-port", "0", "--https-port", "0"} + + showVersion, conf, err := parseFlags() + if err != nil { + t.Fatalf("unexpected error parsing default flags: %v", err) + } + + if showVersion { + t.Fatal("expected false but true was returned for flag show-version") + } + + if conf == nil { + t.Fatal("expected a configuration but nil returned") + } +} + +func TestSetupSSLProxy(t *testing.T) { + // TODO +} diff --git a/cmd/nginx/flags.go b/cmd/nginx/flags.go new file mode 100644 index 0000000000..f037eb1b10 --- /dev/null +++ b/cmd/nginx/flags.go @@ -0,0 +1,191 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/golang/glog" + "github.com/spf13/pflag" + + apiv1 "k8s.io/api/core/v1" + + "k8s.io/ingress-nginx/pkg/ingress/controller" + ngx_config "k8s.io/ingress-nginx/pkg/ingress/controller/config" + ing_net "k8s.io/ingress-nginx/pkg/net" +) + +const ( + defIngressClass = "nginx" +) + +func parseFlags() (bool, *controller.Configuration, error) { + var ( + flags = pflag.NewFlagSet("", pflag.ExitOnError) + + apiserverHost = flags.String("apiserver-host", "", "The address of the Kubernetes Apiserver "+ + "to connect to in the format of protocol://address:port, e.g., "+ + "http://localhost:8080. If not specified, the assumption is that the binary runs inside a "+ + "Kubernetes cluster and local discovery is attempted.") + kubeConfigFile = flags.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") + + defaultSvc = flags.String("default-backend-service", "", + `Service used to serve a 404 page for the default backend. Takes the form + namespace/name. The controller uses the first node port of this Service for + the default backend.`) + + ingressClass = flags.String("ingress-class", "", + `Name of the ingress class to route through this controller.`) + + configMap = flags.String("configmap", "", + `Name of the ConfigMap that contains the custom configuration to use`) + + publishSvc = flags.String("publish-service", "", + `Service fronting the ingress controllers. Takes the form + namespace/name. The controller will set the endpoint records on the + ingress objects to reflect those on the service.`) + + tcpConfigMapName = flags.String("tcp-services-configmap", "", + `Name of the ConfigMap that contains the definition of the TCP services to expose. + The key in the map indicates the external port to be used. The value is the name of the + service with the format namespace/serviceName and the port of the service could be a + number of the name of the port. + The ports 80 and 443 are not allowed as external ports. This ports are reserved for the backend`) + + udpConfigMapName = flags.String("udp-services-configmap", "", + `Name of the ConfigMap that contains the definition of the UDP services to expose. + The key in the map indicates the external port to be used. The value is the name of the + service with the format namespace/serviceName and the port of the service could be a + number of the name of the port.`) + + resyncPeriod = flags.Duration("sync-period", 600*time.Second, + `Relist and confirm cloud resources this often. Default is 10 minutes`) + + watchNamespace = flags.String("watch-namespace", apiv1.NamespaceAll, + `Namespace to watch for Ingress. Default is to watch all namespaces`) + + profiling = flags.Bool("profiling", true, `Enable profiling via web interface host:port/debug/pprof/`) + + defSSLCertificate = flags.String("default-ssl-certificate", "", `Name of the secret + that contains a SSL certificate to be used as default for a HTTPS catch-all server`) + + defHealthzURL = flags.String("health-check-path", "/healthz", `Defines + the URL to be used as health check inside in the default server in NGINX.`) + + updateStatus = flags.Bool("update-status", true, `Indicates if the + ingress controller should update the Ingress status IP/hostname. Default is true`) + + electionID = flags.String("election-id", "ingress-controller-leader", `Election id to use for status update.`) + + forceIsolation = flags.Bool("force-namespace-isolation", false, + `Force namespace isolation. This flag is required to avoid the reference of secrets or + configmaps located in a different namespace than the specified in the flag --watch-namespace.`) + + disableNodeList = flags.Bool("disable-node-list", false, + `Disable querying nodes. If --force-namespace-isolation is true, this should also be set.`) + + updateStatusOnShutdown = flags.Bool("update-status-on-shutdown", true, `Indicates if the + ingress controller should update the Ingress status IP/hostname when the controller + is being stopped. Default is true`) + + sortBackends = flags.Bool("sort-backends", false, + `Defines if backends and it's endpoints should be sorted`) + + useNodeInternalIP = flags.Bool("report-node-internal-ip-address", false, + `Defines if the nodes IP address to be returned in the ingress status should be the internal instead of the external IP address`) + + showVersion = flags.Bool("version", false, + `Shows release information about the NGINX Ingress controller`) + + enableSSLPassthrough = flags.Bool("enable-ssl-passthrough", false, `Enable SSL passthrough feature. Default is disabled`) + + httpPort = flags.Int("http-port", 80, `Indicates the port to use for HTTP traffic`) + httpsPort = flags.Int("https-port", 443, `Indicates the port to use for HTTPS traffic`) + statusPort = flags.Int("status-port", 18080, `Indicates the TCP port to use for exposing the nginx status page`) + sslProxyPort = flags.Int("ssl-passtrough-proxy-port", 442, `Default port to use internally for SSL when SSL Passthgough is enabled`) + defServerPort = flags.Int("default-server-port", 8181, `Default port to use for exposing the default server (catch all)`) + healthzPort = flags.Int("healthz-port", 10254, "port for healthz endpoint.") + ) + + flag.Set("logtostderr", "true") + + flags.AddGoFlagSet(flag.CommandLine) + flags.Parse(os.Args) + flag.Set("logtostderr", "true") + + // Workaround for this issue: + // https://github.com/kubernetes/kubernetes/issues/17162 + flag.CommandLine.Parse([]string{}) + + if *showVersion { + return true, nil, nil + } + + if *defaultSvc == "" { + return false, nil, fmt.Errorf("Please specify --default-backend-service") + } + + if *ingressClass != "" { + glog.Infof("Watching for ingress class: %s", *ingressClass) + + if *ingressClass != defIngressClass { + glog.Warningf("only Ingress with class \"%v\" will be processed by this ingress controller", *ingressClass) + } + } + + // check port collisions + if !ing_net.IsPortAvailable(*httpPort) { + return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --http-port", *httpPort) + } + + if !ing_net.IsPortAvailable(*httpsPort) { + return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --https-port", *httpsPort) + } + + if !ing_net.IsPortAvailable(*statusPort) { + return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --status-port", *statusPort) + } + + if !ing_net.IsPortAvailable(*defServerPort) { + return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --default-server-port", *defServerPort) + } + + if *enableSSLPassthrough && !ing_net.IsPortAvailable(*sslProxyPort) { + return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --ssl-passtrough-proxy-port", *sslProxyPort) + } + + config := &controller.Configuration{ + APIServerHost: *apiserverHost, + KubeConfigFile: *kubeConfigFile, + UpdateStatus: *updateStatus, + ElectionID: *electionID, + EnableProfiling: *profiling, + EnableSSLPassthrough: *enableSSLPassthrough, + ResyncPeriod: *resyncPeriod, + DefaultService: *defaultSvc, + IngressClass: *ingressClass, + Namespace: *watchNamespace, + ConfigMapName: *configMap, + TCPConfigMapName: *tcpConfigMapName, + UDPConfigMapName: *udpConfigMapName, + DefaultSSLCertificate: *defSSLCertificate, + DefaultHealthzURL: *defHealthzURL, + PublishService: *publishSvc, + ForceNamespaceIsolation: *forceIsolation, + DisableNodeList: *disableNodeList, + UpdateStatusOnShutdown: *updateStatusOnShutdown, + SortBackends: *sortBackends, + UseNodeInternalIP: *useNodeInternalIP, + ListenPorts: &ngx_config.ListenPorts{ + Default: *defServerPort, + Health: *healthzPort, + HTTP: *httpPort, + HTTPS: *httpsPort, + SSLProxy: *sslProxyPort, + Status: *statusPort, + }, + } + + return false, config, nil +} diff --git a/cmd/nginx/main.go b/cmd/nginx/main.go index 165957e416..a77418d425 100644 --- a/cmd/nginx/main.go +++ b/cmd/nginx/main.go @@ -17,29 +17,125 @@ limitations under the License. package main import ( + "encoding/json" + "fmt" + "net" + "net/http" + "net/http/pprof" "os" "os/signal" + "strings" "syscall" "time" - "k8s.io/ingress-nginx/pkg/nginx/controller" - + proxyproto "github.com/armon/go-proxyproto" "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus/promhttp" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/server/healthz" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + + "k8s.io/ingress-nginx/pkg/ingress" + "k8s.io/ingress-nginx/pkg/ingress/controller" + "k8s.io/ingress-nginx/pkg/k8s" + "k8s.io/ingress-nginx/pkg/net/ssl" + "k8s.io/ingress-nginx/version" ) func main() { - // start a new nginx controller - ngx := controller.NewNGINXController() + fmt.Println(version.String()) + + showVersion, conf, err := parseFlags() + if showVersion { + os.Exit(0) + } + + if err != nil { + glog.Fatal(err) + } + + kubeClient, err := createApiserverClient(conf.APIServerHost, conf.KubeConfigFile) + if err != nil { + handleFatalInitError(err) + } + + ns, name, err := k8s.ParseNameNS(conf.DefaultService) + if err != nil { + glog.Fatalf("invalid format for service %v: %v", conf.DefaultService, err) + } + + _, err = kubeClient.Core().Services(ns).Get(name, metav1.GetOptions{}) + if err != nil { + if strings.Contains(err.Error(), "cannot get services in the namespace") { + glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration") + } + glog.Fatalf("no service with name %v found: %v", conf.DefaultService, err) + } + glog.Infof("validated %v as the default backend", conf.DefaultService) + + if conf.PublishService != "" { + ns, name, err := k8s.ParseNameNS(conf.PublishService) + if err != nil { + glog.Fatalf("invalid service format: %v", err) + } + + svc, err := kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{}) + if err != nil { + glog.Fatalf("unexpected error getting information about service %v: %v", conf.PublishService, err) + } + + if len(svc.Status.LoadBalancer.Ingress) == 0 { + if len(svc.Spec.ExternalIPs) > 0 { + glog.Infof("service %v validated as assigned with externalIP", conf.PublishService) + } else { + // We could poll here, but we instead just exit and rely on k8s to restart us + glog.Fatalf("service %s does not (yet) have ingress points", conf.PublishService) + } + } else { + glog.Infof("service %v validated as source of Ingress status", conf.PublishService) + } + } + + if conf.Namespace != "" { + _, err = kubeClient.CoreV1().Namespaces().Get(conf.Namespace, metav1.GetOptions{}) + if err != nil { + glog.Fatalf("no watchNamespace with name %v found: %v", conf.Namespace, err) + } + } + + if conf.ResyncPeriod.Seconds() < 10 { + glog.Fatalf("resync period (%vs) is too low", conf.ResyncPeriod.Seconds()) + } + + // create directory that will contains the SSL Certificates + err = os.MkdirAll(ingress.DefaultSSLDirectory, 0655) + if err != nil { + glog.Errorf("Failed to mkdir SSL directory: %v", err) + } + // create the default SSL certificate (dummy) + sha, pem := createDefaultSSLCertificate() + conf.FakeCertificatePath = pem + conf.FakeCertificateSHA = sha + + conf.Client = kubeClient + conf.DefaultIngressClass = defIngressClass + + ngx := controller.NewNGINXController(conf) + + if conf.EnableSSLPassthrough { + setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx) + } go handleSigterm(ngx) - // start the controller + + mux := http.NewServeMux() + go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux) + ngx.Start() - // wait - glog.Infof("shutting down Ingress controller...") - for { - glog.Infof("Handled quit, awaiting pod deletion") - time.Sleep(30 * time.Second) - } } func handleSigterm(ngx *controller.NGINXController) { @@ -54,6 +150,176 @@ func handleSigterm(ngx *controller.NGINXController) { exitCode = 1 } + glog.Infof("Handled quit, awaiting pod deletion") + time.Sleep(10 * time.Second) + glog.Infof("Exiting with %v", exitCode) os.Exit(exitCode) } + +func setupSSLProxy(sslPort, proxyPort int, n *controller.NGINXController) { + glog.Info("starting TLS proxy for SSL passthrough") + n.Proxy = &controller.TCPProxy{ + Default: &controller.TCPServer{ + Hostname: "localhost", + IP: "127.0.0.1", + Port: proxyPort, + ProxyProtocol: true, + }, + } + + listener, err := net.Listen("tcp", fmt.Sprintf(":%v", sslPort)) + if err != nil { + glog.Fatalf("%v", err) + } + + proxyList := &proxyproto.Listener{Listener: listener} + + // start goroutine that accepts tcp connections in port 443 + go func() { + for { + var conn net.Conn + var err error + + if n.IsProxyProtocolEnabled { + // we need to wrap the listener in order to decode + // proxy protocol before handling the connection + conn, err = proxyList.Accept() + } else { + conn, err = listener.Accept() + } + + if err != nil { + glog.Warningf("unexpected error accepting tcp connection: %v", err) + continue + } + + glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr()) + go n.Proxy.Handle(conn) + } + }() +} + +// createApiserverClient creates new Kubernetes Apiserver client. When kubeconfig or apiserverHost param is empty +// the function assumes that it is running inside a Kubernetes cluster and attempts to +// discover the Apiserver. Otherwise, it connects to the Apiserver specified. +// +// apiserverHost param is in the format of protocol://address:port/pathPrefix, e.g.http://localhost:8001. +// kubeConfig location of kubeconfig file +func createApiserverClient(apiserverHost string, kubeConfig string) (*kubernetes.Clientset, error) { + cfg, err := buildConfigFromFlags(apiserverHost, kubeConfig) + if err != nil { + return nil, err + } + + cfg.QPS = defaultQPS + cfg.Burst = defaultBurst + cfg.ContentType = "application/vnd.kubernetes.protobuf" + + glog.Infof("Creating API client for %s", cfg.Host) + + client, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + + v, err := client.Discovery().ServerVersion() + if err != nil { + return nil, err + } + + glog.Infof("Running in Kubernetes Cluster version v%v.%v (%v) - git (%v) commit %v - platform %v", + v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform) + + return client, nil +} + +const ( + // High enough QPS to fit all expected use cases. QPS=0 is not set here, because + // client code is overriding it. + defaultQPS = 1e6 + // High enough Burst to fit all expected use cases. Burst=0 is not set here, because + // client code is overriding it. + defaultBurst = 1e6 + + fakeCertificate = "default-fake-certificate" +) + +// buildConfigFromFlags builds REST config based on master URL and kubeconfig path. +// If both of them are empty then in cluster config is used. +func buildConfigFromFlags(masterURL, kubeconfigPath string) (*rest.Config, error) { + if kubeconfigPath == "" && masterURL == "" { + kubeconfig, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + return kubeconfig, nil + } + + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath}, + &clientcmd.ConfigOverrides{ + ClusterInfo: clientcmdapi.Cluster{ + Server: masterURL, + }, + }).ClientConfig() +} + +/** + * Handles fatal init error that prevents server from doing any work. Prints verbose error + * message and quits the server. + */ +func handleFatalInitError(err error) { + glog.Fatalf("Error while initializing connection to Kubernetes apiserver. "+ + "This most likely means that the cluster is misconfigured (e.g., it has "+ + "invalid apiserver certificates or service accounts configuration). Reason: %s\n"+ + "Refer to the troubleshooting guide for more information: "+ + "https://github.com/kubernetes/ingress-nginx/blob/master/docs/troubleshooting.md", err) +} + +func registerHandlers(enableProfiling bool, port int, ic *controller.NGINXController, mux *http.ServeMux) { + // expose health check endpoint (/healthz) + healthz.InstallHandler(mux, + healthz.PingHealthz, + ic, + ) + + mux.Handle("/metrics", promhttp.Handler()) + + mux.HandleFunc("/build", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + b, _ := json.Marshal(version.String()) + w.Write(b) + }) + + mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) { + err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + if err != nil { + glog.Errorf("unexpected error: %v", err) + } + }) + + if enableProfiling { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + } + + server := &http.Server{ + Addr: fmt.Sprintf(":%v", port), + Handler: mux, + } + glog.Fatal(server.ListenAndServe()) +} + +func createDefaultSSLCertificate() (string, string) { + defCert, defKey := ssl.GetFakeSSLCert() + c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}) + if err != nil { + glog.Fatalf("Error generating self signed certificate: %v", err) + } + + return c.PemSHA, c.PemFileName +} diff --git a/pkg/nginx/controller/utils_test.go b/pkg/file/file_test.go similarity index 54% rename from pkg/nginx/controller/utils_test.go rename to pkg/file/file_test.go index f11f813c3b..c091fa491b 100644 --- a/pkg/nginx/controller/utils_test.go +++ b/pkg/file/file_test.go @@ -14,28 +14,40 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package file -import "testing" +import ( + "io/ioutil" + "testing" +) -func TestDiff(t *testing.T) { +func TestSHA1(t *testing.T) { tests := []struct { - a []byte - b []byte - empty bool + content []byte + sha string }{ - {[]byte(""), []byte(""), true}, - {[]byte("a"), []byte("a"), true}, - {[]byte("a"), []byte("b"), false}, + {[]byte(""), "da39a3ee5e6b4b0d3255bfef95601890afd80709"}, + {[]byte("hello world"), "2aae6c35c94fcfb415dbe95f408b9ce91ee846ed"}, } for _, test := range tests { - b, err := diff(test.a, test.b) + f, err := ioutil.TempFile("", "sha-test") if err != nil { - t.Fatalf("unexpected error returned: %v", err) + t.Fatal(err) } - if len(b) == 0 && !test.empty { - t.Fatalf("expected empty but returned %s", b) + f.Write(test.content) + f.Sync() + + sha := SHA1(f.Name()) + f.Close() + + if sha != test.sha { + t.Fatalf("expected %v but returned %s", test.sha, sha) } } + + sha := SHA1("") + if sha != ""{ + t.Fatalf("expected an empty sha but returned %s", sha) + } } diff --git a/pkg/ingress/annotations/cors/main_test.go b/pkg/ingress/annotations/cors/main_test.go index 2a1782b89f..11a0227311 100644 --- a/pkg/ingress/annotations/cors/main_test.go +++ b/pkg/ingress/annotations/cors/main_test.go @@ -77,7 +77,7 @@ func TestIngressCorsConfig(t *testing.T) { t.Errorf("expected a Config type") } - if nginxCors.CorsEnabled != true { + if !nginxCors.CorsEnabled { t.Errorf("expected cors enabled but returned %v", nginxCors.CorsEnabled) } diff --git a/pkg/ingress/controller/backend_ssl.go b/pkg/ingress/controller/backend_ssl.go index a82bfd8549..fd88caa23b 100644 --- a/pkg/ingress/controller/backend_ssl.go +++ b/pkg/ingress/controller/backend_ssl.go @@ -36,7 +36,7 @@ import ( // syncSecret keeps in sync Secrets used by Ingress rules with the files on // disk to allow copy of the content of the secret to disk to be used // by external processes. -func (ic *GenericController) syncSecret(key string) { +func (ic *NGINXController) syncSecret(key string) { glog.V(3).Infof("starting syncing of secret %v", key) cert, err := ic.getPemCertificate(key) @@ -70,7 +70,7 @@ func (ic *GenericController) syncSecret(key string) { // getPemCertificate receives a secret, and creates a ingress.SSLCert as return. // It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only. -func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLCert, error) { +func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCert, error) { secret, err := ic.listers.Secret.GetByName(secretName) if err != nil { return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err) @@ -127,7 +127,7 @@ func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLC // checkMissingSecrets verify if one or more ingress rules contains a reference // to a secret that is not present in the local secret store. // In this case we call syncSecret. -func (ic *GenericController) checkMissingSecrets() { +func (ic *NGINXController) checkMissingSecrets() { for _, obj := range ic.listers.Ingress.List() { ing := obj.(*extensions.Ingress) diff --git a/pkg/ingress/controller/backend_ssl_test.go b/pkg/ingress/controller/backend_ssl_test.go index 69b49cf74e..f59cbe8f53 100644 --- a/pkg/ingress/controller/backend_ssl_test.go +++ b/pkg/ingress/controller/backend_ssl_test.go @@ -103,8 +103,8 @@ func buildControllerForBackendSSL() cache_client.Controller { return cache_client.New(cfg) } -func buildGenericControllerForBackendSSL() *GenericController { - gc := &GenericController{ +func buildGenericControllerForBackendSSL() *NGINXController { + gc := &NGINXController{ syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), cfg: &Configuration{ Client: buildSimpleClientSetForBackendSSL(), diff --git a/pkg/ingress/controller/checker.go b/pkg/ingress/controller/checker.go new file mode 100644 index 0000000000..2850804b76 --- /dev/null +++ b/pkg/ingress/controller/checker.go @@ -0,0 +1,68 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + + "github.com/golang/glog" + "github.com/ncabatoff/process-exporter/proc" +) + +// Name returns the healthcheck name +func (n NGINXController) Name() string { + return "Ingress Controller" +} + +// Check returns if the nginx healthz endpoint is returning ok (status code 200) +func (n NGINXController) Check(_ *http.Request) error { + res, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath)) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != 200 { + return fmt.Errorf("ingress controller is not healthy") + } + + // check the nginx master process is running + fs, err := proc.NewFS("/proc") + if err != nil { + glog.Errorf("%v", err) + return err + } + f, err := ioutil.ReadFile("/run/nginx.pid") + if err != nil { + glog.Errorf("%v", err) + return err + } + pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n")) + if err != nil { + return err + } + _, err = fs.NewProc(pid) + if err != nil { + glog.Errorf("%v", err) + return err + } + + return nil +} diff --git a/pkg/nginx/config/config.go b/pkg/ingress/controller/config/config.go similarity index 100% rename from pkg/nginx/config/config.go rename to pkg/ingress/controller/config/config.go diff --git a/pkg/nginx/config/config_test.go b/pkg/ingress/controller/config/config_test.go similarity index 100% rename from pkg/nginx/config/config_test.go rename to pkg/ingress/controller/config/config_test.go diff --git a/pkg/ingress/controller/controller.go b/pkg/ingress/controller/controller.go index cdca29a1ca..fa33267e88 100644 --- a/pkg/ingress/controller/controller.go +++ b/pkg/ingress/controller/controller.go @@ -24,7 +24,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -35,12 +34,7 @@ import ( "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/ingress-nginx/pkg/ingress" @@ -48,11 +42,10 @@ import ( "k8s.io/ingress-nginx/pkg/ingress/annotations/healthcheck" "k8s.io/ingress-nginx/pkg/ingress/annotations/parser" "k8s.io/ingress-nginx/pkg/ingress/annotations/proxy" + ngx_config "k8s.io/ingress-nginx/pkg/ingress/controller/config" "k8s.io/ingress-nginx/pkg/ingress/defaults" "k8s.io/ingress-nginx/pkg/ingress/resolver" - "k8s.io/ingress-nginx/pkg/ingress/status" "k8s.io/ingress-nginx/pkg/k8s" - "k8s.io/ingress-nginx/pkg/net/ssl" "k8s.io/ingress-nginx/pkg/task" ) @@ -60,63 +53,32 @@ const ( defUpstreamName = "upstream-default-backend" defServerName = "_" rootLocation = "/" - - fakeCertificate = "default-fake-certificate" ) var ( // list of ports that cannot be used by TCP or UDP services reservedPorts = []string{"80", "443", "8181", "18080"} - fakeCertificatePath = "" - fakeCertificateSHA = "" - - cloner = conversion.NewCloner() + cloner *conversion.Cloner ) -// GenericController holds the boilerplate code required to build an Ingress controlller. -type GenericController struct { - cfg *Configuration - - listers *ingress.StoreLister - cacheController *cacheController - - annotations annotationExtractor - - recorder record.EventRecorder - - syncQueue *task.Queue - - syncStatus status.Sync - - // local store of SSL certificates - // (only certificates used in ingress) - sslCertTracker *sslCertTracker - - syncRateLimiter flowcontrol.RateLimiter - - // stopLock is used to enforce only a single call to Stop is active. - // Needed because we allow stopping through an http endpoint and - // allowing concurrent stoppers leads to stack traces. - stopLock *sync.Mutex - - stopCh chan struct{} - - // runningConfig contains the running configuration in the Backend - runningConfig *ingress.Configuration - - forceReload int32 +func init() { + cloner := conversion.NewCloner() + cloner.RegisterDeepCopyFunc(ingress.GetGeneratedDeepCopyFuncs) } // Configuration contains all the settings required by an Ingress controller type Configuration struct { - Client clientset.Interface + APIServerHost string + KubeConfigFile string + Client clientset.Interface - ResyncPeriod time.Duration + ResyncPeriod time.Duration + + ConfigMapName string DefaultService string IngressClass string Namespace string - ConfigMapName string ForceNamespaceIsolation bool DisableNodeList bool @@ -124,15 +86,14 @@ type Configuration struct { // optional TCPConfigMapName string // optional - UDPConfigMapName string - DefaultSSLCertificate string + UDPConfigMapName string + DefaultHealthzURL string DefaultIngressClass string + DefaultSSLCertificate string + // optional PublishService string - // Backend is the particular implementation to be used. - // (for instance NGINX) - Backend ingress.Controller UpdateStatus bool UseNodeInternalIP bool @@ -140,74 +101,25 @@ type Configuration struct { UpdateStatusOnShutdown bool SortBackends bool -} - -// newIngressController creates an Ingress controller -func newIngressController(config *Configuration) *GenericController { - - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ - Interface: config.Client.CoreV1().Events(config.Namespace), - }) - - ic := GenericController{ - cfg: config, - stopLock: &sync.Mutex{}, - stopCh: make(chan struct{}), - syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ - Component: "ingress-controller", - }), - sslCertTracker: newSSLCertTracker(), - } - - ic.syncQueue = task.NewTaskQueue(ic.syncIngress) - - ic.listers, ic.cacheController = ic.createListers(config.DisableNodeList) - - if config.UpdateStatus { - ic.syncStatus = status.NewStatusSyncer(status.Config{ - Client: config.Client, - PublishService: ic.cfg.PublishService, - IngressLister: ic.listers.Ingress, - ElectionID: config.ElectionID, - IngressClass: config.IngressClass, - DefaultIngressClass: config.DefaultIngressClass, - UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, - CustomIngressStatus: ic.cfg.Backend.UpdateIngressStatus, - UseNodeInternalIP: ic.cfg.UseNodeInternalIP, - }) - } else { - glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)") - } - ic.annotations = newAnnotationExtractor(ic) - - ic.cfg.Backend.SetListers(ic.listers) - cloner.RegisterDeepCopyFunc(ingress.GetGeneratedDeepCopyFuncs) + ListenPorts *ngx_config.ListenPorts - return &ic -} + EnableSSLPassthrough bool -// Info returns information about the backend -func (ic GenericController) Info() *ingress.BackendInfo { - return ic.cfg.Backend.Info() -} + EnableProfiling bool -// IngressClass returns information about the backend -func (ic GenericController) IngressClass() string { - return ic.cfg.IngressClass + FakeCertificatePath string + FakeCertificateSHA string } // GetDefaultBackend returns the default backend -func (ic GenericController) GetDefaultBackend() defaults.Backend { - return ic.cfg.Backend.BackendDefaults() +func (n NGINXController) GetDefaultBackend() defaults.Backend { + return n.backendDefaults } // GetPublishService returns the configured service used to set ingress status -func (ic GenericController) GetPublishService() *apiv1.Service { - s, err := ic.listers.Service.GetByName(ic.cfg.PublishService) +func (n NGINXController) GetPublishService() *apiv1.Service { + s, err := n.listers.Service.GetByName(n.cfg.PublishService) if err != nil { return nil } @@ -215,42 +127,37 @@ func (ic GenericController) GetPublishService() *apiv1.Service { return s } -// GetRecorder returns the event recorder -func (ic GenericController) GetRecorder() record.EventRecorder { - return ic.recorder -} - // GetSecret searches for a secret in the local secrets Store -func (ic GenericController) GetSecret(name string) (*apiv1.Secret, error) { - return ic.listers.Secret.GetByName(name) +func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) { + return n.listers.Secret.GetByName(name) } // GetService searches for a service in the local secrets Store -func (ic GenericController) GetService(name string) (*apiv1.Service, error) { - return ic.listers.Service.GetByName(name) +func (n NGINXController) GetService(name string) (*apiv1.Service, error) { + return n.listers.Service.GetByName(name) } // sync collects all the pieces required to assemble the configuration file and // then sends the content to the backend (OnUpdate) receiving the populated // template as response reloading the backend if is required. -func (ic *GenericController) syncIngress(item interface{}) error { - ic.syncRateLimiter.Accept() +func (n *NGINXController) syncIngress(item interface{}) error { + n.syncRateLimiter.Accept() - if ic.syncQueue.IsShuttingDown() { + if n.syncQueue.IsShuttingDown() { return nil } if element, ok := item.(task.Element); ok { if name, ok := element.Key.(string); ok { - if obj, exists, _ := ic.listers.Ingress.GetByKey(name); exists { + if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists { ing := obj.(*extensions.Ingress) - ic.readSecrets(ing) + n.readSecrets(ing) } } } // Sort ingress rules using the ResourceVersion field - ings := ic.listers.Ingress.List() + ings := n.listers.Ingress.List() sort.SliceStable(ings, func(i, j int) bool { ir := ings[i].(*extensions.Ingress).ResourceVersion jr := ings[j].(*extensions.Ingress).ResourceVersion @@ -261,14 +168,14 @@ func (ic *GenericController) syncIngress(item interface{}) error { var ingresses []*extensions.Ingress for _, ingIf := range ings { ing := ingIf.(*extensions.Ingress) - if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + if !class.IsValid(ing, n.cfg.IngressClass, n.cfg.DefaultIngressClass) { continue } ingresses = append(ingresses, ing) } - upstreams, servers := ic.getBackendServers(ingresses) + upstreams, servers := n.getBackendServers(ingresses) var passUpstreams []*ingress.SSLPassthroughBackend for _, server := range servers { @@ -294,19 +201,19 @@ func (ic *GenericController) syncIngress(item interface{}) error { pcfg := ingress.Configuration{ Backends: upstreams, Servers: servers, - TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, apiv1.ProtocolTCP), - UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, apiv1.ProtocolUDP), + TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP), + UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP), PassthroughBackends: passUpstreams, } - if !ic.isForceReload() && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) { + if !n.isForceReload() && n.runningConfig != nil && n.runningConfig.Equal(&pcfg) { glog.V(3).Infof("skipping backend reload (no changes detected)") return nil } glog.Infof("backend reload required") - err := ic.cfg.Backend.OnUpdate(pcfg) + err := n.OnUpdate(pcfg) if err != nil { incReloadErrorCount() glog.Errorf("unexpected failure restarting the backend: \n%v", err) @@ -317,13 +224,13 @@ func (ic *GenericController) syncIngress(item interface{}) error { incReloadCount() setSSLExpireTime(servers) - ic.runningConfig = &pcfg - ic.SetForceReload(false) + n.runningConfig = &pcfg + n.SetForceReload(false) return nil } -func (ic *GenericController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service { +func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service { glog.V(3).Infof("obtaining information about stream services of type %v located in configmap %v", proto, configmapName) if configmapName == "" { // no configmap configured @@ -336,7 +243,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 return []ingress.L4Service{} } - configmap, err := ic.listers.ConfigMap.GetByName(configmapName) + configmap, err := n.listers.ConfigMap.GetByName(configmapName) if err != nil { glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err) return []ingress.L4Service{} @@ -386,7 +293,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 continue } - svcObj, svcExists, err := ic.listers.Service.GetByKey(nsName) + svcObj, svcExists, err := n.listers.Service.GetByKey(nsName) if err != nil { glog.Warningf("error getting service %v: %v", nsName, err) continue @@ -406,7 +313,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 for _, sp := range svc.Spec.Ports { if sp.Name == svcPort { if sp.Protocol == proto { - endps = ic.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{}) + endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{}) break } } @@ -417,7 +324,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 for _, sp := range svc.Spec.Ports { if sp.Port == int32(targetPort) { if sp.Protocol == proto { - endps = ic.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{}) + endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{}) break } } @@ -450,29 +357,29 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 // getDefaultUpstream returns an upstream associated with the // default backend service. In case of error retrieving information // configure the upstream to return http code 503. -func (ic *GenericController) getDefaultUpstream() *ingress.Backend { +func (n *NGINXController) getDefaultUpstream() *ingress.Backend { upstream := &ingress.Backend{ Name: defUpstreamName, } - svcKey := ic.cfg.DefaultService - svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey) + svcKey := n.cfg.DefaultService + svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) if err != nil { - glog.Warningf("unexpected error searching the default backend %v: %v", ic.cfg.DefaultService, err) - upstream.Endpoints = append(upstream.Endpoints, ic.cfg.Backend.DefaultEndpoint()) + glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err) + upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) return upstream } if !svcExists { glog.Warningf("service %v does not exist", svcKey) - upstream.Endpoints = append(upstream.Endpoints, ic.cfg.Backend.DefaultEndpoint()) + upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) return upstream } svc := svcObj.(*apiv1.Service) - endps := ic.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Upstream{}) + endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Upstream{}) if len(endps) == 0 { glog.Warningf("service %v does not have any active endpoints", svcKey) - endps = []ingress.Endpoint{ic.cfg.Backend.DefaultEndpoint()} + endps = []ingress.Endpoint{n.DefaultEndpoint()} } upstream.Service = svc @@ -482,14 +389,14 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend { // getBackendServers returns a list of Upstream and Server to be used by the backend // An upstream can be used in multiple servers if the namespace, service name and port are the same -func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress) ([]*ingress.Backend, []*ingress.Server) { - du := ic.getDefaultUpstream() - upstreams := ic.createUpstreams(ingresses, du) - servers := ic.createServers(ingresses, upstreams, du) +func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]*ingress.Backend, []*ingress.Server) { + du := n.getDefaultUpstream() + upstreams := n.createUpstreams(ingresses, du) + servers := n.createServers(ingresses, upstreams, du) for _, ing := range ingresses { - affinity := ic.annotations.SessionAffinity(ing) - anns := ic.annotations.Extract(ing) + affinity := n.annotations.SessionAffinity(ing) + anns := n.annotations.Extract(ing) for _, rule := range ing.Spec.Rules { host := rule.Host @@ -508,7 +415,7 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress) } if server.CertificateAuth.CAFileName == "" { - ca := ic.annotations.CertificateAuth(ing) + ca := n.annotations.CertificateAuth(ing) if ca != nil { server.CertificateAuth = *ca // It is possible that no CAFileName is found in the secret @@ -609,7 +516,7 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress) // check if the location contains endpoints and a custom default backend if location.DefaultBackend != nil { sp := location.DefaultBackend.Spec.Ports[0] - endps := ic.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Upstream{}) + endps := n.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Upstream{}) if len(endps) > 0 { glog.V(3).Infof("using custom default backend in server %v location %v (service %v/%v)", server.Hostname, location.Path, location.DefaultBackend.Namespace, location.DefaultBackend.Name) @@ -656,7 +563,7 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress) aUpstreams = append(aUpstreams, upstream) } - if ic.cfg.SortBackends { + if n.cfg.SortBackends { sort.SliceStable(aUpstreams, func(a, b int) bool { return aUpstreams[a].Name < aUpstreams[b].Name }) @@ -678,17 +585,17 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress) } // GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret -func (ic GenericController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { - if _, exists := ic.sslCertTracker.Get(name); !exists { - ic.syncSecret(name) +func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { + if _, exists := n.sslCertTracker.Get(name); !exists { + n.syncSecret(name) } - _, err := ic.listers.Secret.GetByName(name) + _, err := n.listers.Secret.GetByName(name) if err != nil { return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err) } - bc, exists := ic.sslCertTracker.Get(name) + bc, exists := n.sslCertTracker.Get(name) if !exists { return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name) } @@ -702,15 +609,15 @@ func (ic GenericController) GetAuthCertificate(name string) (*resolver.AuthSSLCe // createUpstreams creates the NGINX upstreams for each service referenced in // Ingress rules. The servers inside the upstream are endpoints. -func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend { +func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend { upstreams := make(map[string]*ingress.Backend) upstreams[defUpstreamName] = du for _, ing := range data { - secUpstream := ic.annotations.SecureUpstream(ing) - hz := ic.annotations.HealthCheck(ing) - serviceUpstream := ic.annotations.ServiceUpstream(ing) - upstreamHashBy := ic.annotations.UpstreamHashBy(ing) + secUpstream := n.annotations.SecureUpstream(ing) + hz := n.annotations.HealthCheck(ing) + serviceUpstream := n.annotations.ServiceUpstream(ing) + upstreamHashBy := n.annotations.UpstreamHashBy(ing) var defBackend string if ing.Spec.Backend != nil { @@ -726,7 +633,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing // Add the service cluster endpoint as the upstream instead of individual endpoints // if the serviceUpstream annotation is enabled if serviceUpstream { - endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend) + endpoint, err := n.getServiceClusterEndpoint(svcKey, ing.Spec.Backend) if err != nil { glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err) } else { @@ -735,7 +642,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing } if len(upstreams[defBackend].Endpoints) == 0 { - endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz) + endps, err := n.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz) upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...) if err != nil { glog.Warningf("error creating upstream %v: %v", defBackend, err) @@ -780,7 +687,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing // Add the service cluster endpoint as the upstream instead of individual endpoints // if the serviceUpstream annotation is enabled if serviceUpstream { - endpoint, err := ic.getServiceClusterEndpoint(svcKey, &path.Backend) + endpoint, err := n.getServiceClusterEndpoint(svcKey, &path.Backend) if err != nil { glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err) } else { @@ -789,7 +696,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing } if len(upstreams[name].Endpoints) == 0 { - endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz) + endp, err := n.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz) if err != nil { glog.Warningf("error obtaining service endpoints: %v", err) continue @@ -797,7 +704,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing upstreams[name].Endpoints = endp } - s, err := ic.listers.Service.GetByName(svcKey) + s, err := n.listers.Service.GetByName(svcKey) if err != nil { glog.Warningf("error obtaining service: %v", err) continue @@ -811,8 +718,8 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing return upstreams } -func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) { - svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey) +func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) { + svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) if !svcExists { return endpoint, fmt.Errorf("service %v does not exist", svcKey) @@ -848,9 +755,9 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e // serviceEndpoints returns the upstream servers (endpoints) associated // to a service. -func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, +func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, hz *healthcheck.Upstream) ([]ingress.Endpoint, error) { - svc, err := ic.listers.Service.GetByName(svcKey) + svc, err := n.listers.Service.GetByName(svcKey) var upstreams []ingress.Endpoint if err != nil { @@ -864,12 +771,12 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, servicePort.TargetPort.String() == backendPort || servicePort.Name == backendPort { - endps := ic.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz) + endps := n.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz) if len(endps) == 0 { glog.Warningf("service %v does not have any active endpoints", svcKey) } - if ic.cfg.SortBackends { + if n.cfg.SortBackends { sort.SliceStable(endps, func(i, j int) bool { iName := endps[i].Address jName := endps[j].Address @@ -898,7 +805,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, Port: int32(externalPort), TargetPort: intstr.FromString(backendPort), } - endps := ic.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz) + endps := n.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz) if len(endps) == 0 { glog.Warningf("service %v does not have any active endpoints", svcKey) return upstreams, nil @@ -908,7 +815,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, return upstreams, nil } - if !ic.cfg.SortBackends { + if !n.cfg.SortBackends { rand.Seed(time.Now().UnixNano()) for i := range upstreams { j := rand.Intn(i + 1) @@ -923,7 +830,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, // FDQN referenced by ingress rules and the common name field in the referenced // SSL certificates. Each server is configured with location / using a default // backend specified by the user or the one inside the ingress spec. -func (ic *GenericController) createServers(data []*extensions.Ingress, +func (n *NGINXController) createServers(data []*extensions.Ingress, upstreams map[string]*ingress.Backend, du *ingress.Backend) map[string]*ingress.Server { @@ -932,7 +839,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress, // remove the alias to avoid conflicts. aliases := make(map[string]string, len(data)) - bdef := ic.GetDefaultBackend() + bdef := n.GetDefaultBackend() ngxProxy := proxy.Configuration{ BodySize: bdef.ProxyBodySize, ConnectTimeout: bdef.ProxyConnectTimeout, @@ -946,12 +853,12 @@ func (ic *GenericController) createServers(data []*extensions.Ingress, } // generated on Start() with createDefaultSSLCertificate() - defaultPemFileName := fakeCertificatePath - defaultPemSHA := fakeCertificateSHA + defaultPemFileName := n.cfg.FakeCertificatePath + defaultPemSHA := n.cfg.FakeCertificateSHA // Tries to fetch the default Certificate from nginx configuration. // If it does not exists, use the ones generated on Start() - defaultCertificate, err := ic.getPemCertificate(ic.cfg.DefaultSSLCertificate) + defaultCertificate, err := n.getPemCertificate(n.cfg.DefaultSSLCertificate) if err == nil { defaultPemFileName = defaultCertificate.PemFileName defaultPemSHA = defaultCertificate.PemSHA @@ -976,7 +883,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress, for _, ing := range data { // check if ssl passthrough is configured - sslpt := ic.annotations.SSLPassthrough(ing) + sslpt := n.annotations.SSLPassthrough(ing) // default upstream server un := du.Name @@ -1028,8 +935,8 @@ func (ic *GenericController) createServers(data []*extensions.Ingress, // configure default location, alias, and SSL for _, ing := range data { // setup server-alias based on annotations - aliasAnnotation := ic.annotations.Alias(ing) - srvsnippet := ic.annotations.ServerSnippet(ing) + aliasAnnotation := n.annotations.Alias(ing) + srvsnippet := n.annotations.ServerSnippet(ing) for _, rule := range ing.Spec.Rules { host := rule.Host @@ -1095,7 +1002,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress, } key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName) - bc, exists := ic.sslCertTracker.Get(key) + bc, exists := n.sslCertTracker.Get(key) if !exists { glog.Warningf("ssl certificate \"%v\" does not exist in local store", key) continue @@ -1130,7 +1037,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress, } // getEndpoints returns a list of : for a given service/target port combination. -func (ic *GenericController) getEndpoints( +func (n *NGINXController) getEndpoints( s *apiv1.Service, servicePort *apiv1.ServicePort, proto apiv1.Protocol, @@ -1171,7 +1078,7 @@ func (ic *GenericController) getEndpoints( } glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String()) - ep, err := ic.listers.Endpoint.GetServiceEndpoints(s) + ep, err := n.listers.Endpoint.GetServiceEndpoints(s) if err != nil { glog.Warningf("unexpected error obtaining service endpoints: %v", err) return upsServers @@ -1221,99 +1128,32 @@ func (ic *GenericController) getEndpoints( } // readSecrets extracts information about secrets from an Ingress rule -func (ic *GenericController) readSecrets(ing *extensions.Ingress) { +func (n *NGINXController) readSecrets(ing *extensions.Ingress) { for _, tls := range ing.Spec.TLS { if tls.SecretName == "" { continue } key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) - ic.syncSecret(key) + n.syncSecret(key) } key, _ := parser.GetStringAnnotation("ingress.kubernetes.io/auth-tls-secret", ing) if key == "" { return } - ic.syncSecret(key) -} - -// Stop stops the loadbalancer controller. -func (ic GenericController) Stop() error { - ic.stopLock.Lock() - defer ic.stopLock.Unlock() - - // Only try draining the workqueue if we haven't already. - if !ic.syncQueue.IsShuttingDown() { - glog.Infof("shutting down controller queues") - close(ic.stopCh) - go ic.syncQueue.Shutdown() - if ic.syncStatus != nil { - ic.syncStatus.Shutdown() - } - return nil - } - - return fmt.Errorf("shutdown already in progress") -} - -// Start starts the Ingress controller. -func (ic *GenericController) Start() { - glog.Infof("starting Ingress controller") - - ic.cacheController.Run(ic.stopCh) - - createDefaultSSLCertificate() - - time.Sleep(5 * time.Second) - // initial sync of secrets to avoid unnecessary reloads - glog.Info("running initial sync of secrets") - for _, obj := range ic.listers.Ingress.List() { - ing := obj.(*extensions.Ingress) - - if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { - a, _ := parser.GetStringAnnotation(class.IngressKey, ing) - glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) - continue - } - - ic.readSecrets(ing) - } - - go ic.syncQueue.Run(time.Second, ic.stopCh) - - if ic.syncStatus != nil { - go ic.syncStatus.Run(ic.stopCh) - } - - go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh) - - // force initial sync - ic.syncQueue.Enqueue(&extensions.Ingress{}) - - <-ic.stopCh + n.syncSecret(key) } -func (ic *GenericController) isForceReload() bool { - return atomic.LoadInt32(&ic.forceReload) != 0 +func (n *NGINXController) isForceReload() bool { + return atomic.LoadInt32(&n.forceReload) != 0 } -func (ic *GenericController) SetForceReload(shouldReload bool) { +func (n *NGINXController) SetForceReload(shouldReload bool) { if shouldReload { - atomic.StoreInt32(&ic.forceReload, 1) - ic.syncQueue.Enqueue(&extensions.Ingress{}) + atomic.StoreInt32(&n.forceReload, 1) + n.syncQueue.Enqueue(&extensions.Ingress{}) } else { - atomic.StoreInt32(&ic.forceReload, 0) - } -} - -func createDefaultSSLCertificate() { - defCert, defKey := ssl.GetFakeSSLCert() - c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}) - if err != nil { - glog.Fatalf("Error generating self signed certificate: %v", err) + atomic.StoreInt32(&n.forceReload, 0) } - - fakeCertificateSHA = c.PemSHA - fakeCertificatePath = c.PemFileName } diff --git a/pkg/ingress/controller/launch.go b/pkg/ingress/controller/launch.go deleted file mode 100644 index d7b6aef0fb..0000000000 --- a/pkg/ingress/controller/launch.go +++ /dev/null @@ -1,335 +0,0 @@ -package controller - -import ( - "encoding/json" - "flag" - "fmt" - "net/http" - "net/http/pprof" - "os" - "strings" - "syscall" - "time" - - "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spf13/pflag" - - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apiserver/pkg/server/healthz" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - - "k8s.io/ingress-nginx/pkg/ingress" - "k8s.io/ingress-nginx/pkg/k8s" -) - -// NewIngressController returns a configured Ingress controller -func NewIngressController(backend ingress.Controller) *GenericController { - var ( - flags = pflag.NewFlagSet("", pflag.ExitOnError) - - apiserverHost = flags.String("apiserver-host", "", "The address of the Kubernetes Apiserver "+ - "to connect to in the format of protocol://address:port, e.g., "+ - "http://localhost:8080. If not specified, the assumption is that the binary runs inside a "+ - "Kubernetes cluster and local discovery is attempted.") - kubeConfigFile = flags.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") - - defaultSvc = flags.String("default-backend-service", "", - `Service used to serve a 404 page for the default backend. Takes the form - namespace/name. The controller uses the first node port of this Service for - the default backend.`) - - ingressClass = flags.String("ingress-class", "", - `Name of the ingress class to route through this controller.`) - - configMap = flags.String("configmap", "", - `Name of the ConfigMap that contains the custom configuration to use`) - - publishSvc = flags.String("publish-service", "", - `Service fronting the ingress controllers. Takes the form - namespace/name. The controller will set the endpoint records on the - ingress objects to reflect those on the service.`) - - tcpConfigMapName = flags.String("tcp-services-configmap", "", - `Name of the ConfigMap that contains the definition of the TCP services to expose. - The key in the map indicates the external port to be used. The value is the name of the - service with the format namespace/serviceName and the port of the service could be a - number of the name of the port. - The ports 80 and 443 are not allowed as external ports. This ports are reserved for the backend`) - - udpConfigMapName = flags.String("udp-services-configmap", "", - `Name of the ConfigMap that contains the definition of the UDP services to expose. - The key in the map indicates the external port to be used. The value is the name of the - service with the format namespace/serviceName and the port of the service could be a - number of the name of the port.`) - - resyncPeriod = flags.Duration("sync-period", 600*time.Second, - `Relist and confirm cloud resources this often. Default is 10 minutes`) - - watchNamespace = flags.String("watch-namespace", apiv1.NamespaceAll, - `Namespace to watch for Ingress. Default is to watch all namespaces`) - - healthzPort = flags.Int("healthz-port", 10254, "port for healthz endpoint.") - - profiling = flags.Bool("profiling", true, `Enable profiling via web interface host:port/debug/pprof/`) - - defSSLCertificate = flags.String("default-ssl-certificate", "", `Name of the secret - that contains a SSL certificate to be used as default for a HTTPS catch-all server`) - - defHealthzURL = flags.String("health-check-path", "/healthz", `Defines - the URL to be used as health check inside in the default server in NGINX.`) - - updateStatus = flags.Bool("update-status", true, `Indicates if the - ingress controller should update the Ingress status IP/hostname. Default is true`) - - electionID = flags.String("election-id", "ingress-controller-leader", `Election id to use for status update.`) - - forceIsolation = flags.Bool("force-namespace-isolation", false, - `Force namespace isolation. This flag is required to avoid the reference of secrets or - configmaps located in a different namespace than the specified in the flag --watch-namespace.`) - - disableNodeList = flags.Bool("disable-node-list", false, - `Disable querying nodes. If --force-namespace-isolation is true, this should also be set.`) - - updateStatusOnShutdown = flags.Bool("update-status-on-shutdown", true, `Indicates if the - ingress controller should update the Ingress status IP/hostname when the controller - is being stopped. Default is true`) - - sortBackends = flags.Bool("sort-backends", false, - `Defines if backends and it's endpoints should be sorted`) - - useNodeInternalIP = flags.Bool("report-node-internal-ip-address", false, - `Defines if the nodes IP address to be returned in the ingress status should be the internal instead of the external IP address`) - - showVersion = flags.Bool("version", false, - `Shows release information about the NGINX Ingress controller`) - ) - - flags.AddGoFlagSet(flag.CommandLine) - backend.ConfigureFlags(flags) - flags.Parse(os.Args) - // Workaround for this issue: - // https://github.com/kubernetes/kubernetes/issues/17162 - flag.CommandLine.Parse([]string{}) - - if *showVersion { - fmt.Println(backend.Info().String()) - os.Exit(0) - } - - backend.OverrideFlags(flags) - - flag.Set("logtostderr", "true") - - glog.Info(backend.Info()) - - if *ingressClass != "" { - glog.Infof("Watching for ingress class: %s", *ingressClass) - } - - if *defaultSvc == "" { - glog.Fatalf("Please specify --default-backend-service") - } - - kubeClient, err := createApiserverClient(*apiserverHost, *kubeConfigFile) - if err != nil { - handleFatalInitError(err) - } - - ns, name, err := k8s.ParseNameNS(*defaultSvc) - if err != nil { - glog.Fatalf("invalid format for service %v: %v", *defaultSvc, err) - } - - _, err = kubeClient.Core().Services(ns).Get(name, metav1.GetOptions{}) - if err != nil { - if strings.Contains(err.Error(), "cannot get services in the namespace") { - glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration") - } - glog.Fatalf("no service with name %v found: %v", *defaultSvc, err) - } - glog.Infof("validated %v as the default backend", *defaultSvc) - - if *publishSvc != "" { - ns, name, err := k8s.ParseNameNS(*publishSvc) - if err != nil { - glog.Fatalf("invalid service format: %v", err) - } - - svc, err := kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{}) - if err != nil { - glog.Fatalf("unexpected error getting information about service %v: %v", *publishSvc, err) - } - - if len(svc.Status.LoadBalancer.Ingress) == 0 { - if len(svc.Spec.ExternalIPs) > 0 { - glog.Infof("service %v validated as assigned with externalIP", *publishSvc) - } else { - // We could poll here, but we instead just exit and rely on k8s to restart us - glog.Fatalf("service %s does not (yet) have ingress points", *publishSvc) - } - } else { - glog.Infof("service %v validated as source of Ingress status", *publishSvc) - } - } - - if *watchNamespace != "" { - _, err = kubeClient.CoreV1().Namespaces().Get(*watchNamespace, metav1.GetOptions{}) - if err != nil { - glog.Fatalf("no watchNamespace with name %v found: %v", *watchNamespace, err) - } - } - - if resyncPeriod.Seconds() < 10 { - glog.Fatalf("resync period (%vs) is too low", resyncPeriod.Seconds()) - } - - err = os.MkdirAll(ingress.DefaultSSLDirectory, 0655) - if err != nil { - glog.Errorf("Failed to mkdir SSL directory: %v", err) - } - - config := &Configuration{ - UpdateStatus: *updateStatus, - ElectionID: *electionID, - Client: kubeClient, - ResyncPeriod: *resyncPeriod, - DefaultService: *defaultSvc, - IngressClass: *ingressClass, - DefaultIngressClass: backend.DefaultIngressClass(), - Namespace: *watchNamespace, - ConfigMapName: *configMap, - TCPConfigMapName: *tcpConfigMapName, - UDPConfigMapName: *udpConfigMapName, - DefaultSSLCertificate: *defSSLCertificate, - DefaultHealthzURL: *defHealthzURL, - PublishService: *publishSvc, - Backend: backend, - ForceNamespaceIsolation: *forceIsolation, - DisableNodeList: *disableNodeList, - UpdateStatusOnShutdown: *updateStatusOnShutdown, - SortBackends: *sortBackends, - UseNodeInternalIP: *useNodeInternalIP, - } - - ic := newIngressController(config) - go registerHandlers(*profiling, *healthzPort, ic) - return ic -} - -func registerHandlers(enableProfiling bool, port int, ic *GenericController) { - mux := http.NewServeMux() - // expose health check endpoint (/healthz) - healthz.InstallHandler(mux, - healthz.PingHealthz, - ic.cfg.Backend, - ) - - mux.Handle("/metrics", promhttp.Handler()) - - mux.HandleFunc("/build", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - b, _ := json.Marshal(ic.Info()) - w.Write(b) - }) - - mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) { - err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM) - if err != nil { - glog.Errorf("unexpected error: %v", err) - } - }) - - if enableProfiling { - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - } - - server := &http.Server{ - Addr: fmt.Sprintf(":%v", port), - Handler: mux, - } - glog.Fatal(server.ListenAndServe()) -} - -const ( - // High enough QPS to fit all expected use cases. QPS=0 is not set here, because - // client code is overriding it. - defaultQPS = 1e6 - // High enough Burst to fit all expected use cases. Burst=0 is not set here, because - // client code is overriding it. - defaultBurst = 1e6 -) - -// buildConfigFromFlags builds REST config based on master URL and kubeconfig path. -// If both of them are empty then in cluster config is used. -func buildConfigFromFlags(masterURL, kubeconfigPath string) (*rest.Config, error) { - if kubeconfigPath == "" && masterURL == "" { - kubeconfig, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - - return kubeconfig, nil - } - - return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( - &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath}, - &clientcmd.ConfigOverrides{ - ClusterInfo: clientcmdapi.Cluster{ - Server: masterURL, - }, - }).ClientConfig() -} - -// createApiserverClient creates new Kubernetes Apiserver client. When kubeconfig or apiserverHost param is empty -// the function assumes that it is running inside a Kubernetes cluster and attempts to -// discover the Apiserver. Otherwise, it connects to the Apiserver specified. -// -// apiserverHost param is in the format of protocol://address:port/pathPrefix, e.g.http://localhost:8001. -// kubeConfig location of kubeconfig file -func createApiserverClient(apiserverHost string, kubeConfig string) (*kubernetes.Clientset, error) { - cfg, err := buildConfigFromFlags(apiserverHost, kubeConfig) - if err != nil { - return nil, err - } - - cfg.QPS = defaultQPS - cfg.Burst = defaultBurst - cfg.ContentType = "application/vnd.kubernetes.protobuf" - - glog.Infof("Creating API client for %s", cfg.Host) - - client, err := kubernetes.NewForConfig(cfg) - if err != nil { - return nil, err - } - - v, err := client.Discovery().ServerVersion() - if err != nil { - return nil, err - } - - glog.Infof("Running in Kubernetes Cluster version v%v.%v (%v) - git (%v) commit %v - platform %v", - v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform) - - return client, nil -} - -/** - * Handles fatal init error that prevents server from doing any work. Prints verbose error - * message and quits the server. - */ -func handleFatalInitError(err error) { - glog.Fatalf("Error while initializing connection to Kubernetes apiserver. "+ - "This most likely means that the cluster is misconfigured (e.g., it has "+ - "invalid apiserver certificates or service accounts configuration). Reason: %s\n"+ - "Refer to the troubleshooting guide for more information: "+ - "https://github.com/kubernetes/ingress-nginx/blob/master/docs/troubleshooting.md", err) -} diff --git a/pkg/ingress/controller/listers.go b/pkg/ingress/controller/listers.go index a2da1ab1f9..49cb089873 100644 --- a/pkg/ingress/controller/listers.go +++ b/pkg/ingress/controller/listers.go @@ -64,20 +64,21 @@ func (c *cacheController) Run(stopCh chan struct{}) { } } -func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.StoreLister, *cacheController) { +func (n *NGINXController) createListers(disableNodeLister bool, stopCh chan struct{}) *ingress.StoreLister { // from here to the end of the method all the code is just boilerplate // required to watch Ingress, Secrets, ConfigMaps and Endoints. // This is used to detect new content, updates or removals and act accordingly ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) - if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + if !class.IsValid(addIng, n.cfg.IngressClass, defIngressClass) { a, _ := parser.GetStringAnnotation(class.IngressKey, addIng) glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) return } - ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) - ic.syncQueue.Enqueue(obj) + + n.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) + n.syncQueue.Enqueue(obj) }, DeleteFunc: func(obj interface{}) { delIng, ok := obj.(*extensions.Ingress) @@ -94,29 +95,29 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto return } } - if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + if !class.IsValid(delIng, n.cfg.IngressClass, defIngressClass) { glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) return } - ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) - ic.syncQueue.Enqueue(obj) + n.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) + n.syncQueue.Enqueue(obj) }, UpdateFunc: func(old, cur interface{}) { oldIng := old.(*extensions.Ingress) curIng := cur.(*extensions.Ingress) - validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) - validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) + validOld := class.IsValid(oldIng, n.cfg.IngressClass, defIngressClass) + validCur := class.IsValid(curIng, n.cfg.IngressClass, defIngressClass) if !validOld && validCur { glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) - ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else if validOld && !validCur { glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) - ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else if validCur && !reflect.DeepEqual(old, cur) { - ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } - ic.syncQueue.Enqueue(cur) + n.syncQueue.Enqueue(cur) }, } @@ -125,7 +126,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto if !reflect.DeepEqual(old, cur) { sec := cur.(*apiv1.Secret) key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - ic.syncSecret(key) + n.syncSecret(key) } }, DeleteFunc: func(obj interface{}) { @@ -144,23 +145,23 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto } } key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - ic.sslCertTracker.DeleteAll(key) - ic.syncQueue.Enqueue(key) + n.sslCertTracker.DeleteAll(key) + n.syncQueue.Enqueue(key) }, } eventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - ic.syncQueue.Enqueue(obj) + n.syncQueue.Enqueue(obj) }, DeleteFunc: func(obj interface{}) { - ic.syncQueue.Enqueue(obj) + n.syncQueue.Enqueue(obj) }, UpdateFunc: func(old, cur interface{}) { oep := old.(*apiv1.Endpoints) ocur := cur.(*apiv1.Endpoints) if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { - ic.syncQueue.Enqueue(cur) + n.syncQueue.Enqueue(cur) } }, } @@ -169,33 +170,33 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto AddFunc: func(obj interface{}) { upCmap := obj.(*apiv1.ConfigMap) mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == ic.cfg.ConfigMapName { + if mapKey == n.cfg.ConfigMapName { glog.V(2).Infof("adding configmap %v to backend", mapKey) - ic.cfg.Backend.SetConfig(upCmap) - ic.SetForceReload(true) + n.SetConfig(upCmap) + n.SetForceReload(true) } }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { upCmap := cur.(*apiv1.ConfigMap) mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == ic.cfg.ConfigMapName { + if mapKey == n.cfg.ConfigMapName { glog.V(2).Infof("updating configmap backend (%v)", mapKey) - ic.cfg.Backend.SetConfig(upCmap) - ic.SetForceReload(true) + n.SetConfig(upCmap) + n.SetForceReload(true) } // updates to configuration configmaps can trigger an update - if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName { - ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) - ic.syncQueue.Enqueue(cur) + if mapKey == n.cfg.ConfigMapName || mapKey == n.cfg.TCPConfigMapName || mapKey == n.cfg.UDPConfigMapName { + n.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) + n.syncQueue.Enqueue(cur) } } }, } watchNs := apiv1.NamespaceAll - if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll { - watchNs = ic.cfg.Namespace + if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll { + watchNs = n.cfg.Namespace } lister := &ingress.StoreLister{} @@ -203,34 +204,36 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto controller := &cacheController{} lister.Ingress.Store, controller.Ingress = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()), - &extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler) + cache.NewListWatchFromClient(n.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", n.cfg.Namespace, fields.Everything()), + &extensions.Ingress{}, n.cfg.ResyncPeriod, ingEventHandler) lister.Endpoint.Store, controller.Endpoint = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()), - &apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler) + cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "endpoints", n.cfg.Namespace, fields.Everything()), + &apiv1.Endpoints{}, n.cfg.ResyncPeriod, eventHandler) lister.Secret.Store, controller.Secret = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()), - &apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler) + cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()), + &apiv1.Secret{}, n.cfg.ResyncPeriod, secrEventHandler) lister.ConfigMap.Store, controller.Configmap = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()), - &apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler) + cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()), + &apiv1.ConfigMap{}, n.cfg.ResyncPeriod, mapEventHandler) lister.Service.Store, controller.Service = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()), - &apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) + cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "services", n.cfg.Namespace, fields.Everything()), + &apiv1.Service{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) var nodeListerWatcher cache.ListerWatcher if disableNodeLister { nodeListerWatcher = fcache.NewFakeControllerSource() } else { - nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) + nodeListerWatcher = cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) } lister.Node.Store, controller.Node = cache.NewInformer( nodeListerWatcher, - &apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) + &apiv1.Node{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) + + controller.Run(n.stopCh) - return lister, controller + return lister } diff --git a/pkg/nginx/metric/collector/nginx.go b/pkg/ingress/controller/metric/collector/nginx.go similarity index 100% rename from pkg/nginx/metric/collector/nginx.go rename to pkg/ingress/controller/metric/collector/nginx.go diff --git a/pkg/nginx/metric/collector/process.go b/pkg/ingress/controller/metric/collector/process.go similarity index 100% rename from pkg/nginx/metric/collector/process.go rename to pkg/ingress/controller/metric/collector/process.go diff --git a/pkg/nginx/metric/collector/scrape.go b/pkg/ingress/controller/metric/collector/scrape.go similarity index 100% rename from pkg/nginx/metric/collector/scrape.go rename to pkg/ingress/controller/metric/collector/scrape.go diff --git a/pkg/nginx/metric/collector/status.go b/pkg/ingress/controller/metric/collector/status.go similarity index 100% rename from pkg/nginx/metric/collector/status.go rename to pkg/ingress/controller/metric/collector/status.go diff --git a/pkg/nginx/metric/collector/status_test.go b/pkg/ingress/controller/metric/collector/status_test.go similarity index 100% rename from pkg/nginx/metric/collector/status_test.go rename to pkg/ingress/controller/metric/collector/status_test.go diff --git a/pkg/nginx/metric/collector/vts.go b/pkg/ingress/controller/metric/collector/vts.go similarity index 100% rename from pkg/nginx/metric/collector/vts.go rename to pkg/ingress/controller/metric/collector/vts.go diff --git a/pkg/ingress/controller/metrics.go b/pkg/ingress/controller/metrics.go index d51845fec6..e46223444e 100644 --- a/pkg/ingress/controller/metrics.go +++ b/pkg/ingress/controller/metrics.go @@ -34,7 +34,6 @@ func init() { prometheus.MustRegister(reloadOperation) prometheus.MustRegister(reloadOperationErrors) prometheus.MustRegister(sslExpireTime) - } var ( @@ -74,11 +73,9 @@ func incReloadErrorCount() { } func setSSLExpireTime(servers []*ingress.Server) { - for _, s := range servers { if s.Hostname != defServerName { sslExpireTime.WithLabelValues(s.Hostname).Set(float64(s.SSLExpireTime.Unix())) } } - } diff --git a/pkg/nginx/controller/nginx.go b/pkg/ingress/controller/nginx.go similarity index 57% rename from pkg/nginx/controller/nginx.go rename to pkg/ingress/controller/nginx.go index 7d4e9e56cd..c41509456a 100644 --- a/pkg/nginx/controller/nginx.go +++ b/pkg/ingress/controller/nginx.go @@ -23,31 +23,36 @@ import ( "fmt" "io/ioutil" "net" - "net/http" "os" "os/exec" "strconv" "strings" + "sync" "syscall" "time" "github.com/golang/glog" - "github.com/mitchellh/go-ps" - "github.com/spf13/pflag" - proxyproto "github.com/armon/go-proxyproto" - "github.com/ncabatoff/process-exporter/proc" apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" "k8s.io/ingress-nginx/pkg/ingress" - "k8s.io/ingress-nginx/pkg/ingress/controller" + "k8s.io/ingress-nginx/pkg/ingress/annotations/class" + "k8s.io/ingress-nginx/pkg/ingress/annotations/parser" + ngx_config "k8s.io/ingress-nginx/pkg/ingress/controller/config" + "k8s.io/ingress-nginx/pkg/ingress/controller/process" + ngx_template "k8s.io/ingress-nginx/pkg/ingress/controller/template" "k8s.io/ingress-nginx/pkg/ingress/defaults" + "k8s.io/ingress-nginx/pkg/ingress/status" + ing_net "k8s.io/ingress-nginx/pkg/net" "k8s.io/ingress-nginx/pkg/net/dns" "k8s.io/ingress-nginx/pkg/net/ssl" - "k8s.io/ingress-nginx/pkg/nginx/config" - ngx_template "k8s.io/ingress-nginx/pkg/nginx/template" - "k8s.io/ingress-nginx/version" + "k8s.io/ingress-nginx/pkg/task" ) type statusModule string @@ -57,8 +62,6 @@ const ( defaultStatusModule statusModule = "default" vtsStatusModule statusModule = "vts" - - defUpstreamName = "upstream-default-backend" ) var ( @@ -71,26 +74,66 @@ var ( // NewNGINXController creates a new NGINX Ingress controller. // If the environment variable NGINX_BINARY exists it will be used // as source for nginx commands -func NewNGINXController() *NGINXController { +func NewNGINXController(config *Configuration) *NGINXController { ngx := os.Getenv("NGINX_BINARY") if ngx == "" { ngx = nginxBinary } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ + Interface: config.Client.CoreV1().Events(config.Namespace), + }) + h, err := dns.GetSystemNameServers() if err != nil { glog.Warningf("unexpected error reading system nameservers: %v", err) } n := &NGINXController{ + backendDefaults: ngx_config.NewDefault().Backend, binary: ngx, - configmap: &apiv1.ConfigMap{}, - isIPV6Enabled: isIPv6Enabled(), + + configmap: &apiv1.ConfigMap{}, + + isIPV6Enabled: ing_net.IsIPv6Enabled(), + resolver: h, - ports: &config.ListenPorts{}, - backendDefaults: config.NewDefault().Backend, + cfg: config, + sslCertTracker: newSSLCertTracker(), + syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), + + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ + Component: "nginx-ingress-controller", + }), + + stopCh: make(chan struct{}), + stopLock: &sync.Mutex{}, } + n.stats = newStatsCollector(config.Namespace, config.IngressClass, n.binary, n.cfg.ListenPorts.Status) + + n.syncQueue = task.NewTaskQueue(n.syncIngress) + + n.listers = n.createListers(config.DisableNodeList, n.stopCh) + + if config.UpdateStatus { + n.syncStatus = status.NewStatusSyncer(status.Config{ + Client: config.Client, + PublishService: n.cfg.PublishService, + IngressLister: n.listers.Ingress, + ElectionID: config.ElectionID, + IngressClass: config.IngressClass, + DefaultIngressClass: config.DefaultIngressClass, + UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, + UseNodeInternalIP: n.cfg.UseNodeInternalIP, + }) + } else { + glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)") + } + n.annotations = newAnnotationExtractor(n) + var onChange func() onChange = func() { template, err := ngx_template.NewTemplate(tmplPath, onChange) @@ -107,7 +150,7 @@ Error loading new template : %v n.t.Close() n.t = template glog.Info("new NGINX template loaded") - n.controller.SetForceReload(true) + n.SetForceReload(true) } ngxTpl, err := ngx_template.NewTemplate(tmplPath, onChange) @@ -122,8 +165,40 @@ Error loading new template : %v // NGINXController ... type NGINXController struct { - controller *controller.GenericController - t *ngx_template.Template + cfg *Configuration + + listers *ingress.StoreLister + + annotations annotationExtractor + + recorder record.EventRecorder + + syncQueue *task.Queue + + syncStatus status.Sync + + // local store of SSL certificates + // (only certificates used in ingress) + sslCertTracker *sslCertTracker + + syncRateLimiter flowcontrol.RateLimiter + + // stopLock is used to enforce only a single call to Stop is active. + // Needed because we allow stopping through an http endpoint and + // allowing concurrent stoppers leads to stack traces. + stopLock *sync.Mutex + + stopCh chan struct{} + + // ngxErrCh channel used to detect errors with the nginx processes + ngxErrCh chan error + + // runningConfig contains the running configuration in the Backend + runningConfig *ingress.Configuration + + forceReload int32 + + t *ngx_template.Template configmap *apiv1.ConfigMap @@ -132,8 +207,6 @@ type NGINXController struct { binary string resolver []net.IP - cmdArgs []string - stats *statsCollector statusModule statusModule @@ -141,25 +214,42 @@ type NGINXController struct { isIPV6Enabled bool // returns true if proxy protocol es enabled - isProxyProtocolEnabled bool + IsProxyProtocolEnabled bool isSSLPassthroughEnabled bool isShuttingDown bool - proxy *proxy - - ports *config.ListenPorts + Proxy *TCPProxy backendDefaults defaults.Backend } // Start start a new NGINX master process running in foreground. func (n *NGINXController) Start() { - n.isShuttingDown = false + glog.Infof("starting Ingress controller") + + // initial sync of secrets to avoid unnecessary reloads + glog.Info("running initial sync of secrets") + for _, obj := range n.listers.Ingress.List() { + ing := obj.(*extensions.Ingress) + + if !class.IsValid(ing, n.cfg.IngressClass, n.cfg.DefaultIngressClass) { + a, _ := parser.GetStringAnnotation(class.IngressKey, ing) + glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) + continue + } + + n.readSecrets(ing) + } - n.controller = controller.NewIngressController(n) - go n.controller.Start() + go n.syncQueue.Run(time.Second, n.stopCh) + + if n.syncStatus != nil { + go n.syncStatus.Run(n.stopCh) + } + + go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh) done := make(chan error, 1) cmd := exec.Command(n.binary, "-c", cfgPath) @@ -172,68 +262,55 @@ func (n *NGINXController) Start() { } glog.Info("starting NGINX process...") - n.start(cmd, done) + n.start(cmd) - // if the nginx master process dies the workers continue to process requests, - // passing checks but in case of updates in ingress no updates will be - // reflected in the nginx configuration which can lead to confusion and report - // issues because of this behavior. - // To avoid this issue we restart nginx in case of errors. - for { - err := <-done + // force initial sync + n.syncQueue.Enqueue(&extensions.Ingress{}) - if n.isShuttingDown { - break - } - - if exitError, ok := err.(*exec.ExitError); ok { - waitStatus := exitError.Sys().(syscall.WaitStatus) - glog.Warningf(` -------------------------------------------------------------------------------- -NGINX master process died (%v): %v -------------------------------------------------------------------------------- -`, waitStatus.ExitStatus(), err) - } - cmd.Process.Release() - cmd = exec.Command(n.binary, "-c", cfgPath) - // we wait until the workers are killed - for { - conn, err := net.DialTimeout("tcp", "127.0.0.1:80", 1*time.Second) - if err != nil { + for { + select { + case err := <-done: + if n.isShuttingDown { break } - conn.Close() - // kill nginx worker processes - fs, err := proc.NewFS("/proc") - procs, _ := fs.FS.AllProcs() - for _, p := range procs { - pn, err := p.Comm() - if err != nil { - glog.Errorf("unexpected error obtaining process information: %v", err) - continue - } - if pn == "nginx" { - osp, err := os.FindProcess(p.PID) - if err != nil { - glog.Errorf("unexpected error obtaining process information: %v", err) - continue - } - osp.Signal(syscall.SIGQUIT) - } + // if the nginx master process dies the workers continue to process requests, + // passing checks but in case of updates in ingress no updates will be + // reflected in the nginx configuration which can lead to confusion and report + // issues because of this behavior. + // To avoid this issue we restart nginx in case of errors. + if process.IsRespawnIfRequired(err) { + process.WaitUntilPortIsAvailable(n.cfg.ListenPorts.HTTP) + // release command resources + cmd.Process.Release() + cmd = exec.Command(n.binary, "-c", cfgPath) + // start a new nginx master process if the controller is not being stopped + n.start(cmd) } - time.Sleep(100 * time.Millisecond) + case <-n.stopCh: + break } - // restart a new nginx master process if the controller - // is not being stopped - n.start(cmd, done) } } // Stop gracefully stops the NGINX master process. func (n *NGINXController) Stop() error { n.isShuttingDown = true - n.controller.Stop() + + n.stopLock.Lock() + defer n.stopLock.Unlock() + + // Only try draining the workqueue if we haven't already. + if n.syncQueue.IsShuttingDown() { + return fmt.Errorf("shutdown already in progress") + } + + glog.Infof("shutting down controller queues") + close(n.stopCh) + go n.syncQueue.Shutdown() + if n.syncStatus != nil { + n.syncStatus.Shutdown() + } // Send stop signal to Nginx glog.Info("stopping NGINX process...") @@ -246,192 +323,42 @@ func (n *NGINXController) Stop() error { } // Wait for the Nginx process disappear - waitForNginxShutdown() - glog.Info("NGINX process has stopped") + timer := time.NewTicker(time.Second * 1) + for t := range timer.C { + glog.V(3).Infof("tick at", t) + if !process.IsNginxRunning() { + glog.Info("NGINX process has stopped") + timer.Stop() + break + } + } return nil } -func (n *NGINXController) start(cmd *exec.Cmd, done chan error) { +func (n *NGINXController) start(cmd *exec.Cmd) { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { glog.Fatalf("nginx error: %v", err) - done <- err + n.ngxErrCh <- err return } - n.cmdArgs = cmd.Args - go func() { - done <- cmd.Wait() + n.ngxErrCh <- cmd.Wait() }() } -// BackendDefaults returns the nginx defaults -func (n NGINXController) BackendDefaults() defaults.Backend { - return n.backendDefaults -} - -// printDiff returns the difference between the running configuration -// and the new one -func (n NGINXController) printDiff(data []byte) { - if !glog.V(2) { - return - } - - in, err := os.Open(cfgPath) - if err != nil { - return - } - src, err := ioutil.ReadAll(in) - in.Close() - if err != nil { - return - } - - if !bytes.Equal(src, data) { - tmpfile, err := ioutil.TempFile("", "nginx-cfg-diff") - if err != nil { - glog.Errorf("error creating temporal file: %s", err) - return - } - defer tmpfile.Close() - err = ioutil.WriteFile(tmpfile.Name(), data, 0644) - if err != nil { - return - } - - diffOutput, err := diff(src, data) - if err != nil { - glog.Errorf("error computing diff: %s", err) - return - } - - glog.Infof("NGINX configuration diff\n") - glog.Infof("%v", string(diffOutput)) - - os.Remove(tmpfile.Name()) - } -} - -// Info return build information -func (n NGINXController) Info() *ingress.BackendInfo { - return &ingress.BackendInfo{ - Name: "NGINX", - Release: version.RELEASE, - Build: version.COMMIT, - Repository: version.REPO, - } -} - // DefaultEndpoint returns the default endpoint to be use as default server that returns 404. func (n NGINXController) DefaultEndpoint() ingress.Endpoint { return ingress.Endpoint{ Address: "127.0.0.1", - Port: fmt.Sprintf("%v", n.ports.Default), + Port: fmt.Sprintf("%v", n.cfg.ListenPorts.Default), Target: &apiv1.ObjectReference{}, } } -// ConfigureFlags allow to configure more flags before the parsing of -// command line arguments -func (n *NGINXController) ConfigureFlags(flags *pflag.FlagSet) { - flags.BoolVar(&n.isSSLPassthroughEnabled, "enable-ssl-passthrough", false, `Enable SSL passthrough feature. Default is disabled`) - flags.IntVar(&n.ports.HTTP, "http-port", 80, `Indicates the port to use for HTTP traffic`) - flags.IntVar(&n.ports.HTTPS, "https-port", 443, `Indicates the port to use for HTTPS traffic`) - flags.IntVar(&n.ports.Status, "status-port", 18080, `Indicates the TCP port to use for exposing the nginx status page`) - flags.IntVar(&n.ports.SSLProxy, "ssl-passtrough-proxy-port", 442, `Default port to use internally for SSL when SSL Passthgough is enabled`) - flags.IntVar(&n.ports.Default, "default-server-port", 8181, `Default port to use for exposing the default server (catch all)`) -} - -// OverrideFlags customize NGINX controller flags -func (n *NGINXController) OverrideFlags(flags *pflag.FlagSet) { - // we check port collisions - if !isPortAvailable(n.ports.HTTP) { - glog.Fatalf("Port %v is already in use. Please check the flag --http-port", n.ports.HTTP) - } - if !isPortAvailable(n.ports.HTTPS) { - glog.Fatalf("Port %v is already in use. Please check the flag --https-port", n.ports.HTTPS) - } - if !isPortAvailable(n.ports.Status) { - glog.Fatalf("Port %v is already in use. Please check the flag --status-port", n.ports.Status) - } - if !isPortAvailable(n.ports.Default) { - glog.Fatalf("Port %v is already in use. Please check the flag --default-server-port", n.ports.Default) - } - - ic, _ := flags.GetString("ingress-class") - wc, _ := flags.GetString("watch-namespace") - - if ic == "" { - ic = defIngressClass - } - - if ic != defIngressClass { - glog.Warningf("only Ingress with class %v will be processed by this ingress controller", ic) - } - - flags.Set("ingress-class", ic) - - h, _ := flags.GetInt("healthz-port") - n.ports.Health = h - - n.stats = newStatsCollector(wc, ic, n.binary, n.ports.Status) - - if n.isSSLPassthroughEnabled { - if !isPortAvailable(n.ports.SSLProxy) { - glog.Fatalf("Port %v is already in use. Please check the flag --ssl-passtrough-proxy-port", n.ports.SSLProxy) - } - - glog.Info("starting TLS proxy for SSL passthrough") - n.proxy = &proxy{ - Default: &server{ - Hostname: "localhost", - IP: "127.0.0.1", - Port: n.ports.SSLProxy, - ProxyProtocol: true, - }, - } - - listener, err := net.Listen("tcp", fmt.Sprintf(":%v", n.ports.HTTPS)) - if err != nil { - glog.Fatalf("%v", err) - } - - proxyList := &proxyproto.Listener{Listener: listener} - - // start goroutine that accepts tcp connections in port 443 - go func() { - for { - var conn net.Conn - var err error - - if n.isProxyProtocolEnabled { - // we need to wrap the listener in order to decode - // proxy protocol before handling the connection - conn, err = proxyList.Accept() - } else { - conn, err = listener.Accept() - } - - if err != nil { - glog.Warningf("unexpected error accepting tcp connection: %v", err) - continue - } - - glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr()) - go n.proxy.Handle(conn) - } - }() - } -} - -// DefaultIngressClass just return the default ingress class -func (n NGINXController) DefaultIngressClass() string { - return defIngressClass -} - // testTemplate checks if the NGINX configuration inside the byte array is valid // running the command "nginx -t" using a temporal file. func (n NGINXController) testTemplate(cfg []byte) error { @@ -466,7 +393,7 @@ Error: %v // SetConfig sets the configured configmap func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) { n.configmap = cmap - n.isProxyProtocolEnabled = false + n.IsProxyProtocolEnabled = false m := map[string]string{} if cmap != nil { @@ -477,7 +404,7 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) { if ok { b, err := strconv.ParseBool(val) if err == nil { - n.isProxyProtocolEnabled = b + n.IsProxyProtocolEnabled = b } } @@ -494,16 +421,6 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) { n.backendDefaults = c.Backend } -// SetListers sets the configured store listers in the generic ingress controller -func (n *NGINXController) SetListers(lister *ingress.StoreLister) { - n.storeLister = lister -} - -// UpdateIngressStatus custom Ingress status update -func (n *NGINXController) UpdateIngressStatus(*extensions.Ingress) []apiv1.LoadBalancerIngress { - return nil -} - // OnUpdate is called by syncQueue in https://github.com/kubernetes/ingress-nginx/blob/master/pkg/ingress/controller/controller.go#L426 // periodically to keep the configuration in sync. // @@ -516,7 +433,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { cfg := ngx_template.ReadConfig(n.configmap.Data) cfg.Resolver = n.resolver - servers := []*server{} + servers := []*TCPServer{} for _, pb := range ingressCfg.PassthroughBackends { svc := pb.Service if svc == nil { @@ -541,7 +458,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { } //TODO: Allow PassthroughBackends to specify they support proxy-protocol - servers = append(servers, &server{ + servers = append(servers, &TCPServer{ Hostname: pb.Hostname, IP: svc.Spec.ClusterIP, Port: port, @@ -550,7 +467,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { } if n.isSSLPassthroughEnabled { - n.proxy.ServerList = servers + n.Proxy.ServerList = servers } // we need to check if the status module configuration changed @@ -671,7 +588,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { cfg.SSLDHParam = sslDHParam - tc := config.TemplateConfig{ + tc := ngx_config.TemplateConfig{ ProxySetHeaders: setHeaders, AddHeaders: addHeaders, MaxOpenFiles: maxOpenFiles, @@ -687,8 +604,8 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6, RedirectServers: redirectServers, IsSSLPassthroughEnabled: n.isSSLPassthroughEnabled, - ListenPorts: n.ports, - PublishService: n.controller.GetPublishService(), + ListenPorts: n.cfg.ListenPorts, + PublishService: n.GetPublishService(), } content, err := n.t.Write(tc) @@ -702,7 +619,34 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { return err } - n.printDiff(content) + if glog.V(2) { + src, _ := ioutil.ReadFile(cfgPath) + if !bytes.Equal(src, content) { + tmpfile, err := ioutil.TempFile("", "new-nginx-cfg") + if err != nil { + return err + } + defer tmpfile.Close() + err = ioutil.WriteFile(tmpfile.Name(), content, 0644) + if err != nil { + return err + } + + diffOutput, err := exec.Command("diff", "-u", cfgPath, tmpfile.Name()).CombinedOutput() + if err != nil { + return err + } + + glog.Infof("NGINX configuration diff\n") + glog.Infof("%v\n", string(diffOutput)) + + // Do not use defer to remove the temporal file. + // This is helpful when there is an error in the + // temporal configuration (we can manually inspect the file). + // Only remove the file when no error occured. + os.Remove(tmpfile.Name()) + } + } err = ioutil.WriteFile(cfgPath, content, 0644) if err != nil { @@ -727,46 +671,6 @@ func nginxHashBucketSize(longestString int) int { return nextPowerOf2(rawSize) } -// Name returns the healthcheck name -func (n NGINXController) Name() string { - return "Ingress Controller" -} - -// Check returns if the nginx healthz endpoint is returning ok (status code 200) -func (n NGINXController) Check(_ *http.Request) error { - res, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", n.ports.Status, ngxHealthPath)) - if err != nil { - return err - } - defer res.Body.Close() - if res.StatusCode != 200 { - return fmt.Errorf("ingress controller is not healthy") - } - - // check the nginx master process is running - fs, err := proc.NewFS("/proc") - if err != nil { - glog.Errorf("%v", err) - return err - } - f, err := ioutil.ReadFile("/run/nginx.pid") - if err != nil { - glog.Errorf("%v", err) - return err - } - pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n")) - if err != nil { - return err - } - _, err = fs.NewProc(int(pid)) - if err != nil { - glog.Errorf("%v", err) - return err - } - - return nil -} - // http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 // https://play.golang.org/p/TVSyCcdxUh func nextPowerOf2(v int) int { @@ -780,32 +684,3 @@ func nextPowerOf2(v int) int { return v } - -func isIPv6Enabled() bool { - cmd := exec.Command("test", "-f", "/proc/net/if_inet6") - return cmd.Run() == nil -} - -// isNginxRunning returns true if a process with the name 'nginx' is found -func isNginxProcessPresent() bool { - processes, _ := ps.Processes() - for _, p := range processes { - if p.Executable() == "nginx" { - return true - } - } - return false -} - -func waitForNginxShutdown() { - timer := time.NewTicker(time.Second * 1) - defer timer.Stop() - for { - select { - case <-timer.C: - if !isNginxProcessPresent() { - return - } - } - } -} diff --git a/pkg/nginx/controller/nginx_test.go b/pkg/ingress/controller/nginx_test.go similarity index 100% rename from pkg/nginx/controller/nginx_test.go rename to pkg/ingress/controller/nginx_test.go diff --git a/pkg/ingress/controller/process/nginx.go b/pkg/ingress/controller/process/nginx.go new file mode 100644 index 0000000000..4ffaaa7d12 --- /dev/null +++ b/pkg/ingress/controller/process/nginx.go @@ -0,0 +1,71 @@ +package process + +import ( + "fmt" + "net" + "os" + "os/exec" + "syscall" + "time" + + "github.com/golang/glog" + ps "github.com/mitchellh/go-ps" + "github.com/ncabatoff/process-exporter/proc" +) + +func IsRespawnIfRequired(err error) bool { + exitError, ok := err.(*exec.ExitError) + if !ok { + return false + } + + waitStatus := exitError.Sys().(syscall.WaitStatus) + glog.Warningf(` +------------------------------------------------------------------------------- +NGINX master process died (%v): %v +------------------------------------------------------------------------------- +`, waitStatus.ExitStatus(), err) + return true +} + +func WaitUntilPortIsAvailable(port int) { + // we wait until the workers are killed + for { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%v", port), 1*time.Second) + if err != nil { + break + } + conn.Close() + // kill nginx worker processes + fs, err := proc.NewFS("/proc") + procs, _ := fs.FS.AllProcs() + for _, p := range procs { + pn, err := p.Comm() + if err != nil { + glog.Errorf("unexpected error obtaining process information: %v", err) + continue + } + + if pn == "nginx" { + osp, err := os.FindProcess(p.PID) + if err != nil { + glog.Errorf("unexpected error obtaining process information: %v", err) + continue + } + osp.Signal(syscall.SIGQUIT) + } + } + time.Sleep(100 * time.Millisecond) + } +} + +// IsNginxRunning returns true if a process with the name 'nginx' is found +func IsNginxRunning() bool { + processes, _ := ps.Processes() + for _, p := range processes { + if p.Executable() == "nginx" { + return true + } + } + return false +} diff --git a/pkg/nginx/controller/metrics.go b/pkg/ingress/controller/stat_collector.go similarity index 97% rename from pkg/nginx/controller/metrics.go rename to pkg/ingress/controller/stat_collector.go index acf25c8089..2ce19c1fde 100644 --- a/pkg/nginx/controller/metrics.go +++ b/pkg/ingress/controller/stat_collector.go @@ -20,7 +20,7 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" - "k8s.io/ingress-nginx/pkg/nginx/metric/collector" + "k8s.io/ingress-nginx/pkg/ingress/controller/metric/collector" ) const ( diff --git a/pkg/nginx/controller/tcp.go b/pkg/ingress/controller/tcp.go similarity index 91% rename from pkg/nginx/controller/tcp.go rename to pkg/ingress/controller/tcp.go index 596d693f44..ec4a5cbcc3 100644 --- a/pkg/nginx/controller/tcp.go +++ b/pkg/ingress/controller/tcp.go @@ -10,19 +10,19 @@ import ( "github.com/paultag/sniff/parser" ) -type server struct { +type TCPServer struct { Hostname string IP string Port int ProxyProtocol bool } -type proxy struct { - ServerList []*server - Default *server +type TCPProxy struct { + ServerList []*TCPServer + Default *TCPServer } -func (p *proxy) Get(host string) *server { +func (p *TCPProxy) Get(host string) *TCPServer { if p.ServerList == nil { return p.Default } @@ -36,7 +36,7 @@ func (p *proxy) Get(host string) *server { return p.Default } -func (p *proxy) Handle(conn net.Conn) { +func (p *TCPProxy) Handle(conn net.Conn) { defer conn.Close() data := make([]byte, 4096) diff --git a/pkg/nginx/template/configmap.go b/pkg/ingress/controller/template/configmap.go similarity index 95% rename from pkg/nginx/template/configmap.go rename to pkg/ingress/controller/template/configmap.go index b6ebb3e047..fa8ecbf6cc 100644 --- a/pkg/nginx/template/configmap.go +++ b/pkg/ingress/controller/template/configmap.go @@ -26,8 +26,8 @@ import ( "github.com/mitchellh/mapstructure" + "k8s.io/ingress-nginx/pkg/ingress/controller/config" ing_net "k8s.io/ingress-nginx/pkg/net" - "k8s.io/ingress-nginx/pkg/nginx/config" ) const ( @@ -41,11 +41,9 @@ const ( // ReadConfig obtains the configuration defined by the user merged with the defaults. func ReadConfig(src map[string]string) config.Configuration { conf := map[string]string{} - if src != nil { - // we need to copy the configmap data because the content is altered - for k, v := range src { - conf[k] = v - } + // we need to copy the configmap data because the content is altered + for k, v := range src { + conf[k] = v } errors := make([]int, 0) diff --git a/pkg/nginx/template/configmap_test.go b/pkg/ingress/controller/template/configmap_test.go similarity index 98% rename from pkg/nginx/template/configmap_test.go rename to pkg/ingress/controller/template/configmap_test.go index fc176a4337..a25d828a73 100644 --- a/pkg/nginx/template/configmap_test.go +++ b/pkg/ingress/controller/template/configmap_test.go @@ -21,7 +21,7 @@ import ( "github.com/kylelemons/godebug/pretty" - "k8s.io/ingress-nginx/pkg/nginx/config" + "k8s.io/ingress-nginx/pkg/ingress/controller/config" ) func TestFilterErrors(t *testing.T) { diff --git a/pkg/nginx/template/template.go b/pkg/ingress/controller/template/template.go similarity index 99% rename from pkg/nginx/template/template.go rename to pkg/ingress/controller/template/template.go index 794099a6a8..ef640242c5 100644 --- a/pkg/nginx/template/template.go +++ b/pkg/ingress/controller/template/template.go @@ -37,8 +37,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-nginx/pkg/ingress" "k8s.io/ingress-nginx/pkg/ingress/annotations/ratelimit" + "k8s.io/ingress-nginx/pkg/ingress/controller/config" ing_net "k8s.io/ingress-nginx/pkg/net" - "k8s.io/ingress-nginx/pkg/nginx/config" "k8s.io/ingress-nginx/pkg/watch" ) diff --git a/pkg/nginx/template/template_test.go b/pkg/ingress/controller/template/template_test.go similarity index 97% rename from pkg/nginx/template/template_test.go rename to pkg/ingress/controller/template/template_test.go index 2fc1925738..3d0b999915 100644 --- a/pkg/nginx/template/template_test.go +++ b/pkg/ingress/controller/template/template_test.go @@ -29,7 +29,7 @@ import ( "k8s.io/ingress-nginx/pkg/ingress" "k8s.io/ingress-nginx/pkg/ingress/annotations/authreq" "k8s.io/ingress-nginx/pkg/ingress/annotations/rewrite" - "k8s.io/ingress-nginx/pkg/nginx/config" + "k8s.io/ingress-nginx/pkg/ingress/controller/config" ) var ( @@ -158,7 +158,7 @@ func TestBuildAuthResponseHeaders(t *testing.T) { func TestTemplateWithData(t *testing.T) { pwd, _ := os.Getwd() - f, err := os.Open(path.Join(pwd, "../../../test/data/config.json")) + f, err := os.Open(path.Join(pwd, "../../../../test/data/config.json")) if err != nil { t.Errorf("unexpected error reading json file: %v", err) } @@ -174,7 +174,7 @@ func TestTemplateWithData(t *testing.T) { if dat.ListenPorts == nil { dat.ListenPorts = &config.ListenPorts{} } - tf, err := os.Open(path.Join(pwd, "../../../rootfs/etc/nginx/template/nginx.tmpl")) + tf, err := os.Open(path.Join(pwd, "../../../../rootfs/etc/nginx/template/nginx.tmpl")) if err != nil { t.Errorf("unexpected error reading json file: %v", err) } @@ -193,7 +193,7 @@ func TestTemplateWithData(t *testing.T) { func BenchmarkTemplateWithData(b *testing.B) { pwd, _ := os.Getwd() - f, err := os.Open(path.Join(pwd, "../../../test/data/config.json")) + f, err := os.Open(path.Join(pwd, "../../../../test/data/config.json")) if err != nil { b.Errorf("unexpected error reading json file: %v", err) } @@ -207,7 +207,7 @@ func BenchmarkTemplateWithData(b *testing.B) { b.Errorf("unexpected error unmarshalling json: %v", err) } - tf, err := os.Open(path.Join(pwd, "../../rootfs/etc/nginx/template/nginx.tmpl")) + tf, err := os.Open(path.Join(pwd, "../../../rootfs/etc/nginx/template/nginx.tmpl")) if err != nil { b.Errorf("unexpected error reading json file: %v", err) } diff --git a/pkg/nginx/controller/utils.go b/pkg/ingress/controller/utils.go similarity index 71% rename from pkg/nginx/controller/utils.go rename to pkg/ingress/controller/utils.go index 07259883b4..798ea4a766 100644 --- a/pkg/nginx/controller/utils.go +++ b/pkg/ingress/controller/utils.go @@ -17,11 +17,6 @@ limitations under the License. package controller import ( - "fmt" - "io/ioutil" - "net" - "os" - "os/exec" "syscall" "github.com/golang/glog" @@ -54,34 +49,3 @@ func sysctlFSFileMax() int { } return int(rLimit.Max) } - -func diff(b1, b2 []byte) ([]byte, error) { - f1, err := ioutil.TempFile("", "a") - if err != nil { - return nil, err - } - defer f1.Close() - defer os.Remove(f1.Name()) - - f2, err := ioutil.TempFile("", "b") - if err != nil { - return nil, err - } - defer f2.Close() - defer os.Remove(f2.Name()) - - f1.Write(b1) - f2.Write(b2) - - out, _ := exec.Command("diff", "-u", f1.Name(), f2.Name()).CombinedOutput() - return out, nil -} - -func isPortAvailable(p int) bool { - ln, err := net.Listen("tcp", fmt.Sprintf(":%v", p)) - if err != nil { - return false - } - ln.Close() - return true -} diff --git a/pkg/ingress/doc.go b/pkg/ingress/doc.go deleted file mode 100644 index 068cdaecce..0000000000 --- a/pkg/ingress/doc.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package ingress - -// This package contains the interface is required to implement to build an Ingress controller -// A dummy implementation could be -// -// func main() { -// dc := newDummyController() -// controller.NewIngressController(dc) -// glog.Infof("shutting down Ingress controller...") -// } -// -//where newDummyController returns and implementation of the Controller interface: -// -// func newDummyController() ingress.Controller { -// return &DummyController{} -// } -// -// type DummyController struct { -// } -// -// func (dc DummyController) Reload(data []byte) ([]byte, error) { -// err := ioutil.WriteFile("/arbitrary-path", data, 0644) -// if err != nil { -// return nil, err -// } -// -// return exec.Command("some command", "--reload").CombinedOutput() -// } -// -// func (dc DummyController) Test(file string) *exec.Cmd { -// return exec.Command("some command", "--config-file", file) -// } -// -// func (dc DummyController) OnUpdate(*api.ConfigMap, Configuration) ([]byte, error) { -// return []byte(``) -// } -// -// func (dc DummyController) BackendDefaults() defaults.Backend { -// return ingress.NewStandardDefaults() -// } -// -// func (n DummyController) Name() string { -// return "dummy Controller" -// } -// -// func (n DummyController) Check(_ *http.Request) error { -// return nil -// } -// -// func (dc DummyController) Info() *BackendInfo { -// Name: "dummy", -// Release: "0.0.0", -// Build: "git-00000000", -// Repository: "git://foo.bar.com", -// } diff --git a/pkg/ingress/status/status.go b/pkg/ingress/status/status.go index 4a013b8507..9be568b090 100644 --- a/pkg/ingress/status/status.go +++ b/pkg/ingress/status/status.go @@ -72,9 +72,6 @@ type Config struct { DefaultIngressClass string IngressClass string - - // CustomIngressStatus allows to set custom values in Ingress status - CustomIngressStatus func(*extensions.Ingress) []apiv1.LoadBalancerIngress } // statusSync keeps the status IP in each Ingress rule updated executing a periodic check @@ -252,9 +249,8 @@ func (s *statusSync) runningAddresses() ([]string, error) { addrs = append(addrs, ip.IP) } } - for _, ip := range svc.Spec.ExternalIPs { - addrs = append(addrs, ip) - } + + addrs = append(addrs, svc.Spec.ExternalIPs...) return addrs, nil } @@ -307,8 +303,6 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress { } // updateStatus changes the status information of Ingress rules -// If the backend function CustomIngressStatus returns a value different -// of nil then it uses the returned value or the newIngressPoint values func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) { ings := s.IngressLister.List() @@ -324,7 +318,7 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) { continue } - batch.Queue(runUpdate(ing, newIngressPoint, s.Client, s.CustomIngressStatus)) + batch.Queue(runUpdate(ing, newIngressPoint, s.Client)) } batch.QueueComplete() @@ -332,18 +326,13 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) { } func runUpdate(ing *extensions.Ingress, status []apiv1.LoadBalancerIngress, - client clientset.Interface, - statusFunc func(*extensions.Ingress) []apiv1.LoadBalancerIngress) pool.WorkFunc { + client clientset.Interface) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { if wu.IsCancelled() { return nil, nil } addrs := status - ca := statusFunc(ing) - if ca != nil { - addrs = ca - } sort.SliceStable(addrs, lessLoadBalancerIngress(addrs)) curIPs := ing.Status.LoadBalancer.Ingress diff --git a/pkg/ingress/status/status_test.go b/pkg/ingress/status/status_test.go index a4b6b24b8e..f997052b18 100644 --- a/pkg/ingress/status/status_test.go +++ b/pkg/ingress/status/status_test.go @@ -249,9 +249,6 @@ func buildStatusSync() statusSync { Client: buildSimpleClientSet(), PublishService: apiv1.NamespaceDefault + "/" + "foo", IngressLister: buildIngressListener(), - CustomIngressStatus: func(*extensions.Ingress) []apiv1.LoadBalancerIngress { - return nil - }, }, } } @@ -267,9 +264,6 @@ func TestStatusActions(t *testing.T) { DefaultIngressClass: "nginx", IngressClass: "", UpdateStatusOnShutdown: true, - CustomIngressStatus: func(*extensions.Ingress) []apiv1.LoadBalancerIngress { - return nil - }, } // create object fkSync := NewStatusSyncer(c) diff --git a/pkg/ingress/types.go b/pkg/ingress/types.go index 37a552f97d..355a6c01eb 100644 --- a/pkg/ingress/types.go +++ b/pkg/ingress/types.go @@ -17,15 +17,11 @@ limitations under the License. package ingress import ( - "fmt" "time" - "github.com/spf13/pflag" - apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apiserver/pkg/server/healthz" "k8s.io/ingress-nginx/pkg/ingress/annotations/auth" "k8s.io/ingress-nginx/pkg/ingress/annotations/authreq" @@ -36,7 +32,6 @@ import ( "k8s.io/ingress-nginx/pkg/ingress/annotations/ratelimit" "k8s.io/ingress-nginx/pkg/ingress/annotations/redirect" "k8s.io/ingress-nginx/pkg/ingress/annotations/rewrite" - "k8s.io/ingress-nginx/pkg/ingress/defaults" "k8s.io/ingress-nginx/pkg/ingress/resolver" "k8s.io/ingress-nginx/pkg/ingress/store" ) @@ -49,63 +44,6 @@ var ( DefaultSSLDirectory = "/ingress-controller/ssl" ) -// Controller holds the methods to handle an Ingress backend -// TODO (#18): Make sure this is sufficiently supportive of other backends. -type Controller interface { - // HealthzChecker returns is a named healthz check that returns the ingress - // controller status - healthz.HealthzChecker - - // OnUpdate callback invoked from the sync queue https://k8s.io/ingress/core/blob/master/pkg/ingress/controller/controller.go#L387 - // when an update occurs. This is executed frequently because Ingress - // controllers watches changes in: - // - Ingresses: main work - // - Secrets: referenced from Ingress rules with TLS configured - // - ConfigMaps: where the controller reads custom configuration - // - Services: referenced from Ingress rules and required to obtain - // information about ports and annotations - // - Endpoints: referenced from Services and what the backend uses - // to route traffic - // Any update to services, endpoints, secrets (only those referenced from Ingress) - // and ingress trigger the execution. - // Notifications of type Add, Update and Delete: - // https://github.com/kubernetes/kubernetes/blob/master/pkg/client/cache/controller.go#L164 - // - // Configuration returns the translation from Ingress rules containing - // information about all the upstreams (service endpoints ) "virtual" - // servers (FQDN) and all the locations inside each server. Each - // location contains information about all the annotations were configured - // https://k8s.io/ingress/core/blob/master/pkg/ingress/types.go#L83 - // The backend returns an error if was not possible to update the configuration. - // - OnUpdate(Configuration) error - // ConfigMap content of --configmap - SetConfig(*apiv1.ConfigMap) - // SetListers allows the access of store listers present in the generic controller - // This avoid the use of the kubernetes client. - SetListers(*StoreLister) - // BackendDefaults returns the minimum settings required to configure the - // communication to endpoints - BackendDefaults() defaults.Backend - // Info returns information about the ingress controller - Info() *BackendInfo - // ConfigureFlags allow to configure more flags before the parsing of - // command line arguments - ConfigureFlags(*pflag.FlagSet) - // OverrideFlags allow the customization of the flags in the backend - OverrideFlags(*pflag.FlagSet) - // DefaultIngressClass just return the default ingress class - DefaultIngressClass() string - // UpdateIngressStatus custom callback used to update the status in an Ingress rule - // This allows custom implementations - // If the function returns nil the standard functions will be executed. - UpdateIngressStatus(*extensions.Ingress) []apiv1.LoadBalancerIngress - // DefaultEndpoint returns the Endpoint to use as default when the - // referenced service does not exists. This should return the content - // of to the default backend - DefaultEndpoint() Endpoint -} - // StoreLister returns the configured stores for ingresses, services, // endpoints, secrets and configmaps. type StoreLister struct { @@ -117,29 +55,6 @@ type StoreLister struct { ConfigMap store.ConfigMapLister } -// BackendInfo returns information about the backend. -// This fields contains information that helps to track issues or to -// map the running ingress controller to source code -type BackendInfo struct { - // Name returns the name of the backend implementation - Name string `json:"name"` - // Release returns the running version (semver) - Release string `json:"release"` - // Build returns information about the git commit - Build string `json:"build"` - // Repository return information about the git repository - Repository string `json:"repository"` -} - -func (bi BackendInfo) String() string { - return fmt.Sprintf(` -Name: %v -Release: %v -Build: %v -Repository: %v -`, bi.Name, bi.Release, bi.Build, bi.Repository) -} - // Configuration holds the definition of all the parts required to describe all // ingresses reachable by the ingress controller (using a filter by namespace) type Configuration struct { @@ -261,9 +176,6 @@ type Server struct { // Location describes an URI inside a server. // Also contains additional information about annotations in the Ingress. // -// Important: -// The implementation of annotations is optional -// // In some cases when more than one annotations is defined a particular order in the execution // is required. // The chain in the execution order of annotations should be: diff --git a/pkg/ingress/types_equals.go b/pkg/ingress/types_equals.go index 84282b0613..c9f6c9d91c 100644 --- a/pkg/ingress/types_equals.go +++ b/pkg/ingress/types_equals.go @@ -16,30 +16,6 @@ limitations under the License. package ingress -// Equal tests for equality between two BackendInfo types -func (bi1 *BackendInfo) Equal(bi2 *BackendInfo) bool { - if bi1 == bi2 { - return true - } - if bi1 == nil || bi2 == nil { - return false - } - if bi1.Name != bi2.Name { - return false - } - if bi1.Release != bi2.Release { - return false - } - if bi1.Build != bi2.Build { - return false - } - if bi1.Repository != bi2.Repository { - return false - } - - return true -} - // Equal tests for equality between two Configuration types func (c1 *Configuration) Equal(c2 *Configuration) bool { if c1 == c2 { diff --git a/pkg/k8s/ensure.go b/pkg/k8s/ensure.go deleted file mode 100644 index dd2bf5bbbb..0000000000 --- a/pkg/k8s/ensure.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package k8s - -import ( - api "k8s.io/api/core/v1" - core "k8s.io/api/core/v1" - extensions "k8s.io/api/extensions/v1beta1" - k8sErrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/kubernetes" -) - -func EnsureSecret(cl kubernetes.Interface, secret *api.Secret) (*api.Secret, error) { - s, err := cl.CoreV1().Secrets(secret.Namespace).Create(secret) - if err != nil { - if k8sErrors.IsAlreadyExists(err) { - return cl.CoreV1().Secrets(secret.Namespace).Update(secret) - } - return nil, err - } - return s, nil -} - -func EnsureIngress(cl kubernetes.Interface, ingress *extensions.Ingress) (*extensions.Ingress, error) { - s, err := cl.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress) - if err != nil { - if k8sErrors.IsNotFound(err) { - return cl.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress) - } - return nil, err - } - return s, nil -} - -func EnsureService(cl kubernetes.Interface, service *core.Service) (*core.Service, error) { - s, err := cl.CoreV1().Services(service.Namespace).Update(service) - if err != nil { - if k8sErrors.IsNotFound(err) { - return cl.CoreV1().Services(service.Namespace).Create(service) - } - return nil, err - } - return s, nil -} diff --git a/pkg/net/net.go b/pkg/net/net.go index d18ca49e8c..8547e3dbda 100644 --- a/pkg/net/net.go +++ b/pkg/net/net.go @@ -16,9 +16,29 @@ limitations under the License. package net -import _net "net" +import ( + "fmt" + _net "net" + "os/exec" +) // IsIPV6 checks if the input contains a valid IPV6 address func IsIPV6(ip _net.IP) bool { return ip.To4() == nil } + +// IsPortAvailable checks if a TCP port is available or not +func IsPortAvailable(p int) bool { + ln, err := _net.Listen("tcp", fmt.Sprintf(":%v", p)) + if err != nil { + return false + } + ln.Close() + return true +} + +// IsIPv6Enabled checks if IPV6 is enabled or not +func IsIPv6Enabled() bool { + cmd := exec.Command("test", "-f", "/proc/net/if_inet6") + return cmd.Run() == nil +} diff --git a/pkg/net/net_test.go b/pkg/net/net_test.go index 21b7bf5a9d..830b52af34 100644 --- a/pkg/net/net_test.go +++ b/pkg/net/net_test.go @@ -41,3 +41,20 @@ func TestIsIPV6(t *testing.T) { } } } + +func TestIsPortAvailable(t *testing.T) { + if !IsPortAvailable(0) { + t.Fatal("expected port 0 to be avilable (random port) but returned false") + } + + ln, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer ln.Close() + + p := ln.Addr().(*net.TCPAddr).Port + if IsPortAvailable(p) { + t.Fatalf("expected port %v to not be available", p) + } +} diff --git a/version/version.go b/version/version.go index 3c036c0bca..aebfec3616 100644 --- a/version/version.go +++ b/version/version.go @@ -16,6 +16,8 @@ limitations under the License. package version +import "fmt" + var ( // RELEASE returns the release version RELEASE = "UNKNOWN" @@ -24,3 +26,14 @@ var ( // COMMIT returns the short sha from git COMMIT = "UNKNOWN" ) + +// String returns information about the release. +func String() string { + return fmt.Sprintf(`------------------------------------------------------------------------------- +NGINX Ingress controller + Release: %v + Build: %v + Repository: %v +------------------------------------------------------------------------------- +`, RELEASE, COMMIT, REPO) +}