Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address comments from AI code review #461

Merged
merged 4 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions inbc-program/inbc/parser/source_app_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


def application_add(args: argparse.Namespace) -> str:
if (args.gpgKeyUri and args.gpgKeyName is None) or (args.gpgKeyName and args.gpgKeyUri is None):
if bool(args.gpgKeyUri) != bool(args.gpgKeyName):
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
raise InbcException(
"Source requires either both gpgKeyUri and gpgKeyName to be provided, or neither of them.")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -35,8 +35,8 @@ def application_add(args: argparse.Namespace) -> str:

manifest += '<repo><repos>'

for source in args.sources:
manifest += '<source_pkg>' + source + '</source_pkg>'
source_tags = [f'<source_pkg>{source}</source_pkg>' for source in args.sources]
manifest += ''.join(source_tags)
nmgaston marked this conversation as resolved.
Show resolved Hide resolved

manifest += ('</repos>'
f'{create_xml_tag(arguments, "filename")}</repo>'
Expand Down
116 changes: 58 additions & 58 deletions inbc-program/tests/unit/test_source_app_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def test_parse_add_arguments_deb822_format_separate_lines_successfully(self):
f = self.arg_parser.parse_args(
['source', 'application', 'add',
'--sources', 'X-Repolib-Name: Google Chrome',
'Enabled: yes',
'Types: deb',
'URIs: https://dl-ssl.google.com/linux/linux_signing_key.pub',
'Suites: stable',
'Components: main',
'Enabled: yes',
'Types: deb',
'URIs: https://dl-ssl.google.com/linux/linux_signing_key.pub',
'Suites: stable',
'Components: main',
'--filename', 'intel-gpu-jammy.list'])
self.assertEqual(f.gpgKeyUri, None)
self.assertEqual(f.gpgKeyName, None)
Expand All @@ -46,64 +46,64 @@ def test_parse_add_arguments_deb822_format_separate_lines_successfully(self):
'Components: main'])
self.assertEqual(f.filename, 'intel-gpu-jammy.list')

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_raise_application_add_with_only_one_gpgKeyUri_param(self, m_connect):
p = self.arg_parser.parse_args(
['source', 'application', 'add',
'--gpgKeyUri', 'https://repositories.intel.com/gpu/intel-graphics.key',
'--sources', 'deb http://example.com/ focal main restricted universe',
'deb-src http://example.com/ focal-security main',
'--filename', 'intel-gpu-jammy.list'])
with pytest.raises(InbcException,
match="Source requires either both gpgKeyUri and gpgKeyName "
"to be provided, or neither of them."):
Inbc(p, 'source', False)

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_raise_application_add_with_only_one_gpgKeyName_param(self, m_connect):
p = self.arg_parser.parse_args(
['source', 'application', 'add',
'--gpgKeyName', 'intel-graphics.gpg',
'--sources', 'deb http://example.com/ focal main restricted universe',
'deb-src http://example.com/ focal-security main',
'--filename', 'intel-gpu-jammy.list'])
with pytest.raises(InbcException,
match="Source requires either both gpgKeyUri and gpgKeyName "
"to be provided, or neither of them."):
Inbc(p, 'source', False)

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_create_add_deb_822_format_manifest_successfully(self, m_connect):
def test_raise_application_add_with_only_one_gpgKeyUri_param(self):
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
p = self.arg_parser.parse_args(
['source', 'application', 'add',
'--gpgKeyUri', 'https://repositories.intel.com/gpu/intel-graphics.key',
'--sources', 'deb http://example.com/ focal main restricted universe',
'deb-src http://example.com/ focal-security main',
'--filename', 'intel-gpu-jammy.list'])
with pytest.raises(InbcException,
match="Source requires either both gpgKeyUri and gpgKeyName "
"to be provided, or neither of them."):
Inbc(p, 'source', False)
nmgaston marked this conversation as resolved.
Show resolved Hide resolved

def test_raise_application_add_with_only_one_gpgKeyName_param(self):
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
p = self.arg_parser.parse_args(
['source', 'application', 'add',
'--gpgKeyName', 'intel-graphics.gpg',
'--sources', 'deb http://example.com/ focal main restricted universe',
'deb-src http://example.com/ focal-security main',
'--filename', 'intel-gpu-jammy.list'])
with pytest.raises(InbcException,
match="Source requires either both gpgKeyUri and gpgKeyName "
"to be provided, or neither of them."):
Inbc(p, 'source', False)

