Skip to content

Commit

Permalink
added check grants for shared cache queries
Browse files Browse the repository at this point in the history
  • Loading branch information
hulk8 committed Sep 11, 2024
1 parent ebb7e32 commit 60b7596
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 7 deletions.
16 changes: 9 additions & 7 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type AsyncCache struct {

graceTime time.Duration

MaxPayloadSize config.ByteSize
SharedWithAllUsers bool
MaxPayloadSize config.ByteSize
SharedWithAllUsers bool
CheckGrantsForSharedCache bool
}

func (c *AsyncCache) Close() error {
Expand Down Expand Up @@ -109,10 +110,11 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
maxPayloadSize := cfg.MaxPayloadSize

return &AsyncCache{
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
SharedWithAllUsers: cfg.SharedWithAllUsers,
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
SharedWithAllUsers: cfg.SharedWithAllUsers,
CheckGrantsForSharedCache: cfg.CheckGrantsForSharedCache,
}, nil
}
4 changes: 4 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ max_payload_size: <byte_size>

# Whether a query cached by a user can be used by another user
shared_with_all_users: <bool> | default = false [optional]

# Whether `shared_with_all_users` option is enabled
# check permissions for cached query used by another user
check_grants_for_shared_cache: <bool> | default = false [optional]
```
### <distributed_cache_config>
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,10 @@ type Cache struct {

// Whether a query cached by a user could be used by another user
SharedWithAllUsers bool `yaml:"shared_with_all_users,omitempty"`

// Whether `shared_with_all_users` option is enabled
// check permissions for cached query used by another user
CheckGrantsForSharedCache bool `yaml:"check_grants_for_shared_cache,omitempty"`
}

func (c *Cache) setDefaults() {
Expand Down
46 changes: 46 additions & 0 deletions io.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,49 @@ func (crc *cachedReadCloser) String() string {
crc.bLock.Unlock()
return s
}

var _ ResponseWriterWithCode = &checkGrantsResponseWriter{}

type checkGrantsResponseWriter struct {
http.ResponseWriter

statusCode int
}

func (rw *checkGrantsResponseWriter) SetStatusCode(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(rw.statusCode)
}

func (rw *checkGrantsResponseWriter) StatusCode() int {
if rw.statusCode == 0 {
return http.StatusOK
}

return rw.statusCode
}

func (rw *checkGrantsResponseWriter) WriteHeader(statusCode int) {
// cache statusCode to keep the opportunity to change it in further
rw.statusCode = statusCode
rw.SetStatusCode(statusCode)
}

func (rw *checkGrantsResponseWriter) Write(b []byte) (int, error) {
if rw.statusCode == http.StatusOK {
return 0, nil
}

n, err := rw.ResponseWriter.Write(b)
return n, err
}

// CloseNotify implements http.CloseNotifier
func (rw *checkGrantsResponseWriter) CloseNotify() <-chan bool {
// The rw.ResponseWriter must implement http.CloseNotifier
rwc, ok := rw.ResponseWriter.(http.CloseNotifier)
if !ok {
panic("BUG: the wrapped ResponseWriter must implement http.CloseNotifier")
}
return rwc.CloseNotify()
}
56 changes: 56 additions & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}

if shouldReturnFromCache {
// if cache shared between all users
// try to check if cached query is allowed for current user
if s.user.cache != nil && s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache {
checkReq, checkQuery, _ := s.createCheckGrantsRequest(req)

srwCheck := &checkGrantsResponseWriter{
ResponseWriter: srw.ResponseWriter,
}

rp.proxyRequest(s, srwCheck, srw, checkReq)

if srwCheck.statusCode == http.StatusOK {
log.Debugf("%s: check grants for shared cached query request success; query: %q; Method: %s; URL: %q", s, checkQuery, checkReq.Method, checkReq.URL.String())
} else {
log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, checkQuery, checkReq.Method, checkReq.URL.String())
return
}
}

rp.serveFromCache(s, srw, req, origParams, q)
} else {
rp.proxyRequest(s, srw, srw, req)
Expand Down Expand Up @@ -925,3 +944,40 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) {
s.requestPacketSize = len(q)
return s, 0, nil
}

// create a new request based on proxied one
// with query wrapped to fetch result types like:
// 'DESC ({original_query})'
// along with query parsed and analyzed for return types (which is fast)
// ClickHouse check permissions to execute this query for the user
func (s *scope) createCheckGrantsRequest(originalReq *http.Request) (*http.Request, string, error) {
originalQuery := originalReq.URL.Query().Get("query")
checkQuery := fmt.Sprintf("DESC (%s);", strings.TrimRight(originalQuery, ";"))

newURL := *originalReq.URL

queryParams, err := url.ParseQuery(newURL.RawQuery)
if err != nil {
return nil, checkQuery, err
}

queryParams.Set("query", checkQuery)

newURL.RawQuery = queryParams.Encode()

req := &http.Request{
Method: originalReq.Method,
URL: &newURL,
Proto: originalReq.Proto,
ProtoMajor: originalReq.ProtoMajor,
ProtoMinor: originalReq.ProtoMinor,
Header: originalReq.Header.Clone(),
Body: originalReq.Body,
Host: originalReq.Host,
ContentLength: originalReq.ContentLength,
Close: originalReq.Close,
TLS: originalReq.TLS,
}

return req, checkQuery, nil
}

0 comments on commit 60b7596

Please sign in to comment.