Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature add authz persist api #122

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,15 @@ type Client interface {
// consumes a set of streams. Liftbridge handles assigning partitions to
// the group members and tracking the group's position in the streams.
CreateConsumer(groupID string, opts ...ConsumerOption) (*Consumer, error)

// AddPolicy adds an ACL policy to the cluster
AddPolicy(ctx context.Context, UserId, ResourceId, Action string) error

// RevokePolicy revokes an existing ACL policy from the cluster
RevokePolicy(ctx context.Context, UserId, ResourceId, Action string) error

// ListPolicy retrieves all existing ACL policies from the cluster
ListPolicy(ctx context.Context) (map[int32]*ACLPolicy, error)
}

// client implements the Client interface. It maintains a pool of connections
Expand Down Expand Up @@ -1578,6 +1587,70 @@ func (c *client) FetchCursor(ctx context.Context, id, stream string, partition i
return offset, err
}

// AddPolicy adds an ACL style authorization policy to the cluster.
// The client must either be `root` or have `AddPolicy` permission to execute this operation
func (c *client) AddPolicy(ctx context.Context, userId, resourceId, action string) error {
var (
req = &proto.AddPolicyRequest{Policy: &proto.ACLPolicy{
UserId: userId,
ResourceId: resourceId,
Action: action}}
)

err := c.doResilientRPC(ctx, func(client proto.APIClient) error {
_, err := client.AddPolicy(ctx, req)
if err != nil {
return err
}
return nil
})
return err
}

// RevokePolicy removes an existing authorization policy from the cluster.
// The client must either be `root` or have `RevokePolicy` permission to execute this operation
func (c *client) RevokePolicy(ctx context.Context, userId, resourceId, action string) error {
var (
req = &proto.RevokePolicyRequest{Policy: &proto.ACLPolicy{
UserId: userId,
ResourceId: resourceId,
Action: action}}
)

err := c.doResilientRPC(ctx, func(client proto.APIClient) error {
_, err := client.RevokePolicy(ctx, req)
if err != nil {
return err
}
return nil
})
return err
}

// ListPolicy lists all existing ACL authorization policies from the cluster.
// The client must either be `root` or have `ListPolicy` permission to execute this operation
func (c *client) ListPolicy(ctx context.Context) (map[int32]*ACLPolicy, error) {
var (
req = &proto.ListPolicyRequest{}
policies = make(map[int32]*ACLPolicy)
)

err := c.doResilientRPC(ctx, func(client proto.APIClient) error {
resp, err := client.ListPolicy(ctx, req)
if err != nil {
return err
}

for i, policy := range resp.Policies {
policies[int32(i)] = &ACLPolicy{
UserId: policy.UserId, ResourceId: policy.ResourceId, Action: policy.Action}
}

return nil
})
return policies, err
}

func (c *client) getCursorKey(cursorID, streamName string, partitionID int32) []byte {
return []byte(fmt.Sprintf("%s,%s,%d", cursorID, streamName, partitionID))
}
Expand Down
67 changes: 67 additions & 0 deletions v2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,73 @@ func TestConnectToServerBasedOnWorkLoad(t *testing.T) {
require.Len(t, metadata.Brokers(), 2)
}

func TestAddPolicy(t *testing.T) {
server := newMockServer()
defer server.Stop(t)
port := server.Start(t)

server.SetupMockFetchMetadataResponse(new(proto.FetchMetadataResponse))

server.SetupMockAddPolicyResponse(new(proto.AddPolicyResponse))

client, err := Connect([]string{fmt.Sprintf("localhost:%d", port)},
AckWaitTime(time.Nanosecond))
require.NoError(t, err)
defer client.Close()

err = client.AddPolicy(context.Background(), "a", "b", "read")
require.NoError(t, err)
}

func TestRevokePolicy(t *testing.T) {
server := newMockServer()
defer server.Stop(t)
port := server.Start(t)

server.SetupMockFetchMetadataResponse(new(proto.FetchMetadataResponse))

server.SetupMockRevokePolicyResponse(new(proto.RevokePolicyResponse))

client, err := Connect([]string{fmt.Sprintf("localhost:%d", port)},
AckWaitTime(time.Nanosecond))
require.NoError(t, err)
defer client.Close()

err = client.RevokePolicy(context.Background(), "a", "b", "read")
require.NoError(t, err)
}

func TestListPolicy(t *testing.T) {
server := newMockServer()
defer server.Stop(t)
port := server.Start(t)

mockPolicies := make(map[int32]*proto.ACLPolicy)
mockPolicies[0] = &proto.ACLPolicy{UserId: "a", ResourceId: "b", Action: "read"}
mockPolicies[1] = &proto.ACLPolicy{UserId: "c", ResourceId: "d", Action: "write"}

server.SetupMockFetchMetadataResponse(new(proto.FetchMetadataResponse))

server.SetupMockListPolicyResponse(&proto.ListPolicyResponse{Policies: mockPolicies})

client, err := Connect([]string{fmt.Sprintf("localhost:%d", port)},
AckWaitTime(time.Nanosecond))
require.NoError(t, err)
defer client.Close()

policies, err := client.ListPolicy(context.Background())
require.NoError(t, err)

expectedPolicies := make(map[int32]*ACLPolicy)
for i, policy := range mockPolicies {

expectedPolicies[int32(i)] = &ACLPolicy{
UserId: policy.UserId, ResourceId: policy.ResourceId, Action: policy.Action}
}

require.Equal(t, expectedPolicies, policies)
}