def test_create_add_deb_822_format_manifest_successfully(self):
p = self.arg_parser.parse_args(
['source', 'application', 'add',
'--sources', 'X-Repolib-Name: Google Chrome',
'Enabled: yes',
'Types: deb',
'URIs: https://dl-ssl.google.com/linux/linux_signing_key.pub',
'Suites: stable',
'Components: main',
'Enabled: yes',
'Types: deb',
'URIs: https://dl-ssl.google.com/linux/linux_signing_key.pub',
'Suites: stable',
'Components: main',
'--filename', 'google-chrome.sources'])
Inbc(p, 'source', False)
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
Inbc(p, 'source', False)
expected = '<?xml version="1.0" encoding="utf-8"?><manifest><type>source</type><applicationSource>' \
'<add><repo><repos><source_pkg>X-Repolib-Name: Google Chrome</source_pkg>' \
'<source_pkg>Enabled: yes</source_pkg>' \
'<source_pkg>Types: deb</source_pkg>'\
'<source_pkg>Types: deb</source_pkg>' \
'<source_pkg>URIs: https://dl-ssl.google.com/linux/linux_signing_key.pub</source_pkg>' \
'<source_pkg>Suites: stable</source_pkg>' \
'<source_pkg>Components: main</source_pkg>' \
'</repos><filename>google-chrome.sources</filename></repo></add></applicationSource></manifest>'
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(p.func(p), expected)

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_create_add_all_param_manifest_successfully(self, m_connect):
def test_create_add_all_param_manifest_successfully(self):
p = self.arg_parser.parse_args(
['source', 'application', 'add',
'--gpgKeyUri', 'https://repositories.intel.com/gpu/intel-graphics.key',
'--gpgKeyName', 'intel-graphics.gpg',
'--sources', 'deb http://example.com/ focal main restricted universe',
'deb-src http://example.com/ focal-security main',
'--filename', 'intel-gpu-jammy.list'])
Inbc(p, 'source', False)
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
Inbc(p, 'source', False)
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
expected = '<?xml version="1.0" encoding="utf-8"?><manifest><type>source</type><applicationSource>' \
'<add><gpg><uri>https://repositories.intel.com/gpu/intel-graphics.key</uri>' \
'<keyname>intel-graphics.gpg</keyname></gpg><repo><repos>' \
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -112,14 +112,14 @@ def test_create_add_all_param_manifest_successfully(self, m_connect):
'</repos><filename>intel-gpu-jammy.list</filename></repo></add></applicationSource></manifest>'
self.assertEqual(p.func(p), expected)

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_create_add_minus_gpg_manifest_successfully(self, m_connect):
def test_create_add_minus_gpg_manifest_successfully(self):
p = self.arg_parser.parse_args(
['source', 'application', 'add',
'--sources', 'deb http://example.com/ focal main restricted universe',
'deb-src http://example.com/ focal-security main',
'--filename', 'intel-gpu-jammy.list'])
Inbc(p, 'source', False)
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
Inbc(p, 'source', False)
expected = '<?xml version="1.0" encoding="utf-8"?><manifest><type>source</type><applicationSource>' \
'<add><repo><repos>' \
'<source_pkg>deb http://example.com/ focal main restricted universe</source_pkg>' \
Expand All @@ -142,24 +142,24 @@ def test_parse_minus_gpg_remove_arguments_successfully(self):
self.assertEqual(f.gpgKeyName, None)
self.assertEqual(f.filename, 'intel-gpu-jammy.list')

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_create_remove_manifest_all_params_successfully(self, m_connect):
def test_create_remove_manifest_all_params_successfully(self):
p = self.arg_parser.parse_args(
['source', 'application', 'remove',
'--gpgKeyName', 'intel-gpu-jammy.gpg',
'--filename', 'intel-gpu-jammy.list'])
Inbc(p, 'source', False)
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
Inbc(p, 'source', False)
expected = '<?xml version="1.0" encoding="utf-8"?><manifest><type>source</type><applicationSource>' \
'<remove><gpg><keyname>intel-gpu-jammy.gpg</keyname></gpg>' \
'<repo><filename>intel-gpu-jammy.list</filename></repo></remove></applicationSource></manifest>'
self.assertEqual(p.func(p), expected)

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_create_remove_manifest_minus_gpg_successfully(self, m_connect):
def test_create_remove_manifest_minus_gpg_successfully(self):
p = self.arg_parser.parse_args(
['source', 'application', 'remove',
'--filename', 'intel-gpu-jammy.list'])
Inbc(p, 'source', False)
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
Inbc(p, 'source', False)
expected = '<?xml version="1.0" encoding="utf-8"?><manifest><type>source</type><applicationSource><remove>' \
'<repo><filename>intel-gpu-jammy.list</filename></repo></remove></applicationSource></manifest>'
self.assertEqual(p.func(p), expected)
Expand All @@ -174,25 +174,25 @@ def test_parse_update_arguments_successfully(self):
'deb-src http://example.com/ focal-security main'])
self.assertEqual(f.filename, 'intel-gpu-jammy.list')

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_create_update_manifest_successfully(self, m_connect):
def test_create_update_manifest_successfully(self):
p = self.arg_parser.parse_args(
['source', 'application', 'update',
'--sources', 'deb http://example.com/ focal main restricted universe',
'deb-src http://example.com/ focal-security main',
'--filename', 'intel-gpu-jammy.list'])
Inbc(p, 'source', False)
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
Inbc(p, 'source', False)
expected = '<?xml version="1.0" encoding="utf-8"?><manifest><type>source</type><applicationSource>' \
'<update><repo><repos><source_pkg>deb http://example.com/ focal main restricted universe' \
'</source_pkg><source_pkg>deb-src http://example.com/ focal-security main</source_pkg>' \
'</repos><filename>intel-gpu-jammy.list</filename></repo></update></applicationSource></manifest>'
self.assertEqual(p.func(p), expected)

@patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect')
def test_create_list_manifest_successfully(self, m_connect):
def test_create_list_manifest_successfully(self):
p = self.arg_parser.parse_args(
['source', 'application', 'list'])
Inbc(p, 'source', False)
with patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') as m_connect:
Inbc(p, 'source', False)
expected = '<?xml version="1.0" encoding="utf-8"?><manifest><type>source</type><applicationSource>' \
'<list/></applicationSource></manifest>'
self.assertEqual(p.func(p), expected)
8 changes: 5 additions & 3 deletions inbm-lib/inbm_common_lib/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ def canonicalize_uri(url: str) -> CanonicalUri:

WARNING: a string with no forward slashes will have a scheme added. e.g., 'a' -> 'https://a'

@param url: the url
@param url: URL
@return canonicalized version"""

if url and URL_NULL_CHAR in url:
raise UrlSecurityException("Unsafe characters detected in the url. Cannot process the request.")
raise UrlSecurityException("Unsafe characters detected in the URL. Cannot process the request.")

return CanonicalUri(value=url_normalize.url_normalize(url))
nmgaston marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -205,7 +205,9 @@ def is_within_directory(directory: str, target: str) -> bool:
return prefix == abs_directory


def safe_extract(tarball: tarfile.TarFile, path: str = ".", members: Optional[Iterable[tarfile.TarInfo]] = None, *, numeric_owner: bool = False) -> None:
def safe_extract(tarball: tarfile.TarFile,
path: str = ".",
members: Optional[Iterable[tarfile.TarInfo]] = None, *, numeric_owner: bool = False) -> None:
"""Avoid path traversal when extracting tarball

@param tarball: tarball to extract
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
12 changes: 6 additions & 6 deletions inbm/dispatcher-agent/dispatcher/source/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from enum import Enum, unique
from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, List

UBUNTU_APT_SOURCES_LIST = "/etc/apt/sources.list"
UBUNTU_APT_SOURCES_LIST_D = "/etc/apt/sources.list.d"
Expand All @@ -26,22 +26,22 @@ class SourceParameters:

@dataclass(kw_only=True, frozen=True)
class ApplicationAddSourceParameters:
file_name: str
sources: list[str] = field(default_factory=lambda: [])
source_list_file_name: str
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
sources: List[str] = field(default_factory=list)
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
gpg_key_uri: Optional[str] = field(default=None)
gpg_key_name: Optional[str] = field(default=None)


@dataclass(kw_only=True, frozen=True)
class ApplicationRemoveSourceParameters:
file_name: str
source_list_file_name: str
gpg_key_name: Optional[str] = field(default=None)


@dataclass(kw_only=True, frozen=True)
class ApplicationUpdateSourceParameters:
file_name: str
sources: list[str] = field(default_factory=lambda: [])
source_list_file_name: str
sources: List[str] = field(default_factory=list)
nmgaston marked this conversation as resolved.
Show resolved Hide resolved


@unique
Expand Down
5 changes: 3 additions & 2 deletions inbm/dispatcher-agent/dispatcher/source/linux_gpg_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .source_exception import SourceError
from .constants import LINUX_GPG_KEY_PATH
from inbm_common_lib.utility import remove_file

logger = logging.getLogger(__name__)

Expand All @@ -21,7 +22,7 @@ def remove_gpg_key_if_exists(gpg_key_name: str) -> None:
try:
key_path = os.path.join(LINUX_GPG_KEY_PATH, gpg_key_name)
if os.path.exists(key_path):
os.remove(key_path)
remove_file(key_path)
# it's OK if the key is not there
nmgaston marked this conversation as resolved.
Show resolved Hide resolved

except OSError as e:
Expand Down Expand Up @@ -61,4 +62,4 @@ def add_gpg_key(remote_key_path: str, key_store_name: str) -> None:
raise SourceError(f"Error getting GPG key from remote source: {e}")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved

except subprocess.CalledProcessError as e:
raise SourceError(f"Error running gpg command to dearmor key: {e}")
raise SourceError(f"Error running GPG command to dearmor key: {e}")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 4 additions & 4 deletions inbm/dispatcher-agent/dispatcher/source/source_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _handle_app_source_command(
Handle the application source commands.

@param parsed_head: XmlHandler with command information
@param os_type: os type
@param os_type: OS type
@param app_action: The action to be performed
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
@return Result
"""
Expand All @@ -120,7 +120,7 @@ def _handle_app_source_command(
logger.info(f"Optional GPG key parameters not present in manifest")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
filename = parsed_head.get_children("applicationSource/remove/repo")["filename"]
application_source_manager.remove(
ApplicationRemoveSourceParameters(file_name=filename, gpg_key_name=keyname)
ApplicationRemoveSourceParameters(source_list_file_name=filename, gpg_key_name=keyname)
)
return Result(status=200, message="SUCCESS")

Expand All @@ -144,7 +144,7 @@ def _handle_app_source_command(

application_source_manager.add(
ApplicationAddSourceParameters(
file_name=repo_filename,
source_list_file_name=repo_filename,
gpg_key_name=gpg_key_name,
gpg_key_uri=gpg_key_uri,
sources=add_source_pkgs,
Expand All @@ -161,7 +161,7 @@ def _handle_app_source_command(

application_source_manager.update(
ApplicationUpdateSourceParameters(
file_name=repo_filename,
source_list_file_name=repo_filename,
sources=update_source_pkgs,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def add(self, parameters: ApplicationAddSourceParameters) -> None:
# Step 2: Add the source
try:
create_file_with_contents(
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
os.path.join(UBUNTU_APT_SOURCES_LIST_D, parameters.file_name), parameters.sources
os.path.join(UBUNTU_APT_SOURCES_LIST_D, parameters.source_list_file_name), parameters.sources
)
except (IOError, OSError) as e:
raise SourceError(f"Error adding application source list: {e}")
Expand Down Expand Up @@ -146,26 +146,26 @@ def remove(self, parameters: ApplicationRemoveSourceParameters) -> None:
# Remove the file under /etc/apt/sources.list.d
try:
if (
os.path.sep in parameters.file_name
or parameters.file_name == ".."
or parameters.file_name == "."
os.path.sep in parameters.source_list_file_name
or parameters.source_list_file_name == ".."
or parameters.source_list_file_name == "."
):
raise SourceError(f"Invalid file name: {parameters.file_name}")
raise SourceError(f"Invalid file name: {parameters.source_list_file_name}")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
nmgaston marked this conversation as resolved.
Show resolved Hide resolved

if not remove_file(
get_canonical_representation_of_path(
os.path.join(UBUNTU_APT_SOURCES_LIST_D, parameters.file_name)
os.path.join(UBUNTU_APT_SOURCES_LIST_D, parameters.source_list_file_name)
)
):
raise SourceError(f"Error removing file: {parameters.file_name}")
raise SourceError(f"Error removing file: {parameters.source_list_file_name}")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
except OSError as e:
raise SourceError(f"Error removing file: {e}") from e

def update(self, parameters: ApplicationUpdateSourceParameters) -> None:
"""Updates a source file in Ubuntu OS source file list under /etc/apt/sources.list.d"""
try:
create_file_with_contents(
os.path.join(UBUNTU_APT_SOURCES_LIST_D, parameters.file_name), parameters.sources
os.path.join(UBUNTU_APT_SOURCES_LIST_D, parameters.source_list_file_name), parameters.sources
)
except IOError as e:
raise SourceError(f"Error occurred while trying to update sources: {e}") from e
Loading