Skip to content

Commit

Permalink
Merge pull request #43 from owainhunt/pubsub-topic
Browse files Browse the repository at this point in the history
Optionally pass in project for PubSub calls
  • Loading branch information
Andrewangeta authored Feb 11, 2022
2 parents da674ff + b119c63 commit e36240e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
24 changes: 16 additions & 8 deletions PubSub/Sources/API/SubscriptionsAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ public protocol SubscriptionsAPI {
/// Gets the configuration details of a subscription.
///
/// - parameter `subscriptionId`: The name of the subscription to get
/// `subscriptionProject`: Name of the project that owns the subscription. If not provided, the default project will be used.
/// - returns: An instance of the `Subscription`
func get(subscriptionId: String) -> EventLoopFuture<GoogleCloudPubSubSubscription>
func get(subscriptionId: String, subscriptionProject: String?) -> EventLoopFuture<GoogleCloudPubSubSubscription>

/// Acknowledges the messages associated with the ackIds in the AcknowledgeRequest.
///
/// - parameters `subscriptionId`: ID of the subscription whose message is being acknowledged
/// `subscriptionProject`: Name of the project that owns the subscription. If not provided, the default project will be used.
/// `ackIds`: The acknowledgment ID for the messages being acknowledged that was returned by the Pub/Sub system in the subscriptions.pull response. Must not be empty.
func acknowledge(subscriptionId: String, ackIds: [String]) -> EventLoopFuture<EmptyResponse>
func acknowledge(subscriptionId: String, subscriptionProject: String?, ackIds: [String]) -> EventLoopFuture<EmptyResponse>

/// Creates a subscription to a given topic.
/// - parameter `subscriptionId`: The name of the subscription to be created.
/// `subscriptionProject`: Name of the project that owns the subscription. If not provided, the default project will be used.
/// `topicId`: The name of the topic from which this subscription is receiving messages.
/// `topicProject`: Name of the project that owns the topic. If not provided, the default project will be used.
/// `pushEndpoint`: A URL locating the endpoint to which messages should be pushed.
/// `pushConfigAttributes`: Endpoint configuration attributes that can be used to control different aspects of the message delivery.
/// `pushConfigOidcTokenServiceAccountEmail`:Service account email to be used for generating the OIDC token.
Expand All @@ -46,7 +50,9 @@ public protocol SubscriptionsAPI {
/// - returns: If successful, the response body contains a newly created instance of Subscription.
/// If the subscription already exists, returns ALREADY_EXISTS. If the corresponding topic doesn't exist, returns NOT_FOUND.
func create(subscriptionId: String,
subscriptionProject: String?,
topicId: String,
topicProject: String?,
pushEndpoint: String?,
pushConfigAttributes: [String: String]?,
pushConfigOidcTokenServiceAccountEmail: String?,
Expand Down Expand Up @@ -75,24 +81,26 @@ public final class GoogleCloudPubSubSubscriptionsAPI: SubscriptionsAPI {
self.endpoint = endpoint
}

public func get(subscriptionId: String) -> EventLoopFuture<GoogleCloudPubSubSubscription> {
return request.send(method: .GET, path: "\(endpoint)/v1/projects/\(request.project)/subscriptions/\(subscriptionId)")
public func get(subscriptionId: String, subscriptionProject: String? = nil) -> EventLoopFuture<GoogleCloudPubSubSubscription> {
return request.send(method: .GET, path: "\(endpoint)/v1/projects/\(subscriptionProject ?? request.project)/subscriptions/\(subscriptionId)")
}

public func acknowledge(subscriptionId: String, ackIds: [String]) -> EventLoopFuture<EmptyResponse> {
public func acknowledge(subscriptionId: String, subscriptionProject: String? = nil, ackIds: [String]) -> EventLoopFuture<EmptyResponse> {
do {
let acks = AcknowledgeRequest(ackIds: ackIds)
let body = try HTTPClient.Body.data(encoder.encode(acks))
return request.send(method: .POST,
path: "\(endpoint)/v1/projects/\(request.project)/subscriptions/\(subscriptionId):acknowledge",
path: "\(endpoint)/v1/projects/\(subscriptionProject ?? request.project)/subscriptions/\(subscriptionId):acknowledge",
body: body)
} catch {
return request.eventLoop.makeFailedFuture(error)
}
}

public func create(subscriptionId: String,
subscriptionProject: String? = nil,
topicId: String,
topicProject: String? = nil,
pushEndpoint: String?,
pushConfigAttributes: [String: String]?,
pushConfigOidcTokenServiceAccountEmail: String?,
Expand Down Expand Up @@ -140,7 +148,7 @@ public final class GoogleCloudPubSubSubscriptionsAPI: SubscriptionsAPI {
}

let subscription = GoogleCloudPubSubSubscription(name: subscriptionId,
topic: "projects/\(request.project)/topics/\(topicId)",
topic: "projects/\(topicProject ?? request.project)/topics/\(topicId)",
pushConfig: pushConfig,
ackDeadlineSeconds: ackDeadlineSeconds,
retainAckedMessages: retainAckedMessages,
Expand All @@ -154,7 +162,7 @@ public final class GoogleCloudPubSubSubscriptionsAPI: SubscriptionsAPI {
detached: detached)
let body = try HTTPClient.Body.data(encoder.encode(subscription))
return request.send(method: .PUT,
path: "\(endpoint)/v1/projects/\(request.project)/subscriptions/\(subscriptionId)",
path: "\(endpoint)/v1/projects/\(subscriptionProject ?? request.project)/subscriptions/\(subscriptionId)",
body: body)
} catch {
return request.eventLoop.makeFailedFuture(error)
Expand Down
29 changes: 16 additions & 13 deletions PubSub/Sources/API/TopicsAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,31 @@ public protocol TopicsAPI {
/// Gets the configuration of a topic.
///
/// - parameter `topicId`: Name of the topic
/// `topicProject`: Name of the project that owns the topic. If not provided, the default project will be used.
/// - returns: If successful, the response body contains an instance of `Topic`.
func get(topicId: String) -> EventLoopFuture<GoogleCloudPubSubTopic>
func get(topicId: String, topicProject: String?) -> EventLoopFuture<GoogleCloudPubSubTopic>

/// Lists matching topics.
///
/// - parameter `pageSize`: Maximum number of topics to return.
/// `pageToken`: The value returned by the last ListTopicsResponse; indicates that this is a
/// continuation of a prior topics.list call, and that the system should return the next page of data
/// `topicProject`: Name of the project that owns the topic. If not provided, the default project will be used.
/// - returns: Returns a list of topics and the `nextPageToken`
func list(pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubListTopicResponse>
func list(pageSize: Int?, pageToken: String?, topicProject: String?) -> EventLoopFuture<GooglePubSubListTopicResponse>

/// Adds one or more messages to the topic.
///
/// - parameter `topidId`: Name of the topic
/// - parameter `topicId`: Name of the topic
/// `topicProject`: Name of the project that owns the topic. If not provided, the default project will be used.
/// `data`: Data to be passed in the message
/// `attributes`: Attributes for this message
/// `orderingKey`: Identifies related messages for which publish order should be respected
/// - returns: Returns an array of `messageId`. `MessageId` is the server-assigned ID of each published message, in the same order as the messages in the request. IDs are guaranteed to be unique within the topic.
func publish(topicId: String, data: String, attributes: [String: String]?, orderingKey: String?) -> EventLoopFuture<GoogleCloudPublishResponse>
func publish(topicId: String, topicProject: String?, data: String, attributes: [String: String]?, orderingKey: String?) -> EventLoopFuture<GoogleCloudPublishResponse>

/// Lists the names of the attached subscriptions on this topic.
func getSubscriptionsList(topicId: String, pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubTopicSubscriptionListResponse>
func getSubscriptionsList(topicId: String, topicProject: String?, pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubTopicSubscriptionListResponse>
}

public final class GoogleCloudPubSubTopicsAPI: TopicsAPI {
Expand All @@ -42,27 +45,27 @@ public final class GoogleCloudPubSubTopicsAPI: TopicsAPI {
self.endpoint = endpoint
}

public func get(topicId: String) -> EventLoopFuture<GoogleCloudPubSubTopic> {
return request.send(method: .GET, path: "\(endpoint)/v1/projects/\(request.project)/topics/\(topicId)")
public func get(topicId: String, topicProject: String? = nil) -> EventLoopFuture<GoogleCloudPubSubTopic> {
return request.send(method: .GET, path: "\(endpoint)/v1/projects/\(topicProject ?? request.project)/topics/\(topicId)")
}

public func list(pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubListTopicResponse> {
public func list(pageSize: Int?, pageToken: String?, topicProject: String? = nil) -> EventLoopFuture<GooglePubSubListTopicResponse> {
var query = "pageSize=\(pageSize ?? 10)"
if let pageToken = pageToken {
query.append(contentsOf: "&pageToken=\(pageToken)")
}

return request.send(method: .GET,
path: "\(endpoint)/v1/projects/\(request.project)/topics",
path: "\(endpoint)/v1/projects/\(topicProject ?? request.project)/topics",
query: query)
}

public func publish(topicId: String, data: String, attributes: [String: String]?, orderingKey: String?) -> EventLoopFuture<GoogleCloudPublishResponse> {
public func publish(topicId: String, topicProject: String? = nil, data: String, attributes: [String: String]?, orderingKey: String?) -> EventLoopFuture<GoogleCloudPublishResponse> {
do {
let message = GoogleCloudPubSubMessage(data: data, attributes: attributes, orderingKey: orderingKey)
let publishRequest = GoogleCloudPublishRequest(messages: [message])
let body = try HTTPClient.Body.data(encoder.encode(publishRequest))
let path = "\(endpoint)/v1/projects/\(request.project)/topics/\(topicId):publish"
let path = "\(endpoint)/v1/projects/\(topicProject ?? request.project)/topics/\(topicId):publish"

print("<<<--- Publish on: \(path) --->")

Expand All @@ -74,14 +77,14 @@ public final class GoogleCloudPubSubTopicsAPI: TopicsAPI {
}
}

public func getSubscriptionsList(topicId: String, pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubTopicSubscriptionListResponse> {
public func getSubscriptionsList(topicId: String, topicProject: String? = nil, pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubTopicSubscriptionListResponse> {
var query = "pageSize=\(pageSize ?? 10)"
if let pageToken = pageToken {
query.append(contentsOf: "&pageToken=\(pageToken)")
}

return request.send(method: .GET,
path: "\(endpoint)/v1/projects/\(request.project)/topics/subscriptions",
path: "\(endpoint)/v1/projects/\(topicProject ?? request.project)/topics/subscriptions",
query: query)
}
}

0 comments on commit e36240e

Please sign in to comment.