Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding multiprocessing #148

Merged
merged 4 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ target/

# virtual env
venv/
.venv/

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ requests>=2.10.0
six
xmltodict
lxml
pathos
189 changes: 119 additions & 70 deletions rtcclient/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Tuple
from pathos.pools import ThreadPool
import abc
import logging
from rtcclient import requests
Expand All @@ -20,8 +22,7 @@ def __init__(self, url):
self.url = self.validate_url(url)

def __repr__(self):
return "<%s %s>" % (self.__class__.__name__,
str(self))
return "<%s %s>" % (self.__class__.__name__, str(self))

@abc.abstractmethod
def __str__(self):
Expand Down Expand Up @@ -52,9 +53,7 @@ def get_rtc_obj(self):
pass

@token_expire_handler
def get(self, url,
verify=False, headers=None, proxies=None,
timeout=60, **kwargs):
def get(self, url, verify=False, headers=None, proxies=None, timeout=60, **kwargs):
"""Sends a GET request. Refactor from requests module

:param url: URL for the new :class:`Request` object.
Expand All @@ -74,19 +73,33 @@ def get(self, url,
"""

self.log.debug("Get response from %s", url)
response = requests.get(url, verify=verify, headers=headers,
proxies=proxies, timeout=timeout, **kwargs)
response = requests.get(
url,
verify=verify,
headers=headers,
proxies=proxies,
timeout=timeout,
**kwargs
)
if response.status_code != 200:
self.log.error('Failed GET request at <%s> with response: %s',
url,
response.content)
self.log.error(
"Failed GET request at <%s> with response: %s", url, response.content
)
response.raise_for_status()
return response

@token_expire_handler
def post(self, url, data=None, json=None,
verify=False, headers=None, proxies=None,
timeout=60, **kwargs):
def post(
self,
url,
data=None,
json=None,
verify=False,
headers=None,
proxies=None,
timeout=60,
**kwargs
):
"""Sends a POST request. Refactor from requests module

:param url: URL for the new :class:`Request` object.
Expand All @@ -109,23 +122,39 @@ def post(self, url, data=None, json=None,
:rtype: requests.Response
"""

self.log.debug("Post a request to %s with data: %s and json: %s",
url, data, json)
response = requests.post(url, data=data, json=json,
verify=verify, headers=headers,
proxies=proxies, timeout=timeout, **kwargs)
self.log.debug(
"Post a request to %s with data: %s and json: %s", url, data, json
)
response = requests.post(
url,
data=data,
json=json,
verify=verify,
headers=headers,
proxies=proxies,
timeout=timeout,
**kwargs
)

if response.status_code not in [200, 201]:
self.log.error('Failed POST request at <%s> with response: %s',
url,
response.content)
self.log.error(
"Failed POST request at <%s> with response: %s", url, response.content
)
self.log.info(response.status_code)
response.raise_for_status()
return response

@token_expire_handler
def put(self, url, data=None, verify=False,
headers=None, proxies=None, timeout=60, **kwargs):
def put(
self,
url,
data=None,
verify=False,
headers=None,
proxies=None,
timeout=60,
**kwargs
):
"""Sends a PUT request. Refactor from requests module

:param url: URL for the new :class:`Request` object.
Expand All @@ -146,21 +175,27 @@ def put(self, url, data=None, verify=False,
:rtype: requests.Response
"""

self.log.debug("Put a request to %s with data: %s",
url, data)
response = requests.put(url, data=data,
verify=verify, headers=headers,
proxies=proxies, timeout=timeout, **kwargs)
self.log.debug("Put a request to %s with data: %s", url, data)
response = requests.put(
url,
data=data,
verify=verify,
headers=headers,
proxies=proxies,
timeout=timeout,
**kwargs
)
if response.status_code not in [200, 201]:
self.log.error('Failed PUT request at <%s> with response: %s',
url,
response.content)
self.log.error(
"Failed PUT request at <%s> with response: %s", url, response.content
)
response.raise_for_status()
return response

@token_expire_handler
def delete(self, url, headers=None, verify=False, proxies=None,
timeout=60, **kwargs):
def delete(
self, url, headers=None, verify=False, proxies=None, timeout=60, **kwargs
):
"""Sends a DELETE request. Refactor from requests module

:param url: URL for the new :class:`Request` object.
Expand All @@ -179,18 +214,19 @@ def delete(self, url, headers=None, verify=False, proxies=None,
:rtype: requests.Response
"""

