Skip to content

Commit

Permalink
Add support for MirrorMaker replication flows
Browse files Browse the repository at this point in the history
This commit adds the ability to list, create, update, get, delete MirrorMaker replication flows.

Examples:

Create:
```bash
$ avn mirrormaker replication-flow create --project test mirrormaker --source-cluster s --target-cluster t '{"enabled": true, "topics": ["customer\\..*", "warehouse\\.operations"]}'
```

List:
```bash
$ avn mirrormaker replication-flow list --project test mirrormaker
[
    {
        "enabled": true,
        "source_cluster": "s",
        "target_cluster": "t",
        "topics": [
            "customer\\..*",
            "warehouse\\.operations"
        ]
    }
]
```

Get one:
```
avn mirrormaker replication-flow get --project test mirrormaker --source-cluster s --target-cluster t
{
    "enabled": true,
    "source_cluster": "s",
    "target_cluster": "t",
    "topics": [
        "customer\\..*",
        "warehouse\\.operations"
    ]
}
```

Edit:
```
avn mirrormaker replication-flow update --project test mirrormaker --source-cluster s --target-cluster t '{"enabled": false, "topics": ["customer\\..*", "warehouse\\.operations"]}'
```
  • Loading branch information
ivanyu committed Apr 9, 2020
1 parent dd8ca31 commit 455e711
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 0 deletions.
67 changes: 67 additions & 0 deletions aiven/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,73 @@ def service__schema__subject_version__delete(self):
project_name = self.get_project()
self.client.delete_schema_subject_version(project_name, self.args.name, self.args.subject, self.args.version_id)

@arg.project
@arg.service_name
def mirrormaker__replication_flow__list(self):
"""List Kafka MirrorMaker replication flows"""
project_name = self.get_project()
self.print_response(self.client.list_mirrormaker_replication_flows(project_name, self.args.name))

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
@arg.json_path_or_string("replication_flow_config")
def mirrormaker__replication_flow__create(self):
"""Create a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.client.create_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
self.args.replication_flow_config,
)

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
@arg.json_path_or_string("replication_flow_config")
def mirrormaker__replication_flow__update(self):
"""Update a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.print_response(self.client.update_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
self.args.replication_flow_config,
))

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
def mirrormaker__replication_flow__get(self):
"""Get a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.print_response(self.client.get_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
))

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
def mirrormaker__replication_flow__delete(self):
"""Delete a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.client.delete_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
)

@arg.project
@arg("service", nargs="+", help="Service to wait for")
@arg.timeout
Expand Down
3 changes: 3 additions & 0 deletions aiven/client/cliarg.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,6 @@ def wrapped(self):
help="Choose a compatibility level for the subject"
)
arg.schema = arg("--schema", required=True, help="Schema string quote escaped")

arg.source_cluster = arg("-s", "--source-cluster", required=True, help="Source cluster alias")
arg.target_cluster = arg("-t", "--target-cluster", required=True, help="Target cluster alias")
32 changes: 32 additions & 0 deletions aiven/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,38 @@ def delete_schema_subject_version(self, project, service, subject, version):
)
return self.verify(self.delete, path)

def list_mirrormaker_replication_flows(self, project, service):
path = self.build_path("project", project, "service", service, "mirrormaker", "replication-flows")
return self.verify(self.get, path, result_key="replication_flows")

def create_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster, config):
path = self.build_path("project", project, "service", service, "mirrormaker", "replication-flows")
body = {}
body.update(config)
body["source_cluster"] = source_cluster
body["target_cluster"] = target_cluster
return self.verify(self.post, path, body=body)

def update_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster, config):
path = self.build_path(
"project", project, "service", service, "mirrormaker", "replication-flows", source_cluster, target_cluster
)
body = {}
body.update(config)
return self.verify(self.put, path, body=body, result_key="replication_flow")

def get_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster):
path = self.build_path(
"project", project, "service", service, "mirrormaker", "replication-flows", source_cluster, target_cluster
)
return self.verify(self.get, path, result_key="replication_flow")

def delete_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster):
path = self.build_path(
"project", project, "service", service, "mirrormaker", "replication-flows", source_cluster, target_cluster
)
return self.verify(self.delete, path)

def list_project_vpcs(self, project):
return self.verify(self.get, self.build_path("project", project, "vpcs"))

Expand Down

0 comments on commit 455e711

Please sign in to comment.