func ExampleConnect() {
addr := "localhost:9292"
client, err := Connect([]string{addr})
Expand Down
105 changes: 105 additions & 0 deletions v2/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ type mockAPI struct {
joinConsumerGroupRequests []*proto.JoinConsumerGroupRequest
leaveConsumerGroupRequests []*proto.LeaveConsumerGroupRequest
fetchConsumerGroupAssignmentsRequests []*proto.FetchConsumerGroupAssignmentsRequest
addPolicyRequests []*proto.AddPolicyRequest
revokePolicyRequests []*proto.RevokePolicyRequest
listPolicyRequests []*proto.ListPolicyRequest
responses map[string]interface{}
messages []*proto.Message
createStreamErr error
Expand All @@ -141,6 +144,9 @@ type mockAPI struct {
leaveConsumerGroupErr error
fetchConsumerGroupAssignmentsErr error
fetchPartitionMetadataErr error
addPolicyErr error
revokePolicyError error
listPolicyError error
// autclearError indicates where the mock API shall clear mock error automatically
autoClearError bool
// delayMetadataResponse indicates the FetchMetadata call shall be delayed for few seconds
Expand All @@ -164,6 +170,9 @@ func newMockAPI() *mockAPI {
joinConsumerGroupRequests: []*proto.JoinConsumerGroupRequest{},
leaveConsumerGroupRequests: []*proto.LeaveConsumerGroupRequest{},
fetchConsumerGroupAssignmentsRequests: []*proto.FetchConsumerGroupAssignmentsRequest{},
addPolicyRequests: []*proto.AddPolicyRequest{},
revokePolicyRequests: []*proto.RevokePolicyRequest{},
listPolicyRequests: []*proto.ListPolicyRequest{},
responses: make(map[string]interface{}),
autoClearError: false,
}
Expand Down Expand Up @@ -242,6 +251,18 @@ func (m *mockAPI) SetupMockReportConsumerGroupCoordinatorResponse(responses inte
m.responses["ReportConsumerGroupCoordinator"] = responses
}

func (m *mockAPI) SetupMockAddPolicyResponse(responses interface{}) {
m.responses["AddPolicy"] = responses
}

func (m *mockAPI) SetupMockRevokePolicyResponse(responses interface{}) {
m.responses["RevokePolicy"] = responses
}

func (m *mockAPI) SetupMockListPolicyResponse(responses interface{}) {
m.responses["ListPolicy"] = responses
}

func (m *mockAPI) SetupMockCreateStreamError(err error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -325,6 +346,24 @@ func (m *mockAPI) SetupMockFetchCursorError(err error) {
m.fetchCursorErr = err
}

func (m *mockAPI) SetupMockAddPolicyError(err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.addPolicyErr = err
}

func (m *mockAPI) SetupMockRevokePolicyError(err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.revokePolicyError = err
}

func (m *mockAPI) SetupMockListPolicyError(err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.listPolicyError = err
}

func (m *mockAPI) GetCreateStreamRequests() []*proto.CreateStreamRequest {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -397,6 +436,24 @@ func (m *mockAPI) GetLeaveConsumerGroupRequests() []*proto.LeaveConsumerGroupReq
return m.leaveConsumerGroupRequests
}

func (m *mockAPI) GetAddPolicyRequests() []*proto.AddPolicyRequest {
m.mu.Lock()
defer m.mu.Unlock()
return m.addPolicyRequests
}

func (m *mockAPI) GetRevokePolicyRequests() []*proto.RevokePolicyRequest {
m.mu.Lock()
defer m.mu.Unlock()
return m.revokePolicyRequests
}

func (m *mockAPI) GetLisPolicyRequests() []*proto.ListPolicyRequest {
m.mu.Lock()
defer m.mu.Unlock()
return m.listPolicyRequests
}

func (m *mockAPI) CreateStream(ctx context.Context, in *proto.CreateStreamRequest) (*proto.CreateStreamResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -666,3 +723,51 @@ func (m *mockAPI) ReportConsumerGroupCoordinator(ctx context.Context,

return nil, errors.New("todo")
}

func (m *mockAPI) AddPolicy(ctx context.Context, in *proto.AddPolicyRequest) (*proto.AddPolicyResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()

m.addPolicyRequests = append(m.addPolicyRequests, in)
if m.addPolicyErr != nil {
err := m.addPolicyErr
if m.autoClearError {
m.addPolicyErr = nil
}
return nil, err
}
resp := m.responses["AddPolicy"]
return resp.(*proto.AddPolicyResponse), nil
}

func (m *mockAPI) RevokePolicy(ctx context.Context, in *proto.RevokePolicyRequest) (*proto.RevokePolicyResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()

m.revokePolicyRequests = append(m.revokePolicyRequests, in)
if m.revokePolicyError != nil {
err := m.revokePolicyError
if m.autoClearError {
m.revokePolicyError = nil
}
return nil, err
}
resp := m.responses["RevokePolicy"]
return resp.(*proto.RevokePolicyResponse), nil
}

func (m *mockAPI) ListPolicy(ctx context.Context, in *proto.ListPolicyRequest) (*proto.ListPolicyResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()

m.listPolicyRequests = append(m.listPolicyRequests, in)
if m.listPolicyError != nil {
err := m.listPolicyError
if m.autoClearError {
m.listPolicyError = nil
}
return nil, err
}
resp := m.responses["ListPolicy"]
return resp.(*proto.ListPolicyResponse), nil
}
6 changes: 6 additions & 0 deletions v2/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,9 @@ func (m *metadataCache) get() *Metadata {
defer m.mu.RUnlock()
return m.metadata
}

type ACLPolicy struct {
UserId string
ResourceId string
Action string
}