This repository has been archived by the owner on Sep 22, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathupi-recon.py
163 lines (146 loc) · 8.4 KB
/
upi-recon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import requests
import concurrent.futures
from time import sleep
from sys import exit
from random import uniform as rand
from datetime import datetime
from rich.progress import track
banner = """
_
(_)
_ _ _ __ _ _ __ ___ ___ ___ _ __
| | | | '_ \| | '__/ _ \/ __/ _ \| '_ \
| |_| | |_) | | | | __/ (_| (_) | | | |
\__,_| .__/|_|_| \___|\___\___/|_| |_|
| |
|_|
# Author: Karan Saini (@squeal)
# URL: https://github.com/qurbat/upi-recon
# Usage: upi-recon.py <e.g., 9999999999> (query all suffixes for phone number)
upi-recon.py -w <e.g., address_list_file> (query addresses from file)
upi-recon.py -g <e.g., [email protected]> (query known gpay suffixes for google account)
upi-recon.py -v <e.g., address@psp> (query a single UPI VPA)
upi-recon.py -i <e.g., identifier> (query all suffixes for an arbitrary alphanumeric identifier)
upi-recon.py -f <e.g., MH01AA1234> (query known FASTag suffixes for vehicle registration number)
"""
# opting to load lists from a file instead of hardcoding them
# as this would be more flexible, allow for easier updates,
# and allow others to make use of the lists provided
with open("data/general_suffixes.txt", "r") as suffix_file:
upi_suffix_dict = suffix_file.read().splitlines() # read all suffixes into a list
with open("data/mobile_suffixes.txt", "r") as mobile_suffix_file:
mobile_suffix_dict = mobile_suffix_file.read().splitlines()
with open("data/fastag_suffixes.txt", "r") as fastag_suffix_file:
fastag_suffix_dict = fastag_suffix_file.read().splitlines()
with open("data/gpay_suffixes.txt", "r") as gpay_suffix_file:
gpay_suffix_dict = gpay_suffix_file.read().splitlines()
def searchvpa(searchtext, vpa_dict, threadcount):
if(threadcount == 0):
for suffix in track(vpa_dict, description="querying . . . "):
try:
address_discovery(searchtext + '@' + suffix, API_URL)
except KeyboardInterrupt:
print('[!] execution interrupted. quitting...')
exit(0)
else:
threadcount = 10 if threadcount > 10 else threadcount
with concurrent.futures.ThreadPoolExecutor(max_workers=threadcount) as executor:
try:
for suffix in vpa_dict:
executor.submit(address_discovery, searchtext + '@' + suffix, API_URL)
sleep(rand(0.1, 0.2))
except KeyboardInterrupt:
# quit ungracefully on keyboard interrupt:
# considering the bandwidth consumed for requests,
# there is no reason to wait for the threads to finish
# sorry for the inconvenience
executor._threads.clear()
concurrent.futures.thread._threads_queues.clear()
print('\n[!] execution interrupted. quitting...')
print('[i] finished at ' + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
exit(1)
def address_discovery(vpa, api_url):
r = requests.post(api_url, data={'vpa':vpa,'merchant_id':'milaap'}, headers={'Connection':'close'})
if r.status_code == 200 and r.json()['status'] == 'VALID':
print('[+] ' + vpa + ' is a valid UPI payment address registered to ' + r.json()['customer_name']) if r.json()['customer_name'] else print('[!] The name associated with the UPI payment address could not be determined')
# if r.status_code == 200 and r.json()['success'] == False:
# print('[-] ' + vpa + ' not a valid UPI address')
# todo: store in dict by default and print if verbosity is set
elif r.status_code == 200 and r.json()['status'] == 'INVALID' and arguments.debug:
print('[-] query failed for ' + vpa)
if __name__ == '__main__':
# argument definition
parser = argparse.ArgumentParser(prog='upi-recon.py')
# primary arguments
parser.add_argument('-t', '--threads', type=int, default=0, help='number of threads to use for parallel address discovery')
parser.add_argument('-q', '--quiet', default=False, action='store_true', help='suppress banner')
parser.add_argument('-d', '--debug', default=False, action='store_true', help='show failed queries')
# group arguments
group_1 = parser.add_mutually_exclusive_group()
group_1.add_argument('-w', '--wordlist', type=str, nargs='?', help='use wordlist for suffixes')
group_2 = parser.add_mutually_exclusive_group()
group_2.add_argument('phone', type=str, nargs='?', help='phone number to query UPI addresses for')
group_3 = parser.add_mutually_exclusive_group()
group_3.add_argument('-g', '--gpay', type=str, nargs='?', help='enter gmail address to query Google Pay UPI addresses for')
group_4 = parser.add_mutually_exclusive_group()
group_4.add_argument('-v', '--vpa', type=str, nargs='?', help='enter a single VPA to query')
group_5 = parser.add_mutually_exclusive_group()
group_5.add_argument('-i', '--identifier', type=str, nargs='?', help='enter an address to query against all providers')
group_6 = parser.add_mutually_exclusive_group()
group_6.add_argument('-f', '--fastag', type=str, nargs='?', help='Enter a vehicle number to search for')
# parse arguments
arguments = parser.parse_args()
# deal with arguments
if arguments.quiet is False:
print(banner)
# API stuff
API_URL = 'https://api.juspay.in/upi/verify-vpa'
# print informational header
print('[i] starting at ' + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
# query address directly
# this may not be the best way to handle this, but it works for now
if arguments.vpa and '@' in arguments.vpa:
address_discovery(arguments.vpa, API_URL)
elif arguments.vpa and '@' not in arguments.vpa:
print('[!] please enter a full, valid UPI address e.g. <identifier>@<provider>')
exit(0)
# query based on phone number
elif arguments.phone:
searchtext = arguments.phone[2:] if arguments.phone[0:2] == '91' and len(arguments.phone) > 10 else arguments.phone
if not searchtext.isdigit():
exit('[!] phone number must be numeric')
if len(searchtext) != 10:
print('[!] please enter a valid 10 digit phone number')
exit(1)
print('[i] querying {} suffixes for phone number '.format(len(mobile_suffix_dict)) + searchtext)
searchvpa(searchtext, mobile_suffix_dict, arguments.threads)
# query based on gpay address
elif arguments.gpay:
searchtext = arguments.gpay[:-10] if arguments.gpay.endswith('@gmail.com') else arguments.gpay
print('[i] querying {} suffixes for '.format(len(gpay_suffix_dict)) + searchtext + '@gmail.com')
searchvpa(searchtext, gpay_suffix_dict, 4) # overriding threads to 4 as there are only 4 VPA addresses to check for gpay
# query based on fastag vehicle registration number
elif arguments.fastag:
searchtext = 'netc.' + arguments.fastag
print('[i] querying {} suffixes for vehicle '.format(len(fastag_suffix_dict)) + arguments.fastag)
searchvpa(searchtext, fastag_suffix_dict, arguments.threads)
# query alphanumeric identifier across all providers
elif arguments.identifier:
searchtext = arguments.identifier if '@' not in arguments.identifier else arguments.identifier.split('@')[0]
print('[i] querying {} suffixes for identifier '.format(len(upi_suffix_dict)) + searchtext)
searchvpa(searchtext, upi_suffix_dict, arguments.threads)
# print error if no arguments provided
# this is a probably a bad way to handle empty arguments, but it works
elif arguments.wordlist:
with open("{}".format(arguments.wordlist), "r") as wordlist_file:
wordlist = wordlist_file.read().splitlines() # read wordlist
print('[i] querying {} addresses from wordlist '.format(len(wordlist)))
for address in wordlist:
address_discovery(address, API_URL)
else:
print('[!] please enter a valid argument')
print('[!] usage: upi-recon.py -h for help')
exit(1)