Skip to content

Commit

Permalink
Fix wrong package stream resulting in 200 Ok
Browse files Browse the repository at this point in the history
Assuming we want to keep our stream-redirect approach on the
content-app, We cant recover from wrong data already sent if
the Remote happens to be corrupted (contains wrong binaries).

In order to not give a 200 reponse to client, we decided to
close the connection as soon as the request handler realizes
the checksum is wrong.

That only happens after we already sent the whole blob minus EOF,
so we close the connection before sending the EOF.

Additionally, we put some message on the logs for admins to see
and have a chance to manually fix the remote/remote_artifacts.

fixes #5012
  • Loading branch information
pedro-psb committed Nov 19, 2024
1 parent 378373a commit 239643d
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGES/5012.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed content-app behavior for the case where the client would get a 200 response for a package
streamed from a Remote which didnt match the expected checksum.
Now, the connection is closed before finalizing the response.
14 changes: 12 additions & 2 deletions pulp_file/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def file_fixtures_root(tmp_path):

@pytest.fixture
def write_3_iso_file_fixture_data_factory(file_fixtures_root):
def _write_3_iso_file_fixture_data_factory(name):
file_fixtures_root.joinpath(name).mkdir()
def _write_3_iso_file_fixture_data_factory(name, exist_ok=False):
file_fixtures_root.joinpath(name).mkdir(exist_ok=exist_ok)
file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"))
file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"))
file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"))
Expand All @@ -101,6 +101,16 @@ def basic_manifest_path(write_3_iso_file_fixture_data_factory):
return write_3_iso_file_fixture_data_factory("basic")


@pytest.fixture
def basic_manifest_path_overwrite(write_3_iso_file_fixture_data_factory):
"""After the first call, subsequent calls to build will overwrite the fixture content."""

def build():
return write_3_iso_file_fixture_data_factory("basic", exist_ok=True)

return build


@pytest.fixture
def copy_manifest_only_factory(file_fixtures_root):
def _copy_manifest_only(name):
Expand Down
39 changes: 36 additions & 3 deletions pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from multidict import CIMultiDict
import os
import re
import socket
import struct
from gettext import gettext as _

from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError
Expand Down Expand Up @@ -54,7 +56,10 @@
cache_key,
)

from pulpcore.exceptions import UnsupportedDigestValidationError # noqa: E402
from pulpcore.exceptions import ( # noqa: E402
UnsupportedDigestValidationError,
DigestValidationError,
)
from pulpcore.metrics import artifacts_size_counter # noqa: E402

from jinja2 import Template # noqa: E402: module level not at top of file
Expand Down Expand Up @@ -1125,13 +1130,27 @@ async def finalize():
await original_finalize()

downloader = remote.get_downloader(
remote_artifact=remote_artifact, headers_ready_callback=handle_response_headers
remote_artifact=remote_artifact,
headers_ready_callback=handle_response_headers,
)
original_handle_data = downloader.handle_data
downloader.handle_data = handle_data
original_finalize = downloader.finalize
downloader.finalize = finalize
download_result = await downloader.run()
try:
download_result = await downloader.run()
except DigestValidationError:
# Cant recover from wrong data already sent.
# We should close the connection without sending an EOF in the response
await downloader.session.close()
close_tcp_connection(request.transport._sock)
raise RuntimeError(
f"We tried streaming {remote_artifact.url!r} to the client, but it"
"failed checkusm validation. "
"At this point, we cant recover from wrong data already sent, "
"so we are forcing the connection to close. "
"If this error persists, the remote server might be corrupted."
)

if content_length := response.headers.get("Content-Length"):
response.headers["X-PULP-ARTIFACT-SIZE"] = content_length
Expand All @@ -1149,3 +1168,17 @@ async def finalize():
if response.status == 404:
raise HTTPNotFound()
return response


def close_tcp_connection(sock):
"""Configure socket to close TCP connection immediately."""
try:
l_onoff = 1
l_linger = 0 # 0 seconds timeout - immediate close
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", l_onoff, l_linger))
# Another possibility is configure the socket to send a RST instead of FIN,
# but I'm not sure if that's required:
# https://serverfault.com/questions/242302/use-of-tcp-fin-and-tcp-rst
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except (socket.error, OSError) as e:
log.warning(f"Error configuring socket for force close: {e}")
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Tests related to content delivery."""

from aiohttp.client_exceptions import ClientResponseError
from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError
import hashlib
import pytest
import subprocess
from urllib.parse import urljoin

from pulpcore.client.pulp_file import (
Expand Down Expand Up @@ -102,3 +103,53 @@ def test_remote_artifact_url_update(
actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest()
expected_checksum = expected_file_list[0][1]
assert expected_checksum == actual_checksum


@pytest.mark.parallel
def test_remote_content_changed_with_on_demand(
basic_manifest_path_overwrite,
file_repo_with_auto_publish,
file_remote_ssl_factory,
file_bindings,
basic_manifest_path,
monitor_task,
file_distribution_factory,
):
"""When:
1. Sync a remote with fileA(digest=123)
2. On remote server, change fileA content: fileA(digest=456)
Then:
3. Get fileA from content app will cause a connection-close/incomplete-response.
"""
# Create a remote that points to a repository that only has the manifest, but no content
remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="on_demand")

# Sync from the remote
body = RepositorySyncURL(remote=remote.pulp_href)
monitor_task(
file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task
)
repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href)

# Create a distribution from the publication
distribution = file_distribution_factory(repository=repo.pulp_href)

# Download the manifest from the remote
expected_file_list = list(get_files_in_manifest(remote.url))

# Overwrite files in remote_server (change digest but keep name)
basic_manifest_path_overwrite()

# Assert response is incomplete
get_url = urljoin(distribution.base_url, expected_file_list[0][0])
with pytest.raises(ClientPayloadError, match="Response payload is not completed"):
download_file(get_url)

# Assert with curl just to be sure
result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
curl_error = (
b"* Closing connection 0\ncurl: (18) transfer closed with outstanding read data remaining\n"
)
assert result.returncode == 18
assert curl_error in result.stderr

0 comments on commit 239643d

Please sign in to comment.