From e4987a8aeb644e404e13f4e53f75043ddcecd5b9 Mon Sep 17 00:00:00 2001 From: Recolic K Date: Tue, 26 Jul 2022 18:08:25 +0800 Subject: [PATCH 1/5] init --- vmess2json.py | 235 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 149 insertions(+), 86 deletions(-) diff --git a/vmess2json.py b/vmess2json.py index 8063f85..02f68db 100755 --- a/vmess2json.py +++ b/vmess2json.py @@ -1,4 +1,6 @@ #!/usr/bin/env python3 +# recolic modify: 1. remove DNS and routing (assuming DNS server and bypassing china ip, might cause confusion) +# 2. adding vless logic 3. refine code to use dedicated function for base64 decoding import os import sys import json @@ -13,6 +15,7 @@ import urllib.parse vmscheme = "vmess://" +vlscheme = "vless://" ssscheme = "ss://" TPL = {} @@ -59,33 +62,7 @@ "domainStrategy": "UseIP" } } - ], - "dns": { - "servers": [ - "1.0.0.1", - "localhost" - ] - }, - "routing": { - "domainStrategy": "IPIfNonMatch", - "rules": [ - { - "type": "field", - "ip": [ - "geoip:private", - "geoip:cn" - ], - "outboundTag": "direct" - }, - { - "type": "field", - "domain": [ - "geosite:cn" - ], - "outboundTag": "direct" - } - ] - } + ] } """ @@ -306,11 +283,24 @@ """ +def _decode_if_b64_encoded(text): + for char in text: + if char.isalpha() or char.isdigit() or char in "=/": + continue + return text # Detected non-base64 character + # The string is base64 encoded. Decode it! + blen = len(text) + if blen % 4 > 0: + text += "=" * (4 - blen % 4) + return base64.b64decode(text).decode() + def parseLink(link): if link.startswith(ssscheme): return parseSs(link) elif link.startswith(vmscheme): return parseVmess(link) + elif link.startswith(vlscheme): + return parseVless(link) else: print("ERROR: This script supports only vmess://(N/NG) and ss:// links") return None @@ -323,48 +313,39 @@ def parseSs(sslink): "port": "", "id": "", "aid": "", - "net": "shadowsocks", + "net": "", "type": "", "host": "", "path": "", - "tls": "" + "tls": "", + "protocol": "shadowsocks" } - if sslink.startswith(ssscheme): - info = sslink[len(ssscheme):] - - if info.rfind("#") > 0: - info, _ps = info.split("#", 2) - RETOBJ["ps"] = urllib.parse.unquote(_ps) - - if info.find("@") < 0: - # old style link - #paddings - blen = len(info) - if blen % 4 > 0: - info += "=" * (4 - blen % 4) - - info = base64.b64decode(info).decode() - - atidx = info.rfind("@") - method, password = info[:atidx].split(":", 2) - addr, port = info[atidx+1:].split(":", 2) - else: - atidx = info.rfind("@") - addr, port = info[atidx+1:].split(":", 2) + assert(sslink.startswith(ssscheme)) - info = info[:atidx] - blen = len(info) - if blen % 4 > 0: - info += "=" * (4 - blen % 4) - - info = base64.b64decode(info).decode() - method, password = info.split(":", 2) + info = sslink[len(ssscheme):] + + # Strip the tailing description in URL + if info.rfind("#") > 0: + info, _ps = info.split("#", 2) + RETOBJ["ps"] = urllib.parse.unquote(_ps) + + if info.find("@") < 0: + # old style link: The whole link is encoded + info = _decode_if_b64_encoded(info) + atidx = info.rfind("@") + method, password = info[:atidx].split(":", 2) + addr, port = info[atidx+1:].split(":", 2) + else: + atidx = info.rfind("@") + addr, port = info[atidx+1:].split(":", 2) + info = _decode_if_b64_encoded(info[:atidx]) + method, password = info.split(":", 2) - RETOBJ["add"] = addr - RETOBJ["port"] = port - RETOBJ["aid"] = method - RETOBJ["id"] = password - return RETOBJ + RETOBJ["add"] = addr + RETOBJ["port"] = port + RETOBJ["aid"] = method + RETOBJ["id"] = password + return RETOBJ def parseVmess(vmesslink): @@ -381,45 +362,124 @@ def parseVmess(vmesslink): "type": "none", "host": "", "path": "", - "tls": "" + "tls": "", + "protocol": "vmess" } """ - if vmesslink.startswith(vmscheme): - bs = vmesslink[len(vmscheme):] - #paddings - blen = len(bs) - if blen % 4 > 0: - bs += "=" * (4 - blen % 4) - - vms = base64.b64decode(bs).decode() - return json.loads(vms) - else: - raise Exception("vmess link invalid") + assert(vmesslink.startswith(vmscheme)) + + # vmess link is a base64 encoded json + vms = _decode_if_b64_encoded(vmesslink[len(vmscheme):]) + RETOBJ = json.loads(vms) + RETOBJ["protocol"] = "vmess" + return RETOBJ + +def parseVless(vlesslink): + # This function also applies to vmess AEAD. Just remove the assertion to allow vmessAEAD link to come in. https://github.com/XTLS/Xray-core/issues/91 + assert(vlesslink.startswith(vlscheme)) + RETOBJ = { + "v": "2", + "ps": "REMARK", + "add": "SERVER-ADDR", + "port": "SERVER-PORT", + "id": "LONG-ID", + "aid": "", # N/A + "net": "", + "type": "", # mKCP or QUIC header type + "host": "", # h2 or ws host / quic security + "path": "", # h2 or ws path / quic key + "tls": "", # transport layer security, none/tls/xtls + "protocol": "", # vless or vmess + # "optional_users_security": "", # encryption of vmess or vless + # "optional_tls_sni": "", + } + + # vless link is either fully encoded, or not encoded at all + vlesslink = vlscheme + _decode_if_b64_encoded(vlesslink[len(vlscheme):]) + parsed = urllib.parse.urlparse(vlesslink) + + def validate(arg, defval = None): + if arg is not None: + return str(arg) + elif defval is not None: + return "" + else: + raise RuntimeError("failed while validating URL component. The link is incomplete or broken.") + + RETOBJ["protocol"] = validate(parsed.scheme) + RETOBJ["id"] = validate(parsed.username) + RETOBJ["add"] = validate(parsed.hostname) + RETOBJ["port"] = validate(parsed.port) + RETOBJ["ps"] = validate(parsed.fragment, "") # TODO: decodeURIComponent + + parsed_qs = urllib.parse.parse_qs(parsed.query) + parsed_qs = {k:v[0] for k,v in parsed_qs.items()} + RETOBJ["net"] = validate(parsed_qs.get('type')) + RETOBJ["optional_users_security"] = validate(parsed_qs.get('encryption', 'auto' if parsed.scheme == 'vmess' else 'none')) + RETOBJ["tls"] = validate(parsed_qs.get('security', 'none')) + RETOBJ["optional_tls_sni"] = validate(parsed_qs.get('sni', parsed.hostname)) + + if RETOBJ["net"] == 'http' or RETOBJ["net"] == 'ws': + RETOBJ["path"] = validate(parsed_qs.get('path', '/')) + RETOBJ["host"] = validate(parsed_qs.get('host', parsed.hostname)) + elif RETOBJ["net"] == 'kcp': + RETOBJ["type"] = validate(parsed_qs.get('headerType', parsed.hostname)) + if parsed_qs.get('seed', None) is not None: + raise RuntimeError("mKCP seed is not supported by this tool") + elif RETOBJ["net"] == 'quic': + RETOBJ["type"] = validate(parsed_qs.get('headerType', parsed.hostname)) + RETOBJ["host"] = validate(parsed_qs.get('quicSecurity', 'none')) + RETOBJ["path"] = validate(parsed_qs.get('key', '')) # TODO: decodeURIComponent + elif RETOBJ["net"] == 'grpc': + raise RuntimeError("gRPC transport layer is not supported by this tool") + elif RETOBJ["net"] == 'tcp': + raise RuntimeError("Unknown 'type' argument. Accepting tcp, http, ws, kcp, quic, grpc, but getting " + RETOBJ["net"]) + + if parsed_qs.get('alpn', None) is not None: + raise RuntimeError("'alpn' parameter in the link is not supported. ") + if parsed_qs.get('flow', None) is not None: + raise RuntimeError("'flow' parameter in the link is not supported. (because xtls is not supported)") + + return RETOBJ + + def load_TPL(stype): s = TPL[stype] return json.loads(s) -def fill_basic(_c, _v): +def fill_vmess_or_vless(_c, _v): _outbound = _c["outbounds"][0] + _outbound["protocol"] = _v["protocol"] + _vnext = _outbound["settings"]["vnext"][0] + _vnext["address"] = _v["add"] + _vnext["port"] = int(_v["port"]) - _vnext["address"] = _v["add"] - _vnext["port"] = int(_v["port"]) - _vnext["users"][0]["id"] = _v["id"] - _vnext["users"][0]["alterId"] = int(_v["aid"]) + _vnext_user_0 = _vnext["users"][0] + _vnext_user_0["id"] = _v["id"] + if _v["aid"] == "": + del _vnext_user_0["alterId"] + else: + _vnext_user_0["alterId"] = int(_v["aid"]) + if "optional_users_security" in _v: + _vnext_user_0["security"] = _v["optional_users_security"] _outbound["streamSettings"]["network"] = _v["net"] - if _v["tls"] == "tls": _outbound["streamSettings"]["security"] = "tls" _outbound["streamSettings"]["tlsSettings"] = {"allowInsecure": True} - if _v["host"] != "": + if "optional_tls_sni" in _v: + _outbound["streamSettings"]["tlsSettings"]["serverName"] = _v["optional_tls_sni"] + elif _v["host"] != "": _outbound["streamSettings"]["tlsSettings"]["serverName"] = _v["host"] - + elif _v["tls"] == "xtls": + raise RuntimeError("transport layer 'xtls' is not supported by this script") return _c + + def fill_shadowsocks(_c, _v): _ss = load_TPL("out_ss") _ss["email"] = _v["ps"] + "@ss" @@ -481,12 +541,15 @@ def fill_quic(_c, _v): def vmess2client(_t, _v): _net = _v["net"] + _protocol = _v["protocol"] _type = _v["type"] - if _net == "shadowsocks": + if _protocol == "shadowsocks": return fill_shadowsocks(_t, _v) - - _c = fill_basic(_t, _v) + elif _protocol == "vmess" or _protocol == "vless": + _c = fill_vmess_or_vless(_t, _v) + else: + raise RuntimeError("Invalid link causing vmess2client getting an unknown protocol " + _protocol) if _net == "kcp": return fill_kcp(_c, _v) From f194785d7f82ec6a82240cddf7a30052c4c54c72 Mon Sep 17 00:00:00 2001 From: Recolic K Date: Tue, 26 Jul 2022 18:08:37 +0800 Subject: [PATCH 2/5] init --- vmess2json.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/vmess2json.py b/vmess2json.py index 02f68db..a82e40f 100755 --- a/vmess2json.py +++ b/vmess2json.py @@ -1,6 +1,4 @@ #!/usr/bin/env python3 -# recolic modify: 1. remove DNS and routing (assuming DNS server and bypassing china ip, might cause confusion) -# 2. adding vless logic 3. refine code to use dedicated function for base64 decoding import os import sys import json From 0c405ca19cececf57b648d3ba8c51d359d205eec Mon Sep 17 00:00:00 2001 From: Recolic K Date: Tue, 26 Jul 2022 18:11:57 +0800 Subject: [PATCH 3/5] fix --- vmess2json.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vmess2json.py b/vmess2json.py index a82e40f..8c4355b 100755 --- a/vmess2json.py +++ b/vmess2json.py @@ -421,11 +421,11 @@ def validate(arg, defval = None): RETOBJ["path"] = validate(parsed_qs.get('path', '/')) RETOBJ["host"] = validate(parsed_qs.get('host', parsed.hostname)) elif RETOBJ["net"] == 'kcp': - RETOBJ["type"] = validate(parsed_qs.get('headerType', parsed.hostname)) + RETOBJ["type"] = validate(parsed_qs.get('headerType', 'none')) if parsed_qs.get('seed', None) is not None: raise RuntimeError("mKCP seed is not supported by this tool") elif RETOBJ["net"] == 'quic': - RETOBJ["type"] = validate(parsed_qs.get('headerType', parsed.hostname)) + RETOBJ["type"] = validate(parsed_qs.get('headerType', 'none')) RETOBJ["host"] = validate(parsed_qs.get('quicSecurity', 'none')) RETOBJ["path"] = validate(parsed_qs.get('key', '')) # TODO: decodeURIComponent elif RETOBJ["net"] == 'grpc': From 396d3838b8295eedf2317a409762820599827221 Mon Sep 17 00:00:00 2001 From: Recolic K Date: Tue, 26 Jul 2022 18:18:32 +0800 Subject: [PATCH 4/5] fix --- vmess2json.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vmess2json.py b/vmess2json.py index 8c4355b..4d86278 100755 --- a/vmess2json.py +++ b/vmess2json.py @@ -461,7 +461,10 @@ def fill_vmess_or_vless(_c, _v): else: _vnext_user_0["alterId"] = int(_v["aid"]) if "optional_users_security" in _v: - _vnext_user_0["security"] = _v["optional_users_security"] + if _v["protocol"] == "vless": + _vnext_user_0["encryption"] = _v["optional_users_security"] + else: + _vnext_user_0["security"] = _v["optional_users_security"] _outbound["streamSettings"]["network"] = _v["net"] if _v["tls"] == "tls": From 3fa4781b5a6a1453f479ab0dd39e43c69972ce34 Mon Sep 17 00:00:00 2001 From: Recolic K Date: Tue, 26 Jul 2022 18:19:04 +0800 Subject: [PATCH 5/5] fix --- vmess2json.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vmess2json.py b/vmess2json.py index 4d86278..373a2f8 100755 --- a/vmess2json.py +++ b/vmess2json.py @@ -463,6 +463,7 @@ def fill_vmess_or_vless(_c, _v): if "optional_users_security" in _v: if _v["protocol"] == "vless": _vnext_user_0["encryption"] = _v["optional_users_security"] + del _vnext_user_0["security"] else: _vnext_user_0["security"] = _v["optional_users_security"]