From cd4029c92cce568e773490f438799fb70a1c4092 Mon Sep 17 00:00:00 2001 From: foronedream Date: Wed, 17 Jan 2024 10:23:38 +0800 Subject: [PATCH] save log into file in go client --- .gitignore | 1 + client/client.go | 33 +++- client/obbatch.go | 4 +- client/obclient.go | 36 ++++ client/query_result_iterator.go | 6 +- config/client_config.go | 14 ++ config/toml_config.go | 24 +++ configurations/obkv-table-default.toml | 11 +- go.mod | 7 + go.sum | 18 ++ log/logger.go | 181 ++++++++++++++++-- log/logger_test.go | 47 +++++ obkvrpc/compressor.go | 2 +- obkvrpc/connection.go | 19 +- obkvrpc/connection_lifecycle_mgr.go | 16 +- obkvrpc/connection_mgr.go | 4 +- obkvrpc/connection_pool.go | 6 +- obkvrpc/slb_loader.go | 4 +- route/config_server_info.go | 2 +- route/route_info.go | 30 +-- .../connection_balance_test.go | 2 +- test/logger/all_test.go | 44 +++++ test/logger/logger_file_test.go | 52 +++++ 23 files changed, 504 insertions(+), 59 deletions(-) create mode 100644 test/logger/all_test.go create mode 100644 test/logger/logger_file_test.go diff --git a/.gitignore b/.gitignore index 0a71bd0..eb0492a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .DS_Store .idea .vscode +./logs diff --git a/client/client.go b/client/client.go index 366738e..bf6bf69 100644 --- a/client/client.go +++ b/client/client.go @@ -19,11 +19,11 @@ package client import ( "context" - "github.com/pkg/errors" "github.com/oceanbase/obkv-table-client-go/client/option" "github.com/oceanbase/obkv-table-client-go/config" + "github.com/oceanbase/obkv-table-client-go/log" "github.com/oceanbase/obkv-table-client-go/table" ) @@ -40,6 +40,11 @@ func NewClient( sysUserName string, sysPassWord string, cliConfig *config.ClientConfig) (Client, error) { + // init log + err := initLogProcess(cliConfig) + if err != nil { + return nil, err + } // 1. Check args if configUrl == "" { return nil, errors.New("config url is empty") @@ -81,6 +86,12 @@ func NewOdpClient( odpRpcPort int, database string, cliConfig *config.ClientConfig) (Client, error) { + // init log + err := initLogProcess(cliConfig) + if err != nil { + return nil, err + } + // 1. Check args if fullUserName == "" { return nil, errors.New("full user name is null") @@ -111,6 +122,11 @@ func NewClientWithTomlConfig(configFilePath string) (Client, error) { if err != nil { return nil, errors.WithMessagef(err, "get client config from toml, configFilePath:%s", configFilePath) } + // init log + err = initLogProcess(clientConfig.GetClientConfig()) + if err != nil { + return nil, err + } switch clientConfig.Mode { case "direct": return NewClient( @@ -135,6 +151,21 @@ func NewClientWithTomlConfig(configFilePath string) (Client, error) { } } +func initLogProcess(c *config.ClientConfig) error { + var logConfig log.LogConfig + if c != nil { + logConfig.LogFileName = c.LogFileName + logConfig.MaxAgeFileRem = c.MaxAgeFileRem + logConfig.MaxBackupFileSize = c.MaxBackupFileSize + logConfig.SingleFileMaxSize = c.SingleFileMaxSize + logConfig.Compress = c.Compress + logConfig.SlowQueryThreshold = c.SlowQueryThreshold + } else { + return errors.New("client config is null") + } + return log.InitLoggerWithConfig(logConfig) +} + type Client interface { // Insert a record by rowKey. Insert(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error) diff --git a/client/obbatch.go b/client/obbatch.go index 33213b6..3995d0a 100644 --- a/client/obbatch.go +++ b/client/obbatch.go @@ -309,7 +309,7 @@ func (b *obBatchExecutor) executeInternal(ctx context.Context) (BatchOperationRe defer wg.Done() err, partNeedRetry := b.partitionExecute(ctx, partOp, res) if err != nil { - log.Warn("failed to execute partition operations", log.String("partOp", partOp.String()), log.String("err", err.Error())) + log.Warn("Runtime", ctx.Value(log.ObkvTraceIdName), "failed to execute partition operations", log.String("partOp", partOp.String()), log.String("err", err.Error())) errArrLock.Lock() errArr = append(errArr, err) if needRetry == false && partNeedRetry { @@ -321,7 +321,7 @@ func (b *obBatchExecutor) executeInternal(ctx context.Context) (BatchOperationRe } wg.Wait() if len(errArr) != 0 { - log.Warn("error occur when execute partition operations") + log.Warn("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur when execute partition operations") return nil, errArr[0], needRetry } } else { diff --git a/client/obclient.go b/client/obclient.go index eb16bdd..ef9da19 100644 --- a/client/obclient.go +++ b/client/obclient.go @@ -19,6 +19,10 @@ package client import ( "context" + "fmt" + "github.com/oceanbase/obkv-table-client-go/log" + "golang.org/x/sys/unix" + "runtime" "strings" "time" @@ -271,6 +275,7 @@ func (c *obClient) Insert( rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error) { + log.InitTraceId(&ctx) operationOptions := c.getOperationOptions(opts...) if operationOptions.TableFilter == nil { res, err := c.executeWithRetry( @@ -305,6 +310,7 @@ func (c *obClient) Update( rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error) { + log.InitTraceId(&ctx) operationOptions := c.getOperationOptions(opts...) if operationOptions.TableFilter == nil { res, err := c.executeWithRetry( @@ -339,6 +345,7 @@ func (c *obClient) InsertOrUpdate( rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error) { + log.InitTraceId(&ctx) operationOptions := c.getOperationOptions(opts...) res, err := c.executeWithRetry( ctx, @@ -359,6 +366,7 @@ func (c *obClient) Replace( rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error) { + log.InitTraceId(&ctx) operationOptions := c.getOperationOptions(opts...) res, err := c.executeWithRetry( ctx, @@ -379,6 +387,7 @@ func (c *obClient) Increment( rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (SingleResult, error) { + log.InitTraceId(&ctx) operationOptions := c.getOperationOptions(opts...) if operationOptions.TableFilter == nil { res, err := c.executeWithRetry( @@ -413,6 +422,7 @@ func (c *obClient) Append( rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (SingleResult, error) { + log.InitTraceId(&ctx) operationOptions := c.getOperationOptions(opts...) if operationOptions.TableFilter == nil { res, err := c.executeWithRetry( @@ -446,6 +456,7 @@ func (c *obClient) Delete( tableName string, rowKey []*table.Column, opts ...option.ObOperationOption) (int64, error) { + log.InitTraceId(&ctx) operationOptions := c.getOperationOptions(opts...) if operationOptions.TableFilter == nil { res, err := c.executeWithRetry( @@ -480,6 +491,7 @@ func (c *obClient) Get( rowKey []*table.Column, getColumns []string, opts ...option.ObOperationOption) (SingleResult, error) { + log.InitTraceId(&ctx) var columns = make([]*table.Column, 0, len(getColumns)) for _, columnName := range getColumns { columns = append(columns, table.NewColumn(columnName, nil)) @@ -499,6 +511,7 @@ func (c *obClient) Get( } func (c *obClient) Query(ctx context.Context, tableName string, rangePairs []*table.RangePair, opts ...option.ObQueryOption) (QueryResultIterator, error) { + log.InitTraceId(&ctx) queryOpts := c.getObQueryOptions(opts...) queryExecutor := newObQueryExecutorWithParams(tableName, c) queryExecutor.addKeyRanges(rangePairs) @@ -533,6 +546,7 @@ func (c *obClient) Close() { if c.odpTable != nil { c.odpTable.Close() } + _ = log.Sync() } func (c *obClient) getOperationOptions(opts ...option.ObOperationOption) *option.ObOperationOptions { @@ -601,6 +615,8 @@ func (c *obClient) execute( // 1. Get table route tableParam, err := c.GetTableParam(ctx, tableName, rowKey) if err != nil { + log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", + log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String())) return nil, errors.WithMessagef(err, "get table param, tableName:%s, opType:%d", tableName, opType), needRetry } @@ -618,6 +634,8 @@ func (c *obClient) execute( c.GetRpcFlag(), ) if err != nil { + log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", + log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String())) return nil, errors.WithMessagef(err, "new operation request, tableName:%s, tableParam:%s, opType:%d", tableName, tableParam.String(), opType), needRetry } @@ -626,10 +644,14 @@ func (c *obClient) execute( result := protocol.NewObTableOperationResponse() err, needRetry = c.executeInternal(ctx, tableName, tableParam.Table(), request, result) if err != nil { + trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence()) + log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace)) return nil, err, needRetry } if oberror.ObErrorCode(result.Header().ErrorNo()) != oberror.ObSuccess { + trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence()) + log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace)) return nil, protocol.NewProtocolError( result.RemoteAddr().String(), oberror.ObErrorCode(result.Header().ErrorNo()), @@ -751,3 +773,17 @@ func (c *obClient) GetTableParam( return c.routeInfo.GetTableParam(ctx, tableName, rowKey) } + +func MonitorSlowQuery(executeTime int64, slowQueryThreshold int64, tableName string, clientTraceId any) { + if executeTime > slowQueryThreshold { + pId := unix.Getpid() + buf := make([]byte, 64) + n := runtime.Stack(buf, false) + id := buf[:n] + var goroutineID uint64 + fmt.Sscanf(string(id), "goroutine %d", &goroutineID) + log.Info("Monitor", clientTraceId, "SlowQuery", log.String("tableName", tableName), + log.Int64("executeTime", executeTime), log.Int64("slowQueryThreshold", slowQueryThreshold), + log.Int("pId", pId), log.Uint64("goroutineID", goroutineID)) + } +} diff --git a/client/query_result_iterator.go b/client/query_result_iterator.go index 68b942b..f34be72 100644 --- a/client/query_result_iterator.go +++ b/client/query_result_iterator.go @@ -19,6 +19,7 @@ package client import ( "context" + "github.com/oceanbase/obkv-table-client-go/log" "sync" "time" @@ -88,7 +89,7 @@ func (q *ObQueryResultIterator) Next() (QueryResult, error) { if err != nil { return nil, err } - + var startTime int64 = time.Now().UnixMilli() // lock q.lock.Lock() defer q.lock.Unlock() @@ -117,6 +118,9 @@ func (q *ObQueryResultIterator) Next() (QueryResult, error) { // get next row from next server err = q.fetchNextWithRetry(false) + endTime := time.Now().UnixMilli() + var duration int64 = endTime - startTime + MonitorSlowQuery(duration, log.SlowQueryThreshold, q.queryExecutor.tableName, q.ctx.Value(log.ObkvTraceIdName)) if err != nil { return nil, err } diff --git a/config/client_config.go b/config/client_config.go index 913a00b..b1d992e 100644 --- a/config/client_config.go +++ b/config/client_config.go @@ -49,6 +49,14 @@ type ClientConfig struct { // connection rebalance in ODP mode MaxConnectionAge time.Duration EnableSLBLoadBalance bool + + // log config + LogFileName string // log file dir + SingleFileMaxSize int // log file size(MB) + MaxBackupFileSize int // Maximum number of old files to keep + MaxAgeFileRem int // Maximum number of days to keep old files + Compress bool // Whether to compress/archive old files + SlowQueryThreshold int64 // Slow query threshold } func NewDefaultClientConfig() *ClientConfig { @@ -71,6 +79,12 @@ func NewDefaultClientConfig() *ClientConfig { EnableRerouting: true, MaxConnectionAge: time.Duration(0) * time.Second, // valid iff > 0 EnableSLBLoadBalance: false, + LogFileName: "./", + SingleFileMaxSize: 256, // MB + MaxBackupFileSize: 10, + MaxAgeFileRem: 30, + Compress: false, + SlowQueryThreshold: 40, // ms } } diff --git a/config/toml_config.go b/config/toml_config.go index 27c31d5..7604b17 100644 --- a/config/toml_config.go +++ b/config/toml_config.go @@ -35,6 +35,7 @@ type ClientConfiguration struct { RouteMetaDataConfig RouteMetaDataConfig RsListConfig RsListConfig ExtraConfig ExtraConfig + LogConfig LogConfig Mode string } @@ -88,6 +89,15 @@ type ExtraConfig struct { EnableSLBLoadBalance bool } +type LogConfig struct { + LogFileName string // log file dir + SingleFileMaxSize int // log file size(MB) + MaxBackupFileSize int // Maximum number of old files to keep + MaxAgeFileRem int // Maximum number of days to keep old files + Compress bool // Whether to compress/archive old files + SlowQueryThreshold int64 // Slow query threshold +} + func (c *ClientConfiguration) checkClientConfiguration() error { if c.Mode == "direct" { if c.DirectClientConfig.ConfigUrl == "" { @@ -109,6 +119,14 @@ func (c *ClientConfiguration) checkClientConfiguration() error { } else if c.OdpClientConfig.FullUserName == "" { return errors.New("full user name is empty") } + } else if c.Mode == "log" { + if c.LogConfig.LogFileName == "" { + return errors.New("should set log file name in toml config") + } else if c.LogConfig.SingleFileMaxSize == 0 { + return errors.New("single file maxSize is invalid") + } else if c.LogConfig.SlowQueryThreshold == 0 { + return errors.New("slow query threshold is invalid") + } } else { return errors.New("mode is invalid") } @@ -151,6 +169,12 @@ func (c *ClientConfiguration) GetClientConfig() *ClientConfig { EnableRerouting: c.ExtraConfig.EnableRerouting, MaxConnectionAge: time.Duration(c.ExtraConfig.MaxConnectionAge) * time.Millisecond, EnableSLBLoadBalance: c.ExtraConfig.EnableSLBLoadBalance, + LogFileName: c.LogConfig.LogFileName, + SingleFileMaxSize: c.LogConfig.SingleFileMaxSize, // MB + MaxBackupFileSize: c.LogConfig.MaxBackupFileSize, + MaxAgeFileRem: c.LogConfig.MaxAgeFileRem, + Compress: c.LogConfig.Compress, + SlowQueryThreshold: c.LogConfig.SlowQueryThreshold, // ms } } diff --git a/configurations/obkv-table-default.toml b/configurations/obkv-table-default.toml index b202eb0..df90734 100644 --- a/configurations/obkv-table-default.toml +++ b/configurations/obkv-table-default.toml @@ -1,5 +1,6 @@ # Client Mode - `direct` or `proxy` -mode = "proxy" +# log is not use toml config +mode = "log" ## Direct Mode [DirectClientConfig] @@ -50,3 +51,11 @@ LogLevel = "info" EnableRerouting = false MaxConnectionAge = 0 EnableSLBLoadBalance = false + +[LogConfig] +LogFileName = "./" +SingleFileMaxSize = 256 ## MB +MaxBackupFileSize = 10 ## 0 is not delete +MaxAgeFileRem = 30 ## 30 day +Compress = false ## default not +SlowQueryThreshold = 40 ## ms diff --git a/go.mod b/go.mod index 59dea65..d33156c 100644 --- a/go.mod +++ b/go.mod @@ -18,12 +18,19 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/color v1.16.0 // indirect github.com/frankban/quicktest v1.14.6 // indirect + github.com/google/uuid v1.5.0 // indirect + github.com/jessevdk/go-flags v1.5.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.15.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d0c7f4e..9439677 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA= github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -13,6 +15,10 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= +github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -21,6 +27,11 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= @@ -43,6 +54,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e h1:jV0Y58RWaOMT3i5foW2YoEKlaN6biewBtngFwAfEwQ0= +github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e/go.mod h1:uuMPbyv6WJykZcarrIuJiTjfSGC997/jnfHyyeeG2Jo= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= @@ -52,11 +65,16 @@ go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/log/logger.go b/log/logger.go index 0f0db69..85db0fd 100644 --- a/log/logger.go +++ b/log/logger.go @@ -18,20 +18,42 @@ package log import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/pkg/errors" + "golang.org/x/sys/unix" "io" "os" + "strings" "sync" "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "gopkg.in/natefinch/lumberjack.v2" ) +const ObkvTraceIdName = "_ObkvTraceIdName" + var ( globalMutex sync.Mutex - defaultGlobalLogger = NewLogger(os.Stderr, InfoLevel, AddCaller()) + defaultGlobalLogger *Logger + TraceId string + LogType = "Default" + SlowQueryThreshold int64 + BufferSize = 4096 ) +type LogConfig struct { + LogFileName string // log file dir + SingleFileMaxSize int // log file size(MB) + MaxBackupFileSize int // Maximum number of old files to keep + MaxAgeFileRem int // Maximum number of days to keep old files + Compress bool // Whether to compress/archive old files + SlowQueryThreshold int64 // Slow query threshold +} + type Level zapcore.Level type Logger struct { @@ -99,16 +121,57 @@ var ( Duration = zap.Duration Durationp = zap.Durationp Any = zap.Any - - Info = defaultGlobalLogger.Info - Warn = defaultGlobalLogger.Warn - Error = defaultGlobalLogger.Error - DPanic = defaultGlobalLogger.DPanic - Panic = defaultGlobalLogger.Panic - Fatal = defaultGlobalLogger.Fatal - Debug = defaultGlobalLogger.Debug ) +func resetLogInfo(logType string, traceId any) { + if len(logType) == 0 { + logType = "Default" + } else { + LogType = logType + } + if traceId != nil { + TraceId = traceId.(string) + } else { + TraceId = "" + } +} + +// Default +func Info(logType string, traceId any, msg string, fields ...Field) { + resetLogInfo(logType, traceId) + defaultGlobalLogger.Info(msg, fields...) +} + +func Error(logType string, traceId any, msg string, fields ...Field) { + resetLogInfo(logType, traceId) + defaultGlobalLogger.Error(msg, fields...) +} + +func Warn(logType string, traceId any, msg string, fields ...Field) { + resetLogInfo(logType, traceId) + defaultGlobalLogger.Warn(msg, fields...) +} + +func DPanic(logType string, traceId any, msg string, fields ...Field) { + resetLogInfo(logType, traceId) + defaultGlobalLogger.DPanic(msg, fields...) +} + +func Panic(logType string, traceId any, msg string, fields ...Field) { + resetLogInfo(logType, traceId) + defaultGlobalLogger.Panic(msg, fields...) +} + +func Fatal(logType string, traceId any, msg string, fields ...Field) { + resetLogInfo(logType, traceId) + defaultGlobalLogger.Fatal(msg, fields...) +} + +func Debug(logType string, traceId any, msg string, fields ...Field) { + resetLogInfo(logType, traceId) + defaultGlobalLogger.Debug(msg, fields...) +} + var ( AddCaller = zap.AddCaller WithCaller = zap.WithCaller @@ -122,6 +185,75 @@ func init() { globalMutex.Unlock() } +func InitLoggerWithConfig(cfg LogConfig) error { + err := checkLoggerConfigValidity(cfg) + if err != nil { + return err + } + initDefaultLogger(cfg) + return nil +} + +func InitTraceId(ctx *context.Context) { + if (*ctx).Value(ObkvTraceIdName) == nil { + pid := uint64(unix.Getpid()) + // uniqueId uint64, pid uint64 + uniqueId := uuid.New().String() + uniqueId = strings.ReplaceAll(uniqueId, "-", "") + traceId := fmt.Sprintf("Y%s-%x", uniqueId, pid) + *ctx = context.WithValue((*ctx), ObkvTraceIdName, traceId) + } +} + +func checkLoggerConfigValidity(cfg LogConfig) error { + if cfg.LogFileName == "" { + return errors.New("should set Log File Name in toml or client config") + } else if cfg.SingleFileMaxSize <= 0 { + return errors.New("Single File MaxSize is invalid") + } else if cfg.SlowQueryThreshold <= 0 { + return errors.New("Slow Query Threshold is invalid") + } else if cfg.MaxAgeFileRem < 0 { + return errors.New("Max Age File Remain is invalid") + } else if cfg.MaxBackupFileSize < 0 { + return errors.New("Max Backup File Size is invalid") + } + return nil +} + +func initDefaultLogger(cfg LogConfig) { + defFilePath := cfg.LogFileName + "obclient-table-go.log" + SlowQueryThreshold = cfg.SlowQueryThreshold + defLogWriter := getLogRotationWriter(defFilePath, cfg) + globalMutex.Lock() + defaultGlobalLogger = NewLogger(defLogWriter, InfoLevel, AddCaller()) + globalMutex.Unlock() +} + +// rotation +func getLogRotationWriter(filePath string, cfg LogConfig) zapcore.WriteSyncer { + asyncWrite := &zapcore.BufferedWriteSyncer{ + WS: zapcore.AddSync(&lumberjack.Logger{ + Filename: filePath, + MaxSize: cfg.SingleFileMaxSize, + MaxBackups: cfg.MaxBackupFileSize, + MaxAge: cfg.MaxAgeFileRem, + Compress: cfg.Compress, + }), + //Size specifies the maximum amount of data the writer will buffered before flushing. Defaults to 256 kB if unspecified. + Size: BufferSize, // async print buffer size + } + return asyncWrite +} + +// no rotation +func getLogWriter(filePath string) zapcore.WriteSyncer { + file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND, 0666) + if err != nil && os.IsNotExist(err) { + file, _ = os.Create(filePath) + } + return zapcore.AddSync(file) +} + type Field = zap.Field func (l *Logger) Debug(msg string, fields ...Field) { @@ -175,13 +307,10 @@ func NewLogger(writer io.Writer, level Level, opts ...Option) *Logger { if writer == nil { panic("the writer is nil") } - cfg := zap.NewProductionConfig() - cfg.EncoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { - enc.AppendString(t.Format("2006-01-02T15:04:05.000Z0700")) - } + var CustomEncoder = NewCustomEncoder() core := zapcore.NewCore( - zapcore.NewJSONEncoder(cfg.EncoderConfig), + zapcore.NewConsoleEncoder(CustomEncoder), zapcore.AddSync(writer), zapcore.Level(level), ) @@ -193,6 +322,30 @@ func NewLogger(writer io.Writer, level Level, opts ...Option) *Logger { return logger } +func NewCustomEncoder() zapcore.EncoderConfig { + // time + var customEncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { + enc.AppendString(t.Format("[" + "2006-01-02T15:04:05.000Z0700" + "]")) + } + // level + customLevelEncoder := func(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) { + enc.AppendString("[" + level.CapitalString() + "]") + enc.AppendString("[" + LogType + "]") + enc.AppendString("[" + TraceId + "]") + } + encoderConf := zapcore.EncoderConfig{ + TimeKey: "ts", + LevelKey: "level_name", + MessageKey: "msg", + LineEnding: zapcore.DefaultLineEnding, + EncodeTime: customEncodeTime, + EncodeLevel: customLevelEncoder, + EncodeDuration: zapcore.SecondsDurationEncoder, + EncodeName: zapcore.FullNameEncoder, + } + return encoderConf +} + func MatchStr2LogLevel(level string) Level { switch level { case "debug": diff --git a/log/logger_test.go b/log/logger_test.go index c332dcb..5fdcced 100644 --- a/log/logger_test.go +++ b/log/logger_test.go @@ -18,6 +18,8 @@ package log import ( + "context" + "fmt" "os" "testing" ) @@ -30,3 +32,48 @@ func TestLogger(t *testing.T) { myLogger.Warn("Warn msg", String("Warn", "Warn")) myLogger.Error("Error msg", String("Error", "Error")) } + +func TestFileLogger(t *testing.T) { + var defWriteSync = getLogWriter("obclient-table-go.log") + var Logger1 = NewLogger(defWriteSync, InfoLevel, AddCaller()) + ResetDefaultLogger(Logger1) + Logger1.Debug("Debug msg", String("Debug", "Debug")) + Logger1.Info("Info msg", String("Info", "Info")) + Logger1.Warn("Warn msg", String("Warn", "Warn")) + Logger1.Error("Error msg", String("Error", "Error")) + _ = Logger1.Sync() + _ = os.Remove("obclient-table-go.log") +} + +func TestFileRotationLogger(t *testing.T) { + var logc LogConfig + logc.MaxBackupFileSize = 20 + logc.Compress = false + logc.SingleFileMaxSize = 1 + logc.LogFileName = "./" + logc.MaxAgeFileRem = 30 + ctx := context.TODO() + var defWriteSync = getLogRotationWriter("obclient-table-go.log", logc) + var Logger1 = NewLogger(defWriteSync, InfoLevel, AddCaller()) + ResetDefaultLogger(Logger1) + Logger1.Debug("Debug msg", String("Debug", "first_Debug")) + Logger1.Info("Info msg", String("Info", "second_Info")) + Logger1.Warn("Warn msg", String("Warn", "thrid_Warn")) + Logger1.Error("Error msg", String("Error", "fourth_Error")) + + // test traceId + InitTraceId(&ctx) + Debug("Default", ctx.Value(ObkvTraceIdName).(string), "Debug msg", String("Debug", "first_Debug")) + Info("", ctx.Value(ObkvTraceIdName).(string), "Info msg", String("Info", "second_Info")) + Warn("Boot", ctx.Value(ObkvTraceIdName).(string), "Warn msg", String("Warn", "thrid_Warn")) + Error("", ctx.Value(ObkvTraceIdName).(string), "Error msg", String("Error", "fourth_Error")) + DPanic("routine", ctx.Value(ObkvTraceIdName).(string), "DPanic msg", String("DPanic", "fifth_DPanic")) + _ = Logger1.Sync() + _ = os.Remove("obclient-table-go.log") +} + +func TestFmtPrint(t *testing.T) { + intArr := []int{1, 2, 3, 4} + str := fmt.Sprintf("Y%X-%016X-%x-%x \n", intArr[0], intArr[1], intArr[2], intArr[3]) + fmt.Printf(str) +} diff --git a/obkvrpc/compressor.go b/obkvrpc/compressor.go index 2a81efe..872bab9 100644 --- a/obkvrpc/compressor.go +++ b/obkvrpc/compressor.go @@ -79,7 +79,7 @@ func (d *ZlibDecompressor) Decompress(src *bytes.Buffer, originLen int32) (*byte defer func(zlibReader io.ReadCloser) { err := zlibReader.Close() if err != nil { - log.Warn(fmt.Sprintf("fail to close the zlibReader, errMsg: %s", err.Error())) + log.Warn("BOOT", nil, fmt.Sprintf("fail to close the zlibReader, errMsg: %s", err.Error())) } }(zlibReader) diff --git a/obkvrpc/connection.go b/obkvrpc/connection.go index b57a11a..0be4548 100644 --- a/obkvrpc/connection.go +++ b/obkvrpc/connection.go @@ -242,6 +242,7 @@ func (c *Connection) Execute( c.mutex.Lock() delete(c.pending, seq) c.mutex.Unlock() + log.Error("BOOT", ctx.Value(log.ObkvTraceIdName), "wait send packet to channel", log.String("clientTrace", trace)) return nil, errors.WithMessage(ctx.Err(), "wait send packet to channel, trace: "+trace) } @@ -249,6 +250,7 @@ func (c *Connection) Execute( select { case call = <-call.signal: if call.err != nil { // transport failed + log.Error("BOOT", ctx.Value(log.ObkvTraceIdName), "receive packet", log.String("clientTrace", trace)) return nil, errors.WithMessage(call.err, "receive packet, trace: "+trace) } case <-ctx.Done(): @@ -256,6 +258,7 @@ func (c *Connection) Execute( c.mutex.Lock() delete(c.pending, seq) c.mutex.Unlock() + log.Error("BOOT", ctx.Value(log.ObkvTraceIdName), "wait send packet to channel", log.String("clientTrace", trace)) return nil, errors.WithMessage(ctx.Err(), "wait transport packet, trace: "+trace) } @@ -319,16 +322,16 @@ func (c *Connection) receivePacket() { } if strings.Contains(err.Error(), "use of closed network connection") { - log.Info("connection closed.", zap.Uint64("connection uniqueId", c.uniqueId)) + log.Info("Monitor", nil, "connection closed.", log.Uint64("connection uniqueId", c.uniqueId)) return } if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - log.Info("connection closed.", zap.Error(err), zap.Uint64("connection uniqueId", c.uniqueId)) + log.Info("Monitor", nil, "connection closed.", zap.Error(err), log.Uint64("connection uniqueId", c.uniqueId)) return } - log.Warn("connection closed.", zap.Error(err), zap.Uint64("connection uniqueId", c.uniqueId)) + log.Warn("Monitor", nil, "connection closed.", zap.Error(err), log.Uint64("connection uniqueId", c.uniqueId)) } func (c *Connection) sendPacket() { @@ -405,7 +408,7 @@ func (c *Connection) writerWrite(packet packet) { } func (c *Connection) Close() { - log.Info(fmt.Sprintf("close connection start, remote addr:%s", c.conn.RemoteAddr().String())) + log.Info("Monitor", nil, fmt.Sprintf("close connection start, remote addr:%s", c.conn.RemoteAddr().String())) c.active.Store(false) c.closeOnce.Do(func() { close(c.packetChannelClose) // close packet channel @@ -419,7 +422,7 @@ func (c *Connection) Close() { } c.mutex.Unlock() }) - log.Info(fmt.Sprintf("close connection success, remote addr:%s", c.conn.RemoteAddr().String())) + log.Info("Monitor", nil, fmt.Sprintf("close connection success, remote addr:%s", c.conn.RemoteAddr().String())) } func (c *Connection) encodePacket(seq uint32, request protocol.ObPayload) []byte { @@ -486,7 +489,7 @@ func (c *Connection) decodePacket(contentBuf []byte, response protocol.ObPayload return nil, err } contentBuffer = decompressBuffer - log.Debug(fmt.Sprintf("compressType: %s, compressLen: %d, originLen: %d\n", + log.Debug("Monitor", nil, fmt.Sprintf("compressType: %s, compressLen: %d, originLen: %d\n", convertCompressTypeToString(rpcHeader.CompressType()), contentLen, rpcHeader.OriginalLen())) } // decode rpc response code @@ -510,6 +513,8 @@ func (c *Connection) decodePacket(contentBuf []byte, response protocol.ObPayload if rpcResponseCode.Code().IsRefreshTableErrorCode() { response.SetFlag(response.Flag() | protocol.RpcBadRoutingFlag) } + trace := fmt.Sprintf("Y%X-%016X", rpcHeader.TraceId0(), rpcHeader.TraceId1()) + log.Error("Runtime", nil, "error occur in execute", log.String("observerTraceId", trace)) return moveResponse, protocol.NewProtocolError( c.RemoteAddr().String(), @@ -536,6 +541,6 @@ func (call *call) done() { case call.signal <- call: // ok default: - log.Warn("rpc: discarding call reply due to insufficient signal chan capacity") + log.Warn("Monitor", nil, "rpc: discarding call reply due to insufficient signal chan capacity") } } diff --git a/obkvrpc/connection_lifecycle_mgr.go b/obkvrpc/connection_lifecycle_mgr.go index cf25c27..d57fede 100644 --- a/obkvrpc/connection_lifecycle_mgr.go +++ b/obkvrpc/connection_lifecycle_mgr.go @@ -49,7 +49,7 @@ func NewConnectionLifeCycleMgr(connPool *ConnectionPool, maxConnectionAge time.D // check and reconnect timeout connections func (c *ConnectionLifeCycleMgr) run() { if c.connPool == nil { - log.Error("connection pool is null") + log.Error("Monitor", nil, "connection pool is null") return } @@ -63,9 +63,9 @@ func (c *ConnectionLifeCycleMgr) run() { } if len(expiredConnIds) > 0 { - log.Info(fmt.Sprintf("Find %d expired connections", len(expiredConnIds))) + log.Info("Monitor", nil, fmt.Sprintf("Find %d expired connections", len(expiredConnIds))) for idx, connIdx := range expiredConnIds { - log.Info(fmt.Sprintf("%d: ip=%s, port=%d", idx, c.connPool.connections[connIdx].option.ip, c.connPool.connections[connIdx].option.port)) + log.Info("Monitor", nil, fmt.Sprintf("%d: ip=%s, port=%d", idx, c.connPool.connections[connIdx].option.ip, c.connPool.connections[connIdx].option.port)) } } @@ -73,7 +73,7 @@ func (c *ConnectionLifeCycleMgr) run() { maxReconnIdx := int(math.Ceil(float64(len(expiredConnIds)) / 3)) if maxReconnIdx > 0 { c.lastExpireIdx = expiredConnIds[maxReconnIdx-1] - log.Info(fmt.Sprintf("Begin to refresh expired connections which idx less than %d", maxReconnIdx)) + log.Info("Monitor", nil, fmt.Sprintf("Begin to refresh expired connections which idx less than %d", maxReconnIdx)) } for i := 0; i < maxReconnIdx; i++ { // no one can get expired connection @@ -93,13 +93,13 @@ func (c *ConnectionLifeCycleMgr) run() { for j := 0; len(pool[idx].pending) > 0; j++ { time.Sleep(time.Duration(10) * time.Millisecond) if j > 0 && j%100 == 0 { - log.Info(fmt.Sprintf("Wait too long time for the connection to end,"+ + log.Info("Monitor", nil, fmt.Sprintf("Wait too long time for the connection to end,"+ "connection idx: %d, ip:%s, port:%d, current connection pending size: %d", idx, pool[idx].option.ip, pool[idx].option.port, len(pool[idx].pending))) } if j > 3000 { - log.Warn("Wait too much time for the connection to end, stop ConnectionLifeCycleMgr") + log.Warn("Monitor", nil, "Wait too much time for the connection to end, stop ConnectionLifeCycleMgr") return } } @@ -112,11 +112,11 @@ func (c *ConnectionLifeCycleMgr) run() { c.connPool.connections[expiredConnIds[i]].Close() _, err := c.connPool.RecreateConnection(ctx, expiredConnIds[i]) if err != nil { - log.Warn("reconnect failed", zap.Error(err)) + log.Warn("Monitor", nil, "reconnect failed", zap.Error(err)) return } } if maxReconnIdx > 0 { - log.Info(fmt.Sprintf("Finish to refresh expired connections which idx less than %d", maxReconnIdx)) + log.Info("Monitor", nil, fmt.Sprintf("Finish to refresh expired connections which idx less than %d", maxReconnIdx)) } } diff --git a/obkvrpc/connection_mgr.go b/obkvrpc/connection_mgr.go index e8650b8..c4ddbc9 100644 --- a/obkvrpc/connection_mgr.go +++ b/obkvrpc/connection_mgr.go @@ -58,14 +58,14 @@ func (c *ConnectionMgr) start() { select { case <-c.needStop: ticker.Stop() - log.Info("Stop ConnectionMgr") + log.Info("Monitor", nil, "Stop ConnectionMgr") return case <-ticker.C: c.run() } } }() - log.Info("start ConnectionMgr, " + c.String()) + log.Info("Monitor", nil, "start ConnectionMgr, "+c.String()) } func (c *ConnectionMgr) run() { diff --git a/obkvrpc/connection_pool.go b/obkvrpc/connection_pool.go index db8a8ce..7b35d02 100644 --- a/obkvrpc/connection_pool.go +++ b/obkvrpc/connection_pool.go @@ -117,7 +117,7 @@ func (p *ConnectionPool) GetConnection() (*Connection, int) { index = (index + i) % p.option.connPoolMaxConnSize break } else if i == p.option.connPoolMaxConnSize-1 { - log.Warn("All connections is expired, will pick a expired connection") + log.Warn("Monitor", nil, "All connections is expired, will pick a expired connection") } } @@ -181,7 +181,7 @@ func (p *ConnectionPool) CreateConnection(ctx context.Context) (*Connection, err if p.option.maxConnectionAge > 0 { connection.expireTime = time.Now().Add(p.option.maxConnectionAge) } - log.Info(fmt.Sprintf("connect success, remote addr:%s, expire time: %s", + log.Info("Monitor", nil, fmt.Sprintf("connect success, remote addr:%s, expire time: %s", connection.conn.RemoteAddr().String(), connection.expireTime.String())) return connection, nil } @@ -201,7 +201,7 @@ func (p *ConnectionPool) getNextConnAddress() (string, int) { port := p.option.port if p.connMgr != nil && p.connMgr.slbLoader != nil { ip = p.connMgr.slbLoader.getNextSLBAddress() - log.Info(fmt.Sprintf("Get a SLB address %s:%d", ip, port)) + log.Info("Monitor", nil, fmt.Sprintf("Get a SLB address %s:%d", ip, port)) } return ip, port diff --git a/obkvrpc/slb_loader.go b/obkvrpc/slb_loader.go index 7d74ea4..f5e4142 100644 --- a/obkvrpc/slb_loader.go +++ b/obkvrpc/slb_loader.go @@ -51,7 +51,7 @@ func (s *SLBLoader) refreshSLBList() (bool, error) { s.mutex.Lock() defer s.mutex.Unlock() if changed { - log.Info(fmt.Sprint("SLB address changed, before: ", s.slbAddress, ", after: ", slbAddress)) + log.Info("Monitor", nil, fmt.Sprint("SLB address changed, before: ", s.slbAddress, ", after: ", slbAddress)) s.slbAddress = slbAddress.List() } return changed, nil @@ -73,7 +73,7 @@ func (s *SLBLoader) getNextSLBAddress() string { func (s *SLBLoader) run() { changed, err := s.refreshSLBList() if err != nil { - log.Warn("reconnect failed", zap.Error(err)) + log.Warn("Monitor", nil, "reconnect failed", zap.Error(err)) return } if changed { diff --git a/route/config_server_info.go b/route/config_server_info.go index bdb4893..3c01d01 100644 --- a/route/config_server_info.go +++ b/route/config_server_info.go @@ -122,7 +122,7 @@ func getConfigServerResponseOrNull( for times = 0; times < retryTimes; times++ { httpResp, err = cli.Get(url) if err != nil { - log.Warn("failed to http get", log.String("url", url), log.Int("times", times)) + log.Warn("Monitor", nil, "failed to http get", log.String("url", url), log.Int("times", times)) time.Sleep(retryInternal) } else { break diff --git a/route/route_info.go b/route/route_info.go index 1fe166a..5b8eeb7 100644 --- a/route/route_info.go +++ b/route/route_info.go @@ -182,7 +182,7 @@ func (i *ObRouteInfo) FetchServerRoster(clusterName, tenantName string) error { replicaLocations := entry.TableLocation().ReplicaLocations() for _, replicaLoc := range replicaLocations { if !replicaLoc.SvrStatus().IsActive() { - log.Warn("server is not active", + log.Warn("Routine", nil, "server is not active", log.String("server info", replicaLoc.SvrStatus().String()), log.String("server addr", addr.String())) continue @@ -538,7 +538,7 @@ func (i *ObRouteInfo) Execute( } if needReroute { - log.Info(fmt.Sprintf("route, to:%s", moveRsp.ReplicaInfo().Server().String())) + log.Info("Routine", ctx.Value(log.ObkvTraceIdName), fmt.Sprintf("route, to:%s", moveRsp.ReplicaInfo().Server().String())) err = i.reroute(ctx, moveRsp, request, result) } @@ -592,10 +592,10 @@ func (i *ObRouteInfo) runCreateConnPoolTask() { addr := key.(tcpAddr) err := i.addTable(addr) if err != nil { - log.Warn("add table", log.String("server", addr.String())) + log.Warn("Routine", nil, "add table", log.String("server", addr.String())) } i.taskInfo.createConnPoolServers.Remove(key) - log.Info("[runCreateConnPoolTask] connection pool for server has been created", log.String("server", addr.String())) + log.Info("Routine", nil, "[runCreateConnPoolTask] connection pool for server has been created", log.String("server", addr.String())) }() } }) @@ -613,14 +613,14 @@ func (i *ObRouteInfo) runDropConnPoolTask() { addr := key.(tcpAddr) i.dropTable(addr) i.taskInfo.dropConnPoolServers.Remove(key) - log.Info("[runDropConnPoolTask] connection pool has been dropped", log.String("addr", addr.String())) + log.Info("Routine", nil, "[runDropConnPoolTask] connection pool has been dropped", log.String("addr", addr.String())) // 2. refresh table locations which contain the dropping server err := i.refreshTableLocations(&addr) if err != nil { - log.Warn("refresh table locations", log.String("server", addr.String())) + log.Warn("Routine", nil, "refresh table locations", log.String("server", addr.String())) } - log.Info("[runDropConnPoolTask] table contains server pool has been refreshed", log.String("server", addr.String())) + log.Info("Routine", nil, "[runDropConnPoolTask] table contains server pool has been refreshed", log.String("server", addr.String())) }() } }) @@ -633,7 +633,7 @@ func (i *ObRouteInfo) refreshTableEntry(tableName string) error { if ok { entry, ok := value.(*ObTableEntry) if ok && !entry.NeedRefresh() { // no need to refresh - log.Info("no need to refresh table", log.String("table", tableName)) + log.Info("Routine", nil, "no need to refresh table", log.String("table", tableName)) return nil } } @@ -645,7 +645,7 @@ func (i *ObRouteInfo) refreshTableEntry(tableName string) error { ctx, _ := context.WithTimeout(context.Background(), refreshTableTimeout) _, err := i.GetTableEntry(ctx, tableName) if err != nil { - log.Warn("get table entry", log.String("tableName", tableName)) + log.Warn("Routine", nil, "get table entry", log.String("tableName", tableName)) return err } @@ -660,10 +660,10 @@ func (i *ObRouteInfo) runRefreshTableTask() { if swapped { go func() { tableName := key.(string) - log.Info("[runRefreshTableTask] refresh table entry", log.String("table", tableName)) + log.Info("Routine", nil, "[runRefreshTableTask] refresh table entry", log.String("table", tableName)) err := i.refreshTableEntry(tableName) if err != nil { - log.Warn("refresh table entry", log.String("tableName", tableName)) + log.Warn("Routine", nil, "refresh table entry", log.String("tableName", tableName)) } i.taskInfo.tables.Remove(key) }() @@ -676,17 +676,17 @@ func (i *ObRouteInfo) runCheckRslistTask() { go func() { newRslist, err := i.configServerInfo.FetchRslist() if err != nil { - log.Warn("fetch rslist") + log.Warn("Routine", nil, "fetch rslist") } if !i.configServerInfo.rslist.Equal(newRslist) { missServers := i.configServerInfo.rslist.FindMissingElements(newRslist) - log.Info(fmt.Sprintf("[runCheckRslistTask] missServers size:%d", len(missServers))) + log.Info("Routine", nil, fmt.Sprintf("[runCheckRslistTask] missServers size:%d", len(missServers))) for _, server := range missServers { - log.Info(fmt.Sprintf("[runCheckRslistTask] missServer:%s", server.tcpAddr.String())) + log.Info("Routine", nil, fmt.Sprintf("[runCheckRslistTask] missServer:%s", server.tcpAddr.String())) t, ok := i.tableRoster.Get(server.tcpAddr) if ok { if t.IsDisconnected() { - log.Info(fmt.Sprintf("[runCheckRslistTask] connection pool need close, pool:%s", t.String())) + log.Info("Routine", nil, fmt.Sprintf("[runCheckRslistTask] connection pool need close, pool:%s", t.String())) isDropping := &atomic.Bool{} isDropping.Store(false) server := tcpAddr{t.ip, t.port} diff --git a/test/connection_balance/connection_balance_test.go b/test/connection_balance/connection_balance_test.go index 66f1887..7bde154 100644 --- a/test/connection_balance/connection_balance_test.go +++ b/test/connection_balance/connection_balance_test.go @@ -34,7 +34,7 @@ func run(i int, done chan bool, wg *sync.WaitGroup, t *testing.T) { for { select { case <-done: - log.Info(fmt.Sprintf("Finish %d worker, executeNum: %d", i, executeNum)) + log.Info("Runtime", nil, fmt.Sprintf("Finish %d worker, executeNum: %d", i, executeNum)) return default: rowKey := []*table.Column{table.NewColumn("c1", fmt.Sprintf("key%d", i))} diff --git a/test/logger/all_test.go b/test/logger/all_test.go new file mode 100644 index 0000000..651bbb5 --- /dev/null +++ b/test/logger/all_test.go @@ -0,0 +1,44 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package logger + +import ( + "github.com/oceanbase/obkv-table-client-go/client" + "github.com/oceanbase/obkv-table-client-go/test" + "os" + "testing" +) + +var cli client.Client + +func setup() { + cli = test.CreateClient() + test.CreateDB() +} + +func teardown() { + cli.Close() + test.CloseDB() +} + +func TestMain(m *testing.M) { + setup() + code := m.Run() + teardown() + os.Exit(code) +} diff --git a/test/logger/logger_file_test.go b/test/logger/logger_file_test.go new file mode 100644 index 0000000..60651a9 --- /dev/null +++ b/test/logger/logger_file_test.go @@ -0,0 +1,52 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase +* %% +* OBKV Table Client Framework is licensed under Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +* See the Mulan PSL v2 for more details. +* #L% +*/ + +package logger + +import ( + "context" + "github.com/oceanbase/obkv-table-client-go/config" + "github.com/oceanbase/obkv-table-client-go/log" + "os" + "testing" +) + +func TestInitTraceId(t *testing.T) { + file := "obclient-table-go.log" + filePath := "../../configurations/obkv-table-default.toml" + clientConfig, _ := config.GetClientConfigurationFromTOML(filePath) + var logcon log.LogConfig + logcon.LogFileName = clientConfig.LogConfig.LogFileName + logcon.MaxAgeFileRem = clientConfig.LogConfig.MaxAgeFileRem + logcon.MaxBackupFileSize = clientConfig.LogConfig.MaxBackupFileSize + logcon.SingleFileMaxSize = clientConfig.LogConfig.SingleFileMaxSize + logcon.Compress = clientConfig.LogConfig.Compress + logcon.SlowQueryThreshold = clientConfig.LogConfig.SlowQueryThreshold + ctx := context.TODO() + + log.InitLoggerWithConfig(logcon) + ctx = context.TODO() + log.InitTraceId(&ctx) + log.Info("Default", ctx.Value(log.ObkvTraceIdName), "Info msg", log.String("Info", "second_Info")) + for i := 0; i < 10; i++ { + log.Error("BOOT", ctx.Value(log.ObkvTraceIdName), "Error msg", log.String("Error", "fourth_Error")) + log.Debug("", ctx.Value(log.ObkvTraceIdName), "Debug msg", log.String("Debug", "first_Debug")) + log.Warn("Default", nil, "Warn msg", log.String("Warn", "third_Warn")) + } + _ = log.Sync() + _ = os.Remove(file) +}