Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #11 #12

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 56 additions & 10 deletions marznode/backends/xray/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import logging
import re
from collections import deque
import os

from anyio import create_memory_object_stream, ClosedResourceError, BrokenResourceError
from anyio import create_memory_object_stream, ClosedResourceError, BrokenResourceError, open_file
from anyio.streams.memory import MemoryObjectReceiveStream

from ._config import XrayConfig
Expand All @@ -31,29 +32,43 @@ def __init__(self, executable_path: str, assets_path: str):
self._env = {"XRAY_LOCATION_ASSET": assets_path}
self.stop_event = asyncio.Event()

atexit.register(lambda: self.stop() if self.running else None)
self.error_log = None
self.error_pipe = None

atexit.register(lambda: self.stop() if self.running else None)

async def start(self, config: XrayConfig):
if self.running is True:
raise RuntimeError("Xray is started already")

if config.get("log", {}).get("loglevel") in ("none", "error"):
config["log"]["loglevel"] = "warning"

cmd = [self.executable_path, "run", "-config", "stdin:"]

if "error" in config["log"]:
try:
self.error_pipe = "/var/lib/marznode/error_pipe"
if not os.path.exists(self.error_pipe):
os.mkfifo(self.error_pipe)
self.error_log = open(config["log"]["error"], mode="ab", buffering=0)
config["log"]["error"] = self.error_pipe
except OSError as e:
logger.error(f"Unable to open file {config['log']['error']}")
raise e

print(config["log"])
self._process = await asyncio.create_subprocess_shell(
" ".join(cmd),
env=self._env,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)

self._process.stdin.write(str.encode(config.to_json()))
await self._process.stdin.drain()
self._process.stdin.close()
await self._process.stdin.wait_closed()
logger.info("Xray core %s started", self.version)

logs_stm = self.get_logs_stm()
asyncio.create_task(self.__capture_process_logs())
async for line in logs_stm:
Expand All @@ -69,6 +84,10 @@ def stop(self):
return

self._process.terminate()
if self.error_log != None:
self.error_log.close()
if os.path.exists(self.error_pipe):
os.remove(self.error_pipe)
self._process = None

async def restart(self, config: XrayConfig):
Expand All @@ -87,8 +106,7 @@ async def restart(self, config: XrayConfig):
async def __capture_process_logs(self):
"""capture the logs, push it into the stream, and store it in the deck
note that the stream blocks sending if it's full, so a deck is necessary"""

async def capture_stream(stream):
async def capture_stream(stream, file=None):
while True:
output = await stream.readline()
for stm in self._snd_streams:
Expand All @@ -104,10 +122,38 @@ async def capture_stream(stream):
if output == b"":
"""break in case of eof"""
return

async def fifo_stream(fifo_path, file):
with open(fifo_path, "rb") as fifo:
while True:
output = await asyncio.to_thread(fifo.readline)
if output:
file.write(output)
for stm in self._snd_streams:
try:
await stm.send(output)
except (ClosedResourceError, BrokenResourceError):
try:
self._snd_streams.remove(stm)
except ValueError:
pass
continue
else:
await asyncio.sleep(0.1)

self._logs_buffer.append(output)
if output == b"":
"""break in case of eof"""
return
tasks = [
capture_stream(self._process.stderr),
capture_stream(self._process.stdout)
]
if self.error_log is not None:
tasks.append(fifo_stream(self.error_pipe, self.error_log))

await asyncio.gather(*tasks)

await asyncio.gather(
capture_stream(self._process.stderr), capture_stream(self._process.stdout)
)
logger.warning("Xray stopped/died")
self.stop_event.set()
self.stop_event.clear()
Expand Down