diff --git a/vmess2json.py b/vmess2json.py index 8063f85..373a2f8 100755 --- a/vmess2json.py +++ b/vmess2json.py @@ -13,6 +13,7 @@ import urllib.parse vmscheme = "vmess://" +vlscheme = "vless://" ssscheme = "ss://" TPL = {} @@ -59,33 +60,7 @@ "domainStrategy": "UseIP" } } - ], - "dns": { - "servers": [ - "1.0.0.1", - "localhost" - ] - }, - "routing": { - "domainStrategy": "IPIfNonMatch", - "rules": [ - { - "type": "field", - "ip": [ - "geoip:private", - "geoip:cn" - ], - "outboundTag": "direct" - }, - { - "type": "field", - "domain": [ - "geosite:cn" - ], - "outboundTag": "direct" - } - ] - } + ] } """ @@ -306,11 +281,24 @@ """ +def _decode_if_b64_encoded(text): + for char in text: + if char.isalpha() or char.isdigit() or char in "=/": + continue + return text # Detected non-base64 character + # The string is base64 encoded. Decode it! + blen = len(text) + if blen % 4 > 0: + text += "=" * (4 - blen % 4) + return base64.b64decode(text).decode() + def parseLink(link): if link.startswith(ssscheme): return parseSs(link) elif link.startswith(vmscheme): return parseVmess(link) + elif link.startswith(vlscheme): + return parseVless(link) else: print("ERROR: This script supports only vmess://(N/NG) and ss:// links") return None @@ -323,48 +311,39 @@ def parseSs(sslink): "port": "", "id": "", "aid": "", - "net": "shadowsocks", + "net": "", "type": "", "host": "", "path": "", - "tls": "" + "tls": "", + "protocol": "shadowsocks" } - if sslink.startswith(ssscheme): - info = sslink[len(ssscheme):] - - if info.rfind("#") > 0: - info, _ps = info.split("#", 2) - RETOBJ["ps"] = urllib.parse.unquote(_ps) - - if info.find("@") < 0: - # old style link - #paddings - blen = len(info) - if blen % 4 > 0: - info += "=" * (4 - blen % 4) - - info = base64.b64decode(info).decode() - - atidx = info.rfind("@") - method, password = info[:atidx].split(":", 2) - addr, port = info[atidx+1:].split(":", 2) - else: - atidx = info.rfind("@") - addr, port = info[atidx+1:].split(":", 2) - - info = info[:atidx] - blen = len(info) - if blen % 4 > 0: - info += "=" * (4 - blen % 4) + assert(sslink.startswith(ssscheme)) - info = base64.b64decode(info).decode() - method, password = info.split(":", 2) + info = sslink[len(ssscheme):] + + # Strip the tailing description in URL + if info.rfind("#") > 0: + info, _ps = info.split("#", 2) + RETOBJ["ps"] = urllib.parse.unquote(_ps) + + if info.find("@") < 0: + # old style link: The whole link is encoded + info = _decode_if_b64_encoded(info) + atidx = info.rfind("@") + method, password = info[:atidx].split(":", 2) + addr, port = info[atidx+1:].split(":", 2) + else: + atidx = info.rfind("@") + addr, port = info[atidx+1:].split(":", 2) + info = _decode_if_b64_encoded(info[:atidx]) + method, password = info.split(":", 2) - RETOBJ["add"] = addr - RETOBJ["port"] = port - RETOBJ["aid"] = method - RETOBJ["id"] = password - return RETOBJ + RETOBJ["add"] = addr + RETOBJ["port"] = port + RETOBJ["aid"] = method + RETOBJ["id"] = password + return RETOBJ def parseVmess(vmesslink): @@ -381,45 +360,128 @@ def parseVmess(vmesslink): "type": "none", "host": "", "path": "", - "tls": "" + "tls": "", + "protocol": "vmess" } """ - if vmesslink.startswith(vmscheme): - bs = vmesslink[len(vmscheme):] - #paddings - blen = len(bs) - if blen % 4 > 0: - bs += "=" * (4 - blen % 4) - - vms = base64.b64decode(bs).decode() - return json.loads(vms) - else: - raise Exception("vmess link invalid") + assert(vmesslink.startswith(vmscheme)) + + # vmess link is a base64 encoded json + vms = _decode_if_b64_encoded(vmesslink[len(vmscheme):]) + RETOBJ = json.loads(vms) + RETOBJ["protocol"] = "vmess" + return RETOBJ + +def parseVless(vlesslink): + # This function also applies to vmess AEAD. Just remove the assertion to allow vmessAEAD link to come in. https://github.com/XTLS/Xray-core/issues/91 + assert(vlesslink.startswith(vlscheme)) + RETOBJ = { + "v": "2", + "ps": "REMARK", + "add": "SERVER-ADDR", + "port": "SERVER-PORT", + "id": "LONG-ID", + "aid": "", # N/A + "net": "", + "type": "", # mKCP or QUIC header type + "host": "", # h2 or ws host / quic security + "path": "", # h2 or ws path / quic key + "tls": "", # transport layer security, none/tls/xtls + "protocol": "", # vless or vmess + # "optional_users_security": "", # encryption of vmess or vless + # "optional_tls_sni": "", + } + + # vless link is either fully encoded, or not encoded at all + vlesslink = vlscheme + _decode_if_b64_encoded(vlesslink[len(vlscheme):]) + parsed = urllib.parse.urlparse(vlesslink) + + def validate(arg, defval = None): + if arg is not None: + return str(arg) + elif defval is not None: + return "" + else: + raise RuntimeError("failed while validating URL component. The link is incomplete or broken.") + + RETOBJ["protocol"] = validate(parsed.scheme) + RETOBJ["id"] = validate(parsed.username) + RETOBJ["add"] = validate(parsed.hostname) + RETOBJ["port"] = validate(parsed.port) + RETOBJ["ps"] = validate(parsed.fragment, "") # TODO: decodeURIComponent + + parsed_qs = urllib.parse.parse_qs(parsed.query) + parsed_qs = {k:v[0] for k,v in parsed_qs.items()} + RETOBJ["net"] = validate(parsed_qs.get('type')) + RETOBJ["optional_users_security"] = validate(parsed_qs.get('encryption', 'auto' if parsed.scheme == 'vmess' else 'none')) + RETOBJ["tls"] = validate(parsed_qs.get('security', 'none')) + RETOBJ["optional_tls_sni"] = validate(parsed_qs.get('sni', parsed.hostname)) + + if RETOBJ["net"] == 'http' or RETOBJ["net"] == 'ws': + RETOBJ["path"] = validate(parsed_qs.get('path', '/')) + RETOBJ["host"] = validate(parsed_qs.get('host', parsed.hostname)) + elif RETOBJ["net"] == 'kcp': + RETOBJ["type"] = validate(parsed_qs.get('headerType', 'none')) + if parsed_qs.get('seed', None) is not None: + raise RuntimeError("mKCP seed is not supported by this tool") + elif RETOBJ["net"] == 'quic': + RETOBJ["type"] = validate(parsed_qs.get('headerType', 'none')) + RETOBJ["host"] = validate(parsed_qs.get('quicSecurity', 'none')) + RETOBJ["path"] = validate(parsed_qs.get('key', '')) # TODO: decodeURIComponent + elif RETOBJ["net"] == 'grpc': + raise RuntimeError("gRPC transport layer is not supported by this tool") + elif RETOBJ["net"] == 'tcp': + raise RuntimeError("Unknown 'type' argument. Accepting tcp, http, ws, kcp, quic, grpc, but getting " + RETOBJ["net"]) + + if parsed_qs.get('alpn', None) is not None: + raise RuntimeError("'alpn' parameter in the link is not supported. ") + if parsed_qs.get('flow', None) is not None: + raise RuntimeError("'flow' parameter in the link is not supported. (because xtls is not supported)") + + return RETOBJ + + def load_TPL(stype): s = TPL[stype] return json.loads(s) -def fill_basic(_c, _v): +def fill_vmess_or_vless(_c, _v): _outbound = _c["outbounds"][0] + _outbound["protocol"] = _v["protocol"] + _vnext = _outbound["settings"]["vnext"][0] + _vnext["address"] = _v["add"] + _vnext["port"] = int(_v["port"]) - _vnext["address"] = _v["add"] - _vnext["port"] = int(_v["port"]) - _vnext["users"][0]["id"] = _v["id"] - _vnext["users"][0]["alterId"] = int(_v["aid"]) + _vnext_user_0 = _vnext["users"][0] + _vnext_user_0["id"] = _v["id"] + if _v["aid"] == "": + del _vnext_user_0["alterId"] + else: + _vnext_user_0["alterId"] = int(_v["aid"]) + if "optional_users_security" in _v: + if _v["protocol"] == "vless": + _vnext_user_0["encryption"] = _v["optional_users_security"] + del _vnext_user_0["security"] + else: + _vnext_user_0["security"] = _v["optional_users_security"] _outbound["streamSettings"]["network"] = _v["net"] - if _v["tls"] == "tls": _outbound["streamSettings"]["security"] = "tls" _outbound["streamSettings"]["tlsSettings"] = {"allowInsecure": True} - if _v["host"] != "": + if "optional_tls_sni" in _v: + _outbound["streamSettings"]["tlsSettings"]["serverName"] = _v["optional_tls_sni"] + elif _v["host"] != "": _outbound["streamSettings"]["tlsSettings"]["serverName"] = _v["host"] - + elif _v["tls"] == "xtls": + raise RuntimeError("transport layer 'xtls' is not supported by this script") return _c + + def fill_shadowsocks(_c, _v): _ss = load_TPL("out_ss") _ss["email"] = _v["ps"] + "@ss" @@ -481,12 +543,15 @@ def fill_quic(_c, _v): def vmess2client(_t, _v): _net = _v["net"] + _protocol = _v["protocol"] _type = _v["type"] - if _net == "shadowsocks": + if _protocol == "shadowsocks": return fill_shadowsocks(_t, _v) - - _c = fill_basic(_t, _v) + elif _protocol == "vmess" or _protocol == "vless": + _c = fill_vmess_or_vless(_t, _v) + else: + raise RuntimeError("Invalid link causing vmess2client getting an unknown protocol " + _protocol) if _net == "kcp": return fill_kcp(_c, _v)