Skip to content

Commit

Permalink
Merge pull request #138 from aiven/ivanyu-replication-flows
Browse files Browse the repository at this point in the history
Add support for MirrorMaker replication flows

#138
  • Loading branch information
Ormod authored Apr 20, 2020
2 parents dd8ca31 + 455e711 commit 9e59335
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 0 deletions.
67 changes: 67 additions & 0 deletions aiven/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,73 @@ def service__schema__subject_version__delete(self):
project_name = self.get_project()
self.client.delete_schema_subject_version(project_name, self.args.name, self.args.subject, self.args.version_id)

@arg.project
@arg.service_name
def mirrormaker__replication_flow__list(self):
"""List Kafka MirrorMaker replication flows"""
project_name = self.get_project()
self.print_response(self.client.list_mirrormaker_replication_flows(project_name, self.args.name))

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
@arg.json_path_or_string("replication_flow_config")
def mirrormaker__replication_flow__create(self):
"""Create a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.client.create_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
self.args.replication_flow_config,
)

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
@arg.json_path_or_string("replication_flow_config")
def mirrormaker__replication_flow__update(self):
"""Update a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.print_response(self.client.update_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
self.args.replication_flow_config,
))

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
def mirrormaker__replication_flow__get(self):
"""Get a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.print_response(self.client.get_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
))

@arg.project
@arg.service_name
@arg.source_cluster
@arg.target_cluster
def mirrormaker__replication_flow__delete(self):
"""Delete a Kafka MirrorMaker replication flow"""
project_name = self.get_project()
self.client.delete_mirrormaker_replication_flow(
project_name,
self.args.name,
self.args.source_cluster,
self.args.target_cluster,
)

@arg.project
@arg("service", nargs="+", help="Service to wait for")
@arg.timeout
Expand Down
3 changes: 3 additions & 0 deletions aiven/client/cliarg.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,6 @@ def wrapped(self):
help="Choose a compatibility level for the subject"
)
arg.schema = arg("--schema", required=True, help="Schema string quote escaped")

arg.source_cluster = arg("-s", "--source-cluster", required=True, help="Source cluster alias")
arg.target_cluster = arg("-t", "--target-cluster", required=True, help="Target cluster alias")
32 changes: 32 additions & 0 deletions aiven/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,38 @@ def delete_schema_subject_version(self, project, service, subject, version):
)
return self.verify(self.delete, path)

def list_mirrormaker_replication_flows(self, project, service):
path = self.build_path("project", project, "service", service, "mirrormaker", "replication-flows")
return self.verify(self.get, path, result_key="replication_flows")

def create_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster, config):
path = self.build_path("project", project, "service", service, "mirrormaker", "replication-flows")
body = {}
body.update(config)
body["source_cluster"] = source_cluster
body["target_cluster"] = target_cluster
return self.verify(self.post, path, body=body)

def update_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster, config):
path = self.build_path(
"project", project, "service", service, "mirrormaker", "replication-flows", source_cluster, target_cluster
)
body = {}
body.update(config)
return self.verify(self.put, path, body=body, result_key="replication_flow")

def get_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster):
path = self.build_path(
"project", project, "service", service, "mirrormaker", "replication-flows", source_cluster, target_cluster
)
return self.verify(self.get, path, result_key="replication_flow")

def delete_mirrormaker_replication_flow(self, project, service, source_cluster, target_cluster):
path = self.build_path(
"project", project, "service", service, "mirrormaker", "replication-flows", source_cluster, target_cluster
)
return self.verify(self.delete, path)

def list_project_vpcs(self, project):
return self.verify(self.get, self.build_path("project", project, "vpcs"))

Expand Down

0 comments on commit 9e59335

Please sign in to comment.