diff --git a/config.conf.example b/config.conf.example index 07968ea..cdca028 100644 --- a/config.conf.example +++ b/config.conf.example @@ -1,4 +1,4 @@ -# 1 = Debug; 2 = Info; 3 = Warn; +# 1 = Debug; 2 = Info; 3 = Warn; 4 = Error logLevel = 3 # Enable the built in identd server (listens on port 113) diff --git a/go.mod b/go.mod index 5a6204b..93778b2 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,9 @@ require ( github.com/orcaman/concurrent-map v0.0.0-20190107190726-7ed82d9cb717 github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac // indirect github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c // indirect - golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 - golang.org/x/net v0.0.0-20190213061140-3a22650c66bd - golang.org/x/text v0.3.0 // indirect + golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 + golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c + golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 // indirect + golang.org/x/time v0.0.0-20181108054448-85acf8d2951c gopkg.in/ini.v1 v1.42.0 ) diff --git a/go.sum b/go.sum index a0bf1bb..ba0423a 100644 --- a/go.sum +++ b/go.sum @@ -23,11 +23,16 @@ github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c h1:Ho+uVpke github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE= -golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd h1:HuTn7WObtcDo9uEEU7rEqL0jYthdXAmZ6PP+meazmaU= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To= +golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/main.go b/main.go index 4fe15ce..ca71a37 100644 --- a/main.go +++ b/main.go @@ -97,13 +97,13 @@ func loadPlugins(gateway *webircgateway.Gateway, pluginsQuit *sync.WaitGroup) { gateway.Log(2, "Loading plugin "+pluginFullPath) p, err := plugin.Open(pluginFullPath) if err != nil { - gateway.Log(3, "Error loading plugin: "+err.Error()) + gateway.Log(4, "Error loading plugin: "+err.Error()) continue } startSymbol, err := p.Lookup("Start") if err != nil { - gateway.Log(3, "Plugin does not export a Start function! (%s)", pluginFullPath) + gateway.Log(4, "Plugin does not export a Start function! (%s)", pluginFullPath) continue } diff --git a/pkg/webircgateway/client.go b/pkg/webircgateway/client.go index 8b2b6dc..e0591b3 100644 --- a/pkg/webircgateway/client.go +++ b/pkg/webircgateway/client.go @@ -7,12 +7,15 @@ import ( "fmt" "io" "net" + "net/http" "strconv" "strings" "sync/atomic" "syscall" "time" + "golang.org/x/time/rate" + "sync" "github.com/kiwiirc/webircgateway/pkg/irc" @@ -24,7 +27,7 @@ const ( ClientStateIdle = "idle" // ClientStateConnecting - Connecting upstream ClientStateConnecting = "connecting" - // ClientStateRegistered - Registering to the IRC network + // ClientStateRegistering - Registering to the IRC network ClientStateRegistering = "registering" // ClientStateConnected - Connected upstream ClientStateConnected = "connected" @@ -32,15 +35,6 @@ const ( ClientStateEnding = "ending" ) -// The upstream connection object may be either a TCP client or a KiwiProxy -// instance. Create a common interface we can use that satisfies either -// case. -type ConnInterface interface { - io.Reader - io.Writer - Close() error -} - type ClientSignal [3]string // Client - Connecting client struct @@ -48,24 +42,23 @@ type Client struct { Gateway *Gateway Id uint64 State string - EndWG sync.WaitGroup shuttingDownLock sync.Mutex shuttingDown bool + ShutdownReason string SeenQuit bool Recv chan string - UpstreamSend chan string + ThrottledRecv *ThrottledStringChannel + UpstreamSendIn chan string + UpstreamSendOut chan string + upstream io.ReadWriteCloser + UpstreamRecv chan string UpstreamStarted bool UpstreamConfig *ConfigUpstream - RemoteAddr string - RemoteHostname string - RemotePort int DestHost string DestPort int DestTLS bool IrcState *irc.State Encoding string - // Tags get passed upstream via the WEBIRC command - Tags map[string]string // Captchas may be needed to verify a client Verified bool SentPass bool @@ -78,25 +71,106 @@ type Client struct { } // The specific message-tags CAP that the client has requested if we are wrapping it RequestedMessageTagsCap string + *ClientConnectionInfo } var nextClientID uint64 = 1 +// ClientConnectionInfo contains information about a client connection. It is a +// separate struct so that it can be populated before calling NewClient, so that +// this info can be conveniently accessed even when NewClient does not return a +// *Client value (returns an error) +type ClientConnectionInfo struct { + Origin string + // Tags get passed upstream via the WEBIRC command + Tags map[string]string + RemoteAddr string + RemoteHostname string + Request *http.Request +} + +func NewClientConnectionInfo( + origin string, + remoteAddr string, + req *http.Request, + gateway *Gateway, +) *ClientConnectionInfo { + connInfo := &ClientConnectionInfo{ + Origin: origin, + + Tags: make(map[string]string), + RemoteAddr: remoteAddr, + Request: req, + } + + connInfo.ResolveHostname() + + if req != nil && gateway.isRequestSecure(req) { + connInfo.Tags["secure"] = "" + } + + // This doesn't make sense to have since the remote port may change between requests. Only + // here for testing purposes for now. + _, remoteAddrPort, _ := net.SplitHostPort(connInfo.RemoteAddr) + connInfo.Tags["remote-port"] = remoteAddrPort + + return connInfo +} + +func (connInfo *ClientConnectionInfo) ResolveHostname() { + clientHostnames, err := net.LookupAddr(connInfo.RemoteAddr) + if err != nil || len(clientHostnames) == 0 { + connInfo.RemoteHostname = connInfo.RemoteAddr + } else { + // FQDNs include a . at the end. Strip it out + potentialHostname := strings.Trim(clientHostnames[0], ".") + + // Must check that the resolved hostname also resolves back to the users IP + addr, err := net.LookupIP(potentialHostname) + if err == nil && len(addr) == 1 && addr[0].String() == connInfo.RemoteAddr { + connInfo.RemoteHostname = potentialHostname + } else { + connInfo.RemoteHostname = connInfo.RemoteAddr + } + } +} + +func LogNewClientError(gateway *Gateway, info *ClientConnectionInfo, err error) { + hook := HookNewClientError{ + Gateway: gateway, + ClientConnectionInfo: info, + Error: err, + } + + hook.Dispatch("new.client.error") +} + // NewClient - Makes a new client -func NewClient(gateway *Gateway) *Client { +func NewClient(gateway *Gateway, info *ClientConnectionInfo) (*Client, error) { + if !gateway.isClientOriginAllowed(info.Origin) { + + err := fmt.Errorf("Origin %s not allowed. Closing connection", info.Origin) + LogNewClientError(gateway, info, err) + return nil, err + } + thisID := atomic.AddUint64(&nextClientID, 1) + recv := make(chan string, 50) c := &Client{ - Gateway: gateway, - Id: thisID, - State: ClientStateIdle, - Recv: make(chan string, 50), - UpstreamSend: make(chan string, 50), - Encoding: "UTF-8", - Signals: make(chan ClientSignal, 50), - Tags: make(map[string]string), - IrcState: irc.NewState(), - UpstreamConfig: &ConfigUpstream{}, + Gateway: gateway, + Id: thisID, + State: ClientStateIdle, + Recv: recv, + ThrottledRecv: NewThrottledStringChannel(recv, rate.NewLimiter(rate.Inf, 1)), + UpstreamSendIn: make(chan string, 50), + UpstreamSendOut: make(chan string, 50), + UpstreamRecv: make(chan string, 50), + Encoding: "UTF-8", + Signals: make(chan ClientSignal, 50), + IrcState: irc.NewState(), + UpstreamConfig: &ConfigUpstream{}, + ClientConnectionInfo: info, } // Auto enable some features by default. They may be disabled later on @@ -109,37 +183,28 @@ func NewClient(gateway *Gateway) *Client { go c.clientLineWorker() - // This Add(1) will be ended once the client starts shutting down in StartShutdown() - c.EndWG.Add(1) - // Add to the clients maps and wait until everything has been marked // as completed (several routines add themselves to EndWG so that we can catch // when they are all completed) gateway.Clients.Set(string(c.Id), c) - go func() { - c.EndWG.Wait() - gateway.Clients.Remove(string(c.Id)) - hook := &HookClientState{ - Client: c, - Connected: false, - } - hook.Dispatch("client.state") - }() + // trigger initial client.state hook + c.setState(ClientStateIdle) - hook := &HookClientState{ - Client: c, - Connected: true, - } - hook.Dispatch("client.state") - - return c + return c, nil } // Log - Log a line of text with context of this client func (c *Client) Log(level int, format string, args ...interface{}) { prefix := fmt.Sprintf("client:%d ", c.Id) - c.Gateway.Log(level, prefix+format, args...) + c.Gateway.LogWithoutHook(level, prefix+format, args...) + + hook := &HookLog{ + Client: c, + Level: level, + Line: fmt.Sprintf(format, args...), + } + hook.Dispatch("client.log") } func (c *Client) IsShuttingDown() bool { @@ -156,7 +221,8 @@ func (c *Client) StartShutdown(reason string) { if !c.shuttingDown { lastState := c.State c.shuttingDown = true - c.State = ClientStateEnding + c.ShutdownReason = reason + c.setState(ClientStateEnding) switch reason { case "upstream_closed": @@ -166,7 +232,7 @@ func (c *Client) StartShutdown(reason string) { // Error has been logged already case "client_closed": if !c.SeenQuit && c.Gateway.Config.SendQuitOnClientClose != "" && lastState == ClientStateConnected { - c.UpstreamSend <- "QUIT :" + c.Gateway.Config.SendQuitOnClientClose + c.processLineToUpstream("QUIT :" + c.Gateway.Config.SendQuitOnClientClose) } c.Log(2, "Client disconnected") default: @@ -174,7 +240,7 @@ func (c *Client) StartShutdown(reason string) { } close(c.Signals) - c.EndWG.Done() + c.Gateway.Clients.Remove(string(c.Id)) } } @@ -241,7 +307,7 @@ func (c *Client) connectUpstream() { return } - client.State = ClientStateConnecting + client.setState(ClientStateConnecting) upstream, upstreamErr := client.makeUpstreamConnection() if upstreamErr != nil { @@ -249,19 +315,30 @@ func (c *Client) connectUpstream() { return } - client.State = ClientStateRegistering + client.setState(ClientStateRegistering) + + go func() { + for { + line, ok := <-client.UpstreamSendIn + if !ok { + return + } + client.UpstreamSendOut <- line + } + }() client.writeWebircLines(upstream) client.maybeSendPass(upstream) client.SendClientSignal("state", "connected") client.proxyData(upstream) + client.upstream = upstream } -func (c *Client) makeUpstreamConnection() (ConnInterface, error) { +func (c *Client) makeUpstreamConnection() (io.ReadWriteCloser, error) { client := c upstreamConfig := c.UpstreamConfig - var connection ConnInterface + var connection io.ReadWriteCloser if upstreamConfig.Proxy == nil { // Connect directly to the IRCd @@ -314,7 +391,7 @@ func (c *Client) makeUpstreamConnection() (ConnInterface, error) { conn = net.Conn(tlsConn) } - connection = ConnInterface(conn) + connection = conn } if upstreamConfig.Proxy != nil { @@ -351,13 +428,13 @@ func (c *Client) makeUpstreamConnection() (ConnInterface, error) { return nil, errors.New("error connecting upstream") } - connection = ConnInterface(conn) + connection = conn } return connection, nil } -func (c *Client) writeWebircLines(upstream ConnInterface) { +func (c *Client) writeWebircLines(upstream io.ReadWriteCloser) { // Send any WEBIRC lines if c.UpstreamConfig.WebircPassword == "" { c.Log(1, "No webirc to send") @@ -401,7 +478,7 @@ func (c *Client) writeWebircLines(upstream ConnInterface) { upstream.Write([]byte(webircLine)) } -func (c *Client) maybeSendPass(upstream ConnInterface) { +func (c *Client) maybeSendPass(upstream io.ReadWriteCloser) { if c.UpstreamConfig.ServerPassword == "" { return } @@ -414,139 +491,116 @@ func (c *Client) maybeSendPass(upstream ConnInterface) { upstream.Write([]byte(passLine)) } -func (c *Client) proxyData(upstream ConnInterface) { +func (c *Client) proxyData(upstream io.ReadWriteCloser) { client := c - upstreamConfig := c.UpstreamConfig - // Data from client to upstream + // Data from upstream to client go func() { - client.EndWG.Add(1) - defer func() { - client.EndWG.Done() - }() - - var writeThrottle time.Duration - if upstreamConfig.Throttle > 0 { - writeThrottle = time.Duration(int64(time.Second) / int64(upstreamConfig.Throttle)) - } else { - writeThrottle = 0 - } - + reader := bufio.NewReader(upstream) for { - data, ok := <-client.UpstreamSend - if !ok { - client.Log(1, "connectUpstream() client.UpstreamSend closed") + data, err := reader.ReadString('\n') + if err != nil { break } - if strings.HasPrefix(data, "PASS ") && c.SentPass { - // Hijack the PASS command if we already sent a pass command - continue - } else if strings.HasPrefix(data, "USER ") { - // Hijack the USER command as we may have some overrides - data = fmt.Sprintf( - "USER %s 0 * :%s", - client.IrcState.Username, - client.IrcState.RealName, - ) - } else if strings.HasPrefix(strings.ToUpper(data), "QUIT ") { - client.SeenQuit = true - } - message, _ := irc.ParseLine(data) + data = strings.Trim(data, "\n\r") + client.Log(1, "client.UpstreamRecv <- %s", data) + client.UpstreamRecv <- data + } - hook := &HookIrcLine{ - Client: client, - UpstreamConfig: upstreamConfig, - Line: data, - Message: message, - ToServer: true, - } - hook.Dispatch("irc.line") - if hook.Halt { - continue - } + client.SendClientSignal("state", "closed") + client.StartShutdown("upstream_closed") + upstream.Close() + if client.IrcState.RemotePort > 0 { + c.Gateway.identdServ.RemoveIdent(client.IrcState.LocalPort, client.IrcState.RemotePort, "") + } + }() +} - // Plugins may have modified the data - data = hook.Line +func (c *Client) processLineToUpstream(data string) { + client := c + upstreamConfig := c.UpstreamConfig - client.Log(1, "->upstream: %s", data) - data = utf8ToOther(data, client.Encoding) - if data == "" { - client.Log(1, "Failed to encode into '%s'. Dropping data", c.Encoding) - continue - } + if strings.HasPrefix(data, "PASS ") && c.SentPass { + // Hijack the PASS command if we already sent a pass command + return + } else if strings.HasPrefix(data, "USER ") { + // Hijack the USER command as we may have some overrides + data = fmt.Sprintf( + "USER %s 0 * :%s", + client.IrcState.Username, + client.IrcState.RealName, + ) + } else if strings.HasPrefix(strings.ToUpper(data), "QUIT ") { + client.SeenQuit = true + } - upstream.Write([]byte(data + "\r\n")) + message, _ := irc.ParseLine(data) - // Throttle writes if configured, but only after registration is complete. Typical IRCd - // behavior is to not throttle registration commands. - if writeThrottle > 0 && client.State != ClientStateRegistering { - time.Sleep(writeThrottle) - } - } + hook := &HookIrcLine{ + Client: client, + UpstreamConfig: upstreamConfig, + Line: data, + Message: message, + ToServer: true, + } + hook.Dispatch("irc.line") + if hook.Halt { + return + } - upstream.Close() - }() + // Plugins may have modified the data + data = hook.Line - // Data from upstream to client - go func() { - client.EndWG.Add(1) - defer func() { - client.EndWG.Done() - }() + client.Log(1, "->upstream: %s", data) + data = utf8ToOther(data, client.Encoding) + if data == "" { + client.Log(1, "Failed to encode into '%s'. Dropping data", c.Encoding) + return + } - reader := bufio.NewReader(upstream) - for { - data, err := reader.ReadString('\n') - if err != nil { - break - } + client.upstream.Write([]byte(data + "\r\n")) +} - data = strings.Trim(data, "\n\r") - message, _ := irc.ParseLine(data) - - hook := &HookIrcLine{ - Client: client, - UpstreamConfig: upstreamConfig, - Line: data, - Message: message, - ToServer: false, - } - hook.Dispatch("irc.line") - if hook.Halt { - continue - } +func (c *Client) handleLineFromUpstream(data string) { + client := c + upstreamConfig := c.UpstreamConfig - // Plugins may have modified the data - data = hook.Line + message, _ := irc.ParseLine(data) - if data == "" { - continue - } + hook := &HookIrcLine{ + Client: client, + UpstreamConfig: upstreamConfig, + Line: data, + Message: message, + ToServer: false, + } + hook.Dispatch("irc.line") + if hook.Halt { + return + } - client.Log(1, "upstream->: %s", data) + // Plugins may have modified the data + data = hook.Line - data = ensureUtf8(data, client.Encoding) - if data == "" { - client.Log(1, "Failed to encode into 'UTF-8'. Dropping data") - continue - } + if data == "" { + return + } - data = client.ProcessLineFromUpstream(data) - if data == "" { - return - } + client.Log(1, "upstream->: %s", data) - client.SendClientSignal("data", data) - } + data = ensureUtf8(data, client.Encoding) + if data == "" { + client.Log(1, "Failed to decode as 'UTF-8'. Dropping data") + return + } - client.SendClientSignal("state", "closed") - client.StartShutdown("upstream_closed") - upstream.Close() - if client.IrcState.RemotePort > 0 { - c.Gateway.identdServ.RemoveIdent(client.IrcState.LocalPort, client.IrcState.RemotePort, "") - } - }() + data = client.ProcessLineFromUpstream(data) + if data == "" { + return + } + + client.SendClientSignal("data", data) } func typeOfErr(err error) string { @@ -591,28 +645,43 @@ func typeOfErr(err error) string { // Handle lines sent from the client func (c *Client) clientLineWorker() { - c.EndWG.Add(1) - defer func() { - c.EndWG.Done() - }() - +ReadLoop: for { - data, ok := <-c.Recv - if !ok { - c.Log(1, "clientLineWorker() client.Recv closed") - break - } + select { + case clientData, ok := <-c.ThrottledRecv.Output: + if !ok { + c.Log(1, "client.Recv closed") + break ReadLoop + } + + c.Log(1, "client->: %s", clientData) + + clientLine, err := c.ProcessLineFromClient(clientData) + if err == nil && clientLine != "" { + c.UpstreamSendIn <- clientLine + } - c.Log(1, "ws->: %s", data) + case line, ok := <-c.UpstreamSendOut: + if !ok { + c.Log(1, "client.UpstreamSend closed") + break ReadLoop + } + c.processLineToUpstream(line) + + case upstreamData, ok := <-c.UpstreamRecv: + c.Log(1, "<-c.UpstreamRecv: %s", upstreamData) + if !ok { + c.Log(1, "client.UpstreamRecv closed") + break ReadLoop + } - // Some IRC lines such as USER commands may have some parameter replacements - line, err := c.ProcessLineFromClient(data) - if err == nil && line != "" { - c.UpstreamSend <- line + c.handleLineFromUpstream(upstreamData) } } - close(c.UpstreamSend) + c.Log(1, "leaving clientLineWorker") + + // close(c.UpstreamSend) } // configureUpstream - Generate an upstream configuration from the information set on the client instance @@ -644,3 +713,13 @@ func (c *Client) buildWebircTags() string { return str } + +func (c *Client) setState(state string) { + c.State = state + + hook := &HookClientState{ + Client: c, + Connected: c.State != ClientStateEnding, + } + hook.Dispatch("client.state") +} diff --git a/pkg/webircgateway/client_command_handlers.go b/pkg/webircgateway/client_command_handlers.go index fb4af52..7d899f8 100644 --- a/pkg/webircgateway/client_command_handlers.go +++ b/pkg/webircgateway/client_command_handlers.go @@ -10,6 +10,7 @@ import ( "github.com/kiwiirc/webircgateway/pkg/irc" "github.com/kiwiirc/webircgateway/pkg/recaptcha" "golang.org/x/net/html/charset" + "golang.org/x/time/rate" ) /* @@ -31,7 +32,11 @@ func (c *Client) ProcessLineFromUpstream(data string) string { } if pLen > 0 && m.Command == "001" { client.IrcState.Nick = m.Params[0] - client.State = ClientStateConnected + client.setState(ClientStateConnected) + + // Throttle writes if configured, but only after registration is complete. Typical IRCd + // behavior is to not throttle registration commands. + client.ThrottledRecv.Limiter = rate.NewLimiter(rate.Limit(client.UpstreamConfig.Throttle), 1) } if pLen > 0 && m.Command == "005" { // If EXTJWT is supported by the IRC server, disable it here @@ -169,7 +174,7 @@ func (c *Client) ProcessLineFromClient(line string) (string, error) { maybeConnectUpstream := func() { if !c.UpstreamStarted && c.IrcState.Username != "" && c.Verified { - go c.connectUpstream() + c.connectUpstream() } } @@ -401,7 +406,7 @@ func (c *Client) ProcessLineFromClient(line string) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodHS256, tokenData) tokenSigned, tokenSignedErr := token.SignedString([]byte(c.Gateway.Config.Secret)) if tokenSignedErr != nil { - c.Log(3, "Error creating JWT token. %s", tokenSignedErr.Error()) + c.Log(4, "Error creating JWT token. %s", tokenSignedErr.Error()) println(tokenSignedErr.Error()) } diff --git a/pkg/webircgateway/config.go b/pkg/webircgateway/config.go index 84d0e68..f154f5e 100644 --- a/pkg/webircgateway/config.go +++ b/pkg/webircgateway/config.go @@ -152,7 +152,7 @@ func (c *Config) Load() error { if strings.Index(section.Name(), "DEFAULT") == 0 { c.LogLevel = section.Key("logLevel").MustInt(3) if c.LogLevel < 1 || c.LogLevel > 3 { - c.gateway.Log(3, "Config option logLevel must be between 1-3. Setting default value of 3.") + c.gateway.Log(3, "Config option logLevel must be between 1-4. Setting default value of 3.") c.LogLevel = 3 } diff --git a/pkg/webircgateway/gateway.go b/pkg/webircgateway/gateway.go index e764240..fd68fc3 100644 --- a/pkg/webircgateway/gateway.go +++ b/pkg/webircgateway/gateway.go @@ -15,7 +15,7 @@ import ( "github.com/kiwiirc/webircgateway/pkg/identd" "github.com/kiwiirc/webircgateway/pkg/proxy" - "github.com/orcaman/concurrent-map" + cmap "github.com/orcaman/concurrent-map" ) var ( @@ -31,7 +31,8 @@ type Gateway struct { Clients cmap.ConcurrentMap Acme *LEManager Function string - httpSrv *http.Server + httpSrvs []*http.Server + httpSrvsMu sync.Mutex closeWg sync.WaitGroup } @@ -51,12 +52,23 @@ func NewGateway(function string) *Gateway { } func (s *Gateway) Log(level int, format string, args ...interface{}) { + s.LogWithoutHook(level, format, args...) + + hook := &HookLog{ + Level: level, + Line: fmt.Sprintf(format, args...), + } + hook.Dispatch("gateway.log") +} + +var LogLevels = [...]string{"L_DEBUG", "L_INFO", "L_WARN", "L_ERROR"} + +func (s *Gateway) LogWithoutHook(level int, format string, args ...interface{}) { if level < s.Config.LogLevel { return } - levels := [...]string{"L_DEBUG", "L_INFO", "L_WARN"} - line := fmt.Sprintf(levels[level-1]+" "+format, args...) + line := fmt.Sprintf(LogLevels[level-1]+" "+format, args...) select { case s.LogOutput <- line: @@ -83,7 +95,13 @@ func (s *Gateway) Start() { func (s *Gateway) Close() { defer s.closeWg.Done() - s.httpSrv.Close() + + s.httpSrvsMu.Lock() + defer s.httpSrvsMu.Unlock() + + for _, httpSrv := range s.httpSrvs { + httpSrv.Close() + } } func (s *Gateway) WaitClose() { @@ -116,12 +134,12 @@ func (s *Gateway) initHttpRoutes() error { t.Init(s) engineConfigured = true default: - s.Log(3, "Invalid server engine: '%s'", transport) + s.Log(4, "Invalid server engine: '%s'", transport) } } if !engineConfigured { - s.Log(3, "No server engines configured") + s.Log(4, "No server engines configured") return errors.New("No server engines configured") } @@ -176,7 +194,7 @@ func (s *Gateway) maybeStartIdentd() { if s.Config.Identd { err := s.identdServ.Run() if err != nil { - s.Log(3, "Error starting identd server: %s", err.Error()) + s.Log(4, "Error starting identd server: %s", err.Error()) } else { s.Log(2, "Identd server started") } @@ -192,7 +210,7 @@ func (s *Gateway) startServer(conf ConfigServer) { t.Start(conf.LocalAddr[4:] + ":" + strconv.Itoa(conf.Port)) } else if conf.TLS && conf.LetsEncryptCacheDir == "" { if conf.CertFile == "" || conf.KeyFile == "" { - s.Log(3, "'cert' and 'key' options must be set for TLS servers") + s.Log(4, "'cert' and 'key' options must be set for TLS servers") return } @@ -202,55 +220,70 @@ func (s *Gateway) startServer(conf ConfigServer) { s.Log(2, "Listening with TLS on %s", addr) keyPair, keyPairErr := tls.LoadX509KeyPair(tlsCert, tlsKey) if keyPairErr != nil { - s.Log(3, "Failed to listen with TLS, certificate error: %s", keyPairErr.Error()) + s.Log(4, "Failed to listen with TLS, certificate error: %s", keyPairErr.Error()) return } - s.httpSrv = &http.Server{ + srv := &http.Server{ Addr: addr, TLSConfig: &tls.Config{ Certificates: []tls.Certificate{keyPair}, }, Handler: s.HttpRouter, } + s.httpSrvsMu.Lock() + s.httpSrvs = append(s.httpSrvs, srv) + s.httpSrvsMu.Unlock() // Don't use HTTP2 since it doesn't support websockets - s.httpSrv.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler)) + srv.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler)) - err := s.httpSrv.ListenAndServeTLS("", "") - if err != nil { - s.Log(3, "Failed to listen with TLS: %s", err.Error()) + err := srv.ListenAndServeTLS("", "") + if err != nil && err != http.ErrServerClosed { + s.Log(4, "Failed to listen with TLS: %s", err.Error()) } } else if conf.TLS && conf.LetsEncryptCacheDir != "" { s.Log(2, "Listening with letsencrypt TLS on %s", addr) leManager := s.Acme.Get(conf.LetsEncryptCacheDir) - s.httpSrv = &http.Server{ + srv := &http.Server{ Addr: addr, TLSConfig: &tls.Config{ GetCertificate: leManager.GetCertificate, }, Handler: s.HttpRouter, } + s.httpSrvsMu.Lock() + s.httpSrvs = append(s.httpSrvs, srv) + s.httpSrvsMu.Unlock() // Don't use HTTP2 since it doesn't support websockets - s.httpSrv.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler)) + srv.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler)) - err := s.httpSrv.ListenAndServeTLS("", "") - s.Log(3, "Listening with letsencrypt failed: %s", err.Error()) + err := srv.ListenAndServeTLS("", "") + if err != nil && err != http.ErrServerClosed { + s.Log(4, "Listening with letsencrypt failed: %s", err.Error()) + } } else if strings.HasPrefix(strings.ToLower(conf.LocalAddr), "unix:") { socketFile := conf.LocalAddr[5:] s.Log(2, "Listening on %s", socketFile) os.Remove(socketFile) server, serverErr := net.Listen("unix", socketFile) if serverErr != nil { - s.Log(3, serverErr.Error()) + s.Log(4, serverErr.Error()) return } os.Chmod(socketFile, conf.BindMode) http.Serve(server, s.HttpRouter) } else { s.Log(2, "Listening on %s", addr) - s.httpSrv = &http.Server{Addr: addr, Handler: s.HttpRouter} - err := s.httpSrv.ListenAndServe() - s.Log(3, err.Error()) + srv := &http.Server{Addr: addr, Handler: s.HttpRouter} + + s.httpSrvsMu.Lock() + s.httpSrvs = append(s.httpSrvs, srv) + s.httpSrvsMu.Unlock() + + err := srv.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + s.Log(4, err.Error()) + } } } diff --git a/pkg/webircgateway/gateway_utils.go b/pkg/webircgateway/gateway_utils.go index 93ab905..fb1d83d 100644 --- a/pkg/webircgateway/gateway_utils.go +++ b/pkg/webircgateway/gateway_utils.go @@ -8,9 +8,10 @@ import ( "strings" ) -func (s *Gateway) NewClient() *Client { - return NewClient(s) +func (s *Gateway) NewClient(info *ClientConnectionInfo) (*Client, error) { + return NewClient(s, info) } + func (s *Gateway) isClientOriginAllowed(originHeader string) bool { // Empty list of origins = all origins allowed if len(s.Config.RemoteOrigins) == 0 { diff --git a/pkg/webircgateway/hooks.go b/pkg/webircgateway/hooks.go index 3ce807e..246df9d 100644 --- a/pkg/webircgateway/hooks.go +++ b/pkg/webircgateway/hooks.go @@ -114,3 +114,43 @@ func (h *HookStatus) Dispatch(eventType string) { } } } + +/** + * HookLog + * Dispatched with log message + * Types: client.log, gateway.log + */ +type HookLog struct { + Hook + Client *Client + Level int + Line string +} + +func (h *HookLog) Dispatch(eventType string) { + for _, p := range h.getCallbacks(eventType) { + if f, ok := p.(func(*HookLog)); ok { + f(h) + } + } +} + +/** + * HookNewClientError + * Dispatched when NewClient is not successful + * Types: new.client.error + */ +type HookNewClientError struct { + Hook + Gateway *Gateway + ClientConnectionInfo *ClientConnectionInfo + Error error +} + +func (h *HookNewClientError) Dispatch(eventType string) { + for _, p := range h.getCallbacks(eventType) { + if f, ok := p.(func(*HookNewClientError)); ok { + f(h) + } + } +} diff --git a/pkg/webircgateway/transport_kiwiirc.go b/pkg/webircgateway/transport_kiwiirc.go index c08b9dd..2313dbc 100644 --- a/pkg/webircgateway/transport_kiwiirc.go +++ b/pkg/webircgateway/transport_kiwiirc.go @@ -3,13 +3,12 @@ package webircgateway import ( "fmt" "log" - "net" "runtime/debug" "strings" "sync" "github.com/igm/sockjs-go/sockjs" - "github.com/orcaman/concurrent-map" + cmap "github.com/orcaman/concurrent-map" ) type TransportKiwiirc struct { @@ -23,42 +22,20 @@ func (t *TransportKiwiirc) Init(g *Gateway) { } func (t *TransportKiwiirc) makeChannel(chanID string, ws sockjs.Session) *TransportKiwiircChannel { - client := t.gateway.NewClient() - - originHeader := strings.ToLower(ws.Request().Header.Get("Origin")) - if !t.gateway.isClientOriginAllowed(originHeader) { - client.Log(2, "Origin %s not allowed. Closing connection", originHeader) - ws.Close(0, "Origin not allowed") + req := ws.Request() + connInfo := NewClientConnectionInfo( + strings.ToLower(req.Header.Get("Origin")), + t.gateway.GetRemoteAddressFromRequest(req).String(), + req, + t.gateway, + ) + + client, err := t.gateway.NewClient(connInfo) + if err != nil { + ws.Close(0, err.Error()) return nil } - client.RemoteAddr = t.gateway.GetRemoteAddressFromRequest(ws.Request()).String() - - clientHostnames, err := net.LookupAddr(client.RemoteAddr) - if err != nil || len(clientHostnames) == 0 { - client.RemoteHostname = client.RemoteAddr - } else { - // FQDNs include a . at the end. Strip it out - potentialHostname := strings.Trim(clientHostnames[0], ".") - - // Must check that the resolved hostname also resolves back to the users IP - addr, err := net.LookupIP(potentialHostname) - if err == nil && len(addr) == 1 && addr[0].String() == client.RemoteAddr { - client.RemoteHostname = potentialHostname - } else { - client.RemoteHostname = client.RemoteAddr - } - } - - if t.gateway.isRequestSecure(ws.Request()) { - client.Tags["secure"] = "" - } - - // This doesn't make sense to have since the remote port may change between requests. Only - // here for testing purposes for now. - _, remoteAddrPort, _ := net.SplitHostPort(ws.Request().RemoteAddr) - client.Tags["remote-port"] = remoteAddrPort - client.Log(2, "New kiwiirc channel on %s from %s %s", ws.Request().Host, client.RemoteAddr, client.RemoteHostname) channel := &TransportKiwiircChannel{ diff --git a/pkg/webircgateway/transport_sockjs.go b/pkg/webircgateway/transport_sockjs.go index be04fc8..343a8aa 100644 --- a/pkg/webircgateway/transport_sockjs.go +++ b/pkg/webircgateway/transport_sockjs.go @@ -1,7 +1,6 @@ package webircgateway import ( - "net" "strings" "github.com/igm/sockjs-go/sockjs" @@ -18,42 +17,19 @@ func (t *TransportSockjs) Init(g *Gateway) { } func (t *TransportSockjs) sessionHandler(session sockjs.Session) { - client := t.gateway.NewClient() - - originHeader := strings.ToLower(session.Request().Header.Get("Origin")) - if !t.gateway.isClientOriginAllowed(originHeader) { - client.Log(2, "Origin %s not allowed. Closing connection", originHeader) - session.Close(0, "Origin not allowed") - return - } - - client.RemoteAddr = t.gateway.GetRemoteAddressFromRequest(session.Request()).String() - - clientHostnames, err := net.LookupAddr(client.RemoteAddr) + connInfo := NewClientConnectionInfo( + strings.ToLower(session.Request().Header.Get("Origin")), + t.gateway.GetRemoteAddressFromRequest(session.Request()).String(), + session.Request(), + t.gateway, + ) + + client, err := t.gateway.NewClient(connInfo) if err != nil { - client.RemoteHostname = client.RemoteAddr - } else { - // FQDNs include a . at the end. Strip it out - potentialHostname := strings.Trim(clientHostnames[0], ".") - - // Must check that the resolved hostname also resolves back to the users IP - addr, err := net.LookupIP(potentialHostname) - if err == nil && len(addr) == 1 && addr[0].String() == client.RemoteAddr { - client.RemoteHostname = potentialHostname - } else { - client.RemoteHostname = client.RemoteAddr - } - } - - if t.gateway.isRequestSecure(session.Request()) { - client.Tags["secure"] = "" + session.Close(0, err.Error()) + return } - // This doesn't make sense to have since the remote port may change between requests. Only - // here for testing purposes for now. - _, remoteAddrPort, _ := net.SplitHostPort(session.Request().RemoteAddr) - client.Tags["remote-port"] = remoteAddrPort - client.Log(2, "New sockjs client on %s from %s %s", session.Request().Host, client.RemoteAddr, client.RemoteHostname) // Read from sockjs diff --git a/pkg/webircgateway/transport_tcp.go b/pkg/webircgateway/transport_tcp.go index b3697f5..5e4d87d 100644 --- a/pkg/webircgateway/transport_tcp.go +++ b/pkg/webircgateway/transport_tcp.go @@ -3,6 +3,7 @@ package webircgateway import ( "bufio" "net" + "net/http" "strings" "sync" ) @@ -18,7 +19,7 @@ func (t *TransportTcp) Init(g *Gateway) { func (t *TransportTcp) Start(lAddr string) { l, err := net.Listen("tcp", lAddr) if err != nil { - t.gateway.Log(3, "TCP error listening: "+err.Error()) + t.gateway.Log(4, "TCP error listening: "+err.Error()) return } // Close the listener when the application closes. @@ -28,7 +29,7 @@ func (t *TransportTcp) Start(lAddr string) { // Listen for an incoming connection. conn, err := l.Accept() if err != nil { - t.gateway.Log(3, "TCP error accepting: "+err.Error()) + t.gateway.Log(4, "TCP error accepting: "+err.Error()) break } // Handle connections in a new goroutine. @@ -37,29 +38,19 @@ func (t *TransportTcp) Start(lAddr string) { } func (t *TransportTcp) handleConn(conn net.Conn) { - client := t.gateway.NewClient() + origin := "" + remoteAddr := conn.RemoteAddr().String() + var req *http.Request + gateway := t.gateway - client.RemoteAddr = conn.RemoteAddr().String() + connInfo := NewClientConnectionInfo(origin, remoteAddr, req, gateway) - clientHostnames, err := net.LookupAddr(client.RemoteAddr) + client, err := t.gateway.NewClient(connInfo) if err != nil { - client.RemoteHostname = client.RemoteAddr - } else { - // FQDNs include a . at the end. Strip it out - potentialHostname := strings.Trim(clientHostnames[0], ".") - - // Must check that the resolved hostname also resolves back to the users IP - addr, err := net.LookupIP(potentialHostname) - if err == nil && len(addr) == 1 && addr[0].String() == client.RemoteAddr { - client.RemoteHostname = potentialHostname - } else { - client.RemoteHostname = client.RemoteAddr - } + conn.Close() + return } - _, remoteAddrPort, _ := net.SplitHostPort(conn.RemoteAddr().String()) - client.Tags["remote-port"] = remoteAddrPort - client.Log(2, "New tcp client on %s from %s %s", conn.LocalAddr().String(), client.RemoteAddr, client.RemoteHostname) // We wait until the client send queue has been drained diff --git a/pkg/webircgateway/transport_websocket.go b/pkg/webircgateway/transport_websocket.go index 529cca2..d597ec9 100644 --- a/pkg/webircgateway/transport_websocket.go +++ b/pkg/webircgateway/transport_websocket.go @@ -2,7 +2,6 @@ package webircgateway import ( "fmt" - "net" "net/http" "strings" "sync" @@ -41,33 +40,25 @@ func (t *TransportWebsocket) checkOrigin(config *websocket.Config, req *http.Req } func (t *TransportWebsocket) websocketHandler(ws *websocket.Conn) { - client := t.gateway.NewClient() + req := ws.Request() + gateway := t.gateway + originURL, originParseErr := websocket.Origin(&t.wsServer.Config, req) + if originParseErr != nil { + err := fmt.Errorf("Invalid origin: %s", originParseErr) + t.gateway.Log(4, "%s", err) + return + } + origin := originURL.String() + remoteAddr := t.gateway.GetRemoteAddressFromRequest(ws.Request()).String() - client.RemoteAddr = t.gateway.GetRemoteAddressFromRequest(ws.Request()).String() + connInfo := NewClientConnectionInfo(origin, remoteAddr, req, gateway) - clientHostnames, err := net.LookupAddr(client.RemoteAddr) + client, err := t.gateway.NewClient(connInfo) if err != nil { - client.RemoteHostname = client.RemoteAddr - } else { - // FQDNs include a . at the end. Strip it out - potentialHostname := strings.Trim(clientHostnames[0], ".") - - // Must check that the resolved hostname also resolves back to the users IP - addr, err := net.LookupIP(potentialHostname) - if err == nil && len(addr) == 1 && addr[0].String() == client.RemoteAddr { - client.RemoteHostname = potentialHostname - } else { - client.RemoteHostname = client.RemoteAddr - } + ws.Close() + return } - if t.gateway.isRequestSecure(ws.Request()) { - client.Tags["secure"] = "" - } - - _, remoteAddrPort, _ := net.SplitHostPort(ws.Request().RemoteAddr) - client.Tags["remote-port"] = remoteAddrPort - client.Log(2, "New websocket client on %s from %s %s", ws.Request().Host, client.RemoteAddr, client.RemoteHostname) // We wait until the client send queue has been drained diff --git a/pkg/webircgateway/utils.go b/pkg/webircgateway/utils.go index 56e8041..85165b2 100644 --- a/pkg/webircgateway/utils.go +++ b/pkg/webircgateway/utils.go @@ -1,12 +1,14 @@ package webircgateway import ( + "context" "fmt" "net" "strings" "unicode/utf8" "golang.org/x/net/html/charset" + "golang.org/x/time/rate" ) var privateIPBlocks []*net.IPNet @@ -89,3 +91,48 @@ func containsOneOf(s string, substrs []string) bool { return false } + +type ThrottledStringChannel struct { + in chan string + Input chan<- string + out chan string + Output <-chan string + *rate.Limiter +} + +func NewThrottledStringChannel(wrappedChan chan string, limiter *rate.Limiter) *ThrottledStringChannel { + out := make(chan string, 50) + + c := &ThrottledStringChannel{ + in: wrappedChan, + Input: wrappedChan, + out: out, + Output: out, + Limiter: limiter, + } + + go c.run() + + return c +} + +func (c *ThrottledStringChannel) run() { + for { + select { + case msg, ok := <-c.in: + if !ok { + close(c.out) + return + } + + // start := time.Now() + + c.Wait(context.Background()) + + // elapsed := time.Since(start) + // fmt.Printf("waited %v to send %v\n", elapsed, msg) + + c.out <- msg + } + } +}