Skip to content

Commit

Permalink
add kms encryption; support origin column; search/parallel_scan timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
lutaoact committed May 26, 2022
1 parent dcfa011 commit 81b52bc
Show file tree
Hide file tree
Showing 20 changed files with 2,169 additions and 1,358 deletions.
1 change: 1 addition & 0 deletions sample/SearchIndexOperation.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func MatchAllQuery(client *tablestore.TableStoreClient, tableName string, indexN
searchQuery.SetLimit(0)
searchQuery.SetGetTotalCount(true) // 设置GetTotalCount为true后才会返回总条数
searchRequest.SetSearchQuery(searchQuery)
searchRequest.SetTimeoutMs(30000) //可以显示设置请求超时时间
searchResponse, err := client.Search(searchRequest)
if err != nil {
fmt.Printf("%#v", err)
Expand Down
50 changes: 47 additions & 3 deletions tablestore/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,9 @@ func (tableStoreClient *TableStoreClient) DescribeTable(request *DescribeTableRe
EnableStream: *resp.StreamDetails.EnableStream,
StreamId: (*StreamId)(resp.StreamDetails.StreamId),
ExpirationTime: *resp.StreamDetails.ExpirationTime,
LastEnableTime: *resp.StreamDetails.LastEnableTime}
LastEnableTime: *resp.StreamDetails.LastEnableTime,
ColumnsToGet: resp.GetStreamDetails().GetColumnsToGet(),
}
} else {
response.StreamDetails = &StreamDetails{
EnableStream: false}
Expand Down Expand Up @@ -1077,7 +1079,9 @@ func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableReques
if request.StreamSpec.EnableStream == true {
req.StreamSpec = &otsprotocol.StreamSpecification{
EnableStream: &request.StreamSpec.EnableStream,
ExpirationTime: &request.StreamSpec.ExpirationTime}
ExpirationTime: &request.StreamSpec.ExpirationTime,
ColumnsToGet: request.StreamSpec.ColumnsToGet,
}
} else {
req.StreamSpec = &otsprotocol.StreamSpecification{EnableStream: &request.StreamSpec.EnableStream}
}
Expand All @@ -1103,7 +1107,9 @@ func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableReques
EnableStream: *resp.StreamDetails.EnableStream,
StreamId: (*StreamId)(resp.StreamDetails.StreamId),
ExpirationTime: *resp.StreamDetails.ExpirationTime,
LastEnableTime: *resp.StreamDetails.LastEnableTime}
LastEnableTime: *resp.StreamDetails.LastEnableTime,
ColumnsToGet: resp.GetStreamDetails().GetColumnsToGet(),
}
} else {
response.StreamDetails = &StreamDetails{
EnableStream: false}
Expand Down Expand Up @@ -1829,6 +1835,44 @@ func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*Ge
break
}
}

if pbRecord.GetOriginRecord() != nil {
originPlainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord.GetOriginRecord()))
if err != nil {
return nil, err
}
Assert(len(originPlainRows) == 1,
"There must be exactly one row in a StreamRecord.")
originPlainRow := originPlainRows[0]

