Skip to content

Commit

Permalink
Add option to select list of accepted ssl ciphers in httpx client (ho…
Browse files Browse the repository at this point in the history
  • Loading branch information
mib1185 authored Apr 15, 2023
1 parent f37b1fc commit 67c4de9
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 45 deletions.
11 changes: 9 additions & 2 deletions homeassistant/helpers/httpx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from homeassistant.const import APPLICATION_NAME, EVENT_HOMEASSISTANT_CLOSE, __version__
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.loader import bind_hass
from homeassistant.util.ssl import get_default_context, get_default_no_verify_context
from homeassistant.util.ssl import (
SSLCipherList,
client_context,
create_no_verify_ssl_context,
)

from .frame import warn_use

Expand Down Expand Up @@ -56,6 +60,7 @@ def create_async_httpx_client(
hass: HomeAssistant,
verify_ssl: bool = True,
auto_cleanup: bool = True,
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
**kwargs: Any,
) -> httpx.AsyncClient:
"""Create a new httpx.AsyncClient with kwargs, i.e. for cookies.
Expand All @@ -66,7 +71,9 @@ def create_async_httpx_client(
This method must be run in the event loop.
"""
ssl_context = (
get_default_context() if verify_ssl else get_default_no_verify_context()
client_context(ssl_cipher_list)
if verify_ssl
else create_no_verify_ssl_context(ssl_cipher_list)
)
client = HassHttpXAsyncClient(
verify=ssl_context,
Expand Down
118 changes: 75 additions & 43 deletions homeassistant/util/ssl.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,70 @@
"""Helper to create SSL contexts."""
import contextlib
from functools import cache
from os import environ
import ssl

import certifi

from homeassistant.backports.enum import StrEnum

def create_no_verify_ssl_context() -> ssl.SSLContext:

class SSLCipherList(StrEnum):
"""SSL cipher lists."""

PYTHON_DEFAULT = "python_default"
INTERMEDIATE = "intermediate"
MODERN = "modern"


SSL_CIPHER_LISTS = {
SSLCipherList.INTERMEDIATE: (
"ECDHE-ECDSA-CHACHA20-POLY1305:"
"ECDHE-RSA-CHACHA20-POLY1305:"
"ECDHE-ECDSA-AES128-GCM-SHA256:"
"ECDHE-RSA-AES128-GCM-SHA256:"
"ECDHE-ECDSA-AES256-GCM-SHA384:"
"ECDHE-RSA-AES256-GCM-SHA384:"
"DHE-RSA-AES128-GCM-SHA256:"
"DHE-RSA-AES256-GCM-SHA384:"
"ECDHE-ECDSA-AES128-SHA256:"
"ECDHE-RSA-AES128-SHA256:"
"ECDHE-ECDSA-AES128-SHA:"
"ECDHE-RSA-AES256-SHA384:"
"ECDHE-RSA-AES128-SHA:"
"ECDHE-ECDSA-AES256-SHA384:"
"ECDHE-ECDSA-AES256-SHA:"
"ECDHE-RSA-AES256-SHA:"
"DHE-RSA-AES128-SHA256:"
"DHE-RSA-AES128-SHA:"
"DHE-RSA-AES256-SHA256:"
"DHE-RSA-AES256-SHA:"
"ECDHE-ECDSA-DES-CBC3-SHA:"
"ECDHE-RSA-DES-CBC3-SHA:"
"EDH-RSA-DES-CBC3-SHA:"
"AES128-GCM-SHA256:"
"AES256-GCM-SHA384:"
"AES128-SHA256:"
"AES256-SHA256:"
"AES128-SHA:"
"AES256-SHA:"
"DES-CBC3-SHA:"
"!DSS"
),
SSLCipherList.MODERN: (
"ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
"ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
"ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
"ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
"ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256"
),
}


@cache
def create_no_verify_ssl_context(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
) -> ssl.SSLContext:
"""Return an SSL context that does not verify the server certificate.
This is a copy of aiohttp's create_default_context() function, with the
Expand All @@ -23,18 +81,30 @@ def create_no_verify_ssl_context() -> ssl.SSLContext:
# This only works for OpenSSL >= 1.0.0
sslcontext.options |= ssl.OP_NO_COMPRESSION
sslcontext.set_default_verify_paths()
if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT:
sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list])

return sslcontext


def client_context() -> ssl.SSLContext:
@cache
def client_context(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
) -> ssl.SSLContext:
"""Return an SSL context for making requests."""

# Reuse environment variable definition from requests, since it's already a
# requirement. If the environment variable has no value, fall back to using
# certs from certifi package.
cafile = environ.get("REQUESTS_CA_BUNDLE", certifi.where())

return ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile)
sslcontext = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile
)
if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT:
sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list])

return sslcontext


# Create this only once and reuse it
Expand Down Expand Up @@ -71,13 +141,7 @@ def server_context_modern() -> ssl.SSLContext:
if hasattr(ssl, "OP_NO_COMPRESSION"):
context.options |= ssl.OP_NO_COMPRESSION

context.set_ciphers(
"ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
"ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
"ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
"ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
"ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256"
)
context.set_ciphers(SSL_CIPHER_LISTS[SSLCipherList.MODERN])

return context

Expand All @@ -97,38 +161,6 @@ def server_context_intermediate() -> ssl.SSLContext:
if hasattr(ssl, "OP_NO_COMPRESSION"):
context.options |= ssl.OP_NO_COMPRESSION

context.set_ciphers(
"ECDHE-ECDSA-CHACHA20-POLY1305:"
"ECDHE-RSA-CHACHA20-POLY1305:"
"ECDHE-ECDSA-AES128-GCM-SHA256:"
"ECDHE-RSA-AES128-GCM-SHA256:"
"ECDHE-ECDSA-AES256-GCM-SHA384:"
"ECDHE-RSA-AES256-GCM-SHA384:"
"DHE-RSA-AES128-GCM-SHA256:"
"DHE-RSA-AES256-GCM-SHA384:"
"ECDHE-ECDSA-AES128-SHA256:"
"ECDHE-RSA-AES128-SHA256:"
"ECDHE-ECDSA-AES128-SHA:"
"ECDHE-RSA-AES256-SHA384:"
"ECDHE-RSA-AES128-SHA:"
"ECDHE-ECDSA-AES256-SHA384:"
"ECDHE-ECDSA-AES256-SHA:"
"ECDHE-RSA-AES256-SHA:"
"DHE-RSA-AES128-SHA256:"
"DHE-RSA-AES128-SHA:"
"DHE-RSA-AES256-SHA256:"
"DHE-RSA-AES256-SHA:"
"ECDHE-ECDSA-DES-CBC3-SHA:"
"ECDHE-RSA-DES-CBC3-SHA:"
"EDH-RSA-DES-CBC3-SHA:"
"AES128-GCM-SHA256:"
"AES256-GCM-SHA384:"
"AES128-SHA256:"
"AES256-SHA256:"
"AES128-SHA:"
"AES256-SHA:"
"DES-CBC3-SHA:"
"!DSS"
)
context.set_ciphers(SSL_CIPHER_LISTS[SSLCipherList.INTERMEDIATE])

return context
53 changes: 53 additions & 0 deletions tests/util/test_ssl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Test Home Assistant ssl utility functions."""

from unittest.mock import MagicMock, Mock, patch

import pytest

from homeassistant.util.ssl import (
SSL_CIPHER_LISTS,
SSLCipherList,
client_context,
create_no_verify_ssl_context,
)


@pytest.fixture
def mock_sslcontext():
"""Mock the ssl lib."""
ssl_mock = MagicMock(set_ciphers=Mock(return_value=True))
return ssl_mock


def test_client_context(mock_sslcontext) -> None:
"""Test client context."""
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
client_context()
mock_sslcontext.set_ciphers.assert_not_called()

client_context(SSLCipherList.MODERN)
mock_sslcontext.set_ciphers.assert_called_with(
SSL_CIPHER_LISTS[SSLCipherList.MODERN]
)

client_context(SSLCipherList.INTERMEDIATE)
mock_sslcontext.set_ciphers.assert_called_with(
SSL_CIPHER_LISTS[SSLCipherList.INTERMEDIATE]
)


def test_no_verify_ssl_context(mock_sslcontext) -> None:
"""Test no verify ssl context."""
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
create_no_verify_ssl_context()
mock_sslcontext.set_ciphers.assert_not_called()

create_no_verify_ssl_context(SSLCipherList.MODERN)
mock_sslcontext.set_ciphers.assert_called_with(
SSL_CIPHER_LISTS[SSLCipherList.MODERN]
)

create_no_verify_ssl_context(SSLCipherList.INTERMEDIATE)
mock_sslcontext.set_ciphers.assert_called_with(
SSL_CIPHER_LISTS[SSLCipherList.INTERMEDIATE]
)

0 comments on commit 67c4de9

Please sign in to comment.