Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CLN reconnect plugin #396

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tools/reconnector/consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
PLUGIN_NAME = "reconnector"
VERSION = "1.0.0"
40 changes: 40 additions & 0 deletions tools/reconnector/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3

import sys
from typing import Any

from consts import PLUGIN_NAME, VERSION
from pyln.client import Plugin
from reconnector_config import Config, register_options

from reconnector import Reconnector

pl = Plugin()
register_options(pl)

rec = Reconnector(pl)


@pl.init()
def init(
options: dict[str, Any],
configuration: dict[str, Any],
plugin: Plugin,
**kwargs: dict[str, Any],
) -> None:
cfg = Config(pl, options)
rec.init(cfg)

pl.log(f"Plugin {PLUGIN_NAME} v{VERSION} initialized")


@pl.subscribe("shutdown")
def shutdown(**kwargs: dict[str, Any]) -> None:
pl.log(f"Plugin {PLUGIN_NAME} stopping")
rec.stop()

pl.log(f"Plugin {PLUGIN_NAME} stopped")
sys.exit(0)


pl.run()
97 changes: 97 additions & 0 deletions tools/reconnector/reconnector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from threading import Timer

from pyln.client import Plugin, RpcError
from reconnector_config import Config


class Reconnector:
_pl: Plugin
_pubkey: str

_timer: Timer | None
_custom_uris: dict[str, str]

def __init__(self, pl: Plugin) -> None:
self._pl = pl
self._pubkey = ""
self._timer = None
self._custom_uris = {}

def init(self, cfg: Config) -> None:
self._custom_uris = cfg.custom_uris
self._pubkey = self._pl.rpc.getinfo()["id"]

# Cancel in case there is a running timer
self.stop()

self._pl.log(f"Checking for falsy inactive channels every {cfg.check_interval} seconds")
self._timer = Timer(cfg.check_interval, self._check_inactive_channels)
self._timer.start()

def stop(self) -> None:
if self._timer is not None:
self._timer.cancel()
self._timer = None

def _check_inactive_channels(self) -> None:
channels = self._pl.rpc.listpeerchannels()["channels"]

for channel in channels:
if (
not channel["peer_connected"]
or channel["status"][0] != "CHANNELD_NORMAL:Reconnected, and reestablished."
):
continue

channel_info = self._pl.rpc.listchannels(channel["short_channel_id"])["channels"]
if len(channel_info) != 2:
continue

our_policy = (
channel_info[0] if channel_info[0]["source"] == self._pubkey else channel_info[1]
)

if not our_policy["public"]:
continue

if not our_policy["active"]:
self._pl.log(
f"Found falsely disabled channel with peer {channel['peer_id']}; reconnecting"
)
self._reconnect_channel(channel["peer_id"])

def _reconnect_channel(self, peer_id: str) -> None:
uris = self._get_node_uris(peer_id)

# todo: still disconnect and hope they connect to us?
if len(uris) == 0:
self._pl.log(f"Could not find public URI of peer {peer_id}; not reconnecting")
return

self._pl.rpc.disconnect(peer_id, True)
for uri in uris:
try:
self._pl.rpc.connect(uri)
self._pl.log(f"Reconnected to {peer_id}")
break

except RpcError as e:
self._pl.log(f"Could not connect to {uri}: {e!s}")

def _get_node_uris(self, peer_id: str) -> list[str]:
if peer_id in self._custom_uris:
return [self._custom_uris[peer_id]]

nodes = self._pl.rpc.listnodes(peer_id)["nodes"]
if len(nodes) == 0:
return []

addresses = nodes[0]["addresses"]

# Filter torv2
addresses = list(filter(lambda address: address["type"] != "torv2", addresses))

# Prefer clearnet to Tor connections
addresses.sort(key=lambda address: 1 if "tor" in address["type"] else 0)

return [f"{peer_id}@{address['address']}:{address['port']}" for address in addresses]
62 changes: 62 additions & 0 deletions tools/reconnector/reconnector_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json
from enum import Enum
from typing import Any

from consts import PLUGIN_NAME
from pyln.client import Plugin


class OptionKeys(str, Enum):
CheckInterval = f"{PLUGIN_NAME}-check-interval"
CustomUris = f"{PLUGIN_NAME}-custom-uris"


class OptionDefaults(str, Enum):
CheckInterval = "120"
CustomUris = "[]"


def register_options(pl: Plugin) -> None:
pl.add_option(
OptionKeys.CheckInterval,
OptionDefaults.CheckInterval,
"interval in seconds to check for disabled channels",
)
pl.add_option(
OptionKeys.CustomUris,
OptionDefaults.CustomUris,
"list of URIs that should be used instead of the publicly announced ones",
)


class Config:
check_interval: int
custom_uris: dict[str, str]

def __init__(self, pl: Plugin, configuration: dict[str, Any]) -> None:
self.check_interval = int(configuration[OptionKeys.CheckInterval])

try:
self.custom_uris = Config._parse_uris(pl, configuration[OptionKeys.CustomUris])
except Exception as e:
pl.log(f"Could not decode custom URIs: {e!s}", level="warn")
self.custom_uris = {}

@staticmethod
def _parse_uris(pl: Plugin, uris_str: str) -> dict[str, str]:
uris_list: list[str] = json.loads(uris_str)
if not isinstance(uris_list, list):
msg = "not a list"
raise TypeError(msg)

uris: dict[str, str] = {}

for uri in uris_list:
split = uri.split("@")
if len(split) != 2:
pl.log(f'Ignoring custom URI because of invalid format: "{uri}"')
continue

uris[split[0].lower()] = uri

return uris
83 changes: 83 additions & 0 deletions tools/reconnector/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from unittest.mock import MagicMock

import pytest
from reconnector_config import Config, OptionKeys


class TestConfig:
@pytest.mark.parametrize(
("params", "expected_interval", "expected_uris"),
[
({OptionKeys.CheckInterval: 120, OptionKeys.CustomUris: "[]"}, 120, {}),
({OptionKeys.CheckInterval: 60, OptionKeys.CustomUris: "[]"}, 60, {}),
({OptionKeys.CheckInterval: 1, OptionKeys.CustomUris: "[]"}, 1, {}),
(
{
OptionKeys.CheckInterval: 120,
OptionKeys.CustomUris: '["02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@'
'45.86.229.190:9736"]',
},
120,
{
"02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018": "02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@45.86.229.190:9736"
},
),
(
{
OptionKeys.CheckInterval: 120,
OptionKeys.CustomUris: '["02D96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@'
'45.86.229.190:9736"]',
},
120,
{
"02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018": "02D96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@45.86.229.190:9736"
},
),
(
{
OptionKeys.CheckInterval: 120,
OptionKeys.CustomUris: "["
'"02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@45.86.229.190:9736",'
'"026165850492521f4ac8abd9bd8088123446d126f648ca35e60f88177dc149ceb2@45.86.229.190:9735"'
"]",
},
120,
{
"02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018": "02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@45.86.229.190:9736",
"026165850492521f4ac8abd9bd8088123446d126f648ca35e60f88177dc149ceb2": "026165850492521f4ac8abd9bd8088123446d126f648ca35e60f88177dc149ceb2@45.86.229.190:9735",
},
),
(
{
OptionKeys.CheckInterval: 120,
OptionKeys.CustomUris: "["
'"02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@45.86.229.190:9736",'
'"026165850492521f4ac8abd9bd8088123446d126f648ca35e60f88177dc149ceb2"'
"]",
},
120,
{
"02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018": "02d96eadea3d780104449aca5c93461ce67c1564e2e1d73225fa67dd3b997a6018@45.86.229.190:9736",
},
),
(
{
OptionKeys.CheckInterval: 1,
OptionKeys.CustomUris: '{"some": "otherData"}',
},
1,
{},
),
({OptionKeys.CheckInterval: 1, OptionKeys.CustomUris: "notJson"}, 1, {}),
],
)
def test_parse(
self,
params: dict[str, str],
expected_interval: int,
expected_uris: dict[str, str],
) -> None:
cfg = Config(MagicMock(), params)

assert cfg.check_interval == expected_interval
assert cfg.custom_uris == expected_uris
Loading