record.OriginColumns = make([]*RecordColumn, len(originPlainRow.cells))
for i, plainCell := range originPlainRow.cells {
cell := RecordColumn{}
record.OriginColumns[i] = &cell

name := string(plainCell.cellName)
cell.Name = &name
if plainCell.cellValue != nil {
cell.Type = RCT_Put
} else {
if plainCell.cellTimestamp > 0 {
cell.Type = RCT_DeleteOneVersion
} else {
cell.Type = RCT_DeleteAllVersions
}
}
switch cell.Type {
case RCT_Put:
cell.Value = plainCell.cellValue.Value
fallthrough
case RCT_DeleteOneVersion:
cell.Timestamp = &plainCell.cellTimestamp
case RCT_DeleteAllVersions:
break
}
}
}

}
resp.Records = records
return &resp, nil
Expand Down
12 changes: 12 additions & 0 deletions tablestore/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,18 @@ func (s *TableStoreSuite) TestUpdateAndDescribeTable(c *C) {
updateTableReq.TableOption = new(TableOption)
updateTableReq.TableOption.TimeToAlive = -1
updateTableReq.TableOption.MaxVersion = 5
updateTableReq.StreamSpec = new(StreamSpecification)
updateTableReq.StreamSpec.EnableStream = true
updateTableReq.StreamSpec.ExpirationTime = 168
updateTableReq.StreamSpec.ColumnsToGet = []string{"col1", "col2"}

updateTableResp, error := client.UpdateTable(updateTableReq)
c.Assert(error, Equals, nil)
c.Assert(updateTableResp, NotNil)
c.Assert(updateTableResp.TableOption.TimeToAlive, Equals, updateTableReq.TableOption.TimeToAlive)
c.Assert(updateTableResp.TableOption.MaxVersion, Equals, updateTableReq.TableOption.MaxVersion)
c.Assert(updateTableResp.StreamDetails.EnableStream, Equals, updateTableReq.StreamSpec.EnableStream)
c.Assert(updateTableResp.StreamDetails.ExpirationTime, Equals, updateTableReq.StreamSpec.ExpirationTime)

describeTableReq := new(DescribeTableRequest)
describeTableReq.TableName = defaultTableName
Expand All @@ -342,6 +348,12 @@ func (s *TableStoreSuite) TestUpdateAndDescribeTable(c *C) {
c.Assert(describ, NotNil)
c.Assert(describ.TableOption.TimeToAlive, Equals, updateTableReq.TableOption.TimeToAlive)
c.Assert(describ.TableOption.MaxVersion, Equals, updateTableReq.TableOption.MaxVersion)
c.Assert(describ.StreamDetails.EnableStream, Equals, updateTableReq.StreamSpec.EnableStream)
c.Assert(describ.StreamDetails.ExpirationTime, Equals, updateTableReq.StreamSpec.ExpirationTime)
c.Assert(len(describ.StreamDetails.ColumnsToGet), Equals, len(updateTableReq.StreamSpec.ColumnsToGet))
for i, s := range describ.StreamDetails.ColumnsToGet {
c.Assert(s, Equals, updateTableReq.StreamSpec.ColumnsToGet[i])
}
fmt.Println("TestUpdateAndDescribeTable finished")
}

Expand Down
16 changes: 10 additions & 6 deletions tablestore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,13 +682,15 @@ type ListStreamResponse struct {
type StreamSpecification struct {
EnableStream bool
ExpirationTime int32 // must be positive. in hours
ColumnsToGet []string
}

type StreamDetails struct {
EnableStream bool
StreamId *StreamId // nil when stream is disabled.
ExpirationTime int32 // in hours
LastEnableTime int64 // the last time stream is enabled, in usec
ColumnsToGet []string
}

type DescribeStreamRequest struct {
Expand Down Expand Up @@ -773,19 +775,21 @@ type StreamShard struct {
}

type StreamRecord struct {
Type ActionType
Info *RecordSequenceInfo // required
PrimaryKey *PrimaryKey // required
Columns []*RecordColumn
Type ActionType
Info *RecordSequenceInfo // required
PrimaryKey *PrimaryKey // required
Columns []*RecordColumn
OriginColumns []*RecordColumn
}

func (this *StreamRecord) String() string {
return fmt.Sprintf(
"{\"Type\":%s, \"PrimaryKey\":%s, \"Info\":%s, \"Columns\":%s}",
"{\"Type\":%s, \"PrimaryKey\":%s, \"Info\":%s, \"Columns\":%s, \"OriginColumns\":%s}",
this.Type,
*this.PrimaryKey,
this.Info,
this.Columns)
this.Columns,
this.OriginColumns)
}

type ActionType int
Expand Down
Loading

0 comments on commit 81b52bc

Please sign in to comment.