diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d15ab95 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 SRCE, GRNET, CNRS + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 3f6fa25..5ec07ea 100644 --- a/README.md +++ b/README.md @@ -12,22 +12,24 @@ You may find more information about [the ARGO Messaging Service documentation](h ## Library installation -You may find and download the ARGO Messaging Library from ARGO Repository. +You may find and download the ARGO Messaging Library from ARGO Repository as well as from PyPI. If you want the devel instance so as to test all new features - - http://rpm-repo.argo.grnet.gr/ARGO/devel/centos6/argo-ams-library-*.*.* - - ``` - The current first realese is - http://rpm-repo.argo.grnet.gr/ARGO/devel/centos6/argo-ams-library-0.1.0-20170301161111.55fe753.el6.noarch.rpm - ``` +http://rpm-repo.argo.grnet.gr/ARGO/devel/centos6/ +http://rpm-repo.argo.grnet.gr/ARGO/devel/centos7/ + If you want the stable instance you may download it from here + http://rpm-repo.argo.grnet.gr/ARGO/prod/centos6/ +http://rpm-repo.argo.grnet.gr/ARGO/prod/centos7/ + +PyPI package is available here: + +https://pypi.org/project/argo-ams-library/ ## Authentication -The ams library uses a valid ams token to execute requests against the ams cluster. +The AMS library uses a valid AMS token to execute requests against the AMS cluster. This token can be provided with 2 ways: - Obtain a valid ams token and then use it when initializing the ams object. @@ -46,36 +48,36 @@ The library will use the provided certificate to access the corresponding ams to ## Examples In the folder examples, you may find examples of using the library: + +- for publishing messages (`examples/publish.py`) +- for consuming messages in pull mode (`examples/consume-pull.py`) + +### Publish messages + +This example explains how to publish messages in a topic with the use of the library. Topics are resources that can hold messages. Publishers (users/systems) can create topics on demand and name them (Usually with names that make sense and express the class of messages delivered in the topic). A topic name must be scoped to a project. - - for publishing messages (examples/publish.py) - - for consuming messages in pull mode (examples/consume-pull.py) - - ### Publish messages - - This example explains how to publish messages in a topic with the use of the library. Topics are resources that can hold messages. Publishers (users/systems) can create topics on demand and name them (Usually with names that make sense and express the class of messages delivered in the topic). A topic name must be scoped to a project. - - You may find more information about [Topics in the ARGO Messaging Service documentation](http://argoeu.github.io/messaging/v1/api_topics/) +You may find more information about [Topics in the ARGO Messaging Service documentation](http://argoeu.github.io/messaging/v1/api_topics/) - ``` - publish.py --host=[the FQDN of AMS Service] - --token=[the user token] - --project=[the name of your project registered in AMS Service] - --topic=[the topic to publish your messages] - ``` +``` +publish.py --host=[the FQDN of AMS Service] +--token=[the user token] +--project=[the name of your project registered in AMS Service] +--topic=[the topic to publish your messages] +``` - ### Consume messages in pull mode +### Consume messages in pull mode - This example explains how to consume messages from a predefined subscription with the use of the library. A subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. A subscription name must be scoped to a project. In pull delivery, your subscriber application initiates requests to the Pub/Sub server to retrieve messages. When you create a subscription, the system establishes a sync point. That is, your subscriber is guaranteed to receive any message published after this point. Messages published before the sync point may not be delivered. +This example explains how to consume messages from a predefined subscription with the use of the library. A subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. A subscription name must be scoped to a project. In pull delivery, your subscriber application initiates requests to the Pub/Sub server to retrieve messages. When you create a subscription, the system establishes a sync point. That is, your subscriber is guaranteed to receive any message published after this point. Messages published before the sync point may not be delivered. - You may find more information about [Subscriptions in the ARGO Messaging Service documentation](http://argoeu.github.io/messaging/v1/api_subs/) +You may find more information about [Subscriptions in the ARGO Messaging Service documentation](http://argoeu.github.io/messaging/v1/api_subs/) - ``` - consume-pull.py --host=[the FQDN of AMS Service] - --token=[the user token] - --project=[the name of your project registered in AMS Service] - --topic=[the topic from where the messages are delivered ] - --subscription=[the subscription name to pull the messages] - --nummsgs=[the num of messages to consume] - - ``` +``` +consume-pull.py --host=[the FQDN of AMS Service] +--token=[the user token] +--project=[the name of your project registered in AMS Service] +--topic=[the topic from where the messages are delivered ] +--subscription=[the subscription name to pull the messages] +--nummsgs=[the num of messages to consume] + +``` diff --git a/argo-ams-library.spec b/argo-ams-library.spec index cfb2ae3..d6e2b34 100644 --- a/argo-ams-library.spec +++ b/argo-ams-library.spec @@ -6,7 +6,7 @@ Name: argo-ams-library Summary: %{sum} -Version: 0.4.3 +Version: 0.5.0 Release: 1%{?dist} Group: Development/Libraries @@ -91,6 +91,8 @@ rm -rf %{buildroot} %changelog +* Wed Dec 4 2019 Daniel Vrcic - 0.5.0-1%{?dist} +- ARGO-1481 Connection retry logic in ams-library * Fri Nov 8 2019 Daniel Vrcic , agelostsal - 0.4.3-1%{?dist} - ARGO-1990 Fix runtime dependencies - ARGO-1862 Make argo-ams-library Python 3 ready diff --git a/pymod/__init__.py b/pymod/__init__.py index 801120f..37debdb 100644 --- a/pymod/__init__.py +++ b/pymod/__init__.py @@ -8,7 +8,9 @@ def emit(self, record): logging.getLogger(__name__).addHandler(NullHandler()) from .ams import ArgoMessagingService -from .amsexceptions import (AmsServiceException, AmsConnectionException, AmsMessageException, AmsException) +from .amsexceptions import (AmsServiceException, AmsBalancerException, + AmsConnectionException, AmsTimeoutException, + AmsMessageException, AmsException) from .amsmsg import AmsMessage from .amstopic import AmsTopic from .amssubscription import AmsSubscription diff --git a/pymod/ams.py b/pymod/ams.py index 1fb6a10..3952ef0 100644 --- a/pymod/ams.py +++ b/pymod/ams.py @@ -1,8 +1,15 @@ import json +import logging +import logging.handlers import requests +import socket import sys import datetime -from .amsexceptions import AmsServiceException, AmsConnectionException, AmsMessageException, AmsException +import time + +from .amsexceptions import (AmsServiceException, AmsConnectionException, + AmsMessageException, AmsException, + AmsTimeoutException, AmsBalancerException) from .amsmsg import AmsMessage from .amstopic import AmsTopic from .amssubscription import AmsSubscription @@ -12,14 +19,18 @@ except: from ordereddict import OrderedDict +log = logging.getLogger(__name__) + class AmsHttpRequests(object): + """Class encapsulates methods used by ArgoMessagingService. + + Each method represent HTTP request made to AMS with the help of requests + library. service error handling is implemented according to HTTP + status codes returned by service and the balancer. """ - Class encapsulates methods used by ArgoMessagingService. Each method represent - HTTP request made to AMS with the help of requests library. Proper service error - handling is implemented according to HTTP status codes returned by service. - """ - def __init__(self): + def __init__(self, endpoint): + self.endpoint = endpoint # Create route list self.routes = {"topic_list": ["get", "https://{0}/v1/projects/{2}/topics?key={1}"], "topic_get": ["get", "https://{0}/v1/projects/{2}/topics/{3}?key={1}"], @@ -45,25 +56,131 @@ def __init__(self): # HTTP error status codes returned by AMS according to: # http://argoeu.github.io/messaging/v1/api_errors/ - self.errors_route = {"topic_create": ["put", set([409, 401, 403])], - "topic_list": ["get", set([400, 401, 403, 404])], - "sub_create": ["put", set([400, 409, 408, 401, 403])], - "sub_ack": ["post", set([408, 400, 401, 403, 404])], - "topic_get": ["get", set([404, 401, 403])], - "topic_modifyacl": ["post", set([400, 401, 403, 404])], - "sub_get": ["get", set([404, 401, 403])], - "topic_publish": ["post", set([413, 401, 403])], - "sub_pushconfig": ["post", set([400, 401, 403, 404])], - "auth_x509": ["post", set([400, 401, 403, 404])], - "sub_pull": ["post", set([400, 401, 403, 404])], - "sub_timeToOffset": ["get", set([400, 401, 403, 404, 409])] - } + self.ams_errors_route = {"topic_create": ["put", set([409, 401, 403])], + "topic_list": ["get", set([400, 401, 403, + 404])], + "topic_delete": ["delete", set([401, 403, + 404])], + "sub_create": ["put", set([400, 409, 408, 401, + 403])], + "sub_ack": ["post", set([408, 400, 401, 403, + 404])], + "topic_get": ["get", set([404, 401, 403])], + "topic_modifyacl": ["post", set([400, 401, + 403, 404])], + "sub_get": ["get", set([404, 401, 403])], + "topic_publish": ["post", set([413, 401, + 403])], + "sub_pushconfig": ["post", set([400, 401, 403, + 404])], + "auth_x509": ["post", set([400, 401, 403, + 404])], + "sub_pull": ["post", set([400, 401, 403, + 404])], + "sub_timeToOffset": ["get", set([400, 401, + 403, 404, + 409])]} + # https://cbonte.github.io/haproxy-dconv/1.8/configuration.html#1.3 + self.balancer_errors_route = {"sub_ack": ["post", set([500, 502, 503, 504])], + "sub_pull": ["post", set([500, 502, 503, 504])], + "topic_publish": ["post", set([500, 502, 503, 504])]} + + def _error_dict(self, response_content, status): + error_dict = dict() + + try: + if (response_content and sys.version_info < (3, 6, ) and + isinstance(response_content, bytes)): + response_content = response_content.decode() + error_dict = json.loads(response_content) if response_content else {} + except ValueError: + error_dict = {'error': {'code': status, 'message': response_content}} + + return error_dict + + def _gen_backoff_time(self, try_number, backoff_factor): + for i in range(0, try_number): + value = backoff_factor * (2 ** (i - 1)) + yield value + + def _retry_make_request(self, url, body=None, route_name=None, retry=0, + retrysleep=60, retrybackoff=None, **reqkwargs): + """Wrapper around _make_request() that decides whether should request + be retried or not. + + Two request retry modes are available: + 1) static sleep - fixed amount of seconds to sleep between + request attempts + 2) backoff - each next sleep before request attempt is + exponentially longer + + If enabled, request will be retried in the following occassions: + * timeouts from AMS (HTTP 408) or load balancer (HTTP 408 and 504) + * load balancer HTTP 502, 503 + * connection related problems in the lower network layers + + Default behaviour is no retry attempts. If both, retry and + retrybackoff are enabled, retrybackoff will take precedence. + + Args: + url: str. The final messaging service endpoint + body: dict. Payload of the request + route_name: str. The name of the route to follow selected from the route list + retry: int. Number of request retries before giving up. Default + is 0 meaning no further request retry will be made + after first unsuccesfull request. + retrysleep: int. Static number of seconds to sleep before next + request attempt + retrybackoff: int. Backoff factor to apply between each request + attempts + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. + """ + i = 1 + timeout = reqkwargs.get('timeout', 0) + + saved_exp = None + if retrybackoff: + try: + return self._make_request(url, body, route_name, **reqkwargs) + except (AmsBalancerException, AmsConnectionException, + AmsTimeoutException) as e: + for sleep_secs in self._gen_backoff_time(retry, retrybackoff): + try: + return self._make_request(url, body, route_name, **reqkwargs) + except (AmsBalancerException, AmsConnectionException, + AmsTimeoutException) as e: + saved_exp = e + time.sleep(sleep_secs) + if timeout: + log.warning('Backoff retry #{0} after {1} seconds, connection timeout set to {2} seconds - {3}: {4}'.format(i, sleep_secs, timeout, self.endpoint, e)) + else: + log.warning('Backoff retry #{0} after {1} seconds - {2}: {3}'.format(i, sleep_secs, self.endpoint, e)) + finally: + i += 1 + else: + raise saved_exp + + else: + while i <= retry + 1: + try: + return self._make_request(url, body, route_name, **reqkwargs) + except (AmsBalancerException, AmsConnectionException, AmsTimeoutException) as e: + if i == retry + 1: + raise e + else: + time.sleep(retrysleep) + if timeout: + log.warning('Retry #{0} after {1} seconds, connection timeout set to {2} seconds - {3}: {4}'.format(i, retrysleep, timeout, self.endpoint, e)) + else: + log.warning('Retry #{0} after {1} seconds - {2}: {3}'.format(i, retrysleep, self.endpoint, e)) + finally: + i += 1 def _make_request(self, url, body=None, route_name=None, **reqkwargs): """Common method for PUT, GET, POST HTTP requests with appropriate - service error handling. For known error HTTP statuses, returned JSON - will be used as exception error message, otherwise assume and build one - from response content string. + service error handling by differing between AMS and load balancer + erroneous behaviour. """ m = self.routes[route_name][0] decoded = None @@ -75,129 +192,156 @@ def _make_request(self, url, body=None, route_name=None, **reqkwargs): content = r.content status_code = r.status_code - if content and sys.version_info < (3, 6, ): - content = content.decode() + if (content and sys.version_info < (3, 6, ) and isinstance(content, + bytes)): + content = content.decode() if status_code == 200: - decoded = json.loads(content) if content else {} + decoded = self._error_dict(content, status_code) # handle authnz related errors for all calls elif status_code == 401 or status_code == 403: - decoded = json.loads(content) if content else {} - raise AmsServiceException(json=decoded, request=route_name) - - # JSON error returned by AMS - elif status_code != 200 and status_code in self.errors_route[route_name][1]: - decoded = json.loads(content) if content else {} - raise AmsServiceException(json=decoded, request=route_name) - - # handle other erroneous behaviour and construct error message from - # JSON or plaintext content in response - elif status_code != 200 and status_code not in self.errors_route[route_name][1]: - try: - errormsg = json.loads(content) - except ValueError: - errormsg = {'error': {'code': status_code, - 'message': content}} - raise AmsServiceException(json=errormsg, request=route_name) + raise AmsServiceException(json=self._error_dict(content, + status_code), + request=route_name) + + elif status_code == 408 or (status_code == 504 and route_name in + self.balancer_errors_route): + raise AmsTimeoutException(json=self._error_dict(content, + status_code), + request=route_name) + + # handle errors from AMS + elif (status_code != 200 and status_code in + self.ams_errors_route[route_name][1]): + raise AmsServiceException(json=self._error_dict(content, + status_code), + request=route_name) + + # handle errors coming from load balancer + elif (status_code != 200 and route_name in + self.balancer_errors_route and status_code in + self.balancer_errors_route[route_name][1]): + raise AmsBalancerException(json=self._error_dict(content, + status_code), + request=route_name) + + # handle any other erroneous behaviour by raising exception + else: + raise AmsServiceException(json=self._error_dict(content, + status_code), + request=route_name) - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + except (requests.exceptions.ConnectionError, socket.error) as e: raise AmsConnectionException(e, route_name) else: return decoded if decoded else {} def do_get(self, url, route_name, **reqkwargs): - """Method supports all the GET requests. Used for (topics, - subscriptions, messages). + """Method supports all the GET requests. - Args: - url: str. The final messaging service endpoint - route_name: str. The name of the route to follow selected from the route list - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Used for (topics, subscriptions, messages). + + Args: + url: str. The final messaging service endpoint + route_name: str. The name of the route to follow selected from the route list + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ # try to send a GET request to the messaging service. # if a connection problem araises a Connection error exception is raised. try: - return self._make_request(url, route_name=route_name, **reqkwargs) + return self._retry_make_request(url, route_name=route_name, + **reqkwargs) except AmsException as e: raise e def do_put(self, url, body, route_name, **reqkwargs): - """Method supports all the PUT requests. Used for (topics, - subscriptions, messages). + """Method supports all the PUT requests. + + Used for (topics, subscriptions, messages). - Args: - url: str. The final messaging service endpoint - body: dict. Body the post data to send based on the PUT request. The post data is always in json format. - route_name: str. The name of the route to follow selected from the route list - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + url: str. The final messaging service endpoint + body: dict. Body the post data to send based on the PUT request. + The post data is always in json format. + route_name: str. The name of the route to follow selected from + the route list + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ # try to send a PUT request to the messaging service. # if a connection problem araises a Connection error exception is raised. try: - return self._make_request(url, body=body, route_name=route_name, **reqkwargs) + return self._retry_make_request(url, body=body, + route_name=route_name, **reqkwargs) except AmsException as e: raise e - def do_post(self, url, body, route_name, **reqkwargs): - """Method supports all the POST requests. Used for (topics, - subscriptions, messages). + def do_post(self, url, body, route_name, retry=0, retrysleep=60, + retrybackoff=None, **reqkwargs): + """Method supports all the POST requests. + + Used for (topics, subscriptions, messages). - Args: - url: str. The final messaging service endpoint - body: dict. Body the post data to send based on the PUT request. The post data is always in json format. - route_name: str. The name of the route to follow selected from the route list - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + url: str. The final messaging service endpoint + body: dict. Body the post data to send based on the PUT request. + The post data is always in json format. + route_name: str. The name of the route to follow selected from + the route list + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ # try to send a Post request to the messaging service. # if a connection problem araises a Connection error exception is raised. try: - return self._make_request(url, body=body, route_name=route_name, **reqkwargs) + return self._retry_make_request(url, body=body, + route_name=route_name, retry=retry, + retrysleep=retrysleep, + retrybackoff=retrybackoff, + **reqkwargs) except AmsException as e: raise e def do_delete(self, url, route_name, **reqkwargs): """Delete method that is used to make the appropriate request. - Used for (topics, subscriptions). - Args: - url: str. The final messaging service endpoint - route_name: str. The name of the route to follow selected from the route list - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Used for (topics, subscriptions). + + Args: + url: str. The final messaging service endpoint + route_name: str. The name of the route to follow selected from the route list + reqkwargs: keyword argument that will be passed to underlying python-requests library call. """ # try to send a delete request to the messaging service. # if a connection problem araises a Connection error exception is raised. - m = self.routes[route_name][0] try: # the delete request based on requests. r = requests.delete(url, **reqkwargs) # JSON error returned by AMS - if r.status_code != 200 and r.status_code in self.errors[m]: - decoded = json.loads(r.content) if r.content else {} - raise AmsServiceException(json=decoded, request=route_name) - - # handle other erroneous behaviour - elif r.status_code != 200 and r.status_code not in self.errors[m]: - errormsg = {'error': {'code': r.status_code, - 'message': r.content}} + if r.status_code != 200: + errormsg = self._error_dict(r.content, r.status_code) raise AmsServiceException(json=errormsg, request=route_name) + else: return True - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + except (requests.exceptions.ConnectionError, + requests.exceptions.Timeout) as e: raise AmsConnectionException(e, route_name) class ArgoMessagingService(AmsHttpRequests): - """ + """Class is entry point for client code. + Class abstract Argo Messaging Service by covering all available HTTP API - calls that are wrapped in series of methods. Class is entry point for - client code. + calls that are wrapped in series of methods. """ def __init__(self, endpoint, token="", project="", cert="", key="", authn_port=8443): - super(ArgoMessagingService, self).__init__() + super(ArgoMessagingService, self).__init__(endpoint) self.authn_port = authn_port self.token = "" self.endpoint = endpoint @@ -210,13 +354,12 @@ def __init__(self, endpoint, token="", project="", cert="", key="", authn_port=8 self.subs = OrderedDict() def assign_token(self, token, cert, key): - """ - Assign a token to the ams object + """Assign a token to the ams object - Args: - token(str): a valid ams token - cert(str): a path to a valid certificate file - key(str): a path to the associated key file for the provided certificate + Args: + token(str): a valid ams token + cert(str): a path to a valid certificate file + key(str): a path to the associated key file for the provided certificate """ # check if a token has been provided @@ -233,15 +376,13 @@ def assign_token(self, token, cert, key): # when no token was provided if e.msg == 'While trying the [auth_x509]: No certificate provided.': - refined_msg = "No certificate provided. No token provided" - errormsg = {'error': {'code': e.code, - 'message': refined_msg}} + refined_msg = "No certificate provided. No token provided." + errormsg = self._error_dict(refined_msg, e.code) raise AmsServiceException(json=errormsg, request="auth_x509") raise e def auth_via_cert(self, cert, key, **reqkwargs): - """ - Retrieve an ams token based on the provided certificate + """Retrieve an ams token based on the provided certificate Args: cert(str): a path to a valid certificate file @@ -252,7 +393,7 @@ def auth_via_cert(self, cert, key, **reqkwargs): python-requests library call. """ if cert == "" and key == "": - errord = {"error": {"code": 400, "message": "No certificate provided."}} + errord = self._error_dict("No certificate provided.", 400) raise AmsServiceException(json=errord, request="auth_x509") # create the certificate tuple needed by the requests library @@ -268,7 +409,7 @@ def auth_via_cert(self, cert, key, **reqkwargs): r = method(url, "auth_x509", **reqkwargs) # if the `token` field was not found in the response, raise an error if "token" not in r: - errord = {"error": {"code": 500, "message": "Token was not found in the response. Response: " + str(r)}} + errord = self._error_dict("Token was not found in the response. Response: " + str(r), 500) raise AmsServiceException(json=errord, request="auth_x509") return r["token"] except (AmsServiceException, AmsConnectionException) as e: @@ -290,8 +431,7 @@ def _delete_topic_obj(self, t): del self.topics[t['name']] def getacl_topic(self, topic, **reqkwargs): - """ - Get access control lists for topic + """Get access control lists for topic Args: topic (str): The topic name. @@ -318,8 +458,7 @@ def getacl_topic(self, topic, **reqkwargs): return [] def modifyacl_topic(self, topic, users, **reqkwargs): - """ - Modify access control lists for topic + """Modify access control lists for topic Args: topic (str): The topic name. @@ -352,8 +491,7 @@ def modifyacl_topic(self, topic, users, **reqkwargs): raise e def getacl_sub(self, sub, **reqkwargs): - """ - Get access control lists for subscription + """Get access control lists for subscription Args: sub (str): The subscription name. @@ -380,8 +518,7 @@ def getacl_sub(self, sub, **reqkwargs): return [] def getoffsets_sub(self, sub, offset='all', **reqkwargs): - """ - Retrieve the current positions of min,max and current offsets. + """Retrieve the current positions of min,max and current offsets. Args: sub (str): The subscription name. @@ -406,8 +543,7 @@ def getoffsets_sub(self, sub, offset='all', **reqkwargs): raise AmsServiceException(json=errormsg, request="sub_offsets") def time_to_offset_sub(self, sub, timestamp, **reqkwargs): - """ - Retrieve the closest(greater than) available offset to the given timestamp. + """Retrieve the closest(greater than) available offset to the given timestamp. Args: sub (str): The subscription name. @@ -438,8 +574,7 @@ def time_to_offset_sub(self, sub, timestamp, **reqkwargs): raise e def modifyoffset_sub(self, sub, move_to, **reqkwargs): - """ - Modify the position of the current offset. + """Modify the position of the current offset. Args: sub (str): The subscription name. @@ -467,8 +602,7 @@ def modifyoffset_sub(self, sub, move_to, **reqkwargs): raise e def modifyacl_sub(self, sub, users, **reqkwargs): - """ - Modify access control lists for subscription + """Modify access control lists for subscription Args: sub (str): The subscription name. @@ -534,8 +668,8 @@ def pushconfig_sub(self, sub, push_endpoint=None, retry_policy_type='linear', re def iter_subs(self, topic=None, **reqkwargs): """Iterate over AmsSubscription objects - Args: - topic: Iterate over subscriptions only associated to this topic name + Args: + topic: Iterate over subscriptions only associated to this topic name """ self.list_subs(**reqkwargs) @@ -566,9 +700,9 @@ def iter_topics(self, **reqkwargs): def list_topics(self, **reqkwargs): """List the topics of a selected project - Args: - reqkwargs: keyword argument that will be passed to underlying - python-requests library call + Args: + reqkwargs: keyword argument that will be passed to underlying + python-requests library call """ route = self.routes["topic_list"] # Compose url @@ -589,8 +723,8 @@ def list_topics(self, **reqkwargs): def has_topic(self, topic, **reqkwargs): """Inspect if topic already exists or not - Args: - topic: str. Topic name + Args: + topic: str. Topic name """ try: self.get_topic(topic, **reqkwargs) @@ -608,10 +742,11 @@ def has_topic(self, topic, **reqkwargs): def get_topic(self, topic, retobj=False, **reqkwargs): """Get the details of a selected topic. - Args: - topic: str. Topic name. - retobj: Controls whether method should return AmsTopic object - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + topic: str. Topic name. + retobj: Controls whether method should return AmsTopic object + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ route = self.routes["topic_get"] # Compose url @@ -628,19 +763,30 @@ def get_topic(self, topic, retobj=False, **reqkwargs): else: return r - def publish(self, topic, msg, **reqkwargs): + def publish(self, topic, msg, retry=0, retrysleep=60, retrybackoff=None, **reqkwargs): """Publish a message or list of messages to a selected topic. - Args: - topic (str): Topic name. - msg (list): A list with one or more messages to send. - Each message is represented as AmsMessage object or python - dictionary with at least data or one attribute key defined. - Kwargs: - reqkwargs: keyword argument that will be passed to underlying + If enabled (retry > 0), multiple topic publishes will be tried in + case of problems/glitches with the AMS service. retry* options are + eventually passed to _retry_make_request() + + Args: + topic (str): Topic name. + msg (list): A list with one or more messages to send. + Each message is represented as AmsMessage object or python + dictionary with at least data or one attribute key defined. + Kwargs: + retry: int. Number of request retries before giving up. Default + is 0 meaning no further request retry will be made + after first unsuccesfull request. + retrysleep: int. Static number of seconds to sleep before next + request attempt + retrybackoff: int. Backoff factor to apply between each request + attempts + reqkwargs: keyword argument that will be passed to underlying python-requests library call. - Return: - dict: Dictionary with messageIds of published messages + Return: + dict: Dictionary with messageIds of published messages """ if not isinstance(msg, list): msg = [msg] @@ -661,8 +807,9 @@ def publish(self, topic, msg, **reqkwargs): def list_subs(self, **reqkwargs): """Lists all subscriptions in a project with a GET request. - Args: - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ route = self.routes["sub_list"] # Compose url @@ -685,10 +832,11 @@ def list_subs(self, **reqkwargs): def get_sub(self, sub, retobj=False, **reqkwargs): """Get the details of a subscription. - Args: - sub: str. The subscription name. - retobj: Controls whether method should return AmsSubscription object - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + sub: str. The subscription name. + retobj: Controls whether method should return AmsSubscription object + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ route = self.routes["sub_get"] # Compose url @@ -710,8 +858,8 @@ def get_sub(self, sub, retobj=False, **reqkwargs): def has_sub(self, sub, **reqkwargs): """Inspect if subscription already exists or not - Args: - sub: str. The subscription name. + Args: + sub: str. The subscription name. """ try: self.get_sub(sub, **reqkwargs) @@ -726,14 +874,20 @@ def has_sub(self, sub, **reqkwargs): except AmsConnectionException as e: raise e - def pull_sub(self, sub, num=1, return_immediately=False, **reqkwargs): + def pull_sub(self, sub, num=1, return_immediately=False, retry=0, + retrysleep=60, retrybackoff=None, **reqkwargs): """This function consumes messages from a subscription in a project - with a POST request. + with a POST request. + + If enabled (retry > 0), multiple subscription pulls will be tried in + case of problems/glitches with the AMS service. retry* options are + eventually passed to _retry_make_request() - Args: - sub: str. The subscription name. - num: int. The number of messages to pull. - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + sub: str. The subscription name. + num: int. The number of messages to pull. + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ wasmax = self.get_pullopt('maxMessages') @@ -747,7 +901,9 @@ def pull_sub(self, sub, num=1, return_immediately=False, **reqkwargs): # Compose url url = route[1].format(self.endpoint, self.token, self.project, "", sub) method = getattr(self, 'do_{0}'.format(route[0])) - r = method(url, msg_body, "sub_pull", **reqkwargs) + r = method(url, msg_body, "sub_pull", retry=retry, + retrysleep=retrysleep, retrybackoff=retrybackoff, + **reqkwargs) msgs = r['receivedMessages'] self.set_pullopt('maxMessages', wasmax) @@ -755,15 +911,22 @@ def pull_sub(self, sub, num=1, return_immediately=False, **reqkwargs): return list(map(lambda m: (m['ackId'], AmsMessage(b64enc=False, **m['message'])), msgs)) - def ack_sub(self, sub, ids, **reqkwargs): - """Messages retrieved from a pull subscription can be acknowledged by sending message with an array of ackIDs. - The service will retrieve the ackID corresponding to the highest message offset and will consider that message - and all previous messages as acknowledged by the consumer. + def ack_sub(self, sub, ids, retry=0, retrysleep=60, retrybackoff=None, + **reqkwargs): + """Acknownledgment of received messages + + Messages retrieved from a pull subscription can be acknowledged by + sending message with an array of ackIDs. The service will retrieve + the ackID corresponding to the highest message offset and will + consider that message and all previous messages as acknowledged by + the consumer. If enabled (retry > 0), multiple acknowledgement + will be tried in case of problems/glitches with the AMS service. + retry* options are eventually passed to _retry_make_request() - Args: - sub: str. The subscription name. - ids: list(str). A list of ids of the messages to acknowledge. - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + sub: str. The subscription name. + ids: list(str). A list of ids of the messages to acknowledge. + reqkwargs: keyword argument that will be passed to underlying python-requests library call. """ msg_body = json.dumps({"ackIds": ids}) @@ -779,11 +942,11 @@ def ack_sub(self, sub, ids, **reqkwargs): def set_pullopt(self, key, value): """Function for setting pull options - Args: - key: str. The name of the pull option (ex. maxMessages, returnImmediately).Messaging specific - names are allowed. - value: str or int. The name of the pull option (ex. maxMessages, returnImmediately).Messaging specific names - are allowed. + Args: + key: str. The name of the pull option (ex. maxMessages, returnImmediately). Messaging specific + names are allowed. + value: str or int. The name of the pull option (ex. maxMessages, + returnImmediately). Messaging specific names are allowed. """ self.pullopts.update({key: str(value)}) @@ -791,12 +954,12 @@ def set_pullopt(self, key, value): def get_pullopt(self, key): """Function for getting pull options - Args: - key: str. The name of the pull option (ex. maxMessages, returnImmediately).Messaging specific - names are allowed. + Args: + key: str. The name of the pull option (ex. maxMessages, + returnImmediately). Messaging specific names are allowed. - Returns: - str. The value of the pull option + Returns: + str. The value of the pull option """ return self.pullopts[key] @@ -804,17 +967,21 @@ def create_sub(self, sub, topic, ackdeadline=10, push_endpoint=None, retry_policy_type='linear', retry_policy_period=300, retobj=False, **reqkwargs): """This function creates a new subscription in a project with a PUT request - Args: - sub: str. The subscription name. - topic: str. The topic name. - ackdeadline: int. It is a custom "ack" deadline (in seconds) in the subscription. If your code doesn't - acknowledge the message in this time, the message is sent again. If you don't specify - the deadline, the default is 10 seconds. - push_endpoint: URL of remote endpoint that should receive messages in push subscription mode - retry_policy_type: - retry_policy_period: - retobj: Controls whether method should return AmsSubscription object - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + sub: str. The subscription name. + topic: str. The topic name. + ackdeadline: int. It is a custom "ack" deadline (in seconds) in + the subscription. If your code doesn't + acknowledge the message in this time, the + message is sent again. If you don't specify + the deadline, the default is 10 seconds. + push_endpoint: URL of remote endpoint that should receive + messages in push subscription mode + retry_policy_type: + retry_policy_period: + retobj: Controls whether method should return AmsSubscription object + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ topic = self.get_topic(topic, retobj=True, **reqkwargs) @@ -845,9 +1012,10 @@ def create_sub(self, sub, topic, ackdeadline=10, push_endpoint=None, def delete_sub(self, sub, **reqkwargs): """This function deletes a selected subscription in a project - Args: - sub: str. The subscription name. - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + sub: str. The subscription name. + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ route = self.routes["sub_delete"] # Compose url @@ -863,10 +1031,11 @@ def delete_sub(self, sub, **reqkwargs): return r def topic(self, topic, **reqkwargs): - """Function create a topic in a project. It's wrapper around few - methods defined in client class. Method will ensure that AmsTopic - object is returned either by fetching existing one or creating - a new one in case it doesn't exist. + """Function create a topic in a project. + + It's wrapper around few methods defined in client class. Method will + ensure that AmsTopic object is returned either by fetching existing + one or creating a new one in case it doesn't exist. Args: topic (str): The topic name @@ -888,10 +1057,11 @@ def topic(self, topic, **reqkwargs): def create_topic(self, topic, retobj=False, **reqkwargs): """This function creates a topic in a project - Args: - topic: str. The topic name. - retobj: Controls whether method should return AmsTopic object - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + topic: str. The topic name. + retobj: Controls whether method should return AmsTopic object + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ route = self.routes["topic_create"] # Compose url @@ -909,11 +1079,12 @@ def create_topic(self, topic, retobj=False, **reqkwargs): return r def delete_topic(self, topic, **reqkwargs): - """ This function deletes a topic in a project + """This function deletes a topic in a project - Args: - topic: str. The topic name. - reqkwargs: keyword argument that will be passed to underlying python-requests library call. + Args: + topic: str. The topic name. + reqkwargs: keyword argument that will be passed to underlying + python-requests library call. """ route = self.routes["topic_delete"] # Compose url diff --git a/pymod/amsexceptions.py b/pymod/amsexceptions.py index d0d92c3..c82642b 100644 --- a/pymod/amsexceptions.py +++ b/pymod/amsexceptions.py @@ -1,17 +1,15 @@ import json class AmsException(Exception): - """ - Base exception class for all Argo Messaging service related errors - """ + """Base exception class for all Argo Messaging service related errors""" + def __init__(self, *args, **kwargs): super(AmsException, self).__init__(*args, **kwargs) class AmsServiceException(AmsException): - """ - Exception for Argo Messaging Service API errors - """ + """Exception for Argo Messaging Service API errors""" + def __init__(self, json, request): errord = dict() @@ -29,19 +27,36 @@ def __init__(self, json, request): super(AmsServiceException, self).__init__(errord) -class AmsConnectionException(AmsException): - """ - Exception for connection related problems catched from requests library +class AmsBalancerException(AmsServiceException): + """Exception for load balancer Argo Messaging Service errors""" + + def __init__(self, json, request): + super(AmsBalancerException, self).__init__(json, request) + + +class AmsTimeoutException(AmsServiceException): + """Exception for timeouts errors + + Timeouts can be generated by the Argo Messaging Service if message was + not acknownledged in desired time frame (ackDeadlineSeconds). Also, 408 + timeouts can come from load balancer for partial requests that were not + completed in required time frame. """ + def __init__(self, json, request): + super(AmsTimeoutException, self).__init__(json, request) + + +class AmsConnectionException(AmsException): + """Exception for connection related problems catched from requests library""" + def __init__(self, exp, request): self.msg = "While trying the [{0}]: {1}".format(request, repr(exp)) super(AmsConnectionException, self).__init__(self.msg) class AmsMessageException(AmsException): - """ - Exception that indicate problems with constructing message - """ + """Exception that indicate problems with constructing message""" + def __init__(self, msg): self.msg = msg super(AmsMessageException, self).__init__(self.msg) diff --git a/pymod/amssubscription.py b/pymod/amssubscription.py index 9a716a1..9ccd60c 100644 --- a/pymod/amssubscription.py +++ b/pymod/amssubscription.py @@ -29,7 +29,8 @@ def delete(self): return self.init.delete_sub(self.name) - def pushconfig(self, push_endpoint=None, retry_policy_type='linear', retry_policy_period=300, **reqkwargs): + def pushconfig(self, push_endpoint=None, retry_policy_type='linear', + retry_policy_period=300, **reqkwargs): """Configure Push mode parameters of subscription. When push_endpoint is defined, subscription will automatically start to send messages to it. @@ -47,11 +48,19 @@ def pushconfig(self, push_endpoint=None, retry_policy_type='linear', retry_polic retry_policy_period=retry_policy_period, **reqkwargs) - def pull(self, num=1, return_immediately=False, **reqkwargs): + def pull(self, num=1, retry=0, retrysleep=60, retrybackoff=None, + return_immediately=False, **reqkwargs): """Pull messages from subscription Kwargs: num (int): Number of messages to pull + retry: int. Number of request retries before giving up. Default + is 0 meaning no further request retry will be made + after first unsuccesfull request. + retrysleep: int. Static number of seconds to sleep before next + request attempt + retrybackoff: int. Backoff factor to apply between each request + attempts return_immediately (boolean): If True and if stream of messages is empty, subscriber call will not block and wait for messages @@ -119,11 +128,18 @@ def acl(self, users=None, **reqkwargs): else: return self.init.modifyacl_sub(self.name, users, **reqkwargs) - def ack(self, ids, **reqkwargs): + def ack(self, ids, retry=0, retrysleep=60, retrybackoff=None, **reqkwargs): """Acknowledge receive of messages Kwargs: ids (list): A list of ackIds of the messages to acknowledge. + retry: int. Number of request retries before giving up. Default + is 0 meaning no further request retry will be made + after first unsuccesfull request. + retrysleep: int. Static number of seconds to sleep before next + request attempt + retrybackoff: int. Backoff factor to apply between each request + attempts """ return self.init.ack_sub(self.name, ids, **reqkwargs) diff --git a/pymod/amstopic.py b/pymod/amstopic.py index 86f734f..3231fce 100644 --- a/pymod/amstopic.py +++ b/pymod/amstopic.py @@ -27,10 +27,11 @@ def delete(self): return self.init.delete_topic(self.name) def subscription(self, sub, ackdeadline=10, **reqkwargs): - """Create a subscription for the topic. It's wrapper around few - methods defined in client class. Method will ensure that AmsSubscription - object is returned either by fetching existing one or creating - a new one in case it doesn't exist. + """Create a subscription for the topic. + + It's wrapper around few methods defined in client class. Method will + ensure that AmsSubscription object is returned either by fetching + existing one or creating a new one in case it doesn't exist. Args: sub (str): Name of the subscription @@ -75,13 +76,21 @@ def iter_subs(self): for s in self.init.iter_subs(topic=self.name): yield s - def publish(self, msg, **reqkwargs): + def publish(self, msg, retry=0, retrysleep=60, retrybackoff=None, **reqkwargs): """Publish message to topic Args: msg (list, dict): One or list of dictionaries representing AMS Message Kwargs: + retry: int. Number of request retries before giving up. Default + is 0 meaning no further request retry will be made + after first unsuccesfull request. + retrysleep: int. Static number of seconds to sleep before next + request attempt + retrybackoff: int. Backoff factor to apply between each request + attempts + reqkwargs: Keyword argument that will be passed to underlying python-requests library call. @@ -89,4 +98,6 @@ def publish(self, msg, **reqkwargs): dict: Dictionary with messageIds of published messages """ - return self.init.publish(self.name, msg, **reqkwargs) + return self.init.publish(self.name, msg, retry=retry, + retrysleep=retrysleep, + retrybackoff=retrybackoff, **reqkwargs) diff --git a/setup.py b/setup.py index 166733f..91c5ae1 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,13 @@ from setuptools import setup +from os import path import glob NAME='argo-ams-library' +this_directory = path.abspath(path.dirname(__file__)) +with open(path.join(this_directory, 'README.md')) as f: + long_description = f.read() + def get_ver(): try: with open(NAME+'.spec') as f: @@ -14,19 +19,20 @@ def get_ver(): raise SystemExit(1) setup( - name = NAME, - version = get_ver(), - author = 'SRCE, GRNET', - author_email = 'dvrcic@srce.hr, agelos.tsal@gmail.com, kaggis@gmail.com, themiszamani@gmail.com', - license = 'ASL 2.0', - description = 'A simple python library for interacting with the ARGO Messaging Service', - long_description = 'A simple python library for interacting with the ARGO Messaging Service', - tests_require = [ + name=NAME, + version=get_ver(), + author='SRCE, GRNET', + author_email='dvrcic@srce.hr, agelos.tsal@gmail.com, kaggis@gmail.com, themiszamani@gmail.com', + license='ASL 2.0', + description='A simple python library for interacting with the ARGO Messaging Service', + long_description=long_description, + long_description_content_type='text/markdown', + tests_require=[ 'setuptools_scm', 'httmock', 'pytest' ], - classifiers = [ + classifiers=[ "Development Status :: 5 - Production/Stable", "License :: OSI Approved :: Apache Software License", "Operating System :: POSIX", @@ -37,8 +43,8 @@ def get_ver(): "Intended Audience :: Developers", "Topic :: Software Development :: Libraries :: Python Modules" ], - url = 'https://github.com/ARGOeu/argo-ams-library', - package_dir = {'argo_ams_library': 'pymod/'}, - packages = ['argo_ams_library'], + url='https://github.com/ARGOeu/argo-ams-library', + package_dir={'argo_ams_library': 'pymod/'}, + packages=['argo_ams_library'], install_requires=['requests'] - ) +) diff --git a/tests/amsmocks.py b/tests/amsmocks.py index dc55527..3435725 100644 --- a/tests/amsmocks.py +++ b/tests/amsmocks.py @@ -117,12 +117,20 @@ class ErrorMocks(object): create_subscription_urlmatch = dict(netloc="localhost", path="/v1/projects/TEST/subscriptions/subscription1", method="PUT") + delete_topic_urlmatch = dict(netloc="localhost", + path="/v1/projects/TEST/topics/topic1", + method='DELETE') # Mock ALREADY_EXIST error response for PUT topic request @urlmatch(**create_topic_urlmatch) def create_topic_alreadyexist_mock(self, url, request): return response(409, '{"error":{"code": 409,"message":"Topic already exists","status":"ALREADY_EXIST"}}', None, None, 5, request) + # Mock NOT_FOUND error response for DELETE topic request + @urlmatch(**delete_topic_urlmatch) + def delete_topic_notfound_mock(self, url, request): + return response(404, '{"error":{"code": 404,"message":"Topic does not exist","status":"NOT_FOUND"}}', None, None, 5, request) + # Mock ALREADY_EXIST error response for PUT subscription request @urlmatch(**create_subscription_urlmatch) def create_subscription_alreadyexist_mock(self, url, request): diff --git a/tests/test_authenticate.py b/tests/test_authenticate.py index 0458932..da33cb4 100644 --- a/tests/test_authenticate.py +++ b/tests/test_authenticate.py @@ -43,7 +43,7 @@ def test_auth_via_cert_empty_token_and_cert(self): ams = ArgoMessagingService(endpoint="localhost", project="TEST") except AmsServiceException as e: self.assertEqual(e.code, 400) - self.assertEqual(e.msg, 'While trying the [auth_x509]: No certificate provided. No token provided') + self.assertEqual(e.msg, 'While trying the [auth_x509]: No certificate provided. No token provided.') # tests the case of providing a token def test_assign_token(self): diff --git a/tests/test_errorclient.py b/tests/test_errorclient.py index ea5942a..70559f9 100644 --- a/tests/test_errorclient.py +++ b/tests/test_errorclient.py @@ -1,12 +1,16 @@ +import json +import mock import sys import unittest +import requests + from httmock import urlmatch, HTTMock, response from pymod import ArgoMessagingService from pymod import AmsMessage from pymod import AmsTopic from pymod import AmsSubscription -from pymod import AmsServiceException, AmsException -import json +from pymod import (AmsServiceException, AmsConnectionException, + AmsTimeoutException, AmsBalancerException, AmsException) from .amsmocks import ErrorMocks from .amsmocks import TopicMocks @@ -21,6 +25,15 @@ def setUp(self): self.topicmocks = TopicMocks() self.submocks = SubMocks() + # set defaults for testing of retries + retry = 0 + retrysleep = 0.1 + retrybackoff = None + if sys.version_info < (3, ): + self.ams._retry_make_request.im_func.func_defaults = (None, None, retry, retrysleep, retrybackoff) + else: + self.ams._retry_make_request.__func__.__defaults__ = (None, None, retry, retrysleep, retrybackoff) + # Test create topic client request def testCreateTopics(self): # Execute ams client with mocked response @@ -32,6 +45,17 @@ def testCreateTopics(self): self.assertEqual(e.status, 'ALREADY_EXIST') self.assertEqual(e.code, 409) + # Test delete topic client request + def testDeleteTopics(self): + # Execute ams client with mocked response + with HTTMock(self.errormocks.delete_topic_notfound_mock): + try: + resp = self.ams.delete_topic("topic1") + except Exception as e: + assert isinstance(e, AmsServiceException) + self.assertEqual(e.status, 'NOT_FOUND') + self.assertEqual(e.code, 404) + def testCreateSubscription(self): # Execute ams client with mocked response with HTTMock(self.topicmocks.get_topic_mock, @@ -62,7 +86,7 @@ def publish_mock(url, request): try: resp = self.ams.publish("topic1", msg) except Exception as e: - assert isinstance(e, AmsServiceException) + assert isinstance(e, AmsTimeoutException) self.assertEqual(e.code, 504) # Tests for plaintext or JSON encoded backend error messages @@ -116,6 +140,121 @@ def error_unauth(url, request): self.assertEqual(e.msg, 'While trying the [topic_get]: Unauthorized') self.assertEqual(e.status, 'UNAUTHORIZED') + @mock.patch('pymod.ams.requests.get') + def testRetryConnectionProblems(self, mock_requests_get): + mock_requests_get.side_effect = [requests.exceptions.ConnectionError, + requests.exceptions.ConnectionError, + requests.exceptions.ConnectionError, + requests.exceptions.ConnectionError] + retry = 3 + retrysleep = 0.1 + if sys.version_info < (3, ): + self.ams._retry_make_request.im_func.func_defaults = (None, None, retry, retrysleep, None) + else: + self.ams._retry_make_request.__func__.__defaults__ = (None, None, retry, retrysleep, None) + self.assertRaises(AmsConnectionException, self.ams.list_topics) + self.assertEqual(mock_requests_get.call_count, retry + 1) + + @mock.patch('pymod.ams.requests.get') + def testBackoffRetryConnectionProblems(self, mock_requests_get): + mock_requests_get.side_effect = [requests.exceptions.ConnectionError, + requests.exceptions.ConnectionError, + requests.exceptions.ConnectionError, + requests.exceptions.ConnectionError] + retry = 3 + retrysleep = 0.1 + retrybackoff = 0.1 + if sys.version_info < (3, ): + self.ams._retry_make_request.im_func.func_defaults = (None, None, retry, retrysleep, retrybackoff) + else: + self.ams._retry_make_request.__func__.__defaults__ = (None, None, retry, retrysleep, retrybackoff) + self.assertRaises(AmsConnectionException, self.ams.list_topics) + self.assertEqual(mock_requests_get.call_count, retry + 1) + + @mock.patch('pymod.ams.requests.post') + def testRetryAmsBalancerTimeout408(self, mock_requests_post): + retry = 4 + retrysleep = 0.2 + errmsg = "

408 Request Time-out

\nYour browser didn't send a complete request in time.\n\n" + mock_response = mock.create_autospec(requests.Response) + mock_response.status_code = 408 + mock_response.content = errmsg + mock_requests_post.return_value = mock_response + try: + self.ams.pull_sub('subscription1', retry=retry, retrysleep=retrysleep) + except Exception as e: + assert isinstance(e, AmsTimeoutException) + self.assertEqual(e.code, 408) + self.assertEqual(e.msg, 'While trying the [sub_pull]: ' + errmsg) + self.assertEqual(mock_requests_post.call_count, retry + 1) + + @mock.patch('pymod.ams.requests.post') + def testRetryAmsBalancer502(self, mock_requests_post): + retry = 4 + retrysleep = 0.2 + errmsg = "

502 Bad Gateway

\nThe server returned an invalid or incomplete response.\n\n" + mock_response = mock.create_autospec(requests.Response) + mock_response.status_code = 502 + mock_response.content = errmsg + mock_requests_post.return_value = mock_response + try: + self.ams.pull_sub('subscription1', retry=retry, retrysleep=retrysleep) + except Exception as e: + assert isinstance(e, AmsBalancerException) + self.assertEqual(e.code, 502) + self.assertEqual(e.msg, 'While trying the [sub_pull]: ' + errmsg) + self.assertEqual(mock_requests_post.call_count, retry + 1) + + @mock.patch('pymod.ams.requests.post') + def testRetryAmsBalancer503(self, mock_requests_post): + retry = 4 + retrysleep = 0.2 + errmsg = "

503 Service Unavailable

\nNo server is available to handle this request.\n\n" + mock_response = mock.create_autospec(requests.Response) + mock_response.status_code = 503 + mock_response.content = errmsg + mock_requests_post.return_value = mock_response + try: + self.ams.pull_sub('subscription1', retry=retry, retrysleep=retrysleep) + except Exception as e: + assert isinstance(e, AmsBalancerException) + self.assertEqual(e.code, 503) + self.assertEqual(e.msg, 'While trying the [sub_pull]: ' + errmsg) + self.assertEqual(mock_requests_post.call_count, retry + 1) + + @mock.patch('pymod.ams.requests.post') + def testRetryAmsBalancerTimeout504(self, mock_requests_post): + retry = 4 + retrysleep = 0.2 + errmsg = "

504 Gateway Time-out

\nThe server didn't respond in time.\n\n" + mock_response = mock.create_autospec(requests.Response) + mock_response.status_code = 504 + mock_response.content = errmsg + mock_requests_post.return_value = mock_response + try: + self.ams.pull_sub('subscription1', retry=retry, retrysleep=retrysleep) + except Exception as e: + assert isinstance(e, AmsTimeoutException) + self.assertEqual(e.code, 504) + self.assertEqual(e.msg, 'While trying the [sub_pull]: ' + errmsg) + self.assertEqual(mock_requests_post.call_count, retry + 1) + + @mock.patch('pymod.ams.requests.get') + def testRetryAckDeadlineAmsTimeout(self, mock_requests_get): + mock_response = mock.create_autospec(requests.Response) + mock_response.status_code = 408 + mock_response.content = '{"error": {"code": 408, \ + "message": "Ams Timeout", \ + "status": "TIMEOUT"}}' + mock_requests_get.return_value = mock_response + retry = 3 + retrysleep = 0.1 + if sys.version_info < (3, ): + self.ams._retry_make_request.im_func.__defaults__ = (None, None, retry, retrysleep, None) + else: + self.ams._retry_make_request.__func__.__defaults__ = (None, None, retry, retrysleep, None) + self.assertRaises(AmsTimeoutException, self.ams.list_topics) + self.assertEqual(mock_requests_get.call_count, retry + 1) diff --git a/tox.ini b/tox.ini index 712dc20..0f6de1a 100644 --- a/tox.ini +++ b/tox.ini @@ -2,12 +2,12 @@ envlist = py27-requests0, py27-requests260, py34-requests2123, py36-requests0, py36-requests2125 [testenv] -deps = coverage +deps = coverage==4.5.4 pytest httmock + mock requests0: requests requests2123: requests==2.12.3 requests260: requests==2.6.0 requests2125: requests==2.12.5 commands = coverage run -m pytest -