Skip to content

Commit

Permalink
Merge pull request #115 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Prepare to release v0.5.2-1
  • Loading branch information
kkoumantaros authored Jul 8, 2020
2 parents 6824b1f + 7d55d6f commit 70d2fdc
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 13 deletions.
151 changes: 151 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
pipeline {
agent any
options {
checkoutToSubdirectory('argo-ams-library')
}
environment {
PROJECT_DIR="argo-ams-library"
GIT_COMMIT=sh(script: "cd ${WORKSPACE}/$PROJECT_DIR && git log -1 --format=\"%H\"",returnStdout: true).trim()
GIT_COMMIT_HASH=sh(script: "cd ${WORKSPACE}/$PROJECT_DIR && git log -1 --format=\"%H\" | cut -c1-7",returnStdout: true).trim()
GIT_COMMIT_DATE=sh(script: "date -d \"\$(cd ${WORKSPACE}/$PROJECT_DIR && git show -s --format=%ci ${GIT_COMMIT_HASH})\" \"+%Y%m%d%H%M%S\"",returnStdout: true).trim()

}
stages {
stage ('Test'){
parallel {
stage ('Test Centos 6') {
agent {
docker {
image 'argo.registry:5000/epel-6-ams'
args '-u jenkins:jenkins'
}
}
steps {
echo 'Building Rpm...'
sh '''
cd ${WORKSPACE}/$PROJECT_DIR
coverage run -m unittest2 discover -v
scl enable python27 rh-python36 'tox'
scl enable rh-python36 'coverage xml --omit=*usr* --omit=*.tox*'
'''
cobertura coberturaReportFile: '**/coverage.xml'
}
}
stage ('Test Centos 7') {
agent {
docker {
image 'argo.registry:5000/epel-7-ams'
args '-u jenkins:jenkins'
}
}
steps {
echo 'Building Rpm...'
sh '''
cd ${WORKSPACE}/$PROJECT_DIR
coverage run -m unittest2 discover -v
scl enable python27 rh-python36 'tox'
scl enable rh-python36 'coverage xml --omit=*usr* --omit=*.tox*'
'''
cobertura coberturaReportFile: '**/coverage.xml'
}
}
}
}
stage ('Build'){
parallel {
stage ('Build Centos 6') {
agent {
docker {
image 'argo.registry:5000/epel-6-ams'
args '-u jenkins:jenkins'
}
}
steps {
echo 'Building Rpm...'
withCredentials(bindings: [sshUserPrivateKey(credentialsId: 'jenkins-rpm-repo', usernameVariable: 'REPOUSER', \
keyFileVariable: 'REPOKEY')]) {
sh "/home/jenkins/build-rpm.sh -w ${WORKSPACE} -b ${BRANCH_NAME} -d centos6 -p ${PROJECT_DIR} -s ${REPOKEY}"
}
archiveArtifacts artifacts: '**/*.rpm', fingerprint: true
}
post {
always {
cleanWs()
}
}
}
stage ('Build Centos 7') {
agent {
docker {
image 'argo.registry:5000/epel-7-ams'
args '-u jenkins:jenkins'
}
}
steps {
echo 'Building Rpm...'
withCredentials(bindings: [sshUserPrivateKey(credentialsId: 'jenkins-rpm-repo', usernameVariable: 'REPOUSER', \
keyFileVariable: 'REPOKEY')]) {
sh "/home/jenkins/build-rpm.sh -w ${WORKSPACE} -b ${BRANCH_NAME} -d centos7 -p ${PROJECT_DIR} -s ${REPOKEY}"
}
archiveArtifacts artifacts: '**/*.rpm', fingerprint: true
}
post {
always {
cleanWs()
}
}
}
}
}
stage ('Upload to PyPI'){
when {
branch 'master'
}
agent {
docker {
image 'argo.registry:5000/python3'
}
}
steps {
echo 'Build python package'
withCredentials(bindings: [usernamePassword(credentialsId: 'pypi-token', usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
sh '''
cd ${WORKSPACE}/$PROJECT_DIR
pipenv install --dev
pipenv run python setup.py sdist bdist_wheel
pipenv run python -m twine upload -u $USERNAME -p $PASSWORD dist/*
'''
}
}
post {
always {
cleanWs()
}
}
}
}
post {
always {
cleanWs()
}
success {
script{
if ( env.BRANCH_NAME == 'devel' ) {
build job: '/ARGO/argodoc/devel', propagate: false
} else if ( env.BRANCH_NAME == 'master' ) {
build job: '/ARGO/argodoc/master', propagate: false
}
if ( env.BRANCH_NAME == 'master' || env.BRANCH_NAME == 'devel' ) {
slackSend( message: ":rocket: New version for <$BUILD_URL|$PROJECT_DIR>:$BRANCH_NAME Job: $JOB_NAME !")
}
}
}
failure {
script{
if ( env.BRANCH_NAME == 'master' || env.BRANCH_NAME == 'devel' ) {
slackSend( message: ":rain_cloud: Build Failed for <$BUILD_URL|$PROJECT_DIR>:$BRANCH_NAME Job: $JOB_NAME")
}
}
}
}
}
14 changes: 14 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]
setuptools = "*"
twine = "*"
wheel = "*"

[packages]

[requires]
python_version = "*"
5 changes: 4 additions & 1 deletion argo-ams-library.spec
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Name: argo-ams-library
Summary: %{sum}
Version: 0.5.1
Version: 0.5.2
Release: 1%{?dist}

Group: Development/Libraries
Expand Down Expand Up @@ -95,6 +95,9 @@ rm -rf %{buildroot}


%changelog
* Wed Jul 8 2020 Daniel Vrcic <[email protected]> - 0.5.2-1%{?dist}
- ARGO-2479 Modify subscription offset method fails
- ARGO-2360 Fix ack_sub retry loop
* Mon Feb 10 2020 Daniel Vrcic <[email protected]> - 0.5.1-1%{?dist}
- ARGO-2182 ams-lib does not retry on topic publish
* Wed Dec 4 2019 Daniel Vrcic <[email protected]> - 0.5.0-1%{?dist}
Expand Down
32 changes: 30 additions & 2 deletions examples/retry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python3
#!/usr/bin/env python

from argparse import ArgumentParser
from argo_ams_library import ArgoMessagingService, AmsMessage, AmsException
Expand Down Expand Up @@ -44,8 +44,12 @@ def main():
print('msgid={0}, data={1}, attr={2}'.format(msgid, data, attr))
ackids.append(id)

if ackids:
ams.ack_sub(args.subscription, ackids, retry=3, retrysleep=5,
timeout=5)

# backoff with each next retry attempt exponentially longer
msg = AmsMessage(data='foo1', attributes={'bar1': 'baz1'}).dict()
msg = AmsMessage(data='foo2', attributes={'bar2': 'baz2'}).dict()
try:
ret = ams.publish(args.topic, msg, retry=3, retrybackoff=5, timeout=5)
print(ret)
Expand All @@ -63,4 +67,28 @@ def main():
print('msgid={0}, data={1}, attr={2}'.format(msgid, data, attr))
ackids.append(id)

if ackids:
ams.ack_sub(args.subscription, ackids)

# static sleep between retry attempts. this example uses consume context
# method that pull and acks msgs in one call.
msg = AmsMessage(data='foo3', attributes={'bar3': 'baz3'}).dict()
try:
ret = ams.publish(args.topic, msg, retry=3, retrysleep=5, timeout=5)
print(ret)
except AmsException as e:
print(e)

try:
msgs = ams.pullack_sub(args.subscription, args.nummsgs, retry=3,
retrysleep=5, timeout=5)
for msg in msgs:
data = msg.get_data()
msgid = msg.get_msgid()
attr = msg.get_attr()
print('msgid={0}, data={1}, attr={2}'.format(msgid, data, attr))

except AmsException as e:
print(e)

main()
67 changes: 59 additions & 8 deletions pymod/ams.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def __init__(self, endpoint):
"sub_get": ["get", set([404, 401, 403])],
"topic_publish": ["post", set([413, 401,
403])],
"sub_mod_offset": ["post", set([400, 401, 403,
404])],
"sub_pushconfig": ["post", set([400, 401, 403,
404])],
"auth_x509": ["post", set([400, 401, 403,
Expand Down Expand Up @@ -159,7 +161,10 @@ def _retry_make_request(self, url, body=None, route_name=None, retry=0,
finally:
i += 1
else:
raise saved_exp
if saved_exp:
raise saved_exp
else:
raise e

else:
while i <= retry + 1:
Expand Down Expand Up @@ -913,17 +918,14 @@ def pull_sub(self, sub, num=1, return_immediately=False, retry=0,

return list(map(lambda m: (m['ackId'], AmsMessage(b64enc=False, **m['message'])), msgs))

def ack_sub(self, sub, ids, retry=0, retrysleep=60, retrybackoff=None,
**reqkwargs):
def ack_sub(self, sub, ids, **reqkwargs):
"""Acknownledgment of received messages
Messages retrieved from a pull subscription can be acknowledged by
sending message with an array of ackIDs. The service will retrieve
the ackID corresponding to the highest message offset and will
consider that message and all previous messages as acknowledged by
the consumer. If enabled (retry > 0), multiple acknowledgement
will be tried in case of problems/glitches with the AMS service.
retry* options are eventually passed to _retry_make_request()
the consumer.
Args:
sub: str. The subscription name.
Expand All @@ -937,11 +939,60 @@ def ack_sub(self, sub, ids, retry=0, retrysleep=60, retrybackoff=None,
# Compose url
url = route[1].format(self.endpoint, self.token, self.project, "", sub)
method = getattr(self, 'do_{0}'.format(route[0]))
method(url, msg_body, "sub_ack", retry=retry, retrysleep=retrysleep,
retrybackoff=retrybackoff, **reqkwargs)
method(url, msg_body, "sub_ack", **reqkwargs)

return True

def pullack_sub(self, sub, num=1, return_immediately=False, retry=0,
retrysleep=60, retrybackoff=None, **reqkwargs):
"""Pull messages from subscription and acknownledge them in one call.
If enabled (retry > 0), multiple subscription pulls will be tried in
case of problems/glitches with the AMS service. retry* options are
eventually passed to _retry_make_request().
If succesfull subscription pull immediately follows with failed
acknownledgment (e.g. network hiccup just before acknowledgement of
received messages), consume cycle will reset and start from
beginning with new subscription pull. This ensures that ack deadline
time window is moved to new start period, that is the time when the
second pull was initiated.
Args:
sub: str. The subscription name.
num: int. The number of messages to pull.
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
while True:
try:
ackIds = list()
messages = list()

for id, msg in self.pull_sub(sub, num,
return_immediately=return_immediately,
retry=retry,
retrysleep=retrysleep,
retrybackoff=retrybackoff,
**reqkwargs):
ackIds.append(id)
messages.append(msg)

except AmsException as e:
raise e

if messages and ackIds:
try:
self.ack_sub(sub, ackIds, **reqkwargs)
break
except AmsException as e:
log.warning('Continuing with sub_pull after sub_ack: {0}'.format(e))
pass
else:
break

return messages

def set_pullopt(self, key, value):
"""Function for setting pull options
Expand Down
35 changes: 34 additions & 1 deletion pymod/amssubscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,40 @@ def pull(self, num=1, retry=0, retrysleep=60, retrybackoff=None,
[(ackId, AmsMessage)]: List of tuples with ackId and AmsMessage instance
"""

return self.init.pull_sub(self.name, num=num, return_immediately=return_immediately, **reqkwargs)
return self.init.pull_sub(self.name, num=num,
return_immediately=return_immediately,
**reqkwargs)

def pullack(self, num=1, retry=0, retrysleep=60, retrybackoff=None,
return_immediately=False, **reqkwargs):
"""Pull messages from subscription and acknownledge them in one call.
If succesfull subscription pull immediately follows with failed
acknownledgment (e.g. network hiccup just before acknowledgement of
received messages), consume cycle will reset and start from
begginning with new subscription pull.
Kwargs:
num (int): Number of messages to pull
retry: int. Number of request retries before giving up. Default
is 0 meaning no further request retry will be made
after first unsuccesfull request.
retrysleep: int. Static number of seconds to sleep before next
request attempt
retrybackoff: int. Backoff factor to apply between each request
attempts
return_immediately (boolean): If True and if stream of messages is empty,
subscriber call will not block and wait for
messages
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
Return:
[AmsMessage1, AmsMessage2]: List of AmsMessage instances
"""

return self.init.pullack_sub(self.name, num=num,
return_immediately=return_immediately,
**reqkwargs)

def time_to_offset(self, timestamp, **reqkwargs):
"""
Expand Down
Loading

0 comments on commit 70d2fdc

Please sign in to comment.