diff --git a/curl_cffi/requests/__init__.py b/curl_cffi/requests/__init__.py index 28c369d..da71b19 100644 --- a/curl_cffi/requests/__init__.py +++ b/curl_cffi/requests/__init__.py @@ -19,17 +19,20 @@ "WebSocket", "WebSocketError", "WsCloseCode", + "ExtraFingerprints", ] from functools import partial from io import BytesIO from typing import Callable, Dict, List, Optional, Tuple, Union + from ..const import CurlHttpVersion, CurlWsFlag from ..curl import CurlMime from .cookies import Cookies, CookieTypes from .errors import RequestsError from .headers import Headers, HeaderTypes +from .impersonate import ExtraFingerprints from .models import Request, Response from .session import AsyncSession, BrowserType, ProxySpec, Session, ThreadType from .websockets import WebSocket, WebSocketError, WsCloseCode @@ -58,6 +61,7 @@ def request( impersonate: Optional[Union[str, BrowserType]] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, + extra_fp: Optional[ExtraFingerprints] = None, thread: Optional[ThreadType] = None, default_headers: Optional[bool] = None, default_encoding: Union[str, Callable[[bytes], str]] = "utf-8", @@ -99,6 +103,7 @@ def request( impersonate: which browser version to impersonate. ja3: ja3 string to impersonate. akamai: akamai string to impersonate. + extra_fp: extra fingerprints options, in complement to ja3 and akamai strings. thread: work with other thread implementations. choices: eventlet, gevent. default_headers: whether to set default browser headers. default_encoding: encoding for decoding response content if charset is not found in headers. @@ -136,6 +141,7 @@ def request( impersonate=impersonate, ja3=ja3, akamai=akamai, + extra_fp=extra_fp, default_headers=default_headers, default_encoding=default_encoding, http_version=http_version, diff --git a/curl_cffi/requests/impersonate.py b/curl_cffi/requests/impersonate.py index 02521a7..8641e41 100644 --- a/curl_cffi/requests/impersonate.py +++ b/curl_cffi/requests/impersonate.py @@ -1,3 +1,5 @@ +from dataclasses import dataclass +from typing import List, Literal, Optional import warnings from enum import Enum @@ -44,10 +46,15 @@ def normalize(cls, item): return item -class BrowserSpec: - """A more structured way of selecting browsers""" - - # TODO +@dataclass +class ExtraFingerprints: + tls_min_version: int = CurlSslVersion.TLSv1_2 + tls_grease: bool = False + tls_permute_extensions: bool = False + tls_cert_compression: Literal["zlib", "brotli"] = "brotli" + tls_signature_algorithms: Optional[List[str]] = None + http2_stream_weight: int = 256 + http2_stream_exclusive: int = 1 # TLS version are in the format of 0xAABB, where AA is major version and BB is minor diff --git a/curl_cffi/requests/session.py b/curl_cffi/requests/session.py index b201587..de0786e 100644 --- a/curl_cffi/requests/session.py +++ b/curl_cffi/requests/session.py @@ -30,7 +30,7 @@ from .cookies import Cookies, CookieTypes, CurlMorsel from .errors import RequestsError, SessionClosed from .headers import Headers, HeaderTypes -from .impersonate import toggle_extension, BrowserType, TLS_VERSION_MAP, TLS_CIPHER_NAME_MAP, TLS_EC_CURVES_MAP +from .impersonate import ExtraFingerprints, toggle_extension, BrowserType, TLS_VERSION_MAP, TLS_CIPHER_NAME_MAP, TLS_EC_CURVES_MAP from .models import Request, Response from .websockets import WebSocket @@ -172,6 +172,7 @@ def __init__( impersonate: Optional[Union[str, BrowserType]] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, + extra_fp: Optional[ExtraFingerprints] = None, default_headers: bool = True, default_encoding: Union[str, Callable[[bytes], str]] = "utf-8", curl_options: Optional[dict] = None, @@ -194,6 +195,7 @@ def __init__( self.impersonate = impersonate self.ja3 = ja3 self.akamai = akamai + self.extra_fp = extra_fp self.default_headers = default_headers self.default_encoding = default_encoding self.curl_options = curl_options or {} @@ -230,14 +232,14 @@ def _toggle_extensions_by_ids(self, curl, extension_ids): # print("to_disable: ", to_disable_ids) - def _set_ja3_options(self, curl, ja3: str): + def _set_ja3_options(self, curl, ja3: str, permute: bool = False): """ Detailed explanation: https://engineering.salesforce.com/tls-fingerprinting-with-ja3-and-ja3s-247362855967/ """ tls_version, ciphers, extensions, curves, curve_formats = ja3.split(",") curl_tls_version = TLS_VERSION_MAP[int(tls_version)] - # curl.setopt(CurlOpt.SSLVERSION, curl_tls_version) + curl.setopt(CurlOpt.SSLVERSION, curl_tls_version | CurlSslVersion.MAX_DEFAULT) assert curl_tls_version == CurlSslVersion.TLSv1_2, "Only TLS v1.2 works for now." cipher_names = [] @@ -252,13 +254,14 @@ def _set_ja3_options(self, curl, ja3: str): extensions = extensions[:-3] warnings.warn( "Padding(21) extension found in ja3 string, whether to add it should " - "be decided by the SSL engine. The TLS hello packet may contain " + "be managed by the SSL engine. The TLS client hello packet may contain " "or not contain this extension, any of which should be correct." ) extension_ids = set(int(e) for e in extensions.split("-")) self._toggle_extensions_by_ids(curl, extension_ids) - curl.setopt(CurlOpt.TLS_EXTENSION_ORDER, extensions) + if not permute: + curl.setopt(CurlOpt.TLS_EXTENSION_ORDER, extensions) curve_names = [] for curve in curves.split("-"): @@ -288,6 +291,18 @@ def _set_akamai_options(self, curl, akamai: str): # curl-impersonate only accepts masp format, without commas. curl.setopt(CurlOpt.HTTP2_PSEUDO_HEADERS_ORDER, header_order.replace(",", "")) + def _set_extra_fp(self, curl, fp: ExtraFingerprints): + + if fp.tls_signature_algorithms: + curl.setopt(CurlOpt.SSL_SIG_HASH_ALGS, ",".join(fp.tls_signature_algorithms)) + + curl.setopt(CurlOpt.SSLVERSION, fp.tls_min_version | CurlSslVersion.MAX_DEFAULT) + curl.setopt(CurlOpt.TLS_GREASE, int(fp.tls_grease)) + curl.setopt(CurlOpt.SSL_PERMUTE_EXTENSIONS, int(fp.tls_permute_extensions)) + curl.setopt(CurlOpt.SSL_CERT_COMPRESSION, fp.tls_cert_compression) + curl.setopt(CurlOpt.STREAM_WEIGHT, fp.http2_stream_weight) + curl.setopt(CurlOpt.STREAM_EXCLUSIVE, fp.http2_stream_exclusive) + def _set_curl_options( self, curl, @@ -313,6 +328,7 @@ def _set_curl_options( impersonate: Optional[Union[str, BrowserType]] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, + extra_fp: Optional[ExtraFingerprints] = None, default_headers: Optional[bool] = None, http_version: Optional[CurlHttpVersion] = None, interface: Optional[str] = None, @@ -559,16 +575,24 @@ def _set_curl_options( ja3 = ja3 or self.ja3 if ja3: if impersonate: + warnings.warn("JA3 was altered after browser version was set.") raise RequestsError("Cannot use both impersonate and ja3 string") - self._set_ja3_options(c, ja3) + self._set_ja3_options(c, ja3, permute=bool(extra_fp and extra_fp.tls_permute_extensions)) # akamai string akamai = akamai or self.akamai if akamai: if impersonate: - raise RequestsError("Cannot use both impersonate and akamai string") + warnings.warn("Akamai was altered after browser version was set.") self._set_akamai_options(c, akamai) + # extra_fp options + extra_fp = extra_fp or self.extra_fp + if extra_fp: + if impersonate: + warnings.warn("Extra fingerprints was altered after browser version was set.") + self._set_extra_fp(c, extra_fp) + # http_version, after impersonate, which will change this to http2 http_version = http_version or self.http_version if http_version: @@ -703,6 +727,7 @@ def __init__( impersonate: which browser version to impersonate in the session. ja3: ja3 string to impersonate in the session. akamai: akamai string to impersonate in the session. + extra_fp: extra fingerprints options, in complement to ja3 and akamai strings. interface: which interface use in request to server. default_encoding: encoding for decoding response content if charset is not found in headers. Defaults to "utf-8". Can be set to a callable for automatic detection. @@ -835,6 +860,7 @@ def request( impersonate: Optional[Union[str, BrowserType]] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, + extra_fp: Optional[ExtraFingerprints] = None, default_headers: Optional[bool] = None, default_encoding: Union[str, Callable[[bytes], str]] = "utf-8", http_version: Optional[CurlHttpVersion] = None, @@ -879,6 +905,7 @@ def request( impersonate=impersonate, ja3=ja3, akamai=akamai, + extra_fp=extra_fp, default_headers=default_headers, http_version=http_version, interface=interface, @@ -995,6 +1022,7 @@ def __init__( impersonate: which browser version to impersonate in the session. ja3: ja3 string to impersonate in the session. akamai: akamai string to impersonate in the session. + extra_fp: extra fingerprints options, in complement to ja3 and akamai strings. default_encoding: encoding for decoding response content if charset is not found in headers. Defaults to "utf-8". Can be set to a callable for automatic detection. @@ -1123,6 +1151,7 @@ async def request( impersonate: Optional[Union[str, BrowserType]] = None, ja3: Optional[str] = None, akamai: Optional[str] = None, + extra_fp: Optional[ExtraFingerprints] = None, default_headers: Optional[bool] = None, default_encoding: Union[str, Callable[[bytes], str]] = "utf-8", http_version: Optional[CurlHttpVersion] = None, @@ -1160,6 +1189,7 @@ async def request( impersonate=impersonate, ja3=ja3, akamai=akamai, + extra_fp=extra_fp, default_headers=default_headers, http_version=http_version, interface=interface, diff --git a/examples/impersonate.py b/examples/impersonate.py index 1099baf..704e903 100644 --- a/examples/impersonate.py +++ b/examples/impersonate.py @@ -1,21 +1,39 @@ from curl_cffi import requests - +from curl_cffi.requests.impersonate import ExtraFingerprints # OKHTTP impersonatation examples # credits: https://github.com/bogdanfinn/tls-client/blob/master/profiles/contributed_custom_profiles.go url = "https://tls.browserleaks.com/json" -okhttp4_android10_ja3 = ",".join([ - "771", - "4865-4866-4867-49195-49196-52393-49199-49200-52392-49171-49172-156-157-47-53", - "0-23-65281-10-11-35-16-5-13-51-45-43-18-21", # FIXME: 18 should not be here. - "29-23-24", - "0" -]) +okhttp4_android10_ja3 = ",".join( + [ + "771", + "4865-4866-4867-49195-49196-52393-49199-49200-52392-49171-49172-156-157-47-53", + "0-23-65281-10-11-35-16-5-13-51-45-43-18-21", # FIXME: 18 should not be here. + "29-23-24", + "0", + ] +) okhttp4_android10_akamai = "4:16777216|16711681|0|m,p,a,s" +extra_fp = ExtraFingerprints( + tls_signature_algorithms=[ + "ecdsa_secp256r1_sha256", + "rsa_pss_rsae_sha256", + "rsa_pkcs1_sha256", + "ecdsa_secp384r1_sha384", + "rsa_pss_rsae_sha384", + "rsa_pkcs1_sha384", + "rsa_pss_rsae_sha512", + "rsa_pkcs1_sha512", + "rsa_pkcs1_sha1", + ] +) + -r = requests.get(url, ja3=okhttp4_android10_ja3, akamai=okhttp4_android10_akamai) +r = requests.get( + url, ja3=okhttp4_android10_ja3, akamai=okhttp4_android10_akamai, extra_fp=extra_fp +) print(r.json()) diff --git a/tests/unittest/test_impersonate.py b/tests/unittest/test_impersonate.py index 8a65c9a..b60a9f9 100644 --- a/tests/unittest/test_impersonate.py +++ b/tests/unittest/test_impersonate.py @@ -1,21 +1,29 @@ import pytest from curl_cffi import requests -from curl_cffi.const import CurlHttpVersion +from curl_cffi.const import CurlHttpVersion, CurlSslVersion def test_impersonate_with_version(server): # the test server does not understand http/2 - r = requests.get(str(server.url), impersonate="chrome120", http_version=CurlHttpVersion.V1_1) + r = requests.get( + str(server.url), impersonate="chrome120", http_version=CurlHttpVersion.V1_1 + ) assert r.status_code == 200 - r = requests.get(str(server.url), impersonate="safari17_0", http_version=CurlHttpVersion.V1_1) + r = requests.get( + str(server.url), impersonate="safari17_0", http_version=CurlHttpVersion.V1_1 + ) assert r.status_code == 200 def test_impersonate_without_version(server): - r = requests.get(str(server.url), impersonate="chrome", http_version=CurlHttpVersion.V1_1) + r = requests.get( + str(server.url), impersonate="chrome", http_version=CurlHttpVersion.V1_1 + ) assert r.status_code == 200 - r = requests.get(str(server.url), impersonate="safari_ios", http_version=CurlHttpVersion.V1_1) + r = requests.get( + str(server.url), impersonate="safari_ios", http_version=CurlHttpVersion.V1_1 + ) assert r.status_code == 200 @@ -100,3 +108,83 @@ def test_customized_akamai_safari(): akamai = "2:0;4:4194304;3:100|10485760|0|m,s,p,a" r = requests.get(url, akamai=akamai).json() assert r["akamai_text"] == akamai + + +def test_customized_extra_fp_sig_hash_algs(): + url = "https://tls.peet.ws/api/all" + safari_algs = [ + "ecdsa_secp256r1_sha256", + "rsa_pss_rsae_sha256", + "rsa_pkcs1_sha256", + "ecdsa_secp384r1_sha384", + "ecdsa_sha1", + "rsa_pss_rsae_sha384", + "rsa_pss_rsae_sha384", + "rsa_pkcs1_sha384", + "rsa_pss_rsae_sha512", + "rsa_pkcs1_sha512", + "rsa_pkcs1_sha1", + ] + fp = requests.ExtraFingerprints(tls_signature_algorithms=safari_algs) + r = requests.get(url, extra_fp=fp).json() + result_algs = [] + for ex in r["tls"]["extensions"]: + if ex["name"] == "signature_algorithms (13)": + result_algs = ex["signature_algorithms"] + assert safari_algs == result_algs + + +def test_customized_extra_fp_tls_min_version(): + url = "https://tls.peet.ws/api/all" + safari_min_version = CurlSslVersion.TLSv1_0 + fp = requests.ExtraFingerprints(tls_min_version=safari_min_version) + r = requests.get(url, extra_fp=fp).json() + for ex in r["tls"]["extensions"]: + if ex["name"] == "supported_versions (43)": + # TLS 1.0 1.1, 1.2, 1.3 + assert len(ex["versions"]) >= 4 + + +def test_customized_extra_fp_grease(): + url = "https://tls.peet.ws/api/all" + fp = requests.ExtraFingerprints(tls_grease=True) + r = requests.get(url, extra_fp=fp).json() + assert "TLS_GREASE" in r["tls"]["ciphers"][0] + + +def test_customized_extra_fp_permute(): + url = "https://tls.browserleaks.com/json" + ja3 = "771,4865-4866-4867-49195-49199-49196-49200-52393-52392-49171-49172-156-157-47-53,65037-65281-0-11-23-5-18-27-16-17513-10-35-43-45-13-51,25497-29-23-24,0" + + r = requests.get(url, ja3=ja3).json() + _, _, extensions, _, _ = r["ja3_text"].split(",") + assert extensions == "65037-65281-0-11-23-5-18-27-16-17513-10-35-43-45-13-51" + + r = requests.get(url, ja3=ja3, extra_fp=requests.ExtraFingerprints(tls_permute_extensions=True)).json() + _, _, extensions, _, _ = r["ja3_text"].split(",") + assert extensions != "65037-65281-0-11-23-5-18-27-16-17513-10-35-43-45-13-51" + + +def test_customized_extra_fp_cert_compression(): + url = "https://tls.peet.ws/api/all" + fp = requests.ExtraFingerprints(tls_cert_compression="zlib") + r = requests.get(url, extra_fp=fp).json() + result_algs = [] + for ex in r["tls"]["extensions"]: + if ex["name"] == "compress_certificate (27)": + result_algs = ex["algorithms"] + assert result_algs[0] == "zlib (1)" + + +def test_customized_extra_fp_stream_weight(): + url = "https://tls.peet.ws/api/all" + fp = requests.ExtraFingerprints(http2_stream_weight=64) + r = requests.get(url, extra_fp=fp).json() + assert r["http2"]["sent_frames"][2]["priority"]["weight"] == 64 + + +def test_customized_extra_fp_stream_exclusive(): + url = "https://tls.peet.ws/api/all" + fp = requests.ExtraFingerprints(http2_stream_exclusive=0) + r = requests.get(url, extra_fp=fp).json() + assert r["http2"]["sent_frames"][2]["priority"]["exclusive"] == 0