Skip to content

Commit

Permalink
save log into file in go client
Browse files Browse the repository at this point in the history
  • Loading branch information
foronedream committed Jan 17, 2024
1 parent 250df4a commit fe9859e
Show file tree
Hide file tree
Showing 23 changed files with 484 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
.idea
.vscode
./logs
43 changes: 42 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package client

import (
"context"

"github.com/pkg/errors"

"github.com/oceanbase/obkv-table-client-go/client/option"
"github.com/oceanbase/obkv-table-client-go/config"
"github.com/oceanbase/obkv-table-client-go/log"
"github.com/oceanbase/obkv-table-client-go/table"
)

Expand All @@ -40,6 +40,11 @@ func NewClient(
sysUserName string,
sysPassWord string,
cliConfig *config.ClientConfig) (Client, error) {
// init log
err := InitLogProcess(nil)
if err != nil {
return nil, err
}
// 1. Check args
if configUrl == "" {
return nil, errors.New("config url is empty")
Expand Down Expand Up @@ -81,6 +86,12 @@ func NewOdpClient(
odpRpcPort int,
database string,
cliConfig *config.ClientConfig) (Client, error) {
// init log
err := InitLogProcess(nil)
if err != nil {
return nil, err
}

// 1. Check args
if fullUserName == "" {
return nil, errors.New("full user name is null")
Expand Down Expand Up @@ -111,6 +122,11 @@ func NewClientWithTomlConfig(configFilePath string) (Client, error) {
if err != nil {
return nil, errors.WithMessagef(err, "get client config from toml, configFilePath:%s", configFilePath)
}
// init log
err = InitLogProcess(clientConfig)
if err != nil {
return nil, err
}
switch clientConfig.Mode {
case "direct":
return NewClient(
Expand All @@ -135,6 +151,31 @@ func NewClientWithTomlConfig(configFilePath string) (Client, error) {
}
}

func InitLogProcess(c *config.ClientConfiguration) error {
var lc log.LogConfig
if c != nil {
lc.LogFile = c.LogConfig.LogFile
lc.MaxAge = c.LogConfig.MaxAge
lc.MaxBackups = c.LogConfig.MaxBackups
lc.MaxSize = c.LogConfig.SingleFileMaxValue
lc.Compress = c.LogConfig.Compress
lc.SlowQueryThreshold = c.LogConfig.SlowQueryThreshold
} else {
clientConfig, err := config.GetClientConfigurationFromTOML(log.TomlLogConfigPath)
if err != nil {
return errors.WithMessagef(err, "get client config from toml, configFilePath:%s", log.TomlLogConfigPath)
}
lc.LogFile = clientConfig.LogConfig.LogFile
lc.MaxAge = clientConfig.LogConfig.MaxAge
lc.MaxBackups = clientConfig.LogConfig.MaxBackups
lc.MaxSize = clientConfig.LogConfig.SingleFileMaxValue
lc.Compress = clientConfig.LogConfig.Compress
lc.SlowQueryThreshold = clientConfig.LogConfig.SlowQueryThreshold
}
log.InitLoggerWithFile(lc)
return nil
}

type Client interface {
// Insert a record by rowKey.
Insert(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error)
Expand Down
4 changes: 2 additions & 2 deletions client/obbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (b *obBatchExecutor) executeInternal(ctx context.Context) (BatchOperationRe
defer wg.Done()
err, partNeedRetry := b.partitionExecute(ctx, partOp, res)
if err != nil {
log.Warn("failed to execute partition operations", log.String("partOp", partOp.String()), log.String("err", err.Error()))
log.Warn("Runtime", ctx.Value(log.ObkvTraceId), "failed to execute partition operations", log.String("partOp", partOp.String()), log.String("err", err.Error()))
errArrLock.Lock()
errArr = append(errArr, err)
if needRetry == false && partNeedRetry {
Expand All @@ -321,7 +321,7 @@ func (b *obBatchExecutor) executeInternal(ctx context.Context) (BatchOperationRe
}
wg.Wait()
if len(errArr) != 0 {
log.Warn("error occur when execute partition operations")
log.Warn("Runtime", ctx.Value(log.ObkvTraceId), "error occur when execute partition operations")
return nil, errArr[0], needRetry
}
} else {
Expand Down
37 changes: 37 additions & 0 deletions client/obclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package client

import (
"context"
"fmt"
"github.com/oceanbase/obkv-table-client-go/log"
"go.uber.org/zap"
"golang.org/x/sys/unix"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -271,6 +276,7 @@ func (c *obClient) Insert(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -305,6 +311,7 @@ func (c *obClient) Update(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -339,6 +346,7 @@ func (c *obClient) InsertOrUpdate(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
res, err := c.executeWithRetry(
ctx,
Expand All @@ -359,6 +367,7 @@ func (c *obClient) Replace(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
res, err := c.executeWithRetry(
ctx,
Expand All @@ -379,6 +388,7 @@ func (c *obClient) Increment(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (SingleResult, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -413,6 +423,7 @@ func (c *obClient) Append(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (SingleResult, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -446,6 +457,7 @@ func (c *obClient) Delete(
tableName string,
rowKey []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -480,6 +492,7 @@ func (c *obClient) Get(
rowKey []*table.Column,
getColumns []string,
opts ...option.ObOperationOption) (SingleResult, error) {
log.InitTraceId(&ctx)
var columns = make([]*table.Column, 0, len(getColumns))
for _, columnName := range getColumns {
columns = append(columns, table.NewColumn(columnName, nil))
Expand All @@ -499,6 +512,7 @@ func (c *obClient) Get(
}

func (c *obClient) Query(ctx context.Context, tableName string, rangePairs []*table.RangePair, opts ...option.ObQueryOption) (QueryResultIterator, error) {
log.InitTraceId(&ctx)
queryOpts := c.getObQueryOptions(opts...)
queryExecutor := newObQueryExecutorWithParams(tableName, c)
queryExecutor.addKeyRanges(rangePairs)
Expand Down Expand Up @@ -533,6 +547,7 @@ func (c *obClient) Close() {
if c.odpTable != nil {
c.odpTable.Close()
}
_ = log.Sync()
}

func (c *obClient) getOperationOptions(opts ...option.ObOperationOption) *option.ObOperationOptions {
Expand Down Expand Up @@ -601,6 +616,8 @@ func (c *obClient) execute(
// 1. Get table route
tableParam, err := c.GetTableParam(ctx, tableName, rowKey)
if err != nil {
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute",
zap.Int64("opType", int64(opType)), zap.String("tableName", tableName), zap.String("tableParam", tableParam.String()))
return nil, errors.WithMessagef(err, "get table param, tableName:%s, opType:%d", tableName, opType), needRetry
}

Expand All @@ -618,6 +635,8 @@ func (c *obClient) execute(
c.GetRpcFlag(),
)
if err != nil {
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute",
zap.Int64("opType", int64(opType)), zap.String("tableName", tableName), zap.String("tableParam", tableParam.String()))
return nil, errors.WithMessagef(err, "new operation request, tableName:%s, tableParam:%s, opType:%d",
tableName, tableParam.String(), opType), needRetry
}
Expand All @@ -626,10 +645,14 @@ func (c *obClient) execute(
result := protocol.NewObTableOperationResponse()
err, needRetry = c.executeInternal(ctx, tableName, tableParam.Table(), request, result)
if err != nil {
trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence())
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute", zap.String("observerTraceId", trace))
return nil, err, needRetry
}

if oberror.ObErrorCode(result.Header().ErrorNo()) != oberror.ObSuccess {
trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence())
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute", zap.String("observerTraceId", trace))
return nil, protocol.NewProtocolError(
result.RemoteAddr().String(),
oberror.ObErrorCode(result.Header().ErrorNo()),
Expand Down Expand Up @@ -751,3 +774,17 @@ func (c *obClient) GetTableParam(

return c.routeInfo.GetTableParam(ctx, tableName, rowKey)
}

func MonitorSlowQuery(executeTime int64, slowQueryThreshold int64, tableName string, clientTraceId any) {
if executeTime > slowQueryThreshold {
pId := unix.Getpid()
buf := make([]byte, 64)
n := runtime.Stack(buf, false)
id := buf[:n]
var goroutineID uint64
fmt.Sscanf(string(id), "goroutine %d", &goroutineID)
log.Info("Monitor", clientTraceId, "SlowQuery", zap.String("tableName", tableName),
zap.Int64("executeTime", executeTime), zap.Int64("slowQueryThreshold", slowQueryThreshold),
zap.Int("pId", pId), zap.Uint64("goroutineID", goroutineID))
}
}
6 changes: 5 additions & 1 deletion client/query_result_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client

import (
"context"
"github.com/oceanbase/obkv-table-client-go/log"
"sync"
"time"

Expand Down Expand Up @@ -88,7 +89,7 @@ func (q *ObQueryResultIterator) Next() (QueryResult, error) {
if err != nil {
return nil, err
}

var startTime int64 = time.Now().UnixMilli()
// lock
q.lock.Lock()
defer q.lock.Unlock()
Expand Down Expand Up @@ -117,6 +118,9 @@ func (q *ObQueryResultIterator) Next() (QueryResult, error) {

// get next row from next server
err = q.fetchNextWithRetry(false)
endTime := time.Now().UnixMilli()
var duration int64 = endTime - startTime
MonitorSlowQuery(duration, log.SlowQueryThreshold, q.queryExecutor.tableName, q.ctx.Value(log.ObkvTraceId))
if err != nil {
return nil, err
}
Expand Down
12 changes: 12 additions & 0 deletions config/toml_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ClientConfiguration struct {
RouteMetaDataConfig RouteMetaDataConfig
RsListConfig RsListConfig
ExtraConfig ExtraConfig
LogConfig LogConfig
Mode string
}

Expand Down Expand Up @@ -88,6 +89,15 @@ type ExtraConfig struct {
EnableSLBLoadBalance bool
}

type LogConfig struct {
LogFile string
SingleFileMaxValue int
MaxBackups int
MaxAge int
Compress bool
SlowQueryThreshold int64
}

func (c *ClientConfiguration) checkClientConfiguration() error {
if c.Mode == "direct" {
if c.DirectClientConfig.ConfigUrl == "" {
Expand All @@ -109,6 +119,8 @@ func (c *ClientConfiguration) checkClientConfiguration() error {
} else if c.OdpClientConfig.FullUserName == "" {
return errors.New("full user name is empty")
}
} else if c.Mode == "log" {
// do nothing
} else {
return errors.New("mode is invalid")
}
Expand Down
11 changes: 10 additions & 1 deletion configurations/obkv-table-default.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Client Mode - `direct` or `proxy`
mode = "proxy"
# log is not use toml config
mode = "log"

## Direct Mode
[DirectClientConfig]
Expand Down Expand Up @@ -50,3 +51,11 @@ LogLevel = "info"
EnableRerouting = false
MaxConnectionAge = 0
EnableSLBLoadBalance = false

[LogConfig]
LogFile = "${Absolute_path}/obkv-table-client-go/logs/"
SingleFileMaxValue = 1 ## MB
MaxBackups = 10 ## 0 is not delete
MaxAge = 30 ## 30 day
Compress = false ## default not
SlowQueryThreshold = 40 ## ms
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/frankban/quicktest v1.14.6 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.15.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit fe9859e

Please sign in to comment.