diff --git a/poetry.lock b/poetry.lock index b5dd05a..95d48cf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -153,56 +153,72 @@ test = ["coverage", "mypy", "pexpect", "ruff", "wheel"] [[package]] name = "asyncpg" -version = "0.28.0" +version = "0.29.0" description = "An asyncio PostgreSQL driver" optional = false -python-versions = ">=3.7.0" +python-versions = ">=3.8.0" files = [ - {file = "asyncpg-0.28.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0a6d1b954d2b296292ddff4e0060f494bb4270d87fb3655dd23c5c6096d16d83"}, - {file = "asyncpg-0.28.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0740f836985fd2bd73dca42c50c6074d1d61376e134d7ad3ad7566c4f79f8184"}, - {file = "asyncpg-0.28.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e907cf620a819fab1737f2dd90c0f185e2a796f139ac7de6aa3212a8af96c050"}, - {file = "asyncpg-0.28.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:86b339984d55e8202e0c4b252e9573e26e5afa05617ed02252544f7b3e6de3e9"}, - {file = "asyncpg-0.28.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:0c402745185414e4c204a02daca3d22d732b37359db4d2e705172324e2d94e85"}, - {file = "asyncpg-0.28.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c88eef5e096296626e9688f00ab627231f709d0e7e3fb84bb4413dff81d996d7"}, - {file = "asyncpg-0.28.0-cp310-cp310-win32.whl", hash = "sha256:90a7bae882a9e65a9e448fdad3e090c2609bb4637d2a9c90bfdcebbfc334bf89"}, - {file = "asyncpg-0.28.0-cp310-cp310-win_amd64.whl", hash = "sha256:76aacdcd5e2e9999e83c8fbcb748208b60925cc714a578925adcb446d709016c"}, - {file = "asyncpg-0.28.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a0e08fe2c9b3618459caaef35979d45f4e4f8d4f79490c9fa3367251366af207"}, - {file = "asyncpg-0.28.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:b24e521f6060ff5d35f761a623b0042c84b9c9b9fb82786aadca95a9cb4a893b"}, - {file = "asyncpg-0.28.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:99417210461a41891c4ff301490a8713d1ca99b694fef05dabd7139f9d64bd6c"}, - {file = "asyncpg-0.28.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f029c5adf08c47b10bcdc857001bbef551ae51c57b3110964844a9d79ca0f267"}, - {file = "asyncpg-0.28.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:ad1d6abf6c2f5152f46fff06b0e74f25800ce8ec6c80967f0bc789974de3c652"}, - {file = "asyncpg-0.28.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d7fa81ada2807bc50fea1dc741b26a4e99258825ba55913b0ddbf199a10d69d8"}, - {file = "asyncpg-0.28.0-cp311-cp311-win32.whl", hash = "sha256:f33c5685e97821533df3ada9384e7784bd1e7865d2b22f153f2e4bd4a083e102"}, - {file = "asyncpg-0.28.0-cp311-cp311-win_amd64.whl", hash = "sha256:5e7337c98fb493079d686a4a6965e8bcb059b8e1b8ec42106322fc6c1c889bb0"}, - {file = "asyncpg-0.28.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:1c56092465e718a9fdcc726cc3d9dcf3a692e4834031c9a9f871d92a75d20d48"}, - {file = "asyncpg-0.28.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4acd6830a7da0eb4426249d71353e8895b350daae2380cb26d11e0d4a01c5472"}, - {file = "asyncpg-0.28.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:63861bb4a540fa033a56db3bb58b0c128c56fad5d24e6d0a8c37cb29b17c1c7d"}, - {file = "asyncpg-0.28.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:a93a94ae777c70772073d0512f21c74ac82a8a49be3a1d982e3f259ab5f27307"}, - {file = "asyncpg-0.28.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:d14681110e51a9bc9c065c4e7944e8139076a778e56d6f6a306a26e740ed86d2"}, - {file = "asyncpg-0.28.0-cp37-cp37m-win32.whl", hash = "sha256:8aec08e7310f9ab322925ae5c768532e1d78cfb6440f63c078b8392a38aa636a"}, - {file = "asyncpg-0.28.0-cp37-cp37m-win_amd64.whl", hash = "sha256:319f5fa1ab0432bc91fb39b3960b0d591e6b5c7844dafc92c79e3f1bff96abef"}, - {file = "asyncpg-0.28.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:b337ededaabc91c26bf577bfcd19b5508d879c0ad009722be5bb0a9dd30b85a0"}, - {file = "asyncpg-0.28.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4d32b680a9b16d2957a0a3cc6b7fa39068baba8e6b728f2e0a148a67644578f4"}, - {file = "asyncpg-0.28.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f4f62f04cdf38441a70f279505ef3b4eadf64479b17e707c950515846a2df197"}, - {file = "asyncpg-0.28.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4f20cac332c2576c79c2e8e6464791c1f1628416d1115935a34ddd7121bfc6a4"}, - {file = "asyncpg-0.28.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:59f9712ce01e146ff71d95d561fb68bd2d588a35a187116ef05028675462d5ed"}, - {file = "asyncpg-0.28.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:fc9e9f9ff1aa0eddcc3247a180ac9e9b51a62311e988809ac6152e8fb8097756"}, - {file = "asyncpg-0.28.0-cp38-cp38-win32.whl", hash = "sha256:9e721dccd3838fcff66da98709ed884df1e30a95f6ba19f595a3706b4bc757e3"}, - {file = "asyncpg-0.28.0-cp38-cp38-win_amd64.whl", hash = "sha256:8ba7d06a0bea539e0487234511d4adf81dc8762249858ed2a580534e1720db00"}, - {file = "asyncpg-0.28.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d009b08602b8b18edef3a731f2ce6d3f57d8dac2a0a4140367e194eabd3de457"}, - {file = "asyncpg-0.28.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ec46a58d81446d580fb21b376ec6baecab7288ce5a578943e2fc7ab73bf7eb39"}, - {file = "asyncpg-0.28.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7b48ceed606cce9e64fd5480a9b0b9a95cea2b798bb95129687abd8599c8b019"}, - {file = "asyncpg-0.28.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8858f713810f4fe67876728680f42e93b7e7d5c7b61cf2118ef9153ec16b9423"}, - {file = "asyncpg-0.28.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:5e18438a0730d1c0c1715016eacda6e9a505fc5aa931b37c97d928d44941b4bf"}, - {file = "asyncpg-0.28.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e9c433f6fcdd61c21a715ee9128a3ca48be8ac16fa07be69262f016bb0f4dbd2"}, - {file = "asyncpg-0.28.0-cp39-cp39-win32.whl", hash = "sha256:41e97248d9076bc8e4849da9e33e051be7ba37cd507cbd51dfe4b2d99c70e3dc"}, - {file = "asyncpg-0.28.0-cp39-cp39-win_amd64.whl", hash = "sha256:3ed77f00c6aacfe9d79e9eff9e21729ce92a4b38e80ea99a58ed382f42ebd55b"}, - {file = "asyncpg-0.28.0.tar.gz", hash = "sha256:7252cdc3acb2f52feaa3664280d3bcd78a46bd6c10bfd681acfffefa1120e278"}, + {file = "asyncpg-0.29.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:72fd0ef9f00aeed37179c62282a3d14262dbbafb74ec0ba16e1b1864d8a12169"}, + {file = "asyncpg-0.29.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:52e8f8f9ff6e21f9b39ca9f8e3e33a5fcdceaf5667a8c5c32bee158e313be385"}, + {file = "asyncpg-0.29.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a9e6823a7012be8b68301342ba33b4740e5a166f6bbda0aee32bc01638491a22"}, + {file = "asyncpg-0.29.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:746e80d83ad5d5464cfbf94315eb6744222ab00aa4e522b704322fb182b83610"}, + {file = "asyncpg-0.29.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:ff8e8109cd6a46ff852a5e6bab8b0a047d7ea42fcb7ca5ae6eaae97d8eacf397"}, + {file = "asyncpg-0.29.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:97eb024685b1d7e72b1972863de527c11ff87960837919dac6e34754768098eb"}, + {file = "asyncpg-0.29.0-cp310-cp310-win32.whl", hash = "sha256:5bbb7f2cafd8d1fa3e65431833de2642f4b2124be61a449fa064e1a08d27e449"}, + {file = "asyncpg-0.29.0-cp310-cp310-win_amd64.whl", hash = "sha256:76c3ac6530904838a4b650b2880f8e7af938ee049e769ec2fba7cd66469d7772"}, + {file = "asyncpg-0.29.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4900ee08e85af01adb207519bb4e14b1cae8fd21e0ccf80fac6aa60b6da37b4"}, + {file = "asyncpg-0.29.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a65c1dcd820d5aea7c7d82a3fdcb70e096f8f70d1a8bf93eb458e49bfad036ac"}, + {file = "asyncpg-0.29.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b52e46f165585fd6af4863f268566668407c76b2c72d366bb8b522fa66f1870"}, + {file = "asyncpg-0.29.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc600ee8ef3dd38b8d67421359779f8ccec30b463e7aec7ed481c8346decf99f"}, + {file = "asyncpg-0.29.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:039a261af4f38f949095e1e780bae84a25ffe3e370175193174eb08d3cecab23"}, + {file = "asyncpg-0.29.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6feaf2d8f9138d190e5ec4390c1715c3e87b37715cd69b2c3dfca616134efd2b"}, + {file = "asyncpg-0.29.0-cp311-cp311-win32.whl", hash = "sha256:1e186427c88225ef730555f5fdda6c1812daa884064bfe6bc462fd3a71c4b675"}, + {file = "asyncpg-0.29.0-cp311-cp311-win_amd64.whl", hash = "sha256:cfe73ffae35f518cfd6e4e5f5abb2618ceb5ef02a2365ce64f132601000587d3"}, + {file = "asyncpg-0.29.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:6011b0dc29886ab424dc042bf9eeb507670a3b40aece3439944006aafe023178"}, + {file = "asyncpg-0.29.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b544ffc66b039d5ec5a7454667f855f7fec08e0dfaf5a5490dfafbb7abbd2cfb"}, + {file = "asyncpg-0.29.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d84156d5fb530b06c493f9e7635aa18f518fa1d1395ef240d211cb563c4e2364"}, + {file = "asyncpg-0.29.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:54858bc25b49d1114178d65a88e48ad50cb2b6f3e475caa0f0c092d5f527c106"}, + {file = "asyncpg-0.29.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:bde17a1861cf10d5afce80a36fca736a86769ab3579532c03e45f83ba8a09c59"}, + {file = "asyncpg-0.29.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:37a2ec1b9ff88d8773d3eb6d3784dc7e3fee7756a5317b67f923172a4748a175"}, + {file = "asyncpg-0.29.0-cp312-cp312-win32.whl", hash = "sha256:bb1292d9fad43112a85e98ecdc2e051602bce97c199920586be83254d9dafc02"}, + {file = "asyncpg-0.29.0-cp312-cp312-win_amd64.whl", hash = "sha256:2245be8ec5047a605e0b454c894e54bf2ec787ac04b1cb7e0d3c67aa1e32f0fe"}, + {file = "asyncpg-0.29.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0009a300cae37b8c525e5b449233d59cd9868fd35431abc470a3e364d2b85cb9"}, + {file = "asyncpg-0.29.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5cad1324dbb33f3ca0cd2074d5114354ed3be2b94d48ddfd88af75ebda7c43cc"}, + {file = "asyncpg-0.29.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:012d01df61e009015944ac7543d6ee30c2dc1eb2f6b10b62a3f598beb6531548"}, + {file = "asyncpg-0.29.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:000c996c53c04770798053e1730d34e30cb645ad95a63265aec82da9093d88e7"}, + {file = "asyncpg-0.29.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:e0bfe9c4d3429706cf70d3249089de14d6a01192d617e9093a8e941fea8ee775"}, + {file = "asyncpg-0.29.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:642a36eb41b6313ffa328e8a5c5c2b5bea6ee138546c9c3cf1bffaad8ee36dd9"}, + {file = "asyncpg-0.29.0-cp38-cp38-win32.whl", hash = "sha256:a921372bbd0aa3a5822dd0409da61b4cd50df89ae85150149f8c119f23e8c408"}, + {file = "asyncpg-0.29.0-cp38-cp38-win_amd64.whl", hash = "sha256:103aad2b92d1506700cbf51cd8bb5441e7e72e87a7b3a2ca4e32c840f051a6a3"}, + {file = "asyncpg-0.29.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5340dd515d7e52f4c11ada32171d87c05570479dc01dc66d03ee3e150fb695da"}, + {file = "asyncpg-0.29.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e17b52c6cf83e170d3d865571ba574577ab8e533e7361a2b8ce6157d02c665d3"}, + {file = "asyncpg-0.29.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f100d23f273555f4b19b74a96840aa27b85e99ba4b1f18d4ebff0734e78dc090"}, + {file = "asyncpg-0.29.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:48e7c58b516057126b363cec8ca02b804644fd012ef8e6c7e23386b7d5e6ce83"}, + {file = "asyncpg-0.29.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:f9ea3f24eb4c49a615573724d88a48bd1b7821c890c2effe04f05382ed9e8810"}, + {file = "asyncpg-0.29.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8d36c7f14a22ec9e928f15f92a48207546ffe68bc412f3be718eedccdf10dc5c"}, + {file = "asyncpg-0.29.0-cp39-cp39-win32.whl", hash = "sha256:797ab8123ebaed304a1fad4d7576d5376c3a006a4100380fb9d517f0b59c1ab2"}, + {file = "asyncpg-0.29.0-cp39-cp39-win_amd64.whl", hash = "sha256:cce08a178858b426ae1aa8409b5cc171def45d4293626e7aa6510696d46decd8"}, + {file = "asyncpg-0.29.0.tar.gz", hash = "sha256:d1c49e1f44fffafd9a55e1a9b101590859d881d639ea2922516f5d9c512d354e"}, ] [package.extras] docs = ["Sphinx (>=5.3.0,<5.4.0)", "sphinx-rtd-theme (>=1.2.2)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] -test = ["flake8 (>=5.0,<6.0)", "uvloop (>=0.15.3)"] +test = ["flake8 (>=6.1,<7.0)", "uvloop (>=0.15.3)"] + +[[package]] +name = "asyncpg-stubs" +version = "0.29.1" +description = "asyncpg stubs" +optional = false +python-versions = ">=3.8,<4.0" +files = [ + {file = "asyncpg_stubs-0.29.1-py3-none-any.whl", hash = "sha256:cce994d5a19394249e74ae8d252bde3c77cee0ddfc776cc708b724fdb4adebb6"}, + {file = "asyncpg_stubs-0.29.1.tar.gz", hash = "sha256:686afcc0af3a2f3c8e393cd850e0de430e5a139ce82b2f28ef8f693ecdf918bf"}, +] + +[package.dependencies] +asyncpg = ">=0.29,<0.30" +typing-extensions = ">=4.7.0,<5.0.0" [[package]] name = "attrs" @@ -1430,6 +1446,17 @@ files = [ {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] +[[package]] +name = "types-pillow" +version = "10.2.0.20240520" +description = "Typing stubs for Pillow" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-Pillow-10.2.0.20240520.tar.gz", hash = "sha256:130b979195465fa1e1676d8e81c9c7c30319e8e95b12fae945e8f0d525213107"}, + {file = "types_Pillow-10.2.0.20240520-py3-none-any.whl", hash = "sha256:33c36494b380e2a269bb742181bea5d9b00820367822dbd3760f07210a1da23d"}, +] + [[package]] name = "typing-extensions" version = "4.12.1" @@ -1644,4 +1671,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = ">=3.12,<3.13" -content-hash = "757a44c2ace8883529e4693785f1718cd6de77f803a60e35f968b371ae3be2f8" +content-hash = "c8ad387be06712d24cda4c4436e962fb0b0ed655c045314d9046584a01c469ff" diff --git a/pyproject.toml b/pyproject.toml index 2577f4b..5b59bc1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ ignore_errors = true # I use pyright only because mypy dumb [tool.pyright] pythonVersion = "3.12" -typeCheckingMode = "basic" +typeCheckingMode = "strict" reportPrivateUsage = "none" [tool.poetry] @@ -45,7 +45,9 @@ python = ">=3.12,<3.13" dateparser = "^1.1.8" psutil = "^5.9.6" Pillow = "^10.2.0" -asyncpg = "^0.28.0" +types-Pillow = "^10.2.0" +asyncpg = "^0.29.0" +asyncpg-stubs = "^0.29.0" Levenshtein = "^0.23.0" uvloop = { version = "==0.18.0", platform = "linux" } aiodns = "~=3.1.1" diff --git a/src/__main__.py b/src/__main__.py index 57adc1d..532efd1 100755 --- a/src/__main__.py +++ b/src/__main__.py @@ -1,12 +1,13 @@ #!/usr/bin/python3 +import asyncio import logging import os import pathlib import platform import re -from src.models import SnedBot +from src.models.client import SnedClient DOTENV_REGEX = re.compile(r"^(?P[A-Za-z_]+[A-Za-z0-9_]*)=(?P[^#]+)(#.*)?$") BASE_DIR = str(pathlib.Path(os.path.abspath(__file__)).parents[1]) @@ -38,16 +39,16 @@ try: import uvloop - uvloop.install() + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except ImportError: logging.warn( "Failed to import uvloop! Make sure to install it via 'pip install uvloop' for enhanced performance!" ) -bot = SnedBot(Config()) +client = SnedClient(Config()) if __name__ == "__main__": - bot.run() + client.app.run() # Copyright (C) 2022-present hypergonial diff --git a/src/etc/settings_static.py b/src/etc/settings_static.py index c49c760..43c455f 100644 --- a/src/etc/settings_static.py +++ b/src/etc/settings_static.py @@ -1,3 +1,5 @@ +import typing as t + import hikari import miru @@ -10,7 +12,7 @@ ModerationFlags.IS_EPHEMERAL: "Send mod commands ephemerally", } -default_automod_policies = { +default_automod_policies: dict[str, t.Any] = { "invites": { "state": "disabled", "temp_dur": 15, @@ -99,7 +101,7 @@ } # Policy state configuration -policy_states = { +policy_states: dict[str, t.Any] = { "disabled": {"name": "Disabled", "excludes": [], "description": "Disable this policy.", "emoji": "đŸšĢ"}, "flag": {"name": "Flag", "excludes": ["spam"], "description": "Log message to 'Auto-Mod Flagging'.", "emoji": "🚩"}, "notice": { @@ -223,8 +225,8 @@ }, "escalate": { "name": "Escalation", - "description": """This event is triggered when any other policy's punishment is set to escalation, and escalates measures, culminating in the punishment specified below. - + "description": """This event is triggered when any other policy's punishment is set to escalation, and escalates measures, culminating in the punishment specified below. + **The flow is the following:** **1.** The user is given a notice **2.** If ignored, the user is warned @@ -235,7 +237,7 @@ "perspective": { "name": "Perspective", "description": """Uses advanced machine learning algorithms to detect and filter out potentially toxic messages. Learn more about Perspective [here](https://www.perspectiveapi.com/). - + Below you can set the percentages after which action will be taken based on the Perspective action-types. It is recommended to set at least a `0.85` (85%) confidence rate or higher for all values. ​ Staff members are encouraged to play around with the percentages with only the `Flag` state selected, to test the sensitiveness of the system. Perspective is not a replacement for human moderators, and should not be treated as such. diff --git a/src/extensions/command_handler.py b/src/extensions/command_handler.py index 7d485fd..9a1e147 100644 --- a/src/extensions/command_handler.py +++ b/src/extensions/command_handler.py @@ -282,7 +282,7 @@ async def on_command_invoke(ctx: SnedContext) -> None: @plugin.listen() -async def event_error_handler(event: hikari.ExceptionEvent) -> None: +async def event_error_handler(event: hikari.ExceptionEvent[t.Any]) -> None: logging.error("Ignoring exception in listener {}:".format(event.failed_event.__class__.__name__)) exception_msg = "\n".join(traceback.format_exception(*event.exc_info)) logging.error(exception_msg) diff --git a/src/extensions/fun.py b/src/extensions/fun.py index 64ca742..6762063 100644 --- a/src/extensions/fun.py +++ b/src/extensions/fun.py @@ -3,11 +3,11 @@ import logging import os import random +import typing as t from enum import IntEnum from io import BytesIO from pathlib import Path from textwrap import fill -from typing import TYPE_CHECKING import arc import hikari @@ -26,7 +26,7 @@ from ..config import Config -if TYPE_CHECKING: +if t.TYPE_CHECKING: from fractions import Fraction ANIMAL_EMOJI_MAPPING: dict[str, str] = { @@ -74,7 +74,7 @@ async def handle_errors(ctx: SnedContext, exception: Exception) -> None: class AddBufButton(miru.Button): - def __init__(self, value: str, *args, **kwargs): + def __init__(self, value: str, *args: t.Any, **kwargs: t.Any): if "label" not in kwargs: kwargs["label"] = value super().__init__(*args, **kwargs) @@ -189,10 +189,11 @@ async def view_check(self, ctx: miru.ViewContext) -> bool: return True - async def on_timeout(self, ctx: miru.ViewContext) -> None: + async def on_timeout(self) -> None: for item in self.children: item.disabled = True - await ctx.edit_response(components=self) + if self.message: + await self.message.edit(components=self) class WinState(IntEnum): @@ -273,7 +274,9 @@ async def callback(self, ctx: miru.ViewContext) -> None: class TicTacToeView(miru.View): - def __init__(self, size: int, player_x: hikari.Member, player_o: hikari.Member, *args, **kwargs) -> None: + def __init__( + self, size: int, player_x: hikari.Member, player_o: hikari.Member, *args: t.Any, **kwargs: t.Any + ) -> None: super().__init__(*args, **kwargs) self.current_player: hikari.Member = player_x self.size: int = size @@ -309,7 +312,7 @@ def check_blocked(self) -> bool: # TODO: Replace this old garbage # Check rows - blocked = [] + blocked: list[bool] = [] for row in self.board: if not (-1 in row and 1 in row): blocked.append(False) @@ -320,14 +323,14 @@ def check_blocked(self) -> bool: blocked_list[0] = True # Check columns - values = [] + col_values: list[list[int]] = [] for col in range(self.size): - values.append([]) + col_values.append([]) for row in self.board: - values[col].append(row[col]) + col_values[col].append(row[col]) blocked = [] - for col in values: + for col in col_values: if not (-1 in col and 1 in col): blocked.append(False) else: @@ -336,7 +339,7 @@ def check_blocked(self) -> bool: blocked_list[1] = True # Check diagonals - values = [] + values: list[int] = [] diag_offset = self.size - 1 for i in range(0, self.size): values.append(self.board[i][diag_offset]) @@ -587,7 +590,7 @@ def draw_text() -> BytesIO: img = img.resize((int(textwidth) + margin, len(lines) * (42 + margin))) draw = ImageDraw.Draw(img) - draw.text((margin / 2, margin / 2), display_text, font=text_font, fill="white") + draw.text((margin / 2, margin / 2), display_text, font=text_font, fill="white") # type: ignore buffer = BytesIO() img.save(buffer, format="PNG") return buffer @@ -611,7 +614,7 @@ def draw_text() -> BytesIO: end_trigger = asyncio.Event() start = helpers.utcnow() - winners = {} + winners: dict[hikari.User, float] = {} def predicate(event: hikari.GuildMessageCreateEvent) -> bool: message = event.message @@ -627,7 +630,7 @@ def predicate(event: hikari.GuildMessageCreateEvent) -> bool: asyncio.create_task(message.add_reaction("✅")) # noqa: RUF006 end_trigger.set() - elif lev.distance(text.lower(), message.content.lower()) < 5: + elif lev.distance(text.lower(), message.content.lower()) < 5: # type: ignore asyncio.create_task(message.add_reaction("❌")) # noqa: RUF006 return False diff --git a/src/extensions/help.py b/src/extensions/help.py index b1e2490..95b2c6a 100644 --- a/src/extensions/help.py +++ b/src/extensions/help.py @@ -1,17 +1,11 @@ from __future__ import annotations -import typing as t - +import arc import hikari -import lightbulb +import toolbox import src.etc.const as const -from src.models.plugin import SnedPlugin - -if t.TYPE_CHECKING: - from src.models import SnedBot - from src.models.context import SnedSlashContext - +from src.models.client import SnedClient, SnedContext, SnedPlugin help = SnedPlugin("Help") @@ -21,7 +15,7 @@ None: hikari.Embed( title="ℹī¸ __Help__", description="""**Welcome to Sned Help!** - + To get started with using the bot, simply press `/` to reveal all commands! If you would like to get help about a specific topic, use `/help topic_name`. If you need assistance, found a bug, or just want to hang out, please join our [support server](https://discord.gg/KNKr8FPmJa)! @@ -33,7 +27,7 @@ "admin_home": hikari.Embed( title="ℹī¸ __Help__", description="""**Welcome to Sned Help!** - + To get started with using the bot, simply press `/` to reveal all commands! If you would like to get help about a specific topic, use `/help topic_name`. You may begin configuring the bot via the `/settings` command, which shows all relevant settings & lets you modify them. @@ -81,32 +75,34 @@ } -@help.command -@lightbulb.app_command_permissions(None, dm_enabled=False) -@lightbulb.option( - "topic", - "A specific topic to get help about.", - required=False, - choices=["time-formatting", "configuration", "permissions"], -) -@lightbulb.command("help", "Get help regarding various subjects of the bot's functionality.", pass_options=True) -@lightbulb.implements(lightbulb.SlashCommand) -async def help_cmd(ctx: SnedSlashContext, topic: str | None = None) -> None: +@help.include +@arc.slash_command("help", "Get help regarding various subjects of the bot's functionality.") +async def help_cmd( + ctx: SnedContext, + topic: arc.Option[ + str | None, + arc.StrParams( + "A specific topic to get help about.", choices=["time-formatting", "configuration", "permissions"] + ), + ] = None, +) -> None: if ctx.member: topic = ( topic or "admin_home" - if (lightbulb.utils.permissions_for(ctx.member) & hikari.Permissions.MANAGE_GUILD) + if (toolbox.calculate_permissions(ctx.member) & hikari.Permissions.MANAGE_GUILD) else topic ) await ctx.respond(embed=help_embeds[topic]) -def load(bot: SnedBot) -> None: - bot.add_plugin(help) +@arc.loader +def load(client: SnedClient) -> None: + client.add_plugin(help) -def unload(bot: SnedBot) -> None: - bot.remove_plugin(help) +@arc.unloader +def unload(client: SnedClient) -> None: + client.remove_plugin(help) # Copyright (C) 2022-present hypergonial diff --git a/src/extensions/moderation.py b/src/extensions/moderation.py index 4041c42..3f5ccbb 100644 --- a/src/extensions/moderation.py +++ b/src/extensions/moderation.py @@ -189,7 +189,7 @@ async def purge( channel = ctx.get_channel() or await ctx.client.rest.fetch_channel(ctx.channel_id) assert isinstance(channel, hikari.TextableGuildChannel) and ctx.guild_id - predicates = [ + predicates: list[t.Callable[[hikari.Message], bool]] = [ # Ignore deferred typing indicator so it doesn't get deleted lmfao lambda message: not (hikari.MessageFlag.LOADING & message.flags) ] @@ -208,38 +208,34 @@ async def purge( ) ctx.command.reset_all_limiters(ctx) else: - predicates.append(lambda message: bool(pattern.match(message.content)) if message.content else False) + predicates.append(lambda m: bool(pattern.match(m.content)) if m.content else False) if startswith: - predicates.append(lambda message: message.content.startswith(startswith) if message.content else False) + predicates.append(lambda m: m.content.startswith(startswith) if m.content else False) if endswith: - predicates.append(lambda message: message.content.endswith(endswith) if message.content else False) + predicates.append(lambda m: m.content.endswith(endswith) if m.content else False) if notext: - predicates.append(lambda message: not message.content) + predicates.append(lambda m: not m.content) if onlytext: - predicates.append(lambda message: message.content and not message.attachments and not message.embeds) + predicates.append(lambda m: m.content is not None and not m.attachments and not m.embeds) if attachments: - predicates.append(lambda message: bool(message.attachments)) + predicates.append(lambda m: bool(m.attachments)) if invites: - predicates.append( - lambda message: helpers.is_invite(message.content, fullmatch=False) if message.content else False - ) + predicates.append(lambda m: helpers.is_invite(m.content, fullmatch=False) if m.content else False) if links: - predicates.append( - lambda message: helpers.is_url(message.content, fullmatch=False) if message.content else False - ) + predicates.append(lambda m: helpers.is_url(m.content, fullmatch=False) if m.content else False) if embeds: - predicates.append(lambda message: bool(message.embeds)) + predicates.append(lambda m: bool(m.embeds)) if user: - predicates.append(lambda message: message.author.id == user.id) + predicates.append(lambda m: m.author.id == user.id) await ctx.defer() @@ -844,14 +840,18 @@ async def massban( if created: - def created_pred(member: hikari.User, offset=now - datetime.timedelta(minutes=created)) -> bool: + def created_pred( + member: hikari.User, offset: datetime.datetime = now - datetime.timedelta(minutes=created) + ) -> bool: return member.created_at > offset predicates.append(created_pred) if joined: - def joined_pred(member: hikari.User, offset=now - datetime.timedelta(minutes=joined)) -> bool: + def joined_pred( + member: hikari.User, offset: datetime.datetime = now - datetime.timedelta(minutes=joined) + ) -> bool: if not isinstance(member, hikari.Member): return True else: @@ -861,7 +861,7 @@ def joined_pred(member: hikari.User, offset=now - datetime.timedelta(minutes=joi if joined_after: - def joined_after_pred(member: hikari.Member, joined_after=joined_after) -> bool: + def joined_after_pred(member: hikari.Member, joined_after: hikari.Member = joined_after) -> bool: return ( member.joined_at is not None and joined_after.joined_at is not None @@ -872,7 +872,7 @@ def joined_after_pred(member: hikari.Member, joined_after=joined_after) -> bool: if joined_before: - def joined_before_pred(member: hikari.Member, joined_before=joined_before) -> bool: + def joined_before_pred(member: hikari.Member, joined_before: hikari.Member = joined_before) -> bool: return ( member.joined_at is not None and joined_before.joined_at is not None diff --git a/src/extensions/reminders.py b/src/extensions/reminders.py index 09e0697..d1d2d13 100644 --- a/src/extensions/reminders.py +++ b/src/extensions/reminders.py @@ -47,7 +47,7 @@ async def callback(self, ctx: miru.ViewContext, client: SnedClient = miru.inject assert self.view.reminder_message.embeds[0].description and ctx.guild_id and isinstance(self.view, SnoozeView) message = self.view.reminder_message.embeds[0].description.split("\n\n[Jump to original message!](")[0] - reminder_data = { + reminder_data: dict[str, t.Any] = { "message": message, "jump_url": ctx.message.make_link(ctx.guild_id), "additional_recipients": [], @@ -358,7 +358,7 @@ async def reminder_list(ctx: SnedContext) -> None: ) return - reminders = [] + reminders: list[str] = [] for record in records: time = datetime.datetime.fromtimestamp(record["expires"]) diff --git a/src/extensions/reports.py b/src/extensions/reports.py index cf73540..bfd3c09 100644 --- a/src/extensions/reports.py +++ b/src/extensions/reports.py @@ -130,7 +130,7 @@ async def report(ctx: SnedContext, member: hikari.Member, message: hikari.Messag if not modal.last_context: # Modal was closed/timed out return - role_ids = records[0]["pinged_role_ids"] or [] + role_ids: list[int] = records[0]["pinged_role_ids"] or [] roles = filter(lambda r: r is not None, [ctx.client.cache.get_role(role_id) for role_id in role_ids]) role_mentions = [role.mention for role in roles if role is not None] diff --git a/src/extensions/role_buttons.py b/src/extensions/role_buttons.py index 41287a7..f9d5373 100644 --- a/src/extensions/role_buttons.py +++ b/src/extensions/role_buttons.py @@ -4,8 +4,8 @@ import arc import hikari -import lightbulb import miru +from miru.ext import nav import src.models as models from src.etc import const @@ -13,9 +13,6 @@ from src.models.rolebutton import RoleButton, RoleButtonMode from src.utils import helpers -if t.TYPE_CHECKING: - from miru.ext import nav - logger = logging.getLogger(__name__) plugin = SnedPlugin("Rolebuttons", default_permissions=hikari.Permissions.MANAGE_ROLES) @@ -230,7 +227,7 @@ async def rolebutton_list(ctx: SnedContext) -> None: ) return - paginator = lightbulb.utils.StringPaginator(max_chars=500) + paginator = nav.Paginator(max_len=500) for button in buttons: role = ctx.client.cache.get_role(button.role_id) channel = ctx.client.cache.get_guild_channel(button.channel_id) @@ -247,7 +244,7 @@ async def rolebutton_list(ctx: SnedContext) -> None: description=page, color=const.EMBED_BLUE, ) - for page in paginator.build_pages() + for page in paginator.pages ] navigator = models.AuthorOnlyNavigator(ctx.author, pages=embeds) diff --git a/src/extensions/settings.py b/src/extensions/settings.py index 54d44e0..c4f83b0 100644 --- a/src/extensions/settings.py +++ b/src/extensions/settings.py @@ -99,7 +99,7 @@ async def wait_until_done(self) -> None: # Transitions def add_buttons( - self, buttons: t.Sequence[miru.Button | miru.LinkButton], parent: str | None = None, **kwargs + self, buttons: t.Sequence[miru.Button | miru.LinkButton], parent: str | None = None, **kwargs: t.Any ) -> None: """Add a new set of buttons, clearing previous components.""" self.clear_items() @@ -113,7 +113,7 @@ def add_buttons( self.add_item(button) def select_screen( - self, select: miru.abc.SelectBase, parent: str | None = None, with_done: bool = False, **kwargs + self, select: miru.abc.SelectBase, parent: str | None = None, with_done: bool = False, **kwargs: t.Any ) -> None: """Set view to a new select screen, clearing previous components.""" self.clear_items() @@ -128,7 +128,7 @@ def select_screen( if with_done and parent: self.add_item(DoneButton(parent, **kwargs)) - async def error_screen(self, embed: hikari.Embed, parent: str, **kwargs) -> None: + async def error_screen(self, embed: hikari.Embed, parent: str, **kwargs: t.Any) -> None: """Show an error screen with only a back button, and wait for input on it.""" assert self.last_context self.clear_items() @@ -218,7 +218,7 @@ async def settings_report(self) -> None: ] pinged_roles = ( - [self.arc_client.cache.get_role(role_id) for role_id in records[0]["pinged_role_ids"]] + [self.arc_client.cache.get_role(role_id) for role_id in records[0]["pinged_role_ids"]] # type: ignore if records[0]["pinged_role_ids"] else [] ) @@ -245,7 +245,7 @@ async def settings_report(self) -> None: ) buttons = [ - BooleanButton(state=records[0]["is_enabled"] if channel else False, label="Enabled", disabled=not channel), + BooleanButton(state=records[0]["is_enabled"] if channel else False, label="Enabled", disabled=not channel), # type: ignore OptionButton(label="Set Channel", emoji=const.EMOJI_CHANNEL, style=hikari.ButtonStyle.SECONDARY), OptionButton(label="Change Roles", emoji=const.EMOJI_MENTION, style=hikari.ButtonStyle.SECONDARY), ] @@ -338,7 +338,7 @@ async def settings_mod(self) -> None: Enabling **ephemeral responses** will show all moderation command responses in a manner where they will be invisible to every user except for the one who used the command.""", color=const.EMBED_BLUE, ) - buttons = [] + buttons: list[miru.Button] = [] for flag in ModerationFlags: if flag is ModerationFlags.NONE: continue @@ -526,7 +526,7 @@ async def settings_logging(self) -> None: inline=False, ) - options = [] + options: list[miru.SelectOption] = [] for log_category, channel_id in log_channels.items(): channel = self.arc_client.cache.get_guild_channel(channel_id) if channel_id else None @@ -607,14 +607,13 @@ async def settings_automod(self) -> None: color=const.EMBED_BLUE, ) - options = [] + options: list[miru.SelectOption] = [] for key in policies: embed.add_field( name=policy_strings[key]["name"], value=policies[key]["state"].capitalize(), inline=True, ) - # TODO: Add emojies maybe? options.append(miru.SelectOption(label=policy_strings[key]["name"], value=key)) self.select_screen(OptionsTextSelect(options=options, placeholder="Select a policy..."), parent="Main") @@ -641,7 +640,7 @@ async def settings_automod_policy(self, policy: str | None = None) -> None: ) state = policy_data["state"] - buttons = [] + buttons: list[miru.Button] = [] if state == "disabled": embed.add_field( @@ -670,7 +669,7 @@ async def settings_automod_policy(self, policy: str | None = None) -> None: buttons.append(OptionButton(label="State", custom_id="state", style=hikari.ButtonStyle.SECONDARY)) # Conditions for certain attributes to appear - predicates = { + predicates: dict[str, t.Callable[[str], bool]] = { "temp_dur": lambda s: s in ["timeout", "tempban"] or s == "escalate" and policies["escalate"]["state"] in ["timeout", "tempban"], @@ -789,7 +788,7 @@ async def settings_automod_policy(self, policy: str | None = None) -> None: list_inputs = ["words_list", "words_list_wildcard"] # Expected return type for a question - expected_types = { + expected_types: dict[str, type] = { "temp_dur": int, "words_list": list, "words_list_wildcard": list, diff --git a/src/extensions/starboard.py b/src/extensions/starboard.py index 8a6cc33..192a6d9 100644 --- a/src/extensions/starboard.py +++ b/src/extensions/starboard.py @@ -4,7 +4,7 @@ import arc import hikari -import lightbulb +import toolbox from src.etc import const from src.models.client import SnedClient, SnedContext, SnedPlugin @@ -216,15 +216,15 @@ async def force_star(ctx: SnedContext, message: hikari.Message) -> None: return if settings.channel_id and (channel := ctx.client.cache.get_guild_channel(settings.channel_id)): - perms = lightbulb.utils.permissions_in(channel, me) + perms = toolbox.calculate_permissions(me, channel) if not helpers.includes_permissions( perms, hikari.Permissions.SEND_MESSAGES | hikari.Permissions.VIEW_CHANNEL | hikari.Permissions.READ_MESSAGE_HISTORY, ): - raise lightbulb.BotMissingRequiredPermission( - perms=hikari.Permissions.SEND_MESSAGES + raise arc.BotMissingPermissionsError( + hikari.Permissions.SEND_MESSAGES | hikari.Permissions.VIEW_CHANNEL | hikari.Permissions.READ_MESSAGE_HISTORY ) @@ -287,7 +287,7 @@ async def on_reaction(event: hikari.GuildReactionAddEvent | hikari.GuildReaction return if settings.channel_id and (channel := plugin.client.cache.get_guild_channel(settings.channel_id)): - perms = lightbulb.utils.permissions_in(channel, me) + perms = toolbox.calculate_permissions(me, channel) if not helpers.includes_permissions( perms, hikari.Permissions.SEND_MESSAGES @@ -307,7 +307,7 @@ async def on_reaction(event: hikari.GuildReactionAddEvent | hikari.GuildReaction # Check perms if channel is cached if channel := plugin.client.cache.get_guild_channel(event.channel_id): - perms = lightbulb.utils.permissions_in(channel, me) + perms = toolbox.calculate_permissions(me, channel) if not helpers.includes_permissions( perms, hikari.Permissions.VIEW_CHANNEL | hikari.Permissions.READ_MESSAGE_HISTORY, diff --git a/src/extensions/tags.py b/src/extensions/tags.py index 6f728d0..73c894f 100644 --- a/src/extensions/tags.py +++ b/src/extensions/tags.py @@ -5,8 +5,8 @@ import arc import hikari -import lightbulb import miru +import toolbox from src.etc import const from src.models import AuthorOnlyNavigator, Tag @@ -354,9 +354,7 @@ async def tag_claim( if tag: members = ctx.client.cache.get_members_view_for_guild(ctx.guild_id) if tag.owner_id not in members or ( - helpers.includes_permissions( - lightbulb.utils.permissions_for(ctx.member), hikari.Permissions.MANAGE_MESSAGES - ) + helpers.includes_permissions(toolbox.calculate_permissions(ctx.member), hikari.Permissions.MANAGE_MESSAGES) and tag.owner_id != ctx.member.id ): tag.owner_id = ctx.author.id @@ -449,7 +447,7 @@ async def tag_delete( if tag and ( (tag.owner_id == ctx.author.id) - or helpers.includes_permissions(lightbulb.utils.permissions_for(ctx.member), hikari.Permissions.MANAGE_MESSAGES) + or helpers.includes_permissions(toolbox.calculate_permissions(ctx.member), hikari.Permissions.MANAGE_MESSAGES) ): await tag.delete() diff --git a/src/extensions/troubleshooter.py b/src/extensions/troubleshooter.py index 9260b7a..c727a14 100644 --- a/src/extensions/troubleshooter.py +++ b/src/extensions/troubleshooter.py @@ -77,7 +77,7 @@ async def troubleshoot(ctx: SnedContext) -> None: assert ctx.guild_id is not None missing_perms = ~ctx.app_permissions & REQUIRED_PERMISSIONS - content_list = [] + content_list: list[str] = [] if missing_perms is not hikari.Permissions.NONE: content_list.append("**Missing Permissions:**") diff --git a/src/extensions/userlog.py b/src/extensions/userlog.py index 6d4642b..94a1cbc 100644 --- a/src/extensions/userlog.py +++ b/src/extensions/userlog.py @@ -186,10 +186,10 @@ async def get_diff(guild_id: int, old_object: T, object: T, attrs: dict[str, str elif ( isinstance(old, list) and isinstance(new, list) - and (old and hasattr(old[0], "name") or new and hasattr(new[0], "name")) + and (old and hasattr(old[0], "name") or new and hasattr(new[0], "name")) # type: ignore ): # Handling flag lists - old_names = [str(x) for x in old] - new_names = [str(x) for x in new] + old_names = [str(x) for x in old] # type: ignore + new_names = [str(x) for x in new] # type: ignore if not set(old_names) - set(new_names) or not set(new_names) - set(old_names): continue @@ -832,22 +832,15 @@ async def warns_clear(event: WarnsClearEvent) -> None: @plugin.listen() async def flag_message(event: AutoModMessageFlagEvent) -> None: - user_id = hikari.Snowflake(event.user) - reason = helpers.format_reason(event.reason, max_length=1500) - user = ( - event.user - if isinstance(event.user, hikari.PartialUser) - else (plugin.client.cache.get_member(event.guild_id, user_id) or (await plugin.client.rest.fetch_user(user_id))) - ) content = ( helpers.format_reason(event.message.content, max_length=2000) if event.message.content else "No content found." ) embed = hikari.Embed( title="❗🚩 Message flagged", - description=f"`{display_user(user)}` was flagged by auto-moderator for suspicious behaviour.\n**Reason:**```{reason}```\n**Content:** ```{content}```\n\n[Jump to message!]({event.message.make_link(event.guild_id)})", + description=f"`{display_user(event.user)}` was flagged by auto-moderator for suspicious behaviour.\n**Reason:**```{reason}```\n**Content:** ```{content}```\n\n[Jump to message!]({event.message.make_link(event.guild_id)})", color=const.ERROR_COLOR, ) await plugin.client.userlogger.log(LogEvent.FLAGS, embed, event.guild_id) diff --git a/src/models/bot.py b/src/models/bot.py deleted file mode 100644 index e5328ad..0000000 --- a/src/models/bot.py +++ /dev/null @@ -1,409 +0,0 @@ -import asyncio -import datetime -import logging -import os -import pathlib -import typing as t -from contextlib import suppress - -import aiohttp -import hikari -import kosu -import lightbulb -import miru - -import src.utils.db_backup as db_backup -from src.config import Config -from src.models.audit_log import AuditLogCache -from src.models.context import * -from src.models.db import Database -from src.models.errors import UserBlacklistedError -from src.models.mod_actions import ModActions -from src.utils import cache, helpers, scheduler -from src.utils.tasks import IntervalLoop - - -async def is_not_blacklisted(ctx: SnedContext) -> bool: - """Evaluate if the user is blacklisted or not. - - Parameters - ---------- - ctx : SnedContext - The context to evaluate under. - - Returns - ------- - bool - A boolean determining if the user is blacklisted or not. - - Raises - ------ - UserBlacklistedError - The user is blacklisted. - """ - records = await ctx.app.db_cache.get(table="blacklist", user_id=ctx.user.id) - - if not records: - return True - - raise UserBlacklistedError("User is blacklisted from using the application.") - - -class SnedBot(lightbulb.BotApp): - """A customized subclass of lightbulb.BotApp. - - Parameters - ---------- - config : Config - The bot configuration to initialize the bot with. - See the included config_example.py for formatting help. - """ - - __slots__: t.Sequence[str] = ( - "_started", - "_is_started", - "_config", - "_db", - "_session", - "_db_cache", - "_mod", - "_perspective", - "_scheduler", - "_audit_log_cache", - "_initial_guilds", - "_start_time", - "dev_mode", - "skip_first_db_backup", - "_user_id", - "_base_dir", - "_db_backup_loop", - ) - - def __init__(self, config: Config) -> None: - self._started = asyncio.Event() - self._is_started = False - - cache_settings = hikari.impl.CacheSettings( - components=hikari.api.CacheComponents.ALL, max_messages=100000, max_dm_channel_ids=50 - ) - intents = ( - hikari.Intents.GUILDS - | hikari.Intents.GUILD_MEMBERS - | hikari.Intents.GUILD_MODERATION - | hikari.Intents.GUILD_EMOJIS - | hikari.Intents.GUILD_INVITES - | hikari.Intents.ALL_MESSAGE_REACTIONS - | hikari.Intents.ALL_MESSAGES - | hikari.Intents.MESSAGE_CONTENT - ) - - self.dev_mode: bool = config.DEV_MODE - - default_enabled_guilds = (config.DEBUG_GUILDS or ()) if self.dev_mode else () - - token = os.getenv("TOKEN") - - if not token: - raise RuntimeError("TOKEN not found in environment.") - - super().__init__( - token=token, - cache_settings=cache_settings, - default_enabled_guilds=default_enabled_guilds, - intents=intents, - owner_ids=(163979124820541440,), - prefix="dev", - help_class=None, - banner=None, - ) - - # Initizaling configuration and database - self._config = config - self._db = Database(self) - self._session: aiohttp.ClientSession | None = None - self._db_cache = cache.DatabaseCache(self) - self._mod = ModActions(self) - miru.install(self) - - # Some global variables - self._base_dir = str(pathlib.Path(os.path.abspath(__file__)).parents[2]) - self._db_backup_loop = IntervalLoop(self.backup_db, seconds=3600 * 24) - self.skip_first_db_backup = True # Set to False to backup DB on bot startup too - self._user_id: hikari.Snowflake | None = None - self._perspective: kosu.Client | None = None - self._scheduler = scheduler.Scheduler(self) - self._audit_log_cache: AuditLogCache = AuditLogCache(self) - self._initial_guilds: list[hikari.Snowflake] = [] - self._start_time: datetime.datetime | None = None - - self.check(is_not_blacklisted) - - self.start_listeners() - - @property - def user_id(self) -> hikari.Snowflake: - """The application user's ID.""" - if self._user_id is None: - raise hikari.ComponentStateConflictError("The bot is not yet initialized, user_id is unavailable.") - - return self._user_id - - @property - def is_ready(self) -> bool: - """Indicates if the application is ready to accept instructions or not. - - Alias for BotApp.is_alive - """ - return self.is_alive - - @property - def base_dir(self) -> str: - """The absolute path to the bot's project.""" - return self._base_dir - - @property - def session(self) -> aiohttp.ClientSession: - """The aiohttp client session used by the bot.""" - if self._session is None: - self._session = aiohttp.ClientSession() - return self._session - - @property - def db(self) -> Database: - """The main database connection pool of the bot.""" - return self._db - - @property - def db_cache(self) -> cache.DatabaseCache: - """The database cache instance of the bot.""" - return self._db_cache - - @property - def scheduler(self) -> scheduler.Scheduler: - """The scheduler instance of the bot.""" - return self._scheduler - - @property - def perspective(self) -> kosu.Client: - """The perspective client of the bot.""" - if self._perspective is None: - raise hikari.ComponentStateConflictError( - "The bot is not initialized or no perspective API key was found in the environment." - ) - return self._perspective - - @property - def config(self) -> Config: - """The passed configuration object.""" - return self._config - - @property - def mod(self) -> ModActions: - """The moderation actions instance of the bot. Handles moderation of users and contains useful methods for such purposes.""" - return self._mod - - @property - def audit_log_cache(self) -> AuditLogCache: - """The audit log cache instance of the bot.""" - return self._audit_log_cache - - @property - def is_started(self) -> bool: - """Boolean indicating if the bot has started up or not.""" - return self._is_started - - @property - def start_time(self) -> datetime.datetime: - """The datetime when the bot started up.""" - if not self.is_started or self._start_time is None: - raise hikari.ComponentStateConflictError("The bot is not started yet, 'start_time' cannot be retrieved.") - return self._start_time - - def start_listeners(self) -> None: - """Start all listeners located in this class.""" - self.subscribe(hikari.StartingEvent, self.on_starting) - self.subscribe(hikari.StartedEvent, self.on_started) - self.subscribe(hikari.GuildAvailableEvent, self.on_guild_available) - self.subscribe(lightbulb.LightbulbStartedEvent, self.on_lightbulb_started) - self.subscribe(hikari.MessageCreateEvent, self.on_message) - self.subscribe(hikari.StoppingEvent, self.on_stopping) - self.subscribe(hikari.StoppedEvent, self.on_stop) - self.subscribe(hikari.GuildJoinEvent, self.on_guild_join) - self.subscribe(hikari.GuildLeaveEvent, self.on_guild_leave) - - async def wait_until_started(self) -> None: - """Wait until the bot has started up.""" - await asyncio.wait_for(self._started.wait(), timeout=None) - - async def get_slash_context( - self, - event: hikari.InteractionCreateEvent, - command: lightbulb.SlashCommand, - cls: t.Type[lightbulb.SlashContext] = SnedSlashContext, - ) -> SnedSlashContext: - return await super().get_slash_context(event, command, cls) # type: ignore - - async def get_user_context( - self, - event: hikari.InteractionCreateEvent, - command: lightbulb.UserCommand, - cls: t.Type[lightbulb.UserContext] = SnedUserContext, - ) -> SnedUserContext: - return await super().get_user_context(event, command, cls) # type: ignore - - async def get_message_context( - self, - event: hikari.InteractionCreateEvent, - command: lightbulb.MessageCommand, - cls: t.Type[lightbulb.MessageContext] = SnedMessageContext, - ) -> SnedMessageContext: - return await super().get_message_context(event, command, cls) # type: ignore - - async def get_prefix_context( - self, event: hikari.MessageCreateEvent, cls: t.Type[lightbulb.PrefixContext] = SnedPrefixContext - ) -> SnedPrefixContext | None: - return await super().get_prefix_context(event, cls) # type: ignore - - async def on_guild_available(self, event: hikari.GuildAvailableEvent) -> None: - if self.is_started: - return - self._initial_guilds.append(event.guild_id) - - async def on_starting(self, _: hikari.StartingEvent) -> None: - # Connect to the database, update schema, apply pending migrations - await self.db.connect() - await self.db.update_schema() - # Start scheduler, DB cache - await self.db_cache.start() - self.scheduler.start() - await self._audit_log_cache.start() - - if perspective_api_key := os.getenv("PERSPECTIVE_API_KEY"): - self._perspective = kosu.Client(perspective_api_key, do_not_store=True) - - # Load all extensions - self.load_extensions_from(os.path.join(self.base_dir, "src", "extensions"), must_exist=True) - - async def on_started(self, _: hikari.StartedEvent) -> None: - self._db_backup_loop.start() - - user = self.get_me() - self._user_id = user.id if user else None - - logging.info(f"Startup complete, initialized as {user}.") - activity = hikari.Activity(name="@Sned", type=hikari.ActivityType.LISTENING) - await self.update_presence(activity=activity) - - if self.dev_mode: - logging.warning("Developer mode is enabled!") - - async def on_lightbulb_started(self, _: lightbulb.LightbulbStartedEvent) -> None: - # Insert all guilds the bot is member of into the db global config on startup - async with self.db.acquire() as con: - for guild_id in self._initial_guilds: - await con.execute( - """ - INSERT INTO global_config (guild_id) VALUES ($1) - ON CONFLICT (guild_id) DO NOTHING""", - guild_id, - ) - logging.info(f"Connected to {len(self._initial_guilds)} guilds.") - self._initial_guilds = [] - - # Set this here so all guild_ids are in DB - self._started.set() - self._is_started = True - self._start_time = helpers.utcnow() - self.unsubscribe(hikari.GuildAvailableEvent, self.on_guild_available) - - async def on_stopping(self, _: hikari.StoppingEvent) -> None: - logging.info("Bot is shutting down...") - self.scheduler.stop() - - async def on_stop(self, _: hikari.StoppedEvent) -> None: - await self.db.close() - logging.info("Closed database connection.") - - async def on_message(self, event: hikari.MessageCreateEvent) -> None: - if not event.content: - return - - if self.is_ready and self.db_cache.is_ready and event.is_human: - mentions = [f"<@{self.user_id}>", f"<@!{self.user_id}>"] - - if event.content in mentions: - me = self.get_me() - await event.message.respond( - embed=hikari.Embed( - title="Beep Boop!", - description="Use `/` to access my commands and see what I can do!", - color=0xFEC01D, - ).set_thumbnail(me.avatar_url if me else None) - ) - return - - async def on_guild_join(self, event: hikari.GuildJoinEvent) -> None: - await self.db.register_guild(event.guild_id) - - if event.guild.system_channel_id is None: - return - - me = event.guild.get_my_member() - channel = event.guild.get_channel(event.guild.system_channel_id) - - assert me is not None - - if not channel or not (hikari.Permissions.SEND_MESSAGES & lightbulb.utils.permissions_in(channel, me)): - return - - assert isinstance(channel, hikari.TextableGuildChannel) - - with suppress(hikari.ForbiddenError): - await channel.send( - embed=hikari.Embed( - title="Beep Boop!", - description="""I have been summoned to this server. Type `/` to see what I can do!\n\nIf you have `Manage Server` permissions, you may configure the bot via `/settings`!""", - color=0xFEC01D, - ).set_thumbnail(me.avatar_url) - ) - logging.info(f"Bot has been added to new guild: {event.guild.name} ({event.guild_id}).") - - async def on_guild_leave(self, event: hikari.GuildLeaveEvent) -> None: - await self.db.wipe_guild(event.guild_id, keep_record=False) - logging.info(f"Bot has been removed from guild {event.guild_id}, correlating data erased.") - - async def backup_db(self) -> None: - """Backs up the database to a file and, if configured, sends it to the specified channel.""" - if self.skip_first_db_backup: - logging.info("Skipping database backup for this day...") - self.skip_first_db_backup = False - return - - file = await db_backup.backup_database() - await self.wait_until_started() - - if self.config.DB_BACKUP_CHANNEL: - await self.rest.create_message( - self.config.DB_BACKUP_CHANNEL, - f"Database Backup: {helpers.format_dt(helpers.utcnow())}", - attachment=file, - ) - return logging.info("Database backup complete, database backed up and sent to specified Discord channel.") - - logging.info("Database backup complete.") - - -# Copyright (C) 2022-present hypergonial - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see: https://www.gnu.org/licenses diff --git a/src/models/client.py b/src/models/client.py index 05b6f42..45d14a0 100644 --- a/src/models/client.py +++ b/src/models/client.py @@ -2,6 +2,7 @@ import logging import os import pathlib +import typing as t from contextlib import suppress import aiohttp @@ -194,7 +195,7 @@ async def on_started(self, _: hikari.StartedEvent) -> None: if self.dev_mode: logging.warning("Developer mode is enabled!") - async def on_arc_started(self, _: arc.StartedEvent) -> None: + async def on_arc_started(self, _: arc.StartedEvent[t.Self]) -> None: # Insert all guilds the bot is member of into the db global config on startup async with self.db.acquire() as con: for guild_id in self._initial_guilds: diff --git a/src/models/db.py b/src/models/db.py index 967e808..049adbd 100644 --- a/src/models/db.py +++ b/src/models/db.py @@ -30,7 +30,7 @@ def __init__(self, client: SnedClient) -> None: self._port = int(os.getenv("POSTGRES_PORT") or 5432) self._password = os.environ["POSTGRES_PASSWORD"] self._version = os.getenv("POSTGRES_VERSION") - self._pool: asyncpg.Pool | None = None + self._pool: asyncpg.Pool[asyncpg.Record] | None = None self._schema_version: int | None = None self._is_closed: bool = False @@ -80,7 +80,7 @@ def schema_version(self) -> int: return self._schema_version @property - def pool(self) -> asyncpg.Pool: + def pool(self) -> asyncpg.Pool[asyncpg.Record]: """The connection pool used to connect to the database.""" if self._pool is None: raise DatabaseStateConflictError("The database is not connected.") @@ -117,7 +117,7 @@ def terminate(self) -> None: self._is_closed = True @asynccontextmanager - async def acquire(self) -> t.AsyncIterator[asyncpg.Connection]: + async def acquire(self) -> t.AsyncIterator[asyncpg.Connection[asyncpg.Record]]: """Acquire a database connection from the connection pool.""" con = await self.pool.acquire() try: @@ -125,7 +125,7 @@ async def acquire(self) -> t.AsyncIterator[asyncpg.Connection]: finally: await self.pool.release(con) - async def execute(self, query: str, *args, timeout: float | None = None) -> str: + async def execute(self, query: str, *args: t.Any, timeout: float | None = None) -> str: """Execute an SQL command. Parameters @@ -147,9 +147,9 @@ async def execute(self, query: str, *args, timeout: float | None = None) -> str: DatabaseStateConflictError The application is not connected to the database server. """ - return await self.pool.execute(query, *args, timeout=timeout) # type: ignore + return await self.pool.execute(query, *args, timeout=timeout) - async def fetch(self, query: str, *args, timeout: float | None = None) -> list[asyncpg.Record]: + async def fetch(self, query: str, *args: t.Any, timeout: float | None = None) -> list[asyncpg.Record]: """Run a query and return the results as a list of `Record`. Parameters @@ -173,7 +173,7 @@ async def fetch(self, query: str, *args, timeout: float | None = None) -> list[a """ return await self.pool.fetch(query, *args, timeout=timeout) - async def executemany(self, command: str, args: t.Tuple[t.Any], *, timeout: float | None = None) -> str: + async def executemany(self, command: str, args: t.Tuple[t.Any], *, timeout: float | None = None) -> str | None: """Execute an SQL command for each sequence of arguments in `args`. Parameters @@ -195,9 +195,9 @@ async def executemany(self, command: str, args: t.Tuple[t.Any], *, timeout: floa DatabaseStateConflictError The application is not connected to the database server. """ - return await self.pool.executemany(command, args, timeout=timeout) # type: ignore + return await self.pool.executemany(command, args, timeout=timeout) - async def fetchrow(self, query: str, *args, timeout: float | None = None) -> asyncpg.Record: + async def fetchrow(self, query: str, *args: t.Any, timeout: float | None = None) -> asyncpg.Record | None: """Run a query and return the first row that matched query parameters. Parameters @@ -219,9 +219,9 @@ async def fetchrow(self, query: str, *args, timeout: float | None = None) -> asy DatabaseStateConflictError The application is not connected to the database server. """ - return await self.pool.fetchrow(query, *args, timeout=timeout) # type: ignore + return await self.pool.fetchrow(query, *args, timeout=timeout) - async def fetchval(self, query: str, *args, column: int = 0, timeout: float | None = None) -> t.Any: + async def fetchval(self, query: str, *args: t.Any, column: int = 0, timeout: float | None = None) -> t.Any: """Run a query and return a value in the first row that matched query parameters. Parameters @@ -293,7 +293,8 @@ async def _increment_schema_version(self) -> None: record = await self.fetchrow( """UPDATE schema_info SET schema_version = schema_version + 1 RETURNING schema_version""" ) - self._schema_version = record["schema_version"] + if record is not None: + self._schema_version = record["schema_version"] async def _do_sql_migration(self, filename: str) -> None: """Apply an SQL file as a migration to the database.""" diff --git a/src/models/errors.py b/src/models/errors.py index e63bdc4..74c7460 100644 --- a/src/models/errors.py +++ b/src/models/errors.py @@ -1,6 +1,3 @@ -import lightbulb - - class TagAlreadyExistsError(Exception): """Raised when a tag is trying to get created but already exists.""" @@ -9,11 +6,11 @@ class TagNotFoundError(Exception): """Raised when a tag is not found, although most functions just return None.""" -class RoleHierarchyError(lightbulb.CheckFailure): +class RoleHierarchyError(Exception): """Raised when an action fails due to role hierarchy.""" -class BotRoleHierarchyError(lightbulb.CheckFailure): +class BotRoleHierarchyError(Exception): """Raised when an action fails due to the bot's role hierarchy.""" diff --git a/src/models/events.py b/src/models/events.py index ff9d716..3a711dc 100644 --- a/src/models/events.py +++ b/src/models/events.py @@ -19,7 +19,7 @@ class SnedEvent(hikari.Event): class SnedGuildEvent(SnedEvent): """Base event for any custom event that occurs within the context of a guild.""" - app: hikari.GatewayBot + app: hikari.RESTAware # type: ignore """The currently running application.""" _guild_id: hikari.Snowflakeish @@ -68,7 +68,7 @@ def get_guild(self) -> hikari.GatewayGuild | None: class TimerCompleteEvent(SnedGuildEvent): """Dispatched when a scheduled timer has expired.""" - app: hikari.GatewayBot + app: hikari.RESTAware timer: Timer """The timer that was dispatched.""" _guild_id: hikari.Snowflakeish @@ -78,7 +78,7 @@ class TimerCompleteEvent(SnedGuildEvent): class MassBanEvent(SnedGuildEvent): """Dispatched when a massban occurs.""" - app: hikari.GatewayBot + app: hikari.RESTAware _guild_id: hikari.Snowflakeish moderator: hikari.Member """The moderator responsible for the massban.""" @@ -96,7 +96,7 @@ class MassBanEvent(SnedGuildEvent): class WarnEvent(SnedGuildEvent): """Base class for all warning events.""" - app: hikari.GatewayBot + app: hikari.RESTAware _guild_id: hikari.Snowflakeish member: hikari.Member """The member that was warned.""" @@ -127,7 +127,7 @@ class WarnsClearEvent(WarnEvent): class AutoModMessageFlagEvent(SnedGuildEvent): """Dispatched when a message is flagged by auto-mod.""" - app: hikari.GatewayBot + app: hikari.RESTAware message: hikari.PartialMessage """The message that was flagged.""" user: hikari.PartialUser @@ -141,7 +141,7 @@ class AutoModMessageFlagEvent(SnedGuildEvent): class RoleButtonEvent(SnedGuildEvent): """Base class for all rolebutton-related events.""" - app: hikari.GatewayBot + app: hikari.RESTAware _guild_id: hikari.Snowflakeish rolebutton: RoleButton """The rolebutton that was altered.""" diff --git a/src/models/journal.py b/src/models/journal.py index db27a01..1225577 100644 --- a/src/models/journal.py +++ b/src/models/journal.py @@ -164,6 +164,7 @@ async def update(self) -> None: self.created_at.timestamp(), self.entry_type.value, ) + assert record is not None self.id = record.get("id") return diff --git a/src/models/mod_actions.py b/src/models/mod_actions.py index 64c457d..e770fa5 100644 --- a/src/models/mod_actions.py +++ b/src/models/mod_actions.py @@ -7,9 +7,9 @@ import typing as t from contextlib import suppress +import arc import attr import hikari -import lightbulb import miru import toolbox from miru.abc import ViewItem @@ -149,7 +149,7 @@ async def get_automod_policies(self, guild: hikari.SnowflakeishOr[hikari.Guild]) if nested_key not in policies[key]: policies[key][nested_key] = default_automod_policies[key][nested_key] - invalid = [] + invalid: list[t.Any] = [] for key in policies: if key not in default_automod_policies: invalid.append(key) @@ -299,7 +299,7 @@ async def timeout_extend(self, event: TimerCompleteEvent) -> None: if not event.get_guild(): return - member = event.app.cache.get_member(timer.guild_id, timer.user_id) + member: hikari.Member | None = event.app.cache.get_member(timer.guild_id, timer.user_id) assert timer.notes is not None expiry = int(timer.notes) @@ -664,10 +664,10 @@ async def ban( me = self._client.cache.get_member(moderator.guild_id, self._client.user_id) assert me is not None - perms = lightbulb.utils.permissions_for(me) + perms = toolbox.calculate_permissions(me) if not helpers.includes_permissions(perms, hikari.Permissions.BAN_MEMBERS): - raise lightbulb.BotMissingRequiredPermission(perms=hikari.Permissions.BAN_MEMBERS) + raise arc.BotMissingPermissionsError(hikari.Permissions.BAN_MEMBERS) if isinstance(user, hikari.Member) and not helpers.is_above(me, user): raise RoleHierarchyError @@ -749,13 +749,13 @@ async def unban(self, user: hikari.User, moderator: hikari.Member, reason: str | me = self._client.cache.get_member(moderator.guild_id, self._client.user_id) assert me is not None - perms = lightbulb.utils.permissions_for(me) + perms = toolbox.calculate_permissions(me) raw_reason = reason reason = helpers.format_reason(reason, moderator, max_length=512) if not helpers.includes_permissions(perms, hikari.Permissions.BAN_MEMBERS): - raise lightbulb.BotMissingRequiredPermission(perms=hikari.Permissions.BAN_MEMBERS) + raise arc.BotMissingPermissionsError(hikari.Permissions.BAN_MEMBERS) try: await self._client.rest.unban_user(moderator.guild_id, user.id, reason=reason) diff --git a/src/models/plugin.py b/src/models/plugin.py deleted file mode 100644 index fd26c65..0000000 --- a/src/models/plugin.py +++ /dev/null @@ -1,39 +0,0 @@ -from __future__ import annotations - -import typing as t - -import lightbulb - -if t.TYPE_CHECKING: - from src.models.bot import SnedBot - - -class SnedPlugin(lightbulb.Plugin): - @property - def app(self) -> SnedBot: - return super().app # type: ignore - - @app.setter - def app(self, val: SnedBot) -> None: - self._app = val - self.create_commands() - - @property - def bot(self) -> SnedBot: - return super().bot # type: ignore - - -# Copyright (C) 2022-present hypergonial - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see: https://www.gnu.org/licenses diff --git a/src/models/rolebutton.py b/src/models/rolebutton.py index 086582d..3743ac5 100644 --- a/src/models/rolebutton.py +++ b/src/models/rolebutton.py @@ -198,7 +198,7 @@ async def create( custom_id=f"RB:{id}:{role_id}", emoji=emoji, label=label, - style=style, + style=style, # type: ignore ) view = miru.View.from_message(message) @@ -233,7 +233,7 @@ async def create( role_id=hikari.Snowflake(role), ) - cls._client.dispatch(RoleButtonCreateEvent(cls._client, rolebutton.guild_id, rolebutton, moderator)) + cls._client.app.dispatch(RoleButtonCreateEvent(cls._client.app, rolebutton.guild_id, rolebutton, moderator)) return rolebutton async def update(self, moderator: hikari.PartialUser | None = None) -> None: @@ -261,7 +261,7 @@ async def update(self, moderator: hikari.PartialUser | None = None) -> None: button.emoji = self.emoji button.label = self.label - button.style = self.style + button.style = self.style # type: ignore button.custom_id = f"RB:{self.id}:{self.role_id}" self._custom_id = button.custom_id @@ -283,7 +283,7 @@ async def update(self, moderator: hikari.PartialUser | None = None) -> None: self.id, self.guild_id, ) - self._client.dispatch(RoleButtonUpdateEvent(self._client, self.guild_id, self, moderator)) + self._client.app.dispatch(RoleButtonUpdateEvent(self._client.app, self.guild_id, self, moderator)) async def delete(self, moderator: hikari.PartialUser | None = None) -> None: """Delete this rolebutton, removing it from the message and the database. @@ -315,7 +315,7 @@ async def delete(self, moderator: hikari.PartialUser | None = None) -> None: self.guild_id, self.id, ) - self._client.dispatch(RoleButtonDeleteEvent(self._client, self.guild_id, self, moderator)) + self._client.app.dispatch(RoleButtonDeleteEvent(self._client.app, self.guild_id, self, moderator)) # Copyright (C) 2022-present hypergonial diff --git a/src/models/settings.py b/src/models/settings.py index a1aec30..03574a4 100644 --- a/src/models/settings.py +++ b/src/models/settings.py @@ -96,7 +96,7 @@ def __init__( async def callback(self, context: miru.ModalContext) -> None: self.view.value = SettingValue(modal_values=context.values, is_done=True) - self.view._last_context = context + self.view._last_context = context # type: ignore self.view._input_event.set() self.view._input_event.clear() self.view._done_event.set() @@ -174,8 +174,8 @@ def __init__( self.view = view async def callback(self, context: miru.ModalContext) -> None: - self.view._last_context = context - self.view.value = SettingValue( + self.view._last_context = context # type: ignore + self.view.value = SettingValue( # type: ignore raw_perspective_bounds={item.custom_id: value for item, value in context.values.items()} ) # type: ignore self.view._input_event.set() @@ -190,7 +190,7 @@ async def on_timeout(self) -> None: class OptionsTextSelect(miru.TextSelect, SettingsItem): """Select that sets view value to first selected option's value.""" - def __init__(self, with_done: bool = False, **kwargs): + def __init__(self, with_done: bool = False, **kwargs: t.Any): super().__init__(**kwargs) self.with_done = with_done @@ -203,7 +203,7 @@ async def callback(self, ctx: miru.ViewContext) -> None: class OptionsRoleSelect(miru.RoleSelect, SettingsItem): - def __init__(self, with_done: bool = False, **kwargs): + def __init__(self, with_done: bool = False, **kwargs: t.Any): super().__init__(**kwargs) self.with_done = with_done @@ -216,7 +216,7 @@ async def callback(self, ctx: miru.ViewContext) -> None: class OptionsChannelSelect(miru.ChannelSelect, SettingsItem): - def __init__(self, with_done: bool = False, **kwargs): + def __init__(self, with_done: bool = False, **kwargs: t.Any): super().__init__(**kwargs) self.with_done = with_done @@ -231,7 +231,7 @@ async def callback(self, ctx: miru.ViewContext) -> None: class BackButton(OptionButton): """Go back to page that ctx.parent is set to.""" - def __init__(self, parent: str, **kwargs) -> None: + def __init__(self, parent: str, **kwargs: t.Any) -> None: super().__init__(style=hikari.ButtonStyle.PRIMARY, custom_id=parent, label="Back", emoji="âŦ…ī¸") self.kwargs = kwargs @@ -246,7 +246,7 @@ async def callback(self, _: miru.ViewContext) -> None: class DoneButton(miru.Button, SettingsItem): """Button that signals to the view the action being waited for is done.""" - def __init__(self, parent: str, **kwargs) -> None: + def __init__(self, parent: str, **kwargs: t.Any) -> None: super().__init__(style=hikari.ButtonStyle.SUCCESS, custom_id=f"done:{parent}", label="Done", emoji="✔ī¸") self.kwargs = kwargs diff --git a/src/models/tag.py b/src/models/tag.py index e1a8108..c0a377c 100644 --- a/src/models/tag.py +++ b/src/models/tag.py @@ -68,9 +68,7 @@ async def fetch( ) @classmethod - async def fetch_closest_names( - cls, name: str, guild: hikari.SnowflakeishOr[hikari.PartialGuild] - ) -> list[str] | None: + async def fetch_closest_names(cls, name: str, guild: hikari.SnowflakeishOr[hikari.PartialGuild]) -> list[str]: """Fetch the closest tagnames for the provided name. Parameters @@ -89,10 +87,10 @@ async def fetch_closest_names( # TODO: Figure out how to fuzzymatch within arrays via SQL results = await cls._db.fetch("""SELECT tagname, aliases FROM tags WHERE guild_id = $1""", guild_id) - names = [result["tagname"] for result in results] if results else [] + names: list[str] = [result["tagname"] for result in results] if results else [] - if results is not None: - names += list(chain(*[result.get("aliases") or [] for result in results])) + if results: + names += list(chain(*[result.get("aliases") or [] for result in results])) # type: ignore return get_close_matches(name, names) @@ -102,7 +100,7 @@ async def fetch_closest_owned_names( name: str, guild: hikari.SnowflakeishOr[hikari.PartialGuild], owner: hikari.SnowflakeishOr[hikari.PartialUser], - ) -> list[str] | None: + ) -> list[str]: """Fetch the closest tagnames for the provided name and owner. Parameters @@ -126,10 +124,10 @@ async def fetch_closest_owned_names( """SELECT tagname, aliases FROM tags WHERE guild_id = $1 AND owner_id = $2""", guild_id, owner_id ) - names = [result["tagname"] for result in results] if results else [] + names: list[str] = [result["tagname"] for result in results] if results else [] - if results is not None: - names += list(chain(*[result.get("aliases") or [] for result in results])) + if results: + names += list(chain(*[result.get("aliases") or [] for result in results])) # type: ignore return get_close_matches(name, names) diff --git a/src/models/views.py b/src/models/views.py index daffe0b..f274a21 100644 --- a/src/models/views.py +++ b/src/models/views.py @@ -10,7 +10,7 @@ class StopSelect(miru.TextSelect): """A select that stops the view after interaction.""" - async def callback(self, context: miru.ViewContext) -> None: + async def callback(self, _: miru.ViewContext) -> None: self.view.stop() diff --git a/src/utils/cache.py b/src/utils/cache.py index 5fb3535..7c1ca38 100644 --- a/src/utils/cache.py +++ b/src/utils/cache.py @@ -98,7 +98,7 @@ async def get( return rows if rows else None - async def refresh(self, table: str, **kwargs) -> None: + async def refresh(self, table: str, **kwargs: t.Any) -> None: """Discards and reloads a specific part of the cache, should be called after modifying database values.""" if not self.is_ready: return diff --git a/src/utils/helpers.py b/src/utils/helpers.py index c7e5c1f..a6b8e0f 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -6,9 +6,11 @@ import unicodedata from contextlib import suppress +import arc import hikari -import lightbulb import pytz +import toolbox +from miru.ext import nav from src.etc import const from src.models import errors @@ -227,11 +229,11 @@ def can_harm( me: hikari.Member, member: hikari.Member, permission: hikari.Permissions, *, raise_error: bool = False ) -> bool: """Returns True if "member" can be harmed by "me", also checks if "me" has "permission".""" - perms = lightbulb.utils.permissions_for(me) + perms = toolbox.calculate_permissions(me) if not includes_permissions(perms, permission): if raise_error: - raise lightbulb.BotMissingRequiredPermission(perms=permission) + raise arc.BotMissingPermissionsError(permission) return False if not is_above(me, member): @@ -327,12 +329,22 @@ async def parse_message_link(ctx: SnedContext, message_link: str) -> hikari.Mess channel = ctx.client.cache.get_guild_channel(channel_id) me = ctx.client.cache.get_member(ctx.guild_id, ctx.client.user_id) - assert me is not None and isinstance(channel, hikari.TextableGuildChannel) + assert me is not None - if channel and isinstance(channel, hikari.PermissibleGuildChannel): # Make reasonable attempt at checking perms - perms = lightbulb.utils.permissions_in(channel, me) + if isinstance(channel, hikari.TextableGuildChannel): # Make reasonable attempt at checking perms + perms = toolbox.calculate_permissions(me, channel) if not (perms & hikari.Permissions.READ_MESSAGE_HISTORY): - raise lightbulb.BotMissingRequiredPermission(perms=hikari.Permissions.READ_MESSAGE_HISTORY) + raise arc.BotMissingPermissionsError(hikari.Permissions.READ_MESSAGE_HISTORY) + else: + await ctx.respond( + embed=hikari.Embed( + title="❌ Invalid channel", + description="There are no messages in this channel.", + color=const.ERROR_COLOR, + ), + flags=hikari.MessageFlag.EPHEMERAL, + ) + return try: message = await ctx.client.rest.fetch_message(channel_id, message_id) @@ -355,7 +367,7 @@ async def maybe_delete(message: hikari.PartialMessage) -> None: await message.delete() -async def maybe_edit(message: hikari.PartialMessage, *args, **kwargs) -> None: +async def maybe_edit(message: hikari.PartialMessage, *args: t.Any, **kwargs: t.Any) -> None: with suppress(hikari.NotFoundError, hikari.ForbiddenError, hikari.HTTPError): await message.edit(*args, **kwargs) @@ -394,7 +406,7 @@ def format_reason( def build_journal_pages(entries: list[JournalEntry]) -> list[hikari.Embed]: """Build a list of embeds to send to a user containing journal entries, with pagination.""" - paginator = lightbulb.utils.StringPaginator(max_chars=1500) + paginator = nav.Paginator(max_len=1500) for entry in entries: paginator.add_line(f"`#{entry.id}` {entry.display_content}") @@ -404,7 +416,7 @@ def build_journal_pages(entries: list[JournalEntry]) -> list[hikari.Embed]: description=page, color=const.EMBED_BLUE, ) - for page in paginator.build_pages() + for page in paginator.pages ] return embeds diff --git a/src/utils/ratelimiter.py b/src/utils/ratelimiter.py deleted file mode 100644 index 7babf64..0000000 --- a/src/utils/ratelimiter.py +++ /dev/null @@ -1,237 +0,0 @@ -from __future__ import annotations - -import abc -import asyncio -import sys -import time -import traceback -import typing as t -from collections import deque - -import attr - -if t.TYPE_CHECKING: - import hikari - - -# FIXME: Destroy module -class ContextLike(t.Protocol): - """An object that has common attributes of a context.""" - - @property - def author(self) -> hikari.UndefinedOr[hikari.User]: ... - - @property - def guild_id(self) -> hikari.Snowflake | None: ... - - @property - def channel_id(self) -> hikari.Snowflake: ... - - -@attr.define() -class BucketData: - """Handles the ratelimiting of a single bucket data. (E.g. a single user or a channel).""" - - reset_at: float - """The time at which the bucket resets.""" - remaining: int - """The amount of requests remaining in the bucket.""" - bucket: Bucket - """The bucket this data belongs to.""" - queue: t.Deque[asyncio.Event] = attr.field(factory=deque) - """A list of events to set as the iter task proceeds.""" - task: asyncio.Task[t.Any] | None = attr.field(default=None) - """The task that is currently iterating over the queue.""" - - @classmethod - def for_bucket(cls, bucket: Bucket) -> BucketData: - """Create a new BucketData for a Bucket.""" - return cls( - bucket=bucket, - reset_at=time.monotonic() + bucket.period, - remaining=bucket.limit, - ) - - def start_queue(self) -> None: - """Start the queue of a BucketData. - This will start setting events in the queue until the bucket is ratelimited. - """ - if self.task is None: - self.task = asyncio.create_task(self._iter_queue()) - - def reset(self) -> None: - """Reset the ratelimit.""" - self.remaining = self.bucket.limit - self.reset_at = time.monotonic() + self.bucket.period - - async def _iter_queue(self) -> None: - """Iterate over the queue of a BucketData and set events.""" - try: - if self.remaining <= 0 and self.reset_at > time.monotonic(): - # Sleep until ratelimit expires - sleep_time = self.reset_at - time.monotonic() - await asyncio.sleep(sleep_time) - self.reset() - elif self.reset_at <= time.monotonic(): - self.reset() - - # Set events while not ratelimited - while self.remaining > 0 and self.queue: - self.remaining -= 1 - self.queue.popleft().set() - - self.task = None - - except Exception as e: - print(f"Task Exception was never retrieved: {e}", file=sys.stderr) - print(traceback.format_exc(), file=sys.stderr) - - -class Bucket(abc.ABC): - """Abstract class for ratelimiter buckets.""" - - def __init__(self, period: float, limit: int, wait: bool = True) -> None: - """Abstract class for ratelimiter buckets. - - Parameters - ---------- - period : float - The period, in seconds, after which the quota resets. - limit : int - The amount of requests allowed in a quota. - wait : bool - Determines if the ratelimiter should wait in - case of hitting a ratelimit. - """ - self.period: float = period - self.limit: int = limit - self.wait: bool = wait - self._bucket_data: t.Dict[str, BucketData] = {} - - @abc.abstractmethod - def get_key(self, ctx: ContextLike) -> str: - """Get key for ratelimiter bucket.""" - - def is_rate_limited(self, ctx: ContextLike) -> bool: - """Returns a boolean determining if the ratelimiter is ratelimited or not.""" - now = time.monotonic() - - if data := self._bucket_data.get(self.get_key(ctx)): - if data.reset_at <= now: - return False - return data.remaining <= 0 - return False - - async def acquire(self, ctx: ContextLike) -> None: - """Acquire a ratelimit, block execution if ratelimited and wait is True.""" - event = asyncio.Event() - - # Get or insert bucket data - data = self._bucket_data.setdefault(self.get_key(ctx), BucketData.for_bucket(self)) - data.queue.append(event) - data.start_queue() - - if self.wait: - await event.wait() - - def reset(self, ctx: ContextLike) -> None: - """Reset the ratelimit for a given context.""" - if data := self._bucket_data.get(self.get_key(ctx)): - data.reset() - - -class GlobalBucket(Bucket): - """Ratelimiter bucket for global ratelimits.""" - - def get_key(self, _: ContextLike) -> str: - return "amongus" - - -class GuildBucket(Bucket): - """Ratelimiter bucket for guilds. - - Note that all ContextLike objects must have a guild_id set. - """ - - def get_key(self, ctx: ContextLike) -> str: - if not ctx.guild_id: - raise KeyError("guild_id is not set.") - return str(ctx.guild_id) - - -class ChannelBucket(Bucket): - """Ratelimiter bucket for channels.""" - - def get_key(self, ctx: ContextLike) -> str: - return str(ctx.channel_id) - - -class UserBucket(Bucket): - """Ratelimiter bucket for users. - - Note that all ContextLike objects must have an author set. - """ - - def get_key(self, ctx: ContextLike) -> str: - if not ctx.author: - raise KeyError("author is not set.") - return str(ctx.author.id) - - -class MemberBucket(Bucket): - """Ratelimiter bucket for members. - - Note that all ContextLike objects must have an author and guild_id set. - """ - - def get_key(self, ctx: ContextLike) -> str: - if not ctx.author or not ctx.guild_id: - raise KeyError("author or guild_id is not set.") - return str(ctx.author.id) + str(ctx.guild_id) - - -class RateLimiter: - def __init__(self, period: float, limit: int, bucket: t.Type[Bucket], wait: bool = True) -> None: - """Rate Limiter implementation for Sned. - - Parameters - ---------- - period : float - The period, in seconds, after which the quota resets. - limit : int - The amount of requests allowed in a quota. - bucket : Bucket - The bucket to handle this under. - wait : bool - Determines if the ratelimiter should wait in - case of hitting a ratelimit. - """ - self.bucket: Bucket = bucket(period, limit, wait) - - def is_rate_limited(self, ctx: ContextLike) -> bool: - """Returns a boolean determining if the ratelimiter is ratelimited or not.""" - return self.bucket.is_rate_limited(ctx) - - async def acquire(self, ctx: ContextLike) -> None: - """Acquire a ratelimit, block execution if ratelimited and wait is True.""" - return await self.bucket.acquire(ctx) - - def reset(self, ctx: ContextLike) -> None: - """Reset the ratelimit for a given context.""" - self.bucket.reset(ctx) - - -# Copyright (C) 2022-present hypergonial - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see: https://www.gnu.org/licenses diff --git a/src/utils/rpn.py b/src/utils/rpn.py index f4fa996..f36b39b 100644 --- a/src/utils/rpn.py +++ b/src/utils/rpn.py @@ -52,7 +52,7 @@ def _pow(a: Fraction, b: Fraction) -> Fraction: return Fraction(a**b) -OPS: dict[str, Operator] = { +OPS: dict[str, Operator[t.Any]] = { "+": BinaryOperator("+", 0, lambda a, b: a + b), "-": BinaryOperator("-", 0, lambda a, b: a - b), "*": BinaryOperator("*", 1, lambda a, b: a * b), @@ -94,7 +94,7 @@ def _validate(self) -> bool: InvalidExpressionError If the expression is invalid. """ - stack = [] + stack: list[str] = [] for i, c in enumerate(self._expr): if c not in VALID_CHARS: raise InvalidExpressionError(f"Illegal character at position {i+1}: {c}") @@ -178,7 +178,7 @@ def _to_polish_notation(self) -> None: If the expression is invalid. """ result: list[str] = [""] - stack = [] + stack: list[str] = [] for c in self._expr: if c.isspace(): continue @@ -223,7 +223,7 @@ def solve(self) -> Fraction: self._validate() self._preprocess() self._to_polish_notation() - stack = [] + stack: list[Fraction] = [] for c in self._rpn: if c not in OPS: stack.append(Fraction(c)) diff --git a/src/utils/scheduler.py b/src/utils/scheduler.py index 92f68e7..35b8cb8 100644 --- a/src/utils/scheduler.py +++ b/src/utils/scheduler.py @@ -7,12 +7,12 @@ import re import typing as t +import arc import dateparser import hikari from src.models.events import TimerCompleteEvent from src.models.timer import Timer, TimerEvent -from src.utils.tasks import IntervalLoop logger = logging.getLogger(__name__) @@ -37,8 +37,8 @@ class Scheduler: def __init__(self, client: SnedClient) -> None: self.client = client self._current_timer: Timer | None = None # Currently active timer that is being awaited - self._current_task: asyncio.Task | None = None # Current task that is handling current_timer - self._timer_loop: IntervalLoop = IntervalLoop(self._wait_for_active_timers, hours=1.0) + self._current_task: asyncio.Task[None] | None = None # Current task that is handling current_timer + self._timer_loop = arc.utils.IntervalLoop(self._wait_for_active_timers, hours=1.0) self._is_started: bool = False def start(self) -> None: diff --git a/src/utils/tasks.py b/src/utils/tasks.py deleted file mode 100644 index a1d4cbe..0000000 --- a/src/utils/tasks.py +++ /dev/null @@ -1,104 +0,0 @@ -import asyncio -import inspect -import logging -import traceback -import typing as t - - -# FIXME: Destroy module -class IntervalLoop: - """A simple interval loop that runs a coroutine at a specified interval. - - Parameters - ---------- - callback : Callable[..., Awaitable[None]] - The coroutine to run at the specified interval. - seconds : float, optional - The number of seconds to wait before running the coroutine again. - minutes : float, optional - The number of minutes to wait before running the coroutine again. - hours : float, optional - The number of hours to wait before running the coroutine again. - days : float, optional - The number of days to wait before running the coroutine again. - """ - - def __init__( - self, - callback: t.Callable[..., t.Awaitable[None]], - seconds: float | None = None, - minutes: float | None = None, - hours: float | None = None, - days: float | None = None, - ) -> None: - if not seconds and not minutes and not hours and not days: - raise ValueError("Expected a loop time.") - else: - seconds = seconds or 0 - minutes = minutes or 0 - hours = hours or 0 - days = hours or 0 - - self._coro = callback - self._task: asyncio.Task | None = None - self._failed: int = 0 - self._sleep: float = seconds + minutes * 60 + hours * 3600 + days * 24 * 3600 - self._stop_next: bool = False - - if not inspect.iscoroutinefunction(self._coro): - raise TypeError("Expected a coroutine function.") - - async def _loopy_loop(self, *args, **kwargs) -> None: - """Main loop logic.""" - while not self._stop_next: - try: - await self._coro(*args, **kwargs) - except Exception as e: - logging.error(f"Task encountered exception: {e}") - traceback_msg = "\n".join(traceback.format_exception(type(e), e, e.__traceback__)) - logging.error(traceback_msg) - - if self._failed < 3: - self._failed += 1 - await asyncio.sleep(self._sleep) - else: - raise RuntimeError(f"Task failed repeatedly, stopping it. Exception: {e}") - else: - await asyncio.sleep(self._sleep) - self.cancel() - - def start(self, *args, **kwargs) -> None: - """Start looping the task at the specified interval.""" - if self._task and not self._task.done(): - raise RuntimeError("Task is already running!") - - self._task = asyncio.create_task(self._loopy_loop(*args, **kwargs)) - - def cancel(self) -> None: - """Cancel the looping of the task.""" - if not self._task: - return - - self._task.cancel() - self._task = None - - def stop(self) -> None: - """Gracefully stop the looping of the task.""" - if self._task and not self._task.done(): - self._stop_next = True - - -# Copyright (C) 2022-present hypergonial - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see: https://www.gnu.org/licenses diff --git a/src/utils/userlogger.py b/src/utils/userlogger.py index 2bb9fef..17a0628 100644 --- a/src/utils/userlogger.py +++ b/src/utils/userlogger.py @@ -53,9 +53,9 @@ class UserLogger: """Handles the logging of audit log & other related events.""" def __init__(self, client: SnedClient) -> None: - self._queue = {} self._client = client - self._frozen_guilds = [] + self._queue: dict[hikari.Snowflake, list[hikari.Embed]] = {} + self._frozen_guilds: list[hikari.Snowflake] = [] self._task: asyncio.Task[None] = self._client.create_task(self._iter_queue()) async def _iter_queue(self) -> None: