Skip to content

Commit

Permalink
hrpc: allow table creation with table attributes (#234)
Browse files Browse the repository at this point in the history
Closes #161
  • Loading branch information
dethi authored Sep 21, 2023
1 parent 6b4fbf6 commit f750f62
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 7 deletions.
28 changes: 23 additions & 5 deletions hrpc/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
type CreateTable struct {
base

families map[string]map[string]string
splitKeys [][]byte
attributes map[string]string
families map[string]map[string]string
splitKeys [][]byte
}

var defaultAttributes = map[string]string{
var defaultFamiliesAttributes = map[string]string{
"BLOOMFILTER": "ROW",
"VERSIONS": "3",
"IN_MEMORY": "false",
Expand Down Expand Up @@ -52,8 +53,8 @@ func NewCreateTable(ctx context.Context, table []byte,
option(ct)
}
for family, attrs := range families {
ct.families[family] = make(map[string]string, len(defaultAttributes))
for k, dv := range defaultAttributes {
ct.families[family] = make(map[string]string, len(defaultFamiliesAttributes))
for k, dv := range defaultFamiliesAttributes {
if v, ok := attrs[k]; ok {
ct.families[family][k] = v
} else {
Expand All @@ -71,6 +72,13 @@ func SplitKeys(sk [][]byte) func(*CreateTable) {
}
}

// TableAttributes will return an option that will set attributes on the created table
func TableAttributes(attrs map[string]string) func(*CreateTable) {
return func(ct *CreateTable) {
ct.attributes = attrs
}
}

// Name returns the name of this RPC call.
func (ct *CreateTable) Name() string {
return "CreateTable"
Expand All @@ -83,6 +91,14 @@ func (ct *CreateTable) Description() string {

// ToProto converts the RPC into a protobuf message
func (ct *CreateTable) ToProto() proto.Message {
pbAttributes := make([]*pb.BytesBytesPair, 0, len(ct.attributes))
for k, v := range ct.attributes {
pbAttributes = append(pbAttributes, &pb.BytesBytesPair{
First: []byte(k),
Second: []byte(v),
})
}

pbFamilies := make([]*pb.ColumnFamilySchema, 0, len(ct.families))
for family, attrs := range ct.families {
f := &pb.ColumnFamilySchema{
Expand All @@ -97,13 +113,15 @@ func (ct *CreateTable) ToProto() proto.Message {
}
pbFamilies = append(pbFamilies, f)
}

return &pb.CreateTableRequest{
TableSchema: &pb.TableSchema{
TableName: &pb.TableName{
// TODO: handle namespaces
Namespace: []byte("default"),
Qualifier: ct.table,
},
Attributes: pbAttributes,
ColumnFamilies: pbFamilies,
},
SplitKeys: ct.splitKeys,
Expand Down
52 changes: 50 additions & 2 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ var infoFamily = map[string][]string{
}

var cFamilies = map[string]map[string]string{
"cf": nil,
"cf2": nil,
"cf": nil,
"cf2": {
"MIN_VERSIONS": "1",
},
}

func TestCreateTable(t *testing.T) {
Expand Down Expand Up @@ -114,6 +116,52 @@ func TestCreatePresplitTable(t *testing.T) {
}
}

func TestCreateTableWithAttributes(t *testing.T) {
testTableName := t.Name() + "_" + getTimestampString()
t.Log("testTableName=" + testTableName)

attrs := map[string]string{
"NORMALIZATION_ENABLED": "TRUE",
}

ac := gohbase.NewAdminClient(*host)
crt := hrpc.NewCreateTable(
context.Background(),
[]byte(testTableName),
cFamilies,
hrpc.TableAttributes(attrs),
)

if err := ac.CreateTable(crt); err != nil {
t.Errorf("CreateTable returned an error: %v", err)
}

// check in hbase:meta if there's a region for the table
c := gohbase.NewClient(*host)
metaKey := testTableName + ","
keyFilter := filter.NewPrefixFilter([]byte(metaKey))
scan, err := hrpc.NewScanStr(context.Background(), metaTableName, hrpc.Filters(keyFilter))
if err != nil {
t.Fatalf("Failed to create Scan request: %s", err)
}

var rsp []*hrpc.Result
scanner := c.Scan(scan)
for {
res, err := scanner.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
rsp = append(rsp, res)
}
if len(rsp) != 1 {
t.Errorf("Meta returned %d rows for prefix '%s' , want 1", len(rsp), metaKey)
}
}

func TestDisableDeleteTable(t *testing.T) {
testTableName := "test1_" + getTimestampString()
t.Log("testTableName=" + testTableName)
Expand Down

0 comments on commit f750f62

Please sign in to comment.