From bcd184aba5952753f1edab8841d7387d78092f2f Mon Sep 17 00:00:00 2001 From: lwnmengjing Date: Sat, 11 May 2024 01:57:38 +0800 Subject: [PATCH] :sparkles: feat: support query cache (#106) --- core/logger/writer/loki.go | 16 +++++++++++++--- core/server/server.go | 15 ++++++++++++--- pkg/response/actions/gorm/base.go | 3 +++ pkg/response/actions/gorm/control.go | 6 +++++- pkg/response/actions/gorm/delete.go | 7 ++++++- pkg/response/actions/gorm/get.go | 3 ++- pkg/response/actions/gorm/search.go | 6 ++++-- 7 files changed, 45 insertions(+), 11 deletions(-) diff --git a/core/logger/writer/loki.go b/core/logger/writer/loki.go index 6e3c512..4507296 100644 --- a/core/logger/writer/loki.go +++ b/core/logger/writer/loki.go @@ -6,6 +6,9 @@ import ( "io" "log/slog" "net/http" + "os" + "os/signal" + "syscall" "time" "github.com/gogo/protobuf/proto" @@ -55,11 +58,18 @@ func (p *LokiWriter) Write(data []byte) (n int, err error) { func (p *LokiWriter) write() { entries := make([]logproto.Entry, 0) - defer func() { - _ = p.send(entries) - }() + + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) for { select { + case <-done: + err := p.send(entries) + if err != nil { + slog.Error("application exit, send to loki failed", slog.String("error", err.Error())) + err = nil + } + return case <-time.After(p.opts.lokiInterval): // send to loki if len(entries) > 0 { diff --git a/core/server/server.go b/core/server/server.go index 4cafcf0..fdc7e38 100644 --- a/core/server/server.go +++ b/core/server/server.go @@ -12,7 +12,10 @@ import ( "errors" "fmt" "log/slog" + "os" + "os/signal" "sync" + "syscall" ) // Server server @@ -44,7 +47,7 @@ func New(opts ...Option) Manager { return s } -// Add add runnable +// Add runnable func (e *Server) Add(r ...Runnable) { if e.services == nil { e.services = make(map[string]Runnable) @@ -57,7 +60,7 @@ func (e *Server) Add(r ...Runnable) { } } -// Start start runnable +// Start runnable func (e *Server) Start(ctx context.Context) (err error) { //e.mux.Lock() //defer e.mux.Unlock() @@ -87,12 +90,18 @@ func (e *Server) Start(ctx context.Context) (err error) { e.startRunnable(e.services[k]) e.setStarted(k) } + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) e.waitForRunnable.Wait() select { case <-ctx.Done(): return nil - case err := <-e.errChan: + case err = <-e.errChan: return err + case <-done: + // 优雅退出 + fmt.Println("received SIGINT, shutting down server") + return nil } } diff --git a/pkg/response/actions/gorm/base.go b/pkg/response/actions/gorm/base.go index 2cdac37..7795653 100644 --- a/pkg/response/actions/gorm/base.go +++ b/pkg/response/actions/gorm/base.go @@ -8,11 +8,14 @@ package gorm */ import ( + "context" "net/http" "github.com/gin-gonic/gin" ) +var CleanCacheFromTag func(ctx context.Context, tag string) error + // Base action type Base struct { opts *Options diff --git a/pkg/response/actions/gorm/control.go b/pkg/response/actions/gorm/control.go index 918b648..c0e32fe 100644 --- a/pkg/response/actions/gorm/control.go +++ b/pkg/response/actions/gorm/control.go @@ -8,6 +8,7 @@ package gorm */ import ( + "context" "errors" "fmt" "net/http" @@ -129,7 +130,7 @@ func (e *Control) update(c *gin.Context) { api.Err(http.StatusUnprocessableEntity) return } - query := gormdb.DB.Where(e.opts.Key, id) + query := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", m.TableName())).Where(e.opts.Key, id) if e.opts.Scope != nil { query = query.Scopes(e.opts.Scope(c, m)) } @@ -168,6 +169,9 @@ func (e *Control) update(c *gin.Context) { api.Err(http.StatusInternalServerError) return } + if CleanCacheFromTag != nil { + _ = CleanCacheFromTag(c, m.TableName()) + } if e.opts.AfterUpdate != nil { err = e.opts.AfterUpdate(c, query, m) if err != nil { diff --git a/pkg/response/actions/gorm/delete.go b/pkg/response/actions/gorm/delete.go index 5225db9..2449b13 100644 --- a/pkg/response/actions/gorm/delete.go +++ b/pkg/response/actions/gorm/delete.go @@ -8,6 +8,7 @@ package gorm */ import ( + "context" "fmt" "net/http" @@ -80,7 +81,8 @@ func (e *Delete) delete(c *gin.Context, ids ...string) { return } } - query := gormdb.DB.WithContext(c).Where(fmt.Sprintf("%s IN ?", e.opts.Key), ids) + query := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", e.opts.Model.TableName())). + Where(fmt.Sprintf("%s IN ?", e.opts.Key), ids) if e.opts.Scope != nil { query = query.Scopes(e.opts.Scope(c, e.opts.Model)) } @@ -90,6 +92,9 @@ func (e *Delete) delete(c *gin.Context, ids ...string) { api.Err(http.StatusInternalServerError) return } + if CleanCacheFromTag != nil { + _ = CleanCacheFromTag(c, e.opts.Model.TableName()) + } if e.opts.AfterDelete != nil { if err = e.opts.AfterDelete(c, gormdb.DB, e.opts.Model); err != nil { api.AddError(err).Log.ErrorContext(c, "AfterDelete error", "error", err) diff --git a/pkg/response/actions/gorm/get.go b/pkg/response/actions/gorm/get.go index 5448d48..a895b8b 100644 --- a/pkg/response/actions/gorm/get.go +++ b/pkg/response/actions/gorm/get.go @@ -8,6 +8,7 @@ package gorm */ import ( + "context" "errors" "net/http" @@ -63,7 +64,7 @@ func (e *Get) get(c *gin.Context, key string) { api := response.Make(c) m := pkg.TablerDeepCopy(e.opts.Model) preloads := c.QueryArray("preloads[]") - query := gormdb.DB.Model(m).Where("id = ?", c.Param(key)) + query := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", m.TableName())).Model(m).Where("id = ?", c.Param(key)) if e.opts.BeforeGet != nil { if err := e.opts.BeforeGet(c, query, m); err != nil { diff --git a/pkg/response/actions/gorm/search.go b/pkg/response/actions/gorm/search.go index bc0b1ba..318cd16 100644 --- a/pkg/response/actions/gorm/search.go +++ b/pkg/response/actions/gorm/search.go @@ -8,6 +8,7 @@ package gorm */ import ( + "context" "errors" "net/http" "strings" @@ -68,9 +69,10 @@ func (e *Search) search(c *gin.Context) { api.Err(http.StatusUnprocessableEntity) return } - db := gormdb.DB m := pkg.TablerDeepCopy(e.opts.Model) + db := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", m.TableName())) + if e.opts.BeforeSearch != nil { if err := e.opts.BeforeSearch(c, db, m); err != nil { api.AddError(err).Log.Error("BeforeSearch error", "error", err) @@ -87,7 +89,7 @@ func (e *Search) search(c *gin.Context) { if e.opts.Scope != nil { scopes = append(scopes, e.opts.Scope(c, m)) } - query := db.WithContext(c).Model(m).Scopes(scopes...) + query := db.Model(m).Scopes(scopes...) //if err := query.Limit(-1).Offset(-1).Count(&count).Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { // api.AddError(err).Log.ErrorContext(c, "Search error", "error", err) // api.Err(http.StatusInternalServerError)