Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure reloading Caddy configuration results in using latest bouncer (WIP) #31

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"net/url"
"strings"
"sync"

"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig/httpcaddyfile"
Expand All @@ -30,7 +31,9 @@ import (
)

var (
cfg *config
cfg *config
latestBouncerMu sync.RWMutex
latestBouncer *bouncer.Bouncer
)

const (
Expand Down Expand Up @@ -186,6 +189,12 @@ func (c *CrowdSec) Validate() error {
// Start starts the CrowdSec Caddy app
func (c *CrowdSec) Start() error {
c.bouncer.Run()

// TODO(hs): before setting the global latest bouncer, ensure the new bouncer is healthy (enough)?
latestBouncerMu.Lock()
latestBouncer = c.bouncer
latestBouncerMu.Unlock()

return nil
}

Expand All @@ -194,11 +203,29 @@ func (c *CrowdSec) Stop() error {
return c.bouncer.Shutdown()
}

// IsAllowed is used by the CrowdSec HTTP handler to check if
// an IP is allowed to perform a request
// IsAllowed is used by the CrowdSec HTTP Handler and L4 Matcher to check
// if an IP is allowed to perform a request. It will try the latest (global)
// bouncer instance first and will fallback to this CrowdSec instance's
// bouncer in case it's missing. This is because Caddy may be in the process
// of (re)starting with a new configuration and the latest bouncer may not
// be available (yet) or may have been set to nil in the process. Using
// the global instance does make the overal logic depend on some global
// logic, but some construction like this seems to be the only way to make
// Caddy always use the correct bouncer instance when checking IPs.
func (c *CrowdSec) IsAllowed(ip net.IP) (bool, *models.Decision, error) {
// TODO: check if running? fully loaded, etc?
return c.bouncer.IsAllowed(ip)
switch {
case latestBouncer != nil:
// try the latest bouncer instance first
latestBouncerMu.RLock()
defer latestBouncerMu.RUnlock()
return latestBouncer.IsAllowed(ip)
case c.bouncer != nil:
// fallback to this crowdsec instance bouncer
return c.bouncer.IsAllowed(ip)
default:
// fail closed
return false, nil, errors.New("bouncer not available")
}
}

func (c *CrowdSec) isStreamingEnabled() bool {
Expand Down
4 changes: 0 additions & 4 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func (Handler) CaddyModule() caddy.ModuleInfo {

// Provision sets up the CrowdSec handler.
func (h *Handler) Provision(ctx caddy.Context) error {

crowdsecAppIface, err := ctx.App("crowdsec")
if err != nil {
return fmt.Errorf("getting crowdsec app: %v", err)
Expand All @@ -65,7 +64,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {

// Validate ensures the app's configuration is valid.
func (h *Handler) Validate() error {

if h.crowdsec == nil {
return errors.New("crowdsec app not available")
}
Expand All @@ -75,7 +73,6 @@ func (h *Handler) Validate() error {

// ServeHTTP is the Caddy handler for serving HTTP requests
func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhttp.Handler) error {

ipToCheck, err := determineIPFromRequest(r)
if err != nil {
return err // TODO: return error here? Or just log it and continue serving
Expand Down Expand Up @@ -132,7 +129,6 @@ func writeCaptchaResponse(w http.ResponseWriter) error {

// writeThrottleResponse writes 429 status as response
func writeThrottleResponse(w http.ResponseWriter, duration string) error {

d, err := time.ParseDuration(duration)
if err != nil {
return err
Expand Down
30 changes: 17 additions & 13 deletions internal/bouncer/bouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/crowdsecurity/crowdsec/pkg/models"
csbouncer "github.com/crowdsecurity/go-cs-bouncer"
Expand All @@ -27,7 +28,7 @@ import (
"go.uber.org/zap/zapcore"
)

const version = "v0.5.4"
const version = "v0.5.5"
const maxNumberOfDecisionsToLog = 10

// Bouncer is a custom CrowdSec bouncer backed by an immutable radix tree
Expand All @@ -39,14 +40,17 @@ type Bouncer struct {
useStreamingBouncer bool
shouldFailHard bool

ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
// bouncer runtime management
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
instantiatedAt time.Time
}

// New creates a new (streaming) Bouncer with a storage based on immutable radix tree
// TODO: take a configuration struct instead, because more options will be added.
func New(apiKey, apiURL, tickerInterval string, logger *zap.Logger) (*Bouncer, error) {
instantiatedAt := time.Now()
userAgent := fmt.Sprintf("caddy-cs-bouncer/%s", version)
insecureSkipVerify := false
return &Bouncer{
Expand All @@ -64,8 +68,9 @@ func New(apiKey, apiURL, tickerInterval string, logger *zap.Logger) (*Bouncer, e
InsecureSkipVerify: &insecureSkipVerify,
UserAgent: userAgent,
},
store: newStore(),
logger: logger,
store: newStore(),
logger: logger,
instantiatedAt: instantiatedAt,
}, nil
}

Expand All @@ -86,13 +91,13 @@ func (b *Bouncer) Init() error {
// override CrowdSec's default logrus logging
b.overrideLogrusLogger()

// initialize the CrowdSec streaming bouncer
if b.useStreamingBouncer {
return b.streamingBouncer.Init()
// initialize the CrowdSec live bouncer
if !b.useStreamingBouncer {
return b.liveBouncer.Init()
}

// initialize the CrowdSec live bouncer
return b.liveBouncer.Init()
// initialize the CrowdSec streaming bouncer
return b.streamingBouncer.Init()
}

// Run starts the Bouncer processes
Expand Down Expand Up @@ -178,7 +183,7 @@ func (b *Bouncer) Shutdown() error {
return nil
}

b.cancel()
b.cancel() // TODO(hs): move these to Cleanup() instead?
b.wg.Wait()

// TODO: clean shutdown of the streaming bouncer channel reading
Expand Down Expand Up @@ -207,7 +212,6 @@ func (b *Bouncer) delete(decision *models.Decision) error {

// IsAllowed checks if an IP is allowed or not
func (b *Bouncer) IsAllowed(ip net.IP) (bool, *models.Decision, error) {

// TODO: perform lookup in explicit allowlist as a kind of quick lookup in front of the CrowdSec lookup list?
isAllowed := false
decision, err := b.retrieveDecision(ip)
Expand Down
1 change: 0 additions & 1 deletion internal/bouncer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (s *crowdSecStore) delete(decision *models.Decision) error {
}

func (s *crowdSecStore) get(key net.IP) (*models.Decision, error) {

r, err := s.store.Get(key)
if err != nil {
return nil, err
Expand Down
Loading