self.log.debug("Delete a request to %s",
url)
response = requests.delete(url,
headers=headers,
verify=verify,
proxies=proxies,
timeout=timeout,
**kwargs)
self.log.debug("Delete a request to %s", url)
response = requests.delete(
url,
headers=headers,
verify=verify,
proxies=proxies,
timeout=timeout,
**kwargs
)
if response.status_code not in [200, 201]:
self.log.error('Failed DELETE request at <%s> with response: %s',
url,
response.content)
self.log.error(
"Failed DELETE request at <%s> with response: %s", url, response.content
)
response.raise_for_status()
return response

Expand All @@ -207,7 +243,7 @@ def validate_url(cls, url):
return None

url = url.strip()
while url.endswith('/'):
while url.endswith("/"):
url = url[:-1]
return url

Expand Down Expand Up @@ -236,15 +272,17 @@ def get_rtc_obj(self):
def _initialize(self):
"""Initialize the object from the request"""

self.log.debug("Start initializing data from %s",
self.url)
resp = self.get(self.url,
verify=False,
proxies=self.rtc_obj.proxies,
headers=self.rtc_obj.headers)
self.log.debug("Start initializing data from %s", self.url)
resp = self.get(
self.url,
verify=False,
proxies=self.rtc_obj.proxies,
headers=self.rtc_obj.headers,
)
self.__initialize(resp)
self.log.info("Finish the initialization for <%s %s>",
self.__class__.__name__, self)
self.log.info(
"Finish the initialization for <%s %s>", self.__class__.__name__, self
)

def __initialize(self, resp):
"""Initialize from the response"""
Expand All @@ -257,11 +295,12 @@ def __initialize(self, resp):
def __initializeFromRaw(self):
"""Initialze from raw data (OrderedDict)"""

for (key, value) in self.raw_data.items():
def process_items(item) -> Tuple:
key, value = item
if key.startswith("@"):
# be compatible with IncludedInBuild
if "@oslc_cm:label" != key:
continue
return None

attr = key.split(":")[-1].replace("-", "_")
attr_list = attr.split(".")
Expand All @@ -270,9 +309,7 @@ def __initializeFromRaw(self):
if len(attr_list) > 1:
# attr = "_".join([attr_list[-2],
# attr_list[-1]])
continue

self.field_alias[attr] = key
return None

if isinstance(value, OrderedDict):
value_text = value.get("#text")
Expand All @@ -286,7 +323,15 @@ def __initializeFromRaw(self):
value = self.__get_rdf_resource_title(value)
except (exception.RTCException, Exception):
self.log.error("Unable to handle %s", value)
self.setattr(attr, value)
return key, attr, value

with ThreadPool() as pool:
for processed in pool.map(process_items, self.raw_data.items()):
if processed is None:
continue
key, attr, value = processed
self.field_alias[attr] = key
self.setattr(attr, value)

def __get_rdf_resource_title(self, rdf_url):
# handle for /jts/users
Expand All @@ -301,10 +346,12 @@ def __get_rdf_resource_title(self, rdf_url):
if "/resource/content/" in rdf_url:
return rdf_url

resp = self.get(rdf_url,
verify=False,
proxies=self.rtc_obj.proxies,
headers=self.rtc_obj.headers)
resp = self.get(
rdf_url,
verify=False,
proxies=self.rtc_obj.proxies,
headers=self.rtc_obj.headers,
)
raw_data = xmltodict.parse(resp.content)

root_key = list(raw_data.keys())[0]
Expand All @@ -313,13 +360,17 @@ def __get_rdf_resource_title(self, rdf_url):
# no total count
# only single resource
# compatible with IncludedInBuild
return (raw_data[root_key].get("dc:title") or
raw_data[root_key].get("foaf:nick"))
return raw_data[root_key].get("dc:title") or raw_data[root_key].get(
"foaf:nick"
)
else:
# multiple resource
result_list = list()
entry_keys = [entry_key for entry_key in raw_data[root_key].keys()
if not entry_key.startswith("@")]
entry_keys = [
entry_key
for entry_key in raw_data[root_key].keys()
if not entry_key.startswith("@")
]
for entry_key in entry_keys:
entries = raw_data[root_key][entry_key]
if isinstance(entries, OrderedDict):
Expand All @@ -336,9 +387,7 @@ def __get_rdf_resource_title(self, rdf_url):

def __handle_rdf_entry(self, entry):
# only return useful info instead of the whole object
return_fields = ["rtc_cm:userId",
"dc:title",
"dc:description"]
return_fields = ["rtc_cm:userId", "dc:title", "dc:description"]
subkeys = entry.keys()
for return_field in return_fields:
if return_field in subkeys:
Expand Down
Loading