From ed561a01a5508b02e91681defe10481d478d91b8 Mon Sep 17 00:00:00 2001 From: Adam Gibson Date: Tue, 24 Aug 2021 15:40:53 +0100 Subject: [PATCH] Onion-based message channels with directory nodes Joinmarket bots run their own onion services allowing inbound connections. Both takers and makers connect to other makers at the mentioned onion services, over Tor. Directory nodes run persistent onion services allowing peers to find other (maker) peers to connect to, and also forwarding messages where necessary. This is implemented as an alternative to IRC, i.e. a new implementation of the abstract class MessageChannel, in onionmc.py. Note that using both this *and* IRC servers is supported; Joinmarket supports multiple, redundant different communication methods, simultaneously. Messaging is done with a derived class of twisted's LineReceiver, and there is an additional layer of syntax, similar to but not the same as the IRC syntax for ensuring that messages are passed with the same J5.. nick as is used on IRC. This allows us to keep the message signing logic the same as before. As well as Joinmarket line messages, we use additional control messages to communicate peer lists, and to manage connections. Peers which send messages not conforming to the syntax are dropped. See https://github.com/JoinMarket-Org/JoinMarket-Docs/pull/12 for documentation of the syntax. Connections to directory nodes are robust as for IRC servers, in that we use a ReconnectingClientFactory to keep trying to re-establish broken connections with exponential backoff. Connections to maker peers do not require this feature, as they will often disconnect in normal operation. Multiple directory nodes can and should be configured by bots. --- docs/onion-message-channels.md | 173 ++++ jmbase/jmbase/twisted_utils.py | 77 +- jmclient/jmclient/__init__.py | 2 +- jmclient/jmclient/client_protocol.py | 14 +- jmclient/jmclient/configure.py | 132 ++- jmclient/jmclient/wallet_rpc.py | 10 +- jmdaemon/jmdaemon/__init__.py | 1 + jmdaemon/jmdaemon/daemon_protocol.py | 18 +- jmdaemon/jmdaemon/message_channel.py | 6 +- jmdaemon/jmdaemon/onionmc.py | 1179 +++++++++++++++++++++++++ jmdaemon/test/test_daemon_protocol.py | 4 +- jmdaemon/test/test_irc_messaging.py | 6 +- jmdaemon/test/test_orderbookwatch.py | 4 +- scripts/obwatch/ob-watcher.py | 4 +- test/e2e-coinjoin-test.py | 364 ++++++++ test/regtest_joinmarket.cfg | 42 +- test/ygrunner.py | 3 +- 17 files changed, 1945 insertions(+), 94 deletions(-) create mode 100644 docs/onion-message-channels.md create mode 100644 jmdaemon/jmdaemon/onionmc.py create mode 100644 test/e2e-coinjoin-test.py diff --git a/docs/onion-message-channels.md b/docs/onion-message-channels.md new file mode 100644 index 000000000..deb5af6cb --- /dev/null +++ b/docs/onion-message-channels.md @@ -0,0 +1,173 @@ +# HOW TO SETUP ONION MESSAGE CHANNELS IN JOINMARKET + +### Contents + +1. [Overview](#overview) + +2. [Testing, configuring for signet](#testing) + +4. [Directory nodes](#directory) + + + +## Overview + +This is a new way for Joinmarket bots to communicate, namely by serving and connecting to Tor onion services. This does not +introduce any new requirements to your Joinmarket installation, technically, because the use of Payjoin already required the need +to service such onion services, and connecting to IRC used a SOCKS5 proxy (by default, and used by almost all users) over Tor to +a remote onion service. + +The purpose of this new type of message channel is as follows: + +* less reliance on any service external to Joinmarket +* most of the transaction negotiation will be happening directly peer to peer, not passed over a central server ( +albeit it was and remains E2E encrypted data, in either case) +* the above can lead to better scalability at large numbers +* a substantial increase in the speed of transaction negotiation; this is mostly related to the throttling of high bursts of traffic on IRC + +The configuration for a user is simple; in their `joinmarket.cfg` they will add a messaging section like this: + +``` +[MESSAGING:onion1] +type = onion +onion_serving_port = 8082 +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port +directory_nodes = rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80 +``` + +Here, I have deliberately omitted the several other settings in this section which will almost always be fine as default; +see `jmclient/jmclient/configure.py` for what those defaults are, and the extensive comments explaining. + +The main point is the list of **directory nodes** (the one shown here is one being run on signet, right now), which will +be comma separated if multiple directory nodes are configured (we expect there will be 2 or 3 as a normal situation). +The `onion_serving_port` is on which port on the local machine the onion service is served. +The `type` field must always be `onion` in this case, and distinguishes it from IRC message channels and others. + +### Can/should I still run IRC message channels? + +In short, yes. + +### Do I need to configure Tor, and if so, how? + +These message channels use both outbound and inbound connections to onion services (or "hidden services"). + +As previously mentioned, both of these features were already in use in Joinmarket. If you never served an +onion service before, it should work fine as long as you have the Tor service running in the background, +and the default control port 9051 (if not, change that value in the `joinmarket.cfg`, see above. + +#### Why not use Lightning based onions? + +(*Feel free to skip this section if you don't know what "Lightning based onions" refers to!*). The reason this architecture is +proposed as an alternative to the previously suggested Lightning-node-based network (see +[this PR](https://github.com/JoinMarket-Org/joinmarket-clientserver/pull/1000)), is mostly that: + +* the latter has a bunch of extra installation and maintenance dependencies (just one example: pyln-client requires coincurve, which we just +removed) +* the latter requires establishing a new node "identity" which can be refreshed, but that creates more concern +* longer term ideas to integrate Lightning payments to the coinjoin workflow (and vice versa!) are not realizable yet +* using multi-hop onion messaging in the LN network itself is also a way off, and a bit problematic + +So the short version is: the Lightning based alternative is certainly feasible, but has a lot more baggage that can't really be justified +unless we're actually using it for something. + + + + +## Testing, and configuring for signet. + +This testing section focuses on signet since that will be the less troublesome way of getting involved in tests for +the non-hardcore JM developer :) + +(For the latter, please use the regtest setup by running `test/e2e-coinjoin-test.py` under `pytest`, +and pay attention to the settings in `regtest_joinmarket.cfg`.) + +There is no separate/special configuration for signet other than the configuration that is already needed for running +Joinmarket against a signet backend (so e.g. RPC port of 38332). + +Add the `[MESSAGING:onion1]` message channel section to your `joinmarket.cfg`, as listed above, including the +signet directory node listed above (rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80), and, +for the simplest test, remove the other `[MESSAGING:*]` sections that you have. + +Then just make sure your bot has some signet coins and try running as maker or taker or both. + + + +## Directory nodes + +**This last section is for people with a lot of technical knowledge in this area, +who would like to help by running a directory node. You can ignore it if that does not apply.**. + +This requires a long running bot. It should be on a server you can keep running permanently, so perhaps a VPS, +but in any case, very high uptime. For reliability it also makes sense to configure to run as a systemd service. + +A note: in this early stage, the usage of Lightning is only really network-layer stuff, and the usage of bitcoin, is none; feel free to add elements that remove any need for a backend bitcoin blockchain, but beware: future upgrades *could* mean that the directory node really does need the bitcoin backend. + +#### Joinmarket-specific configuration + +Add `hidden_service_dir` to your `[MESSAGING:onion1]` with a directory accessible to your user. You may want to lock this down +a bit! +The point to understand is: Joinmarket's `jmbase.JMHiddenService` will, if configured with a non-empty `hidden_service_dir` +field, actually start an *independent* instance of Tor specifically for serving this, under the current user. +(our tor interface library `txtorcon` needs read access to the Tor HS dir, so it's troublesome to do this another way). + +##### Question: How to configure the `directory-nodes` list in our `joinmarket.cfg` for this directory node bot? + +Answer: **you must only enter your own node in this list!** (otherwise you may find your bot infinitely rebroadcasting messages). + + +#### Suggested setup of a service: + +You will need two components: bitcoind, and Joinmarket itself, which you can run as a yg. +Since this task is going to be attempted by someone with significant technical knowledge, +only an outline is provided here; several details will need to be filled in. +Here is a sketch of how the systemd service files can be set up for signet: + +If someone wants to put together a docker setup of this for a more "one-click install", that would be great. + +1. bitcoin-signet.service + +``` +[Unit] +Description=bitcoind signet +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/local/bin/bitcoind -signet +User=user + +[Install] +WantedBy=multi-user.target +``` + +This is deliberately a super-basic setup (see above). Don't forget to setup your `bitcoin.conf` as usual, +for the bitcoin user, and make it match (specifically in terms of RPC) what you set up for Lightning below. + + +2. + +``` +[Unit] +Description=joinmarket directory node on signet +Requires=bitcoin-signet.service +After=bitcoin-signet.service + +[Service] +Type=simple +ExecStart=/bin/bash -c 'cd /path/to/joinmarket-clientserver && source jmvenv/bin/activate && cd scripts && echo -n "password" | python yg-privacyenhanced.py --wallet-password-stdin --datadir=/custom/joinmarket-datadir some-signet-wallet.jmdat' +User=user + +[Install] +WantedBy=multi-user.target +``` + +To state the obvious, the idea here is that this second service will run the JM directory node and have a dependency on the previous one, +to ensure they start up in the correct order. + +Re: password echo, obviously this kind of password entry is bad; +for now we needn't worry as these nodes don't need to carry any real coins (and it's better they don't!). +Later we may need to change that (though of course you can use standard measures to protect the box). + +TODO: add some material on network hardening/firewalls here, I guess. diff --git a/jmbase/jmbase/twisted_utils.py b/jmbase/jmbase/twisted_utils.py index f7e2f287b..b7594d181 100644 --- a/jmbase/jmbase/twisted_utils.py +++ b/jmbase/jmbase/twisted_utils.py @@ -128,16 +128,23 @@ def config_to_hs_ports(virtual_port, host, port): class JMHiddenService(object): """ Wrapper class around the actions needed to create and serve on a hidden service; an object of - type Resource must be provided in the constructor, - which does the HTTP serving actions (GET, POST serving). + type either Resource or server.ProtocolFactory must + be provided in the constructor, which does the HTTP + (GET, POST) or other protocol serving actions. """ - def __init__(self, resource, info_callback, error_callback, - onion_hostname_callback, tor_control_host, + def __init__(self, proto_factory_or_resource, info_callback, + error_callback, onion_hostname_callback, tor_control_host, tor_control_port, serving_host, serving_port, - virtual_port = None, - shutdown_callback = None): - self.site = Site(resource) - self.site.displayTracebacks = False + virtual_port=None, + shutdown_callback=None, + hidden_service_dir=""): + if isinstance(proto_factory_or_resource, Resource): + # TODO bad naming, in this case it doesn't start + # out as a protocol factory; a Site is one, a Resource isn't. + self.proto_factory = Site(proto_factory_or_resource) + self.proto_factory.displayTracebacks = False + else: + self.proto_factory = proto_factory_or_resource self.info_callback = info_callback self.error_callback = error_callback # this has a separate callback for convenience, it should @@ -155,6 +162,13 @@ def __init__(self, resource, info_callback, error_callback, # config object, so no default here: self.serving_host = serving_host self.serving_port = serving_port + # this is used to serve an onion from the filesystem, + # NB: Because of how txtorcon is set up, this option + # uses a *separate tor instance* owned by the owner of + # this script (because txtorcon needs to read the + # HS dir), whereas if this option is "", we set up + # an ephemeral HS on the global or pre-existing tor. + self.hidden_service_dir = hidden_service_dir def start_tor(self): """ This function executes the workflow @@ -162,19 +176,31 @@ def start_tor(self): """ self.info_callback("Attempting to start onion service on port: {} " "...".format(self.virtual_port)) - if str(self.tor_control_host).startswith('unix:'): - control_endpoint = UNIXClientEndpoint(reactor, - self.tor_control_host[5:]) + if self.hidden_service_dir == "": + if str(self.tor_control_host).startswith('unix:'): + control_endpoint = UNIXClientEndpoint(reactor, + self.tor_control_host[5:]) + else: + control_endpoint = TCP4ClientEndpoint(reactor, + self.tor_control_host, self.tor_control_port) + d = txtorcon.connect(reactor, control_endpoint) + d.addCallback(self.create_onion_ep) + d.addErrback(self.setup_failed) + # TODO: add errbacks to the next two calls in + # the chain: + d.addCallback(self.onion_listen) + d.addCallback(self.print_host) else: - control_endpoint = TCP4ClientEndpoint(reactor, - self.tor_control_host, self.tor_control_port) - d = txtorcon.connect(reactor, control_endpoint) - d.addCallback(self.create_onion_ep) - d.addErrback(self.setup_failed) - # TODO: add errbacks to the next two calls in - # the chain: - d.addCallback(self.onion_listen) - d.addCallback(self.print_host) + ep = "onion:" + str(self.virtual_port) + ":localPort=" + ep += str(self.serving_port) + # endpoints.TCPHiddenServiceEndpoint creates version 2 by + # default for backwards compat (err, txtorcon needs to update that ...) + ep += ":version=3" + ep += ":hiddenServiceDir="+self.hidden_service_dir + onion_endpoint = serverFromString(reactor, ep) + d = onion_endpoint.listen(self.proto_factory) + d.addCallback(self.print_host_filesystem) + def setup_failed(self, arg): # Note that actions based on this failure are deferred to callers: @@ -195,7 +221,8 @@ def onion_listen(self, onion): serverstring = "tcp:{}:interface={}".format(self.serving_port, self.serving_host) onion_endpoint = serverFromString(reactor, serverstring) - return onion_endpoint.listen(self.site) + print("created the onion endpoint, now calling listen") + return onion_endpoint.listen(self.proto_factory) def print_host(self, ep): """ Callback fired once the HS is available @@ -204,6 +231,14 @@ def print_host(self, ep): """ self.onion_hostname_callback(self.onion.hostname) + def print_host_filesystem(self, port): + """ As above but needed to respect slightly different + callback chain for this case (where we start our own tor + instance for the filesystem-based onion). + """ + self.onion = port.onion_service + self.onion_hostname_callback(self.onion.hostname) + def shutdown(self): self.tor_connection.protocol.transport.loseConnection() self.info_callback("Hidden service shutdown complete") diff --git a/jmclient/jmclient/__init__.py b/jmclient/jmclient/__init__.py index df2ed38e6..e5b8a8961 100644 --- a/jmclient/jmclient/__init__.py +++ b/jmclient/jmclient/__init__.py @@ -24,7 +24,7 @@ TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WPKH, detect_script_type) from .configure import (load_test_config, process_shutdown, load_program_config, jm_single, get_network, update_persist_config, - validate_address, is_burn_destination, get_irc_mchannels, + validate_address, is_burn_destination, get_mchannels, get_blockchain_interface_instance, set_config, is_segwit_mode, is_native_segwit_mode, JMPluginService, get_interest_rate, get_bondless_makers_allowance, check_and_start_tor) diff --git a/jmclient/jmclient/client_protocol.py b/jmclient/jmclient/client_protocol.py index 68ea865ac..01b00b8e7 100644 --- a/jmclient/jmclient/client_protocol.py +++ b/jmclient/jmclient/client_protocol.py @@ -15,7 +15,7 @@ import sys from jmbase import (get_log, EXIT_FAILURE, hextobin, bintohex, utxo_to_utxostr, bdict_sdict_convert) -from jmclient import (jm_single, get_irc_mchannels, +from jmclient import (jm_single, get_mchannels, RegtestBitcoinCoreInterface, SNICKERReceiver, process_shutdown) import jmbitcoin as btc @@ -434,7 +434,7 @@ def clientStart(self): "blockchain_source") #needed only for channel naming convention network = jm_single().config.get("BLOCKCHAIN", "network") - irc_configs = get_irc_mchannels() + irc_configs = self.factory.get_mchannels() #only here because Init message uses this field; not used by makers TODO minmakers = jm_single().config.getint("POLICY", "minimum_makers") maker_timeout_sec = jm_single().maker_timeout_sec @@ -601,7 +601,7 @@ def clientStart(self): "blockchain_source") #needed only for channel naming convention network = jm_single().config.get("BLOCKCHAIN", "network") - irc_configs = get_irc_mchannels() + irc_configs = self.factory.get_mchannels() minmakers = jm_single().config.getint("POLICY", "minimum_makers") maker_timeout_sec = jm_single().maker_timeout_sec @@ -795,6 +795,14 @@ def getClient(self): def buildProtocol(self, addr): return self.protocol(self, self.client) + def get_mchannels(self): + """ A transparent wrapper that allows override, + so that a script can return a customised set of + message channel configs; currently used for testing + multiple bots on regtest. + """ + return get_mchannels() + def start_reactor(host, port, factory=None, snickerfactory=None, bip78=False, jm_coinjoin=True, ish=True, daemon=False, rs=True, gui=False): #pragma: no cover diff --git a/jmclient/jmclient/configure.py b/jmclient/jmclient/configure.py index b4498616b..f1d1661ee 100644 --- a/jmclient/jmclient/configure.py +++ b/jmclient/jmclient/configure.py @@ -144,6 +144,9 @@ def jm_single(): ## SERVER 1/3) Darkscience IRC (Tor, IP) ################################################################################ [MESSAGING:server1] +# by default the legacy format without a `type` field is +# understood to be IRC, but you can, optionally, add it: +# type = irc channel = joinmarket-pit port = 6697 usessl = true @@ -158,24 +161,47 @@ def jm_single(): #socks5_host = localhost #socks5_port = 9050 -## SERVER 2/3) hackint IRC (Tor, IP) -################################################################################ -[MESSAGING:server2] -channel = joinmarket-pit +[MESSAGING:onion1] +# onion based message channels must have the exact type 'onion' +# (while the section name above can be MESSAGING:whatever), and there must +# be only ONE such message channel configured (note the directory servers +# can be multiple, below): +type = onion -# For traditional IP (default): -host = irc.hackint.org -port = 6697 -usessl = true -socks5 = false +socks5_host = localhost +socks5_port = 9050 -# For Tor (recommended as clearnet alternative): -#host = ncwkrwxpq2ikcngxq3dy2xctuheniggtqeibvgofixpzvrwpa77tozqd.onion -#port = 6667 -#usessl = false -#socks5 = true -#socks5_host = localhost -#socks5_port = 9050 +# the tor control configuration. +# for most people running the tor daemon +# on Linux, no changes are required here: +tor_control_host = localhost +# or, to use a UNIX socket +# tor_control_host = unix:/var/run/tor/control +tor_control_port = 9051 + +# the host/port actually serving the hidden service +# (note the *virtual port*, that the client uses, +# is hardcoded to 80): +onion_serving_host = 127.0.0.1 +onion_serving_port = 8080 + +# directory node configuration +# +# This is mandatory for directory nodes (who must also set their +# own *.onion:port as the only directory in directory_nodes, below), +# but NOT TO BE USED by non-directory nodes (which is you, unless +# you know otherwise!), as it will greatly degrade your privacy. +# (note the default is no value, don't replace it with ""). +hidden_service_dir = +# +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port ; both are required, though port will +# be 80 if created in this code. +directory_nodes = rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80 + +# This setting is ONLY for developer regtest setups, +# running multiple bots at once. Don't alter it otherwise +regtest_count = 0,0 ## SERVER 3/3) ILITA IRC (Tor - disabled by default) ################################################################################ @@ -488,7 +514,7 @@ def set_config(cfg, bcint=None): global_singleton.bc_interface = bcint -def get_irc_mchannels(): +def get_mchannels(): SECTION_NAME = 'MESSAGING' # FIXME: remove in future release if jm_single().config.has_section(SECTION_NAME): @@ -499,16 +525,30 @@ def get_irc_mchannels(): return _get_irc_mchannels_old() SECTION_NAME += ':' - irc_sections = [] + sections = [] for s in jm_single().config.sections(): if s.startswith(SECTION_NAME): - irc_sections.append(s) - assert irc_sections + sections.append(s) + assert sections - req_fields = [("host", str), ("port", int), ("channel", str), ("usessl", str)] + irc_fields = [("host", str), ("port", int), ("channel", str), ("usessl", str), + ("socks5", str), ("socks5_host", str), ("socks5_port", str)] + onion_fields = [("type", str), ("directory_nodes", str), ("regtest_count", str), + ("socks5_host", str), ("socks5_port", int), + ("tor_control_host", str), ("tor_control_port", int), + ("onion_serving_host", str), ("onion_serving_port", int), + ("hidden_service_dir", str)] configs = [] - for section in irc_sections: + + # processing the IRC sections: + for section in sections: + if jm_single().config.has_option(section, "type"): + # legacy IRC configs do not have "type" but just + # in case, we'll allow the "irc" type: + if not jm_single().config.get(section, "type").lower( + ) == "irc": + break server_data = {} # check if socks5 is enabled for tor and load relevant config if so @@ -520,13 +560,30 @@ def get_irc_mchannels(): server_data["socks5_host"] = jm_single().config.get(section, "socks5_host") server_data["socks5_port"] = jm_single().config.get(section, "socks5_port") - for option, otype in req_fields: + for option, otype in irc_fields: val = jm_single().config.get(section, option) server_data[option] = otype(val) server_data['btcnet'] = get_network() configs.append(server_data) - return configs + # processing the onion sections: + for section in sections: + if not jm_single().config.has_option(section, "type") or \ + not jm_single().config.get(section, "type").lower() == "onion": + continue + onion_data = {} + for option, otype in onion_fields: + try: + val = jm_single().config.get(section, option) + except NoOptionError: + continue + onion_data[option] = otype(val) + onion_data['btcnet'] = get_network() + # Just to allow a dynamic set of var: + onion_data["section-name"] = section + configs.append(onion_data) + + return configs def _get_irc_mchannels_old(): fields = [("host", str), ("port", int), ("channel", str), ("usessl", str), @@ -655,28 +712,6 @@ def load_program_config(config_path="", bs=None, plugin_services=[]): "settings and restart joinmarket.", "info") sys.exit(EXIT_FAILURE) - #These are left as sanity checks but currently impossible - #since any edits are overlays to the default, these sections/options will - #always exist. - # FIXME: This check is a best-effort attempt. Certain incorrect section - # names can pass and so can non-first invalid sections. - for s in required_options: #pragma: no cover - # check for sections - avail = None - if not global_singleton.config.has_section(s): - for avail in global_singleton.config.sections(): - if avail.startswith(s): - break - else: - raise Exception( - "Config file does not contain the required section: " + s) - # then check for specific options - k = avail or s - for o in required_options[s]: - if not global_singleton.config.has_option(k, o): - raise Exception("Config file does not contain the required " - "option '{}' in section '{}'.".format(o, k)) - loglevel = global_singleton.config.get("LOGGING", "console_log_level") try: set_logging_level(loglevel) @@ -746,6 +781,11 @@ def load_program_config(config_path="", bs=None, plugin_services=[]): if not os.path.exists(plogsdir): os.makedirs(plogsdir) p.set_log_dir(plogsdir) + # Check if a onion message channel was configured, and if so, + # check there is only 1; multiple directory nodes will be inside the config. + chans = get_mchannels() + onion_chans = [x for x in chans if "type" in x and x["type"] == "onion"] + assert len(onion_chans) < 2 def gracefully_kill_subprocess(p): # See https://stackoverflow.com/questions/43274476/is-there-a-way-to-check-if-a-subprocess-is-still-running diff --git a/jmclient/jmclient/wallet_rpc.py b/jmclient/jmclient/wallet_rpc.py index 4597cf475..68c8f895d 100644 --- a/jmclient/jmclient/wallet_rpc.py +++ b/jmclient/jmclient/wallet_rpc.py @@ -159,6 +159,9 @@ def __init__(self, port, wss_port, tls=True): # can be shut down cleanly: self.coinjoin_connection = None + def get_client_factory(self): + return JMClientProtocolFactory(self.taker) + def activate_coinjoin_state(self, state): """ To be set when a maker or taker operation is initialized; they cannot @@ -420,7 +423,8 @@ def dummy_restart_callback(msg): walletname=self.wallet_name, token=self.cookie) - def taker_finished(self, res, fromtx=False, waittime=0.0, txdetails=None): + def taker_finished(self, res, fromtx=False, + waittime=0.0, txdetails=None): # This is a slimmed down version compared with what is seen in # the CLI code, since that code encompasses schedules with multiple # entries; for now, the RPC only supports single joins. @@ -1003,13 +1007,13 @@ def dummy_user_callback(rel, abs): self.taker = Taker(self.services["wallet"], schedule, max_cj_fee = max_cj_fee, callbacks=(self.filter_orders_callback, - None, self.taker_finished)) + None, self.taker_finished)) # TODO ; this makes use of a pre-existing hack to allow # selectively disabling the stallMonitor function that checks # if transactions went through or not; here we want to cleanly # destroy the Taker after an attempt is made, successful or not. self.taker.testflag = True - self.clientfactory = JMClientProtocolFactory(self.taker) + self.clientfactory = self.get_client_factory() dhost, dport = self.check_daemon_ready() diff --git a/jmdaemon/jmdaemon/__init__.py b/jmdaemon/jmdaemon/__init__.py index 384b5f720..fc1c4070b 100644 --- a/jmdaemon/jmdaemon/__init__.py +++ b/jmdaemon/jmdaemon/__init__.py @@ -4,6 +4,7 @@ from .enc_wrapper import as_init_encryption, decode_decrypt, \ encrypt_encode, init_keypair, init_pubkey, get_pubkey, NaclError from .irc import IRCMessageChannel +from .onionmc import OnionMessageChannel from jmbase.support import get_log from .message_channel import MessageChannel, MessageChannelCollection from .orderbookwatch import OrderbookWatch diff --git a/jmdaemon/jmdaemon/daemon_protocol.py b/jmdaemon/jmdaemon/daemon_protocol.py index b20a55107..d84bbb514 100644 --- a/jmdaemon/jmdaemon/daemon_protocol.py +++ b/jmdaemon/jmdaemon/daemon_protocol.py @@ -7,8 +7,9 @@ from .protocol import (COMMAND_PREFIX, ORDER_KEYS, NICK_HASH_LENGTH, NICK_MAX_ENCODED, JM_VERSION, JOINMARKET_NICK_HEADER, COMMITMENT_PREFIXES) -from .irc import IRCMessageChannel +from .irc import IRCMessageChannel +from .onionmc import OnionMessageChannel from jmbase import (is_hs_uri, get_tor_agent, JMHiddenService, get_nontor_agent, BytesProducer, wrapped_urlparse, bdict_sdict_convert, JMHTTPResource) @@ -527,10 +528,15 @@ def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, self.mc_shutdown() self.irc_configs = irc_configs self.restart_mc_required = True - mcs = [IRCMessageChannel(c, - daemon=self, - realname='btcint=' + bcsource) - for c in self.irc_configs] + mcs = [] + for c in self.irc_configs: + if "type" in c and c["type"] == "onion": + mcs.append(OnionMessageChannel(c, daemon=self)) + else: + # default is IRC; TODO allow others + mcs.append(IRCMessageChannel(c, + daemon=self, + realname='btcint=' + bcsource)) self.mcc = MessageChannelCollection(mcs) OrderbookWatch.set_msgchan(self, self.mcc) #register taker-specific msgchan callbacks here @@ -947,6 +953,7 @@ def init_connections(self, nick): incomplete transaction is wiped. """ self.jm_state = 0 #uninited + self.mcc.set_nick(nick) if self.restart_mc_required: self.mcc.run() self.restart_mc_required = False @@ -954,7 +961,6 @@ def init_connections(self, nick): #if we are not restarting the MC, #we must simulate the on_welcome message: self.on_welcome() - self.mcc.set_nick(nick) def transfer_commitment(self, commit): """Send this commitment via privmsg to one (random) diff --git a/jmdaemon/jmdaemon/message_channel.py b/jmdaemon/jmdaemon/message_channel.py index 96be37ec6..9549f193d 100644 --- a/jmdaemon/jmdaemon/message_channel.py +++ b/jmdaemon/jmdaemon/message_channel.py @@ -263,9 +263,9 @@ def privmsg(self, nick, cmd, message, mc=None): #is supposed to be sent. There used to be an exception raise. #to prevent a crash (especially in makers), we just inform #the user about it for now - log.error("Tried to communicate on this IRC server but " + log.error("Tried to communicate on this message channel but " "failed: " + str(mc)) - log.error("You might have to comment out this IRC server " + log.error("You might have to comment out this message channel" "in joinmarket.cfg and restart.") log.error("No action needed for makers / yield generators!") # todo: add logic to continue on other available mc @@ -444,7 +444,7 @@ def on_welcome_trigger(self, mc): if (not self.on_welcome_announce_id) and self.on_welcome: self.on_welcome_announce_id = reactor.callLater(60, self.on_welcome_setup_finished,) else: - log.info("All IRC servers connected, starting execution.") + log.info("All message channels connected, starting execution.") if self.on_welcome_announce_id: self.on_welcome_announce_id.cancel() self.on_welcome_setup_finished() diff --git a/jmdaemon/jmdaemon/onionmc.py b/jmdaemon/jmdaemon/onionmc.py new file mode 100644 index 000000000..a426674bb --- /dev/null +++ b/jmdaemon/jmdaemon/onionmc.py @@ -0,0 +1,1179 @@ +from jmdaemon.message_channel import MessageChannel +from jmdaemon.protocol import COMMAND_PREFIX, JM_VERSION +from jmbase import get_log, JM_APP_NAME, JMHiddenService +import json +import copy +from typing import Callable, Union +from twisted.internet import reactor, task, protocol +from twisted.protocols import basic +from twisted.internet.endpoints import TCP4ClientEndpoint +from twisted.internet.address import IPv4Address, IPv6Address +from txtorcon.socks import TorSocksEndpoint + +log = get_log() + +def network_addr_to_string(location: Union[IPv4Address, IPv4Address]) -> str: + if isinstance(location, (IPv4Address, IPv6Address)): + host = location.host + port = location.port + else: + # TODO handle other addr types + assert False + return host + ":" + str(port) + +# module-level var to control whether we use Tor or not +# (specifically for tests): +testing_mode = False +def set_testing_mode(configdata: dict) -> None: + """ Toggles testing mode which enables non-Tor + network setup: + """ + global testing_mode + if not "regtest_count" in configdata: + log.debug("Onion message channel is not using regtest mode.") + testing_mode = False + return + try: + s, e = [int(x) for x in configdata["regtest_count"].split(",")] + except Exception as e: + log.info("Failed to get regtest count settings, error: {}".format(repr(e))) + testing_mode = False + return + if s == 0 and e == 0: + testing_mode = False + return + testing_mode = True + +""" +Messaging protocol (which wraps the underlying Joinmarket +messaging protocol) used here is documented in: +Joinmarket-Docs/onion-messaging.md +""" + +LOCAL_CONTROL_MESSAGE_TYPES = {"connect": 785, "disconnect": 787, "connect-in": 797} +CONTROL_MESSAGE_TYPES = {"peerlist": 789, "getpeerlist": 791, + "handshake": 793, "dn-handshake": 795, + "ping": 797, "pong": 799, "disconnect": 801} +JM_MESSAGE_TYPES = {"privmsg": 685, "pubmsg": 687} + +# Used for some control message construction, as detailed below. +NICK_PEERLOCATOR_SEPARATOR = ";" + +# location_string and nick must be set before sending, +# otherwise invalid: +client_handshake_json = {"app-name": JM_APP_NAME, + "directory": False, + "location-string": "", + "proto-ver": JM_VERSION, + "features": {}, + "nick": "" +} + +# default acceptance false; code must switch it on: +server_handshake_json = {"app-name": JM_APP_NAME, + "directory": True, + "proto-ver-min": JM_VERSION, + "proto-ver-max": JM_VERSION, + "features": {}, + "accepted": False, + "nick": "", + "motd": "Default MOTD, replace with information for the directory." + } + +# states that keep track of relationship to a peer +PEER_STATUS_UNCONNECTED, PEER_STATUS_CONNECTED, PEER_STATUS_HANDSHAKED, \ + PEER_STATUS_DISCONNECTED = range(4) + + +class OnionPeerError(Exception): + pass + +class OnionPeerDirectoryWithoutHostError(OnionPeerError): + pass + +class OnionPeerConnectionError(OnionPeerError): + pass + +class OnionCustomMessageDecodingError(Exception): + pass + +class OnionCustomMessage(object): + """ Encapsulates the messages passed over the wire + to and from other onion peers + """ + def __init__(self, text: str, msgtype: int): + self.text = text + self.msgtype = msgtype + + def encode(self) -> str: + self.encoded = json.dumps({"type": self.msgtype, + "line": self.text}).encode("utf-8") + return self.encoded + + @classmethod + def from_string_decode(cls, msg: str) -> 'OnionCustomMessage': + """ Build a custom message from a json-ified string. + """ + try: + msg_obj = json.loads(msg) + text = msg_obj["line"] + msgtype = msg_obj["type"] + except: + raise OnionCustomMessageDecodingError + return cls(text, msgtype) + +class OnionLineProtocol(basic.LineReceiver): + def connectionMade(self): + self.factory.register_connection(self) + + def connectionLost(self, reason): + self.factory.register_disconnection(self) + + def lineReceived(self, line: str) -> None: + #print("received", repr(line)) + try: + msg = OnionCustomMessage.from_string_decode(line) + except OnionCustomMessageDecodingError: + log.debug("Received invalid message, dropping connection.") + self.transport.loseConnection() + return + self.factory.receive_message(msg, self) + + def message(self, message: OnionCustomMessage) -> None: + #log.info("in OnionLineProtocol, about to send message: {} to peer {}".format(message.encode(), self.transport.getPeer())) + self.transport.write(message.encode() + self.delimiter) + +class OnionLineProtocolFactory(protocol.ServerFactory): + """ This factory allows us to start up instances + of the LineReceiver protocol that are instantiated + towards us. + As such, it is responsible for keeping track + """ + protocol = OnionLineProtocol + + def __init__(self, client: 'OnionMessageChannel'): + self.client = client + self.peers = {} + + def register_connection(self, p: OnionLineProtocol) -> None: + # make a local control message registering + # the new connection + peer_location = network_addr_to_string(p.transport.getPeer()) + self.client.register_connection(peer_location, direction=0) + self.peers[peer_location] = p + + def register_disconnection(self, p: OnionLineProtocol) -> None: + # make a local control message registering + # the new connection + peer_location = network_addr_to_string(p.transport.getPeer()) + self.client.register_disconnection(peer_location) + if not peer_location in self.peers: + log.warn("Disconnection event registered for non-existent peer.") + return + del self.peers[peer_location] + + def receive_message(self, message: OnionCustomMessage, + p: OnionLineProtocol) -> None: + self.client.receive_msg(message, network_addr_to_string( + p.transport.getPeer())) + + def send(self, message: OnionCustomMessage, destination: str) -> bool: + #print("trying to send in OnionLineProtocolFactory.") + #print("message: {}, destination: {}".format(message.encode(), destination)) + if not (destination in self.peers): + print("sending message {}, destination {} was not in peers {}".format(message.encode(), destination, self.peers)) + return False + proto = self.peers[destination] + proto.message(message) + return True + +class OnionClientFactory(protocol.ServerFactory): + """ We define a distinct protocol factory for outbound connections. + Notably, this factory supports only *one* protocol instance at a time. + """ + protocol = OnionLineProtocol + + def __init__(self, message_receive_callback: Callable, + connection_callback: Callable, + disconnection_callback: Callable): + self.proto_client = None + # callback takes OnionCustomMessage as arg and returns None + self.message_receive_callback = message_receive_callback + # connection callback, no args, returns None + self.connection_callback = connection_callback + # disconnection the same + self.disconnection_callback = disconnection_callback + + def register_connection(self, p: OnionLineProtocol) -> None: + #print("in OnionClientFactory, registered a connection, proto instance: ", p) + self.proto_client = p + self.connection_callback() + + def register_disconnection(self, p: OnionLineProtocol) -> None: + self.proto_client = None + self.disconnection_callback() + + def send(self, msg: OnionCustomMessage) -> bool: + self.proto_client.message(msg) + + def receive_message(self, message: OnionCustomMessage, + p: OnionLineProtocol) -> None: + self.message_receive_callback(message) + + """ + def clientConnectionLost(self, connector, reason): + log.debug('Connection to peer lost: {}, reason: {}'.format(connector, reason)) + if reactor.running: + log.info('Attempting to reconnect...') + protocol.ReconnectingClientFactory.clientConnectionLost( + self, connector, reason) + + def clientConnectionFailed(self, connector, reason): + log.debug('Connection to peer failed: {}, reason: {}'.format( + connector, reason)) + if reactor.running: + log.info('Attempting to reconnect...') + protocol.ReconnectingClientFactory.clientConnectionFailed( + self, connector, reason) + """ + +class OnionPeer(object): + + def __init__(self, messagechannel: 'OnionMessageChannel', + socks5_host: str, socks5_port: int, + hostname: str=None, port: int=-1, + directory: bool=False, nick: str="", + handshake_callback: Callable=None): + # reference to the managing OnionMessageChannel instance is + # needed so that we know where to send the messages received + # from this peer: + self.messagechannel = messagechannel + self.nick = nick + # client side net config: + self.socks5_host = socks5_host + self.socks5_port = socks5_port + # remote net config: + self.hostname = hostname + self.port = port + if directory and not (self.hostname): + raise OnionPeerDirectoryWithoutHostError() + self.directory = directory + self._status = PEER_STATUS_UNCONNECTED + #A function to be called to initiate a handshake; + # it should take a single argument, an OnionPeer object, + #and return None. + self.handshake_callback = handshake_callback + # Keep track of the protocol factory used to connect + # to the remote peer. Note that this won't always be used, + # if we have an inbound connection from this peer: + self.factory = None + # alternate location strings are used for inbound + # connections for this peer (these will be used first + # and foremost by directories, sending messages backwards + # on a connection created towards them). + self.alternate_location = "" + + def set_alternate_location(self, location_string: str): + self.alternate_location = location_string + + def update_status(self, destn_status: int) -> None: + """ Wrapping state updates to enforce: + (a) that the handshake is triggered by connection + outwards, and (b) to ensure no illegal state transitions. + """ + assert destn_status in range(4) + ignored_updates = [] + if self._status == PEER_STATUS_UNCONNECTED: + allowed_updates = [PEER_STATUS_CONNECTED, + PEER_STATUS_DISCONNECTED] + elif self._status == PEER_STATUS_CONNECTED: + # updates from connected->connected are harmless + allowed_updates = [PEER_STATUS_CONNECTED, + PEER_STATUS_DISCONNECTED, + PEER_STATUS_HANDSHAKED] + elif self._status == PEER_STATUS_HANDSHAKED: + allowed_updates = [PEER_STATUS_DISCONNECTED] + ignored_updates = [PEER_STATUS_CONNECTED] + elif self._status == PEER_STATUS_DISCONNECTED: + allowed_updates = [PEER_STATUS_CONNECTED] + ignored_updates = [PEER_STATUS_DISCONNECTED] + if destn_status in ignored_updates: + # TODO: this happens sometimes from 2->1; why? + log.debug("Attempt to update status of peer from {} " + "to {} ignored.".format(self._status, destn_status)) + return + assert destn_status in allowed_updates, ("couldn't update state " + "from {} to {}".format(self._status, destn_status)) + self._status = destn_status + # the handshakes are always initiated by a client: + if destn_status == PEER_STATUS_CONNECTED: + log.info("We, {}, are calling the handshake callback as client.".format(self.messagechannel.self_as_peer.peer_location())) + self.handshake_callback(self) + + def status(self) -> int: + """ Simple getter function for the wrapped _status: + """ + return self._status + + def set_nick(self, nick: str) -> None: + self.nick = nick + + def get_nick_peerlocation_ser(self) -> str: + if not self.nick: + raise OnionPeerError("Cannot serialize " + "identifier string without nick.") + return self.nick + NICK_PEERLOCATOR_SEPARATOR + \ + self.peer_location() + + @classmethod + def from_location_string(cls, mc: 'OnionMessageChannel', + location: str, + socks5_host: str, + socks5_port: int, + directory: bool=False, + handshake_callback: Callable=None) -> 'OnionPeer': + """ Allows construction of an OnionPeer from the + connection information given by the network interface. + TODO: special handling for inbound is needed. + """ + host, port = location.split(":") + return cls(mc, socks5_host, socks5_port, hostname=host, + port=int(port), directory=directory, + handshake_callback=handshake_callback) + + def set_host_port(self, hostname: str, port: int) -> None: + """ If the connection info is discovered + after this peer was already added to our list, + we can set it with this method. + """ + self.hostname = hostname + self.port = port + + def set_location(self, location_string: str) -> bool: + """ Allows setting location from an unchecked + input string argument; if the string does not have + the required format, + will return False, otherwise self.hostname, self.port are + updated for future `peer_location` calls, and True is returned. + """ + try: + host, port = location_string.split(":") + portint = int(port) + assert portint > 0 + except Exception as e: + log.debug("Failed to update host and port of this peer, " + "error: {}".format(repr(e))) + return False + self.hostname = host + self.port = portint + return True + + def peer_location(self) -> str: + assert (self.hostname and self.port > 0) + return self.hostname + ":" + str(self.port) + + def send(self, message: OnionCustomMessage) -> bool: + """ If the message can be sent on either an inbound or + outbound connection, True is returned, else False. + """ + if not self.factory: + #print("We are: {}. peer, wich was directory {}, did not have factory, so we send via mc".format( + # self.messagechannel.self_as_peer.peer_location(), self.directory)) + # we try to send via the overall message channel serving + # protocol, i.e. we assume the connection was made inbound: + #print("and to this location: ", self.peer_location()) + return self.messagechannel.proto_factory.send(message, self.alternate_location) + #print("peer which was directory {} did have factory {}, we send via that".format(self.directory, self.factory)) + return self.factory.send(message) + + def receive_message(self, message: OnionCustomMessage) -> None: + self.messagechannel.receive_msg(message, self.peer_location()) + + def connect(self) -> None: + """ This method is called to connect, over Tor, to the remote + peer at the given onion host/port. + The connection is 'persistent' in the sense that we use a + ReconnectingClientFactory. + """ + if self._status in [PEER_STATUS_HANDSHAKED, PEER_STATUS_CONNECTED]: + return + if not (self.hostname and self.port > 0): + raise OnionPeerConnectionError( + "Cannot connect without host, port info") + + self.factory = OnionClientFactory(self.receive_message, + self.register_connection, self.register_disconnection) + if testing_mode: + print("{} is making a tcp connection to {}, {}, {},".format( + self.messagechannel.self_as_peer.peer_location(), self.hostname, self.port, self.factory)) + self.tcp_connector = reactor.connectTCP(self.hostname, self.port, self.factory) + else: + torEndpoint = TCP4ClientEndpoint(reactor, self.socks5_host, self.socks5_port) + onionEndpoint = TorSocksEndpoint(torEndpoint, self.hostname, self.port) + onionEndpoint.connect(self.factory) + + def register_connection(self) -> None: + self.messagechannel.register_connection(self.peer_location(), direction=1) + + def register_disconnection(self) -> None: + self.messagechannel.register_disconnection(self.peer_location()) + + def try_to_connect(self) -> None: + """ This method wraps OnionPeer.connect and accepts + any error if that fails. + """ + try: + self.connect() + except OnionPeerConnectionError as e: + log.debug("Tried to connect but failed: {}".format(repr(e))) + except Exception as e: + log.warn("Got unexpected exception in connect attempt: {}".format( + repr(e))) + + def disconnect(self) -> None: + if self._status in [PEER_STATUS_UNCONNECTED, PEER_STATUS_DISCONNECTED]: + return + if not (self.hostname and self.port > 0): + raise OnionPeerConnectionError( + "Cannot disconnect without host, port info") + d = self.reconnecting_service.stopService() + d.addCallback(self.complete_disconnection) + d.addErrback(log.warn, "Failed to disconnect from peer {}.".format( + self.peer_location())) + + def complete_disconnection(self): + log.debug("Disconnected from peer: {}".format(self.peer_location())) + self.update_status(PEER_STATUS_DISCONNECTED) + self.factory = None + +class OnionDirectoryPeer(OnionPeer): + delay = 4.0 + def try_to_connect(self) -> None: + # Delay deliberately expands out to very + # long times as yg-s tend to be very long + # running bots: + self.delay *= 1.5 + if self.delay > 10000: + log.warn("Cannot connect to directory node peer: {} " + "after 20 attempts, giving up.".format(self.peer_location())) + return + try: + self.connect() + except OnionPeerConnectionError: + reactor.callLater(self.delay, self.try_to_connect) + +class OnionMessageChannel(MessageChannel): + """ Receives messages via a Torv3 hidden/onion service. + Sends messages to other nodes of the same type over Tor + via SOCKS5. + Uses one or more configured "directory nodes" + to access a list of current active nodes, and updates + dynamically from messages seen. + """ + + def __init__(self, + configdata, + daemon=None): + MessageChannel.__init__(self, daemon=daemon) + # hostid is a feature to avoid replay attacks across message channels; + # TODO investigate, but for now, treat onion-based as one "server". + self.hostid = "onion-network" + self.tor_control_host = configdata["tor_control_host"] + self.tor_control_port = int(configdata["tor_control_port"]) + self.onion_serving_host=configdata["onion_serving_host"] + self.onion_serving_port=int(configdata["onion_serving_port"]) + self.hidden_service_dir = configdata["hidden_service_dir"] + # client side config: + self.socks5_host = "127.0.0.1" + self.socks5_port = 9050 + # we use the setting in the config sent over from + # the client, to decide whether to set up our connections + # over localhost (if testing), without Tor: + set_testing_mode(configdata) + log.info("after call to testing_mode, it is: {}".format(testing_mode)) + # keep track of peers. the list will be instances + # of OnionPeer: + self.peers = set() + for dn in configdata["directory_nodes"].split(","): + # note we don't use a nick for directories: + self.peers.add(OnionDirectoryPeer.from_location_string( + self, dn, self.socks5_host, self.socks5_port, + directory=True, handshake_callback=self.handshake_as_client)) + # we can direct messages via the protocol factory, which + # will index protocol connections by peer location: + self.proto_factory = OnionLineProtocolFactory(self) + if testing_mode: + # we serve over TCP: + self.testing_serverconn = reactor.listenTCP(self.onion_serving_port, + self.proto_factory, interface="localhost") + self.onion_hostname = "127.0.0.1" + else: + self.hs = JMHiddenService(self.proto_factory, + self.info_callback, + self.setup_error_callback, + self.onion_hostname_callback, + self.tor_control_host, + self.tor_control_port, + self.onion_serving_host, + self.onion_serving_port, + shutdown_callback=self.shutdown_callback, + hidden_service_dir=self.hidden_service_dir) + # this call will start bringing up the HS; when it's finished, + # it will fire the `onion_hostname_callback`, or if it fails, + # it'll fire the `setup_error_callback`. + self.hs.start_tor() + + # This will serve as our unique identifier, indicating + # that we are ready to communicate (in both directions) over Tor. + self.onion_hostname = None + + # intended to represent the special case of 'we are the + # only directory node known', however for now dns don't interact + # so this has no role. TODO probably remove it. + self.genesis_node = False + + # waiting loop for all directories to have + # connected (note we could use a deferred but + # the rpc connection calls are not using twisted) + self.wait_for_directories_loop = None + + def info_callback(self, msg): + log.info(msg) + + def setup_error_callback(self, msg): + log.error(msg) + + def shutdown_callback(self, msg): + log.info("in shutdown callback: {}".format(msg)) + + def onion_hostname_callback(self, hostname): + """ This entrypoint marks the start of the OnionMessageChannel + running, since we need this unique identifier as our name + before we can start working (we need to compare it with the + configured directory nodes). + """ + print("hostname: ", hostname) + print("type: ", type(hostname)) + log.info("setting onion hostname to : {}".format(hostname)) + self.onion_hostname = hostname + +# ABC implementation section + def run(self) -> None: + self.hs_up_loop = task.LoopingCall(self.check_onion_hostname) + self.hs_up_loop.start(0.5) + + def get_pubmsg(self, msg:str, source_nick:str ="") -> str: + """ Converts a message into the known format for + pubmsgs; if we are not sending this (because we + are a directory, forwarding it), `source_nick` must be set. + Note that pubmsg does NOT prefix the *message* with COMMAND_PREFIX. + """ + nick = source_nick if source_nick else self.nick + return nick + COMMAND_PREFIX + "PUBLIC" + msg + + def get_privmsg(self, nick: str, cmd: str, message: str, + source_nick=None) -> None: + """ See `get_pubmsg` for comment on `source_nick`. + """ + from_nick = source_nick if source_nick else self.nick + return from_nick + COMMAND_PREFIX + nick + COMMAND_PREFIX + \ + cmd + " " + message + + def _pubmsg(self, msg:str) -> None: + """ Best effort broadcast of message `msg`: + send the message to every known directory node, + with the PUBLIC message type and nick. + """ + peerids = self.get_directory_peers() + msg = OnionCustomMessage(self.get_pubmsg(msg), + JM_MESSAGE_TYPES["pubmsg"]) + for peerid in peerids: + # currently a directory node can send its own + # pubmsgs (act as maker or taker); this will + # probably be removed but is useful in testing: + if peerid == self.self_as_peer.peer_location(): + self.receive_msg(msg, "00") + else: + self._send(self.get_peer_by_id(peerid), msg) + + def _privmsg(self, nick: str, cmd: str, msg:str) -> None: + log.debug("Privmsging to: {}, {}, {}".format(nick, cmd, msg)) + encoded_privmsg = OnionCustomMessage(self.get_privmsg(nick, cmd, msg), + JM_MESSAGE_TYPES["privmsg"]) + peerid = self.get_peerid_by_nick(nick) + if peerid: + peer = self.get_peer_by_id(peerid) + # notice the order matters here!: + if not peerid or not peer or not peer.status() == PEER_STATUS_HANDSHAKED: + # If we are trying to message a peer via their nick, we + # may not yet have a connection; then we just + # forward via directory nodes. + log.debug("Privmsg peer: {} but don't have peerid; " + "sending via directory.".format(nick)) + try: + # TODO change this to redundant or switching? + peer = self.get_connected_directory_peers()[0] + except Exception as e: + log.warn("Failed to send privmsg because no " + "directory peer is connected. Error: {}".format(repr(e))) + return + self._send(peer, encoded_privmsg) + + def _announce_orders(self, offerlist: list) -> None: + for offer in offerlist: + self._pubmsg(offer) + +# End ABC implementation section + + def check_onion_hostname(self): + if not self.onion_hostname: + return + self.hs_up_loop.stop() + # now our hidden service is up, we must check our peer status + # then set up directories. + self.get_our_peer_info() + # at this point the only peers added are directory + # nodes from config; we try to connect to all. + # We will get other peers to add to our list once they + # start sending us messages. + reactor.callLater(0.0, self.connect_to_directories) + + def get_our_peer_info(self) -> None: + """ Create a special OnionPeer object, + outside of our peerlist, to refer to ourselves. + """ + dp = self.get_directory_peers() + self_dir = False + # only for publically exposed onion does the 'virtual port' exist; + # for local tests we always connect to an actual machine port: + port_to_check = 80 if not testing_mode else self.onion_serving_port + my_location_str = self.onion_hostname + ":" + str(port_to_check) + log.info("To check if we are genesis, we compare {} with {}".format(my_location_str, dp)) + if [my_location_str] == dp: + log.info("This is the genesis node: {}".format(self.onion_hostname)) + self.genesis_node = True + self_dir = True + elif my_location_str in dp: + # Here we are just one of many directory nodes, + # which should be fine, we should just be careful + # to not query ourselves. + self_dir = True + self.self_as_peer = OnionPeer(self, self.socks5_host, self.socks5_port, + self.onion_hostname, self.onion_serving_port, + self_dir, nick=self.nick, + handshake_callback=None) + + def connect_to_directories(self) -> None: + if self.genesis_node: + # we are a directory and we have no directory peers; + # just start. + self.on_welcome(self) + return + # the remaining code is only executed by non-directories: + for p in self.peers: + log.info("Trying to connect to node: {}".format(p.peer_location())) + try: + p.connect() + except OnionPeerConnectionError: + pass + # do not trigger on_welcome event until all directories + # configured are ready: + self.on_welcome_sent = False + self.wait_for_directories_loop = task.LoopingCall( + self.wait_for_directories) + self.wait_for_directories_loop.start(10.0) + + def handshake_as_client(self, peer: OnionPeer) -> None: + assert peer.status() == PEER_STATUS_CONNECTED + if self.self_as_peer.directory: + log.debug("Not sending client handshake to {} because we are directory.".format(peer.peer_location())) + return + our_hs = copy.deepcopy(client_handshake_json) + our_hs["location-string"] = self.self_as_peer.peer_location() + our_hs["nick"] = self.nick + # We fire and forget the handshake; successful setting + # of the `is_handshaked` var in the Peer object will depend + # on a valid/success return via the custommsg hook in the plugin. + log.info("Sending this handshake: {} to peer {}".format(json.dumps(our_hs), peer.peer_location())) + self._send(peer, OnionCustomMessage(json.dumps(our_hs), + CONTROL_MESSAGE_TYPES["handshake"])) + + def handshake_as_directory(self, peer: OnionPeer, our_hs: dict) -> None: + assert peer.status() == PEER_STATUS_CONNECTED + log.info("Sending this handshake as directory: {}".format(json.dumps(our_hs))) + self._send(peer, OnionCustomMessage(json.dumps(our_hs), + CONTROL_MESSAGE_TYPES["dn-handshake"])) + + def get_directory_peers(self) -> list: + return [ p.peer_location() for p in self.peers if p.directory is True] + + def get_peerid_by_nick(self, nick:str) -> Union[OnionPeer, None]: + for p in self.get_all_connected_peers(): + if p.nick == nick: + return p.peer_location() + return None + + def _send(self, peer: OnionPeer, message: OnionCustomMessage) -> bool: + try: + return peer.send(message) + except Exception as e: + # This can happen when a peer disconnects, depending + # on the timing: + log.warn("Failed to send message to: {}, error: {}".format( + peer.peer_location(), repr(e))) + return False + + def shutdown(self): + """ TODO + """ + + def receive_msg(self, message: OnionCustomMessage, peer_location: str) -> None: + """ Messages from peers and also connection related control + messages. These messages either come via OnionPeer or via + the main OnionLineProtocolFactory instance that handles all + inbound connections. + """ + if self.self_as_peer.directory: + print("received message as directory: ", message.encode()) + peer = self.get_peer_by_id(peer_location) + if not peer: + log.warn("Received message but could not find peer: {}".format(peer_location)) + return + msgtype = message.msgtype + msgval = message.text + if msgtype in LOCAL_CONTROL_MESSAGE_TYPES.values(): + self.process_control_message(peer_location, msgtype, msgval) + # local control messages are processed first. + # TODO this is a historical artifact, we can simplify. + return + + if self.process_control_message(peer_location, msgtype, msgval): + # will return True if it is, elsewise, a control message. + return + + # ignore non-JM messages: + if not msgtype in JM_MESSAGE_TYPES.values(): + log.debug("Invalid message type, ignoring: {}".format(msgtype)) + return + + # real JM message; should be: from_nick, to_nick, cmd, message + try: + nicks_msgs = msgval.split(COMMAND_PREFIX) + from_nick, to_nick = nicks_msgs[:2] + msg = COMMAND_PREFIX + COMMAND_PREFIX.join(nicks_msgs[2:]) + if to_nick == "PUBLIC": + #log.debug("A pubmsg is being processed by {} from {}; it " + # "is {}".format(self.self_as_peer.nick, from_nick, msg)) + self.on_pubmsg(from_nick, msg) + if self.self_as_peer.directory: + self.forward_pubmsg_to_peers(msg, from_nick) + elif to_nick != self.nick: + if not self.self_as_peer.directory: + log.debug("Ignoring message, not for us: {}".format(msg)) + else: + self.forward_privmsg_to_peer(to_nick, msg, from_nick) + else: + self.on_privmsg(from_nick, msg) + except Exception as e: + log.debug("Invalid joinmarket message: {}, error was: {}".format( + msgval, repr(e))) + return + + def forward_pubmsg_to_peers(self, msg: str, from_nick: str) -> None: + """ Used by directory nodes currently. Takes a received + message that was PUBLIC and broadcasts it to the non-directory + peers. + """ + assert self.self_as_peer.directory + pubmsg = self.get_pubmsg(msg, source_nick=from_nick) + msgtype = JM_MESSAGE_TYPES["pubmsg"] + # NOTE!: Specifically with forwarding/broadcasting, + # we introduce the danger of infinite re-broadcast, + # if there is more than one party forwarding. + # For now we are having directory nodes not talk to + # each other (i.e. they are *required* to only configure + # themselves, not other dns). But this could happen by + # accident. + encoded_msg = OnionCustomMessage(pubmsg, msgtype) + for peer in self.get_connected_nondirectory_peers(): + # don't loop back to the sender: + if peer.nick == from_nick: + continue + log.debug("Sending {}:{} to nondir peer {}".format( + msgtype, pubmsg, peer.peer_location())) + self._send(peer, encoded_msg) + + def forward_privmsg_to_peer(self, nick: str, message: str, + from_nick: str) -> None: + assert self.self_as_peer.directory + peerid = self.get_peerid_by_nick(nick) + if not peerid: + log.debug("We were asked to send a message from {} to {}, " + "but {} is not connected.".format(from_nick, nick, nick)) + return + # The `message` passed in has format COMMAND_PREFIX||command||" "||msg + # we need to parse out cmd, message for sending. + _, cmdmsg = message.split(COMMAND_PREFIX) + cmdmsglist = cmdmsg.split(" ") + cmd = cmdmsglist[0] + msg = " ".join(cmdmsglist[1:]) + privmsg = self.get_privmsg(nick, cmd, msg, source_nick=from_nick) + #log.debug("Sending out privmsg: {} to peer: {}".format(privmsg, peerid)) + encoded_msg = OnionCustomMessage(privmsg, + JM_MESSAGE_TYPES["privmsg"]) + self._send(self.get_peer_by_id(peerid), encoded_msg) + # If possible, we forward the from-nick's network location + # to the to-nick peer, so they can just talk directly next time. + peerid_from = self.get_peerid_by_nick(from_nick) + if not peerid_from: + return + peer_to = self.get_peer_by_id(peerid) + self.send_peers(peer_to, peerid_filter=[peerid_from]) + + def process_control_message(self, peerid: str, msgtype: int, + msgval: str) -> bool: + """ Triggered by a directory node feeding us + peers, or by a connect/disconnect hook; this is our housekeeping + to try to create, and keep track of, useful connections. + """ + all_ctrl = list(LOCAL_CONTROL_MESSAGE_TYPES.values( + )) + list(CONTROL_MESSAGE_TYPES.values()) + if msgtype not in all_ctrl: + return False + # this is too noisy, but TODO, investigate allowing + # some kind of control message monitoring e.g. default-off + # log-to-file (we don't currently have a 'TRACE' level debug). + #log.debug("received control message: {},{}".format(msgtype, msgval)) + if msgtype == CONTROL_MESSAGE_TYPES["peerlist"]: + # This is the base method of seeding connections; + # a directory node can send this any time. We may well + # need to control this; for now it just gets processed, + # whereever it came from: + try: + peerlist = msgval.split(",") + for peer in peerlist: + # defaults mean we just add the peer, not + # add or alter its connection status: + self.add_peer(peer, with_nick=True) + except Exception as e: + log.debug("Incorrectly formatted peer list: {}, " + "ignoring, {}".format(msgval, e)) + # returning True either way, because although it was an + # invalid message, it *was* a control message, and should + # not be processed as something else. + return True + elif msgtype == CONTROL_MESSAGE_TYPES["getpeerlist"]: + # getpeerlist must be accompanied by a full node + # locator, and nick; + # add that peer before returning our peer list. + p = self.add_peer(msgval, connection=True, + overwrite_connection=True, with_nick=True) + try: + self.send_peers(p) + except OnionPeerConnectionError: + pass + # comment much as above; if we can't connect, it's none + # of our business. + return True + elif msgtype == CONTROL_MESSAGE_TYPES["handshake"]: + # sent by non-directory peers on startup + self.process_handshake(peerid, msgval) + return True + elif msgtype == CONTROL_MESSAGE_TYPES["dn-handshake"]: + self.process_handshake(peerid, msgval, dn=True) + return True + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["connect"]: + self.add_peer(msgval, connection=True, + overwrite_connection=True) + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["connect-in"]: + self.add_peer(msgval, connection=True, + overwrite_connection=True) + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["disconnect"]: + log.debug("We got a disconnect event: {}".format(msgval)) + if msgval in [x.peer_location() for x in self.get_connected_directory_peers()]: + # we need to use the full peer locator string, so that + # add_peer knows it can try to reconnect: + msgval = self.get_peer_by_id(msgval).peer_location() + self.add_peer(msgval, connection=False, + overwrite_connection=True) + else: + assert False + # If we got here it is *not* a non-local control message; + # so we must process it as a Joinmarket message. + return False + + + def process_handshake(self, peerid: str, message: str, + dn: bool=False) -> None: + peer = self.get_peer_by_id(peerid) + if not peer: + # rando sent us a handshake? + log.warn("Unexpected handshake from unknown peer: {}, " + "ignoring.".format(peerid)) + return + assert isinstance(peer, OnionPeer) + if not peer.status() == PEER_STATUS_CONNECTED: + # we were not waiting for it: + log.warn("Unexpected handshake from peer: {}, " + "ignoring. Peer's current status is: {}".format( + peerid, peer.status())) + return + if dn: + print("We, {}, are processing a handshake with dn {} from peer {}".format(self.self_as_peer.peer_location(), dn, peerid)) + # it means, we are a non-dn and we are expecting + # a returned `dn-handshake` message: + # (currently dns don't talk to other dns): + assert not self.self_as_peer.directory + if not peer.directory: + # got dn-handshake from non-dn: + log.warn("Unexpected dn-handshake from non-dn " + "node: {}, ignoring.".format(peerid)) + return + # we got the right message from the right peer; + # check it is formatted correctly and represents + # acceptance of the connection + try: + handshake_json = json.loads(message) + app_name = handshake_json["app-name"] + is_directory = handshake_json["directory"] + proto_min = handshake_json["proto-ver-min"] + proto_max = handshake_json["proto-ver-max"] + features = handshake_json["features"] + accepted = handshake_json["accepted"] + nick = handshake_json["nick"] + assert isinstance(proto_max, int) + assert isinstance(proto_min, int) + assert isinstance(features, dict) + assert isinstance(nick, str) + except Exception as e: + log.warn("Invalid handshake message from: {}, exception: {}, message: {}," + "ignoring".format(peerid, repr(e), message)) + return + # currently we are not using any features, but the intention + # is forwards compatibility, so we don't check its contents + # at all. + if not accepted: + log.warn("Directory: {} rejected our handshake.".format(peerid)) + return + if not (app_name == JM_APP_NAME and is_directory and JM_VERSION \ + <= proto_max and JM_VERSION >= proto_min and accepted): + log.warn("Handshake from directory is incompatible or " + "rejected: {}".format(handshake_json)) + return + # We received a valid, accepting dn-handshake. Update the peer. + peer.update_status(PEER_STATUS_HANDSHAKED) + peer.set_nick(nick) + else: + print("We, {}, are processing a handshake with dn {} from peer {}".format(self.self_as_peer.peer_location(), dn, peerid)) + # it means, we are receiving an initial handshake + # message from a 'client' (non-dn) peer. + # dns don't talk to each other: + assert not peer.directory + accepted = True + try: + handshake_json = json.loads(message) + app_name = handshake_json["app-name"] + is_directory = handshake_json["directory"] + proto_ver = handshake_json["proto-ver"] + features = handshake_json["features"] + full_location_string = handshake_json["location-string"] + nick = handshake_json["nick"] + assert isinstance(proto_ver, int) + assert isinstance(features, dict) + assert isinstance(nick, str) + except Exception as e: + log.warn("(not dn) Invalid handshake message from: {}, exception: {}, message: {}," + "ignoring".format(peerid, repr(e), message)) + accepted = False + if not (app_name == JM_APP_NAME and proto_ver == JM_VERSION \ + and not is_directory): + log.warn("Invalid handshake name/version data: {}, from peer: " + "{}, rejecting.".format(message, peerid)) + accepted = False + # If accepted, we should update the peer to have the full + # location which in general will not yet be present, so as to + # allow publishing their location via `getpeerlist`: + if not peer.set_location(full_location_string): + accepted = False + if not peerid == full_location_string: + print("we are reading a handshake from location {} but they sent" + "us full location string {}, setting an alternate".format( + peerid, full_location_string)) + peer.set_alternate_location(peerid) + peer.set_nick(nick) + # client peer's handshake message was valid; send ours, and + # then mark this peer as successfully handshaked: + our_hs = copy.deepcopy(server_handshake_json) + our_hs["nick"] = self.nick + our_hs["accepted"] = accepted + if self.self_as_peer.directory: + self.handshake_as_directory(peer, our_hs) + if accepted: + peer.update_status(PEER_STATUS_HANDSHAKED) + + def get_peer_by_id(self, p: str) -> Union[OnionPeer, bool]: + """ Returns the OnionPeer with peer location p, + if it is in self.peers, otherwise returns False. + """ + if p == "00": + return self.self_as_peer + for x in self.peers: + if x.peer_location() == p: + return x + if x.alternate_location == p: + return x + return False + + def register_connection(self, peer_location: str, direction: int) -> None: + """ We send ourselves a local control message indicating + the new connection. + If the connection is inbound, direction == 0, else 1. + """ + assert direction in range(2) + if direction == 1: + msgtype = LOCAL_CONTROL_MESSAGE_TYPES["connect"] + else: + msgtype = LOCAL_CONTROL_MESSAGE_TYPES["connect-in"] + msg = OnionCustomMessage(peer_location, msgtype) + self.receive_msg(msg, "00") + + def register_disconnection(self, peer_location: str) -> None: + """ We send ourselves a local control message indicating + the disconnection. + """ + msg = OnionCustomMessage(peer_location, + LOCAL_CONTROL_MESSAGE_TYPES["disconnect"]) + self.receive_msg(msg, "00") + + def add_peer(self, peerdata: str, connection: bool=False, + overwrite_connection: bool=False, with_nick=False) -> None: + """ add non-directory peer from (nick, peer) serialization `peerdata`, + where "peer" is host:port; + return the created OnionPeer object. Or, with_nick=False means + that `peerdata` has only the peer location. + If the peer is already in our peerlist it can be updated in + one of these ways: + * the nick can be added + * it can be marked as 'connected' if it was previously unconnected, + with this conditional on whether the flag `overwrite_connection` is + set. Note that this peer removal, unlike the peer addition above, + can also occur for directory nodes, if we lose connection (and then + we persistently try to reconnect; see OnionDirectoryPeer). + """ + if with_nick: + try: + nick, peer = peerdata.split(NICK_PEERLOCATOR_SEPARATOR) + except Exception as e: + # TODO: as of now, this is not an error, but expected. + # Don't log? Do something else? + log.debug("Received invalid peer identifier string: {}, {}".format( + peerdata, e)) + return + else: + peer = peerdata + + # assumed that it's passing a full string + try: + temp_p = OnionPeer.from_location_string(self, peer, + self.socks5_host, self.socks5_port, + handshake_callback=self.handshake_as_client) + except Exception as e: + # There are currently a few ways the location + # parsing and Peer object construction can fail; + # TODO specify exception types. + log.warn("Failed to add peer: {}, exception: {}".format(peer, repr(e))) + return + if not self.get_peer_by_id(temp_p.peer_location()): + if connection: + log.info("Updating status of peer: {} to connected.".format(temp_p.peer_location())) + temp_p.update_status(PEER_STATUS_CONNECTED) + else: + temp_p.update_status(PEER_STATUS_DISCONNECTED) + if with_nick: + temp_p.set_nick(nick) + self.peers.add(temp_p) + if not connection: + # Here, we are not currently connected. We + # try to connect asynchronously. We don't pay attention + # to any return. This attempt is one-shot and opportunistic, + # for non-dns, but will retry with exp-backoff for dns. + # Notice this is only possible for non-dns to other non-dns, + # since dns will never reach this point without an active + # connection. + reactor.callLater(0.0, temp_p.try_to_connect) + return temp_p + else: + p = self.get_peer_by_id(temp_p.peer_location()) + if overwrite_connection: + if connection: + log.info("Updating status to connected for peer {}.".format(temp_p.peer_location())) + p.update_status(PEER_STATUS_CONNECTED) + else: + p.update_status(PEER_STATUS_DISCONNECTED) + if with_nick: + p.set_nick(nick) + return p + + def get_all_connected_peers(self) -> list: + return self.get_connected_directory_peers() + \ + self.get_connected_nondirectory_peers() + + def get_connected_directory_peers(self) -> list: + return [p for p in self.peers if p.directory and p.status() == \ + PEER_STATUS_HANDSHAKED] + + def get_connected_nondirectory_peers(self) -> list: + return [p for p in self.peers if (not p.directory) and p.status() == \ + PEER_STATUS_HANDSHAKED] + + def wait_for_directories(self) -> None: + # Notice this is checking for *handshaked* dps; + # the handshake will have been initiated once a + # connection was seen: + log.warn("in the wait for directories loop, this is the connected dps: {}".format(self.get_connected_directory_peers())) + if len(self.get_connected_directory_peers()) == 0: + return + # This is what triggers the start of taker/maker workflows. + if not self.on_welcome_sent: + self.on_welcome(self) + self.on_welcome_sent = True + self.wait_for_directories_loop.stop() + + """ CONTROL MESSAGES SENT BY US + """ + def send_peers(self, requesting_peer: OnionPeer, + peerid_filter: list=[]) -> None: + """ This message is sent by directory peers on request + by non-directory peers. + If peerid_filter is specified, only peers whose peerid is in + this list will be sent. (TODO this is inefficient). + The peerlist message should have this format: + (1) entries comma separated + (2) each entry is serialized nick then the NICK_PEERLOCATOR_SEPARATOR + then *either* 66 char hex peerid, *or* peerid@host:port + (3) However this message might be long enough to exceed a 1300 byte limit, + if we don't use a filter, so we may need to split it into multiple + messages (TODO). + """ + if not requesting_peer.status() == PEER_STATUS_HANDSHAKED: + raise OnionPeerConnectionError( + "Cannot send peer list to unhandshaked peer") + peerlist = set() + for p in self.get_connected_nondirectory_peers(): + # don't send a peer to itself + if p.peer_location() == requesting_peer.peer_location(): + continue + if len(peerid_filter) > 0 and p.peer_location() not in peerid_filter: + continue + if not p.status() == PEER_STATUS_HANDSHAKED: + # don't advertise what is not online. + continue + # peers that haven't sent their nick yet are not + # privmsg-reachable; don't send them + if p.nick == "": + continue + peerlist.add(p.get_nick_peerlocation_ser()) + # For testing: dns won't usually participate: + peerlist.add(self.self_as_peer.get_nick_peerlocation_ser()) + self._send(requesting_peer, OnionCustomMessage(",".join( + peerlist), CONTROL_MESSAGE_TYPES["peerlist"])) diff --git a/jmdaemon/test/test_daemon_protocol.py b/jmdaemon/test/test_daemon_protocol.py index 71beba734..f9dbf390e 100644 --- a/jmdaemon/test/test_daemon_protocol.py +++ b/jmdaemon/test/test_daemon_protocol.py @@ -7,7 +7,7 @@ from jmdaemon.protocol import NICK_HASH_LENGTH, NICK_MAX_ENCODED, JM_VERSION,\ JOINMARKET_NICK_HEADER from jmbase import get_log -from jmclient import (load_test_config, jm_single, get_irc_mchannels) +from jmclient import (load_test_config, jm_single, get_mchannels) from twisted.python.log import msg as tmsg from twisted.python.log import startLogging from twisted.internet import protocol, reactor, task @@ -59,7 +59,7 @@ def connectionMade(self): def clientStart(self): self.sigs_received = 0 - irc = get_irc_mchannels() + irc = [get_mchannels()[0]] d = self.callRemote(JMInit, bcsource="dummyblockchain", network="dummynetwork", diff --git a/jmdaemon/test/test_irc_messaging.py b/jmdaemon/test/test_irc_messaging.py index 755a20c69..0e9812fd7 100644 --- a/jmdaemon/test/test_irc_messaging.py +++ b/jmdaemon/test/test_irc_messaging.py @@ -6,7 +6,7 @@ from twisted.internet import reactor, task from jmdaemon import IRCMessageChannel, MessageChannelCollection #needed for test framework -from jmclient import (load_test_config, get_irc_mchannels, jm_single) +from jmclient import (load_test_config, get_mchannels, jm_single) si = 1 class DummyDaemon(object): @@ -95,7 +95,7 @@ def junk_fill(mc): def getmc(nick): dm = DummyDaemon() - mc = DummyMC(get_irc_mchannels()[0], nick, dm) + mc = DummyMC(get_mchannels()[0], nick, dm) mc.register_orderbookwatch_callbacks(on_order_seen=on_order_seen) mc.register_taker_callbacks(on_pubkey=on_pubkey) mc.on_connect = on_connect @@ -108,7 +108,7 @@ class TrialIRC(unittest.TestCase): def setUp(self): load_test_config() - print(get_irc_mchannels()[0]) + print(get_mchannels()[0]) jm_single().maker_timeout_sec = 1 dm, mc, mcc = getmc("irc_publisher") dm2, mc2, mcc2 = getmc("irc_receiver") diff --git a/jmdaemon/test/test_orderbookwatch.py b/jmdaemon/test/test_orderbookwatch.py index 39d4de791..17797a635 100644 --- a/jmdaemon/test/test_orderbookwatch.py +++ b/jmdaemon/test/test_orderbookwatch.py @@ -2,7 +2,7 @@ from jmdaemon.orderbookwatch import OrderbookWatch from jmdaemon import IRCMessageChannel, fidelity_bond_cmd_list -from jmclient import get_irc_mchannels, load_test_config +from jmclient import get_mchannels, load_test_config from jmdaemon.protocol import JM_VERSION, ORDER_KEYS from jmbase.support import hextobin from jmclient.fidelity_bond import FidelityBondProof @@ -24,7 +24,7 @@ def on_welcome(x): def get_ob(): load_test_config() dm = DummyDaemon() - mc = DummyMC(get_irc_mchannels()[0], "test", dm) + mc = DummyMC(get_mchannels()[0], "test", dm) ob = OrderbookWatch() ob.on_welcome = on_welcome ob.set_msgchan(mc) diff --git a/scripts/obwatch/ob-watcher.py b/scripts/obwatch/ob-watcher.py index 661eda94d..aa18f321b 100755 --- a/scripts/obwatch/ob-watcher.py +++ b/scripts/obwatch/ob-watcher.py @@ -44,7 +44,7 @@ import matplotlib.pyplot as plt from jmclient import jm_single, load_program_config, calc_cj_fee, \ - get_irc_mchannels, add_base_options + get_mchannels, add_base_options from jmdaemon import OrderbookWatch, MessageChannelCollection, IRCMessageChannel #TODO this is only for base58, find a solution for a client without jmbitcoin import jmbitcoin as btc @@ -805,7 +805,7 @@ def main(): load_program_config(config_path=options.datadir) check_and_start_tor() hostport = (options.host, options.port) - mcs = [ObIRCMessageChannel(c) for c in get_irc_mchannels()] + mcs = [ObIRCMessageChannel(c) for c in get_mchannels()] mcc = MessageChannelCollection(mcs) mcc.set_nick(get_dummy_nick()) taker = ObBasic(mcc, hostport) diff --git a/test/e2e-coinjoin-test.py b/test/e2e-coinjoin-test.py new file mode 100644 index 000000000..600d6ecd5 --- /dev/null +++ b/test/e2e-coinjoin-test.py @@ -0,0 +1,364 @@ +#! /usr/bin/env python +'''Creates wallets and yield generators in regtest, + then runs both them and a JMWalletDaemon instance + for the taker, injecting the newly created taker + wallet into it and running sendpayment once. + Number of ygs is configured in the joinmarket.cfg + with `regtest-count` in the `ln-onion` type MESSAGING + section. + See notes below for more detail on config. + Run it like: + pytest \ + --btcroot=/path/to/bitcoin/bin/ \ + --btcpwd=123456abcdef --btcconf=/blah/bitcoin.conf \ + -s test/ln-ygrunner.py + ''' +from twisted.internet import reactor, defer +from twisted.web.client import readBody, Headers +from common import make_wallets +import pytest +import random +import json +from datetime import datetime +from jmbase import (get_nontor_agent, BytesProducer, jmprint, + get_log, stop_reactor, hextobin, bintohex) +from jmclient import (YieldGeneratorBasic, load_test_config, jm_single, + JMClientProtocolFactory, start_reactor, SegwitWallet, get_mchannels, + SegwitLegacyWallet, JMWalletDaemon) +from jmclient.wallet_utils import wallet_gettimelockaddress +from jmclient.wallet_rpc import api_version_string + +log = get_log() + +# For quicker testing, restrict the range of timelock +# addresses to avoid slow load of multiple bots. +# Note: no need to revert this change as ygrunner runs +# in isolation. +from jmclient import FidelityBondMixin +FidelityBondMixin.TIMELOCK_ERA_YEARS = 2 +FidelityBondMixin.TIMELOCK_EPOCH_YEAR = datetime.now().year +FidelityBondMixin.TIMENUMBERS_PER_PUBKEY = 12 + +wallet_name = "test-onion-yg-runner.jmdat" + +mean_amt = 2.0 + +directory_node_indices = [1] + +# +def get_onion_messaging_config_regtest(run_num: int, dns=[1], hsd=""): + """ Sets a onion messaging channel section for a regtest instance + indexed by `run_num`. The indices to be used as directory nodes + should be passed as `dns`, as a list of ints. + """ + def location_string(directory_node_run_num): + return "127.0.0.1:" + str( + 8080 + directory_node_run_num) + if run_num in dns: + # means *we* are a dn, and dns currently + # do not use other dns: + dns_to_use = [location_string(run_num)] + else: + dns_to_use = [location_string(a) for a in dns] + dn_nodes_list = ",".join(dns_to_use) + log.info("For node: {}, set dn list to: {}".format(run_num, dn_nodes_list)) + cf = {"type": "onion", + "socks5_host": "127.0.0.1", + "socks5_port": 9050, + "tor_control_host": "127.0.0.1", + "tor_control_port": 9051, + "onion_serving_host": "127.0.0.1", + "onion_serving_port": 8080 + run_num, + "hidden_service_dir": "", + "directory_nodes": dn_nodes_list, + "regtest_count": "1, 1"} + if run_num in dns: + # only directories need to use fixed hidden service directories: + cf["hidden_service_dir"] = hsd + return cf + + +class RegtestJMClientProtocolFactory(JMClientProtocolFactory): + i = 1 + def set_directory_nodes(self, dns): + # a list of integers representing the directory nodes + # for this test: + self.dns = dns + + def get_mchannels(self): + # swaps out any existing lightning configs + # in the config settings on startup, for one + # that's indexed to the regtest counter var: + default_chans = get_mchannels() + new_chans = [] + onion_found = False + hsd = "" + for c in default_chans: + if "type" in c and c["type"] == "onion": + onion_found = True + if c["hidden_service_dir"] != "": + hsd = c["hidden_service_dir"] + continue + else: + new_chans.append(c) + if onion_found: + new_chans.append(get_onion_messaging_config_regtest( + self.i, self.dns, hsd)) + return new_chans + +class JMWalletDaemonT(JMWalletDaemon): + def check_cookie(self, request): + if self.auth_disabled: + return True + return super().check_cookie(request) + +class TWalletRPCManager(object): + """ Base class for set up of tests of the + Wallet RPC calls using the wallet_rpc.JMWalletDaemon service. + """ + # the port for the jmwallet daemon + dport = 28183 + # the port for the ws + wss_port = 28283 + + def __init__(self): + # a client connnection object which is often but not always + # instantiated: + self.client_connector = None + self.daemon = JMWalletDaemonT(self.dport, self.wss_port, tls=False) + self.daemon.auth_disabled = True + # because we sync and start the wallet service manually here + # (and don't use wallet files yet), we won't have set a wallet name, + # so we set it here: + self.daemon.wallet_name = wallet_name + + def start(self): + r, s = self.daemon.startService() + self.listener_rpc = r + self.listener_ws = s + + def get_route_root(self): + addr = "http://127.0.0.1:" + str(self.dport) + addr += api_version_string + return addr + + def stop(self): + for dc in reactor.getDelayedCalls(): + dc.cancel() + d1 = defer.maybeDeferred(self.listener_ws.stopListening) + d2 = defer.maybeDeferred(self.listener_rpc.stopListening) + if self.client_connector: + self.client_connector.disconnect() + # only fire if everything is finished: + return defer.gatherResults([d1, d2]) + + @defer.inlineCallbacks + def do_request(self, agent, method, addr, body, handler, token=None): + if token: + headers = Headers({"Authorization": ["Bearer " + self.jwt_token]}) + else: + headers = None + response = yield agent.request(method, addr, headers, bodyProducer=body) + yield self.response_handler(response, handler) + + @defer.inlineCallbacks + def response_handler(self, response, handler): + body = yield readBody(response) + # these responses should always be 200 OK. + #assert response.code == 200 + # handlers check the body is as expected; no return. + yield handler(body) + return True + +def test_start_yg_and_taker_setup(setup_onion_ygrunner): + """Set up some wallets, for the ygs and 1 taker. + Then start LN and the ygs in the background, then fire + a startup of a wallet daemon for the taker who then + makes a coinjoin payment. + """ + if jm_single().config.get("POLICY", "native") == "true": + walletclass = SegwitWallet + else: + # TODO add Legacy + walletclass = SegwitLegacyWallet + + start_bot_num, end_bot_num = [int(x) for x in jm_single().config.get( + "MESSAGING:onion1", "regtest_count").split(",")] + num_ygs = end_bot_num - start_bot_num + # specify the number of wallets and bots of each type: + wallet_services = make_wallets(num_ygs + 1, + wallet_structures=[[1, 3, 0, 0, 0]] * (num_ygs + 1), + mean_amt=2.0, + walletclass=walletclass) + #the sendpayment bot uses the last wallet in the list + wallet_service = wallet_services[end_bot_num - 1]['wallet'] + jmprint("\n\nTaker wallet seed : " + wallet_services[end_bot_num - 1]['seed']) + # for manual audit if necessary, show the maker's wallet seeds + # also (note this audit should be automated in future, see + # test_full_coinjoin.py in this directory) + jmprint("\n\nMaker wallet seeds: ") + for i in range(start_bot_num, end_bot_num): + jmprint("Maker seed: " + wallet_services[i - 1]['seed']) + jmprint("\n") + wallet_service.sync_wallet(fast=True) + ygclass = YieldGeneratorBasic + + # As per previous note, override non-default command line settings: + options = {} + for x in ["ordertype", "txfee_contribution", "txfee_contribution_factor", + "cjfee_a", "cjfee_r", "cjfee_factor", "minsize", "size_factor"]: + options[x] = jm_single().config.get("YIELDGENERATOR", x) + ordertype = options["ordertype"] + txfee_contribution = int(options["txfee_contribution"]) + txfee_contribution_factor = float(options["txfee_contribution_factor"]) + cjfee_factor = float(options["cjfee_factor"]) + size_factor = float(options["size_factor"]) + if ordertype == 'reloffer': + cjfee_r = options["cjfee_r"] + # minimum size is such that you always net profit at least 20% + #of the miner fee + minsize = max(int(1.2 * txfee_contribution / float(cjfee_r)), + int(options["minsize"])) + cjfee_a = None + elif ordertype == 'absoffer': + cjfee_a = int(options["cjfee_a"]) + minsize = int(options["minsize"]) + cjfee_r = None + else: + assert False, "incorrect offertype config for yieldgenerator." + + txtype = wallet_service.get_txtype() + if txtype == "p2wpkh": + prefix = "sw0" + elif txtype == "p2sh-p2wpkh": + prefix = "sw" + elif txtype == "p2pkh": + prefix = "" + else: + assert False, "Unsupported wallet type for yieldgenerator: " + txtype + + ordertype = prefix + ordertype + + for i in range(start_bot_num, end_bot_num): + cfg = [txfee_contribution, cjfee_a, cjfee_r, ordertype, minsize, + txfee_contribution_factor, cjfee_factor, size_factor] + wallet_service_yg = wallet_services[i - 1]["wallet"] + + wallet_service_yg.startService() + + yg = ygclass(wallet_service_yg, cfg) + clientfactory = RegtestJMClientProtocolFactory(yg, proto_type="MAKER") + # This ensures that the right rpc/port config is passed into the daemon, + # for this specific bot: + clientfactory.i = i + # This ensures that this bot knows which other bots are directory nodes: + clientfactory.set_directory_nodes(directory_node_indices) + nodaemon = jm_single().config.getint("DAEMON", "no_daemon") + daemon = True if nodaemon == 1 else False + #rs = True if i == num_ygs - 1 else False + start_reactor(jm_single().config.get("DAEMON", "daemon_host"), + jm_single().config.getint("DAEMON", "daemon_port"), + clientfactory, daemon=daemon, rs=False) + reactor.callLater(1.0, start_test_taker, wallet_services[end_bot_num - 1]['wallet'], end_bot_num) + reactor.run() + +@defer.inlineCallbacks +def start_test_taker(wallet_service, i): + # this rpc manager has auth disabled, + # and the wallet_service is set manually, + # so no unlock etc. + mgr = TWalletRPCManager() + mgr.daemon.wallet_service = wallet_service + # because we are manually setting the wallet_service + # of the JMWalletDaemon instance, we do not follow the + # usual flow of `initialize_wallet_service`, we do not set + # the auth token or start the websocket; so we must manually + # sync the wallet, including bypassing any restart callback: + def dummy_restart_callback(msg): + log.warn("Ignoring rescan request from backend wallet service: " + msg) + mgr.daemon.wallet_service.add_restart_callback(dummy_restart_callback) + mgr.daemon.wallet_name = wallet_name + while not mgr.daemon.wallet_service.synced: + mgr.daemon.wallet_service.sync_wallet(fast=True) + mgr.daemon.wallet_service.startService() + def get_client_factory(): + clientfactory = RegtestJMClientProtocolFactory(mgr.daemon.taker, + proto_type="TAKER") + clientfactory.i = i + clientfactory.set_directory_nodes(directory_node_indices) + return clientfactory + + mgr.daemon.get_client_factory = get_client_factory + # before preparing the RPC call to the wallet daemon, + # we decide a coinjoin destination and amount. Choosing + # a destination in the wallet is a bit easier because + # we can query the mixdepth balance at the end. + coinjoin_destination = mgr.daemon.wallet_service.get_internal_addr(4) + cj_amount = 22000000 + # once the taker is finished we sanity check before + # shutting down: + def dummy_taker_finished(res, fromtx=False, + waittime=0.0, txdetails=None): + jmprint("Taker is finished") + # check that the funds have arrived. + mbal = mgr.daemon.wallet_service.get_balance_by_mixdepth()[4] + assert mbal == cj_amount + jmprint("Funds: {} sats successfully arrived into mixdepth 4.".format(cj_amount)) + stop_reactor() + mgr.daemon.taker_finished = dummy_taker_finished + mgr.start() + agent = get_nontor_agent() + addr = mgr.get_route_root() + addr += "/wallet/" + addr += mgr.daemon.wallet_name + addr += "/taker/coinjoin" + addr = addr.encode() + body = BytesProducer(json.dumps({"mixdepth": "1", + "amount_sats": cj_amount, + "counterparties": "2", + "destination": coinjoin_destination}).encode()) + yield mgr.do_request(agent, b"POST", addr, body, + process_coinjoin_response) + +def process_coinjoin_response(response): + json_body = json.loads(response.decode("utf-8")) + print("coinjoin response: {}".format(json_body)) + +def get_addr_and_fund(yg): + """ This function allows us to create + and publish a fidelity bond for a particular + yield generator object after the wallet has reached + a synced state and is therefore ready to serve up + timelock addresses. We create the TL address, fund it, + refresh the wallet and then republish our offers, which + will also publish the new FB. + """ + if not yg.wallet_service.synced: + return + if yg.wallet_service.timelock_funded: + return + addr = wallet_gettimelockaddress(yg.wallet_service.wallet, "2021-11") + print("Got timelockaddress: {}".format(addr)) + + # pay into it; amount is randomized for now. + # Note that grab_coins already mines 1 block. + fb_amt = random.randint(1, 5) + jm_single().bc_interface.grab_coins(addr, fb_amt) + + # we no longer have to run this loop (TODO kill with nonlocal) + yg.wallet_service.timelock_funded = True + + # force wallet to check for the new coins so the new + # yg offers will include them: + yg.wallet_service.transaction_monitor() + + # publish a new offer: + yg.offerlist = yg.create_my_orders() + yg.fidelity_bond = yg.get_fidelity_bond_template() + jmprint('updated offerlist={}'.format(yg.offerlist)) + +@pytest.fixture(scope="module") +def setup_onion_ygrunner(): + load_test_config() + jm_single().bc_interface.tick_forward_chain_interval = 10 + jm_single().bc_interface.simulate_blocks() diff --git a/test/regtest_joinmarket.cfg b/test/regtest_joinmarket.cfg index 4d3c211cf..3345e29ff 100644 --- a/test/regtest_joinmarket.cfg +++ b/test/regtest_joinmarket.cfg @@ -16,6 +16,7 @@ network = testnet rpc_wallet_file = jm-test-wallet [MESSAGING:server1] +type = irc host = localhost hostid = localhost1 channel = joinmarket-pit @@ -26,6 +27,7 @@ socks5_host = localhost socks5_port = 9150 [MESSAGING:server2] +type = irc host = localhost hostid = localhost2 channel = joinmarket-pit @@ -35,8 +37,46 @@ socks5 = false socks5_host = localhost socks5_port = 9150 +[MESSAGING:onion1] +# onion based message channels must have the exact type 'onion' +# (while the section name above can be MESSAGING:whatever), and there must +# be only ONE such message channel configured (note the directory servers +# can be multiple, below): +type = onion +socks5_host = localhost +socks5_port = 9050 +# the tor control configuration: +tor_control_host = localhost +# or, to use a UNIX socket +# control_host = unix:/var/run/tor/control +tor_control_port = 9051 +# the host/port actually serving the hidden service +# (note the *virtual port*, that the client uses, +# is hardcoded to 80): +onion_serving_host = 127.0.0.1 +onion_serving_port = 8080 +# This is mandatory for directory nodes (who must also set their +# own .onion:port as the only directory in directory_nodes, below), +# but NOT TO BE USED by non-directory nodes (which is you, unless +# you know otherwise!), as it will greatly degrade your privacy. +# +# Special handling on regtest, so just ignore and let the code handle it: +hidden_service_dir = "" +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port +# On regtest we are going to increment the port numbers served from, with +# the value used here as the starting value: +directory_nodes = localhost:8081 +# this is not present in default real config +# and is specifically used to flag tests: +# means we use indices 1,2,3,4,5: +regtest_count=1,5 + [TIMEOUT] -maker_timeout_sec = 15 +maker_timeout_sec = 10 + +[LOGGING] +console_log_level = DEBUG [POLICY] # for dust sweeping, try merge_algorithm = gradual diff --git a/test/ygrunner.py b/test/ygrunner.py index 88ef65b97..d657179d5 100644 --- a/test/ygrunner.py +++ b/test/ygrunner.py @@ -96,7 +96,7 @@ def on_tx_received(self, nick, tx, offerinfo): "num_ygs, wallet_structures, fb_indices, mean_amt, malicious, deterministic", [ # 1sp 3yg, honest makers, one maker has FB: - (3, [[1, 3, 0, 0, 0]] * 4, [1, 2], 2, 0, False), + (3, [[1, 3, 0, 0, 0]] * 4, [], 2, 0, False), # 1sp 3yg, malicious makers reject on auth and on tx 30% of time #(3, [[1, 3, 0, 0, 0]] * 4, 2, 30, False), # 1 sp 9 ygs, deterministically malicious 50% of time @@ -173,6 +173,7 @@ def test_start_ygs(setup_ygrunner, num_ygs, wallet_structures, fb_indices, ygclass = DeterministicMaliciousYieldGenerator else: ygclass = MaliciousYieldGenerator + for i in range(num_ygs): cfg = [txfee_contribution, cjfee_a, cjfee_r, ordertype, minsize, txfee_contribution_factor, cjfee_factor, size_factor]