Skip to content

Commit

Permalink
feat(backends): implement hysteria2 support (#3)
Browse files Browse the repository at this point in the history
* feat(backends): implement hysteria2 support

* refactor(backends): make it sane

* fix(storage): inbound removal in memory storage

* refactor(service): getting stats from backends

* fix(hysteria): stop hysteria on exit

* feat(config): allow enabling and disabling both xray and hysteria2

* feat(config): parsing obfuscation settings and port from hysteria config

* feat(hysteria): restarts and hashing the key

* fix(xray): reuse the same runner and also capture stderr

* feat(hysteria): log capturing

* feat(hysteria): set inbound tls correctly
  • Loading branch information
khodedawsh authored Jul 31, 2024
1 parent d2f3ac4 commit ef9d2f8
Show file tree
Hide file tree
Showing 15 changed files with 385 additions and 63 deletions.
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@
#SERVICE_PORT=53042
#INSECURE=False

#XRAY_ENABLED=True
#XRAY_EXECUTABLE_PATH=/usr/bin/xray
#XRAY_ASSETS_PATH=/usr/share/xray
#XRAY_CONFIG_PATH=/etc/xray/xray_config.json
#XRAY_VLESS_REALITY_FLOW=xtls-rprx-vision


#HYSTERIA_ENABLED=False
#HYSTERIA_EXECUTABLE_PATH=/usr/bin/hysteria
#HYSTERIA_CONFIG_PATH=/etc/hysteria/config.yaml


#SSL_KEY_FILE=./server.key
#SSL_CERT_FILE=./server.cert
#SSL_CLIENT_CERT_FILE=./client.cert
Expand Down
16 changes: 16 additions & 0 deletions hysteria.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
listen: :4443

tls:
cert: ./ssl_cert.pem
key: ./ssl_key.pem

auth:
type: command
command: echo

masquerade:
type: proxy
proxy:
url: https://news.ycombinator.com/
rewriteHost: true

2 changes: 1 addition & 1 deletion marznode/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def contains_tag(self, tag: str) -> bool:
raise NotImplementedError

@abstractmethod
async def start(self) -> None:
async def start(self, backend_config: Any) -> None:
raise NotImplementedError

@abstractmethod
Expand Down
57 changes: 57 additions & 0 deletions marznode/backends/hysteria2/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import yaml

from marznode.models import Inbound
from marznode.storage import BaseStorage


class HysteriaConfig:
def __init__(
self,
config: str,
api_port: int = 9090,
stats_port: int = 9999,
stats_secret: str = "pretty_secret",
):
loaded_config = yaml.safe_load(config)
loaded_config["auth"] = {
"type": "http",
"http": {"url": "http://127.0.0.1:" + str(api_port)},
}
loaded_config["trafficStats"] = {
"listen": "127.0.0.1:" + str(stats_port),
"secret": stats_secret,
}
self._config = loaded_config

port = 443
if "listen" in loaded_config:
try:
port = int(loaded_config.get("listen").split(":")[-1])
except ValueError:
pass
obfs_type, obfs_password = None, None

if "obfs" in loaded_config:
try:
obfs_type = loaded_config["obfs"]["type"]
obfs_password = loaded_config["obfs"][obfs_type]["password"]
except:
pass

self._inbound = {
"tag": "hysteria2",
"protocol": "hysteria2",
"port": port,
"tls": "tls",
}
if obfs_type and obfs_password:
self._inbound.update({"path": obfs_password, "header_type": obfs_type})

def register_inbounds(self, storage: BaseStorage):
inbound = self._inbound
storage.register_inbound(
Inbound(tag=inbound["tag"], protocol=inbound["protocol"], config=inbound)
)

def render(self):
return self._config
76 changes: 76 additions & 0 deletions marznode/backends/hysteria2/_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio
import atexit
import logging
import tempfile
from collections import deque

import yaml
from anyio import BrokenResourceError, ClosedResourceError, create_memory_object_stream

logger = logging.getLogger(__name__)


class Hysteria:
def __init__(self, executable_path: str):
self._executable_path = executable_path
self._process = None
self._snd_streams = []
self._logs_buffer = deque(maxlen=100)
self._capture_task = None
atexit.register(lambda: self.stop() if self.started else None)

async def start(self, config: dict):
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False
) as temp_file:
yaml.dump(config, temp_file)
cmd = [self._executable_path, "server", "-c", temp_file.name]

self._process = await asyncio.create_subprocess_shell(
" ".join(cmd),
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
logger.info("Hysteria has started")
asyncio.create_task(self.__capture_process_logs())

def stop(self):
if self.started:
self._process.terminate()

@property
def started(self):
return self._process and self._process.returncode is None

async def __capture_process_logs(self):
"""capture the logs, push it into the stream, and store it in the deck
note that the stream blocks sending if it's full, so a deck is necessary"""

async def capture_stream(stream):
while True:
output = await stream.readline()
if output == b"":
"""break in case of eof"""
return
for stm in self._snd_streams:
try:
await stm.send(output)
except (ClosedResourceError, BrokenResourceError):
self._snd_streams.remove(stm)
continue
self._logs_buffer.append(output)

await asyncio.gather(
capture_stream(self._process.stderr), capture_stream(self._process.stdout)
)

def get_logs_stm(self):
new_snd_stm, new_rcv_stm = create_memory_object_stream()
self._snd_streams.append(new_snd_stm)
return new_rcv_stm

def get_buffer(self):
"""makes a copy of the buffer, so it could be read multiple times
the buffer is never cleared in case logs from xray's exit are useful"""
return self._logs_buffer.copy()
106 changes: 106 additions & 0 deletions marznode/backends/hysteria2/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import json
import logging
from secrets import token_hex
from typing import AsyncIterator, Any

import aiohttp
from aiohttp import web

from marznode.backends.base import VPNBackend
from marznode.backends.hysteria2._config import HysteriaConfig
from marznode.backends.hysteria2._runner import Hysteria
from marznode.models import User, Inbound
from marznode.storage import BaseStorage
from marznode.utils.key_gen import generate_password
from marznode.utils.network import find_free_port

logger = logging.getLogger(__name__)


class HysteriaBackend(VPNBackend):
def __init__(self, executable_path: str, storage: BaseStorage):
self._executable_path = executable_path
self._storage = storage
self._inbounds = ["hysteria2"]
self._users = {}
self._auth_site = None
self._runner = Hysteria(self._executable_path)
self._stats_secret = None
self._stats_port = None

def contains_tag(self, tag: str) -> bool:
return bool(tag == "hysteria2")

async def start(self, config_path: str) -> None:
api_port = find_free_port()
self._stats_port = find_free_port()
self._stats_secret = token_hex(16)
if self._auth_site:
await self._auth_site.stop()
app = web.Application()
app.router.add_post("/", self._auth_callback)
app_runner = web.AppRunner(app)
await app_runner.setup()

self._auth_site = web.TCPSite(app_runner, "127.0.0.1", api_port)

await self._auth_site.start()
with open(config_path) as f:
config = f.read()
cfg = HysteriaConfig(config, api_port, self._stats_port, self._stats_secret)
cfg.register_inbounds(self._storage)
await self._runner.start(cfg.render())

async def stop(self):
await self._auth_site.stop()
self._storage.remove_inbound("hysteria2")
self._runner.stop()

async def restart(self, backend_config: Any) -> None:
await self.stop()
await self.start(backend_config)

async def add_user(self, user: User, inbound: Inbound) -> None:
password = generate_password(user.key)
self._users.update({password: user})

async def remove_user(self, user: User, inbound: Inbound) -> None:
self._users.pop(user.key)
url = "http://127.0.0.1:" + str(self._stats_port) + "/kick"
headers = {"Authorization": self._stats_secret}

payload = json.dumps([str(user.id) + "." + user.username])
async with aiohttp.ClientSession() as session:
async with session.post(url, data=payload, headers=headers):
pass

async def get_logs(self, include_buffer: bool) -> AsyncIterator:
if include_buffer:
buffer = self._runner.get_buffer()
for line in buffer:
yield line
log_stm = self._runner.get_logs_stm()
async with log_stm:
async for line in log_stm:
yield line

async def get_usages(self):
url = "http://127.0.0.1:" + str(self._stats_port) + "/traffic?clear=1"
headers = {"Authorization": self._stats_secret}

async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
data = await response.json()
usages = {}
for user_identifier, usage in data.items():
uid = int(user_identifier.split(".")[0])
usages[uid] = usage["tx"] + usage["rx"]
return usages

async def _auth_callback(self, request: web.Request):
user_key = (await request.json())["auth"]
if user := self._users.get(user_key):
return web.Response(
body=json.dumps({"ok": True, "id": str(user.id) + "." + user.username}),
)
return web.Response(status=404)
10 changes: 10 additions & 0 deletions marznode/backends/xray/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from marznode.config import XRAY_EXECUTABLE_PATH, XRAY_VLESS_REALITY_FLOW
from ._utils import get_x25519
from ...models import Inbound
from ...storage import BaseStorage


class XrayConfig(dict):
Expand Down Expand Up @@ -191,5 +193,13 @@ def get_outbound(self, tag) -> dict:
if outbound["tag"] == tag:
return outbound

def register_inbounds(self, storage: BaseStorage):
inbounds = [
Inbound(tag=i["tag"], protocol=i["protocol"], config=i)
for i in self.inbounds_by_tag.values()
]
for inbound in inbounds:
storage.register_inbound(inbound)

def to_json(self, **json_kwargs):
return json.dumps(self, **json_kwargs)
Loading

0 comments on commit ef9d2f8

Please sign in to comment.