Skip to content

Commit

Permalink
Support delete timeseries analytical store; Add KeepDefaultRetryStrat…
Browse files Browse the repository at this point in the history
…egyWhileUsingCustomizedRetryFunc for retry.
  • Loading branch information
yunjinlxp committed Sep 20, 2023
1 parent 93dbea2 commit aefad28
Show file tree
Hide file tree
Showing 9 changed files with 714 additions and 441 deletions.
34 changes: 34 additions & 0 deletions tablestore/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret st
tableStoreClient.accessKeyId = accessKeyId
tableStoreClient.accessKeySecret = accessKeySecret
tableStoreClient.securityToken = securityToken
tableStoreClient.KeepDefaultRetryStrategyWhileUsingCustomizedRetryFunc = true
provider := &common.DefaultCredentialsProvider{AccessKeyID: accessKeyId, AccessKeySecret: accessKeySecret, SecurityToken: securityToken}
tableStoreClient.credentialsProvider = provider
for _, option := range options {
Expand Down Expand Up @@ -177,6 +178,7 @@ func NewTimeseriesClientWithConfig(endPoint, instanceName, accessKeyId, accessKe
timeseriesClient.accessKeyId = accessKeyId
timeseriesClient.accessKeySecret = accessKeySecret
timeseriesClient.securityToken = securityToken
timeseriesClient.KeepDefaultRetryStrategyWhileUsingCustomizedRetryFunc = true
provider := &common.DefaultCredentialsProvider{AccessKeyID: accessKeyId, AccessKeySecret: accessKeySecret, SecurityToken: securityToken}
timeseriesClient.credentialsProvider = provider
for _, option := range options {
Expand Down Expand Up @@ -226,6 +228,8 @@ func NewClientWithExternalHeader(endPoint, instanceName, accessKeyId, accessKeyS
return tableStoreClient
}

type RetryNotify func(requestId string, err error, action string, backoffDuration time.Duration)

// 请求服务端
func (internalClient *internalClient) doRequestWithRetry(uri string, req, resp proto.Message, responseInfo *ResponseInfo) error {
end := time.Now().Add(internalClient.config.MaxRetryTime)
Expand Down Expand Up @@ -260,6 +264,10 @@ func (internalClient *internalClient) doRequestWithRetry(uri string, req, resp p
return err
}

if internalClient.RetryNotify != nil {
internalClient.RetryNotify(requestId, err, uri, time.Duration(value)*time.Millisecond)
}

time.Sleep(time.Duration(value) * time.Millisecond)
}
}
Expand Down Expand Up @@ -301,6 +309,10 @@ func (internalClient *internalClient) doBatchRequestWithRetry(uri string, req, r
return err
}

if internalClient.RetryNotify != nil {
internalClient.RetryNotify(requestId, err, uri, time.Duration(value)*time.Millisecond)
}

time.Sleep(time.Duration(value) * time.Millisecond)
} else {
if len(respBody) == 0 {
Expand All @@ -319,6 +331,11 @@ func (internalClient *internalClient) doBatchRequestWithRetry(uri string, req, r
if value <= 0 {
return nil
}

if internalClient.RetryNotify != nil {
internalClient.RetryNotify(requestId, err, uri, time.Duration(value)*time.Millisecond)
}

time.Sleep(time.Duration(value) * time.Millisecond)
}
}
Expand Down Expand Up @@ -578,6 +595,8 @@ func (internalClient *internalClient) shouldRetry(errorCode string, errorMsg str
if internalClient.CustomizedRetryFunc != nil {
if internalClient.CustomizedRetryFunc(errorCode, errorMsg, action, statusCode) == true {
return true
} else if !internalClient.KeepDefaultRetryStrategyWhileUsingCustomizedRetryFunc {
return false
}
}
return shouldRetryViaErrorAndAction(errorCode, errorMsg, action)
Expand Down Expand Up @@ -670,6 +689,20 @@ func (tableStoreClient *TableStoreClient) GetExternalHeader() map[string]string
return tableStoreClient.externalHeader
}

func (tableStoreClient *TableStoreClient) GetRetryNotify() RetryNotify {
if tableStoreClient.internalClient == nil {
return nil
}
return tableStoreClient.RetryNotify
}

func (tableStoreClient *TableStoreClient) SetRetryNotify(retryNotify RetryNotify) {
if tableStoreClient.internalClient == nil {
return
}
tableStoreClient.RetryNotify = retryNotify
}

// table API
// Create a table with the CreateTableRequest, in which the table name and
// primary keys are required.
Expand Down Expand Up @@ -1277,6 +1310,7 @@ func (timeseriesClient *TimeseriesClient) DeleteTimeseriesAnalyticalStore(reques
req := new(otsprotocol.DeleteTimeseriesAnalyticalStoreRequest)
req.TableName = proto.String(request.timeseriesTableName)
req.StoreName = proto.String(request.analyticalStoreName)
req.DropMappingTable = proto.Bool(request.dropMappingTable)

resp := new(otsprotocol.DeleteTimeseriesAnalyticalStoreResponse)
response := new(DeleteTimeseriesAnalyticalStoreResponse)
Expand Down
13 changes: 11 additions & 2 deletions tablestore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ type internalClient struct {
random *rand.Rand
mu *sync.Mutex

externalHeader map[string]string
CustomizedRetryFunc CustomizedRetryNotMatterActions
externalHeader map[string]string
CustomizedRetryFunc CustomizedRetryNotMatterActions
KeepDefaultRetryStrategyWhileUsingCustomizedRetryFunc bool

timeseriesConfiguration *TimeseriesConfiguration
credentialsProvider common.CredentialsProvider

RetryNotify RetryNotify
}

const initMapLen int = 8
Expand Down Expand Up @@ -929,6 +932,7 @@ type IndexMeta struct {
Primarykey []string
DefinedColumns []string
IndexType IndexType
IndexSyncPhase *SyncPhase
}

type DefinedColumnSchema struct {
Expand Down Expand Up @@ -2327,6 +2331,7 @@ type CreateTimeseriesAnalyticalStoreResponse struct {
type DeleteTimeseriesAnalyticalStoreRequest struct {
timeseriesTableName string
analyticalStoreName string
dropMappingTable bool
}

func NewDeleteTimeseriesAnalyticalStoreRequest(timeseriesTableName string, analyticalStoreName string) *DeleteTimeseriesAnalyticalStoreRequest {
Expand All @@ -2336,6 +2341,10 @@ func NewDeleteTimeseriesAnalyticalStoreRequest(timeseriesTableName string, analy
}
}

func (deleteTimeseriesAnalyticalStoreRequest *DeleteTimeseriesAnalyticalStoreRequest) SetDropMappingTable(dropMappingTable bool) {
deleteTimeseriesAnalyticalStoreRequest.dropMappingTable = dropMappingTable
}

type DeleteTimeseriesAnalyticalStoreResponse struct {
ResponseInfo
}
Expand Down
Loading

0 comments on commit aefad28

Please sign in to comment.