-
Notifications
You must be signed in to change notification settings - Fork 0
/
salesbot.py
185 lines (159 loc) · 7.3 KB
/
salesbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import os
import sys
import math
import time
import discord
import logging
import sqlite3
import argparse
import datetime
from dotenv import load_dotenv
from prometheus_client import start_http_server
from collections import defaultdict
from opensea_utils import get_new_events, EVENT_TYPE_SALE
def query(conn, query):
"""Perform query against an existing SQLite3 connection and return results. Commit changes in case any changes were made.
:param conn: SQLite3 connection object
:param query: Query string to execute
:return: List of tuples returned by SQLite3. List might be empty.
"""
c = conn.cursor()
c.execute(query)
conn.commit()
result = c.fetchall()
c.close()
return result
def get_collection_slugs(sqlite3_db):
conn = sqlite3.connect(sqlite3_db)
res = query(conn, "SELECT collection_slug FROM collections")
conn.close()
collection_slugs = [row[0] for row in res]
return collection_slugs
def main(opensea_api_key, webhook_url, sqlite3_db, check_interval):
webhook = discord.Webhook.from_url(
webhook_url, adapter=discord.RequestsWebhookAdapter()
)
last_event_id_seen = defaultdict(int)
# Throw away events older than this
# This is mainly used at startup to avoid reposting old sales
EVENT_MAX_AGE = datetime.timedelta(hours=6)
while True:
events = []
for collection_slug in get_collection_slugs(sqlite3_db):
now = int(time.time())
try:
logging.info(
f"Querying for new events for {collection_slug} up until {now} filtering for events after {last_event_id_seen[collection_slug]}"
)
new_events, new_last_id = get_new_events(
opensea_api_key=opensea_api_key,
event_type=EVENT_TYPE_SALE,
collection_name=collection_slug,
collection_slug=collection_slug,
start_time=now,
last_event_id_seen=last_event_id_seen[collection_slug],
)
last_event_id_seen[collection_slug] = new_last_id
except Exception as e:
logging.warning(e)
time.sleep(1)
continue
if new_events:
events.extend(new_events)
if events:
for event in sorted(
events, key=lambda event: event["transaction"]["timestamp"]
):
if (
datetime.datetime.today()
- datetime.datetime.strptime(
event["transaction"]["timestamp"], "%Y-%m-%dT%H:%M:%S"
)
< EVENT_MAX_AGE
):
try:
price = int(event["total_price"]) / math.pow(
10, event["payment_token"]["decimals"]
)
symbol = event["payment_token"]["symbol"]
sold_at = event["transaction"]["timestamp"]
# Look up OpenSea username of buyer and seller
# Fall back to first six characters of address if no username is found
buyer = (
f"{event['winner_account']['user']['username']}"
if event["winner_account"]["user"]["username"]
else f"{event['winner_account']['address'][2:8].upper()}"
)
seller = (
f"{event['seller']['user']['username']}"
if event["seller"]["user"]["username"]
else f"{event['seller']['address'][2:8].upper()}"
)
# Detect bundles
if not event["asset"] and event["asset_bundle"]:
url = f"{event['asset_bundle']['permalink']}"
title = f"{len(event['asset_bundle']['assets'])} pieces in {event['asset_bundle']['name']} sold by {seller}"
image_url = event["asset_bundle"]["assets"][0]["image_url"]
collection_name = event["asset_bundle"]["assets"][0][
"collection"
]["name"]
else:
# Common case: One asset only
title = f"{event['asset']['name']} sold by {seller}"
url = event["asset"]["permalink"]
image_url = event["asset"]["image_url"]
collection_name = event["asset"]["collection"]["name"]
except Exception as e:
logging.warning(f"Failed to parse event due to {e}")
continue
# Format message
message = (
discord.Embed(title=title, url=url)
.set_image(url=image_url)
.add_field(
name="Collection", value=collection_name, inline=True
)
.add_field(name="Buyer", value=buyer, inline=True)
.add_field(name="Price", value=f"{price} {symbol}", inline=True)
.set_footer(text=f"Sold at {sold_at}")
)
# Post to Discord channel
try:
webhook.send(embed=message)
except Exception as e:
logging.error(
f"Hit exception for Discord webhook, ignored message due to: {e}"
)
time.sleep(0.1)
time.sleep(int(check_interval))
if __name__ == "__main__":
# Command-line arguments for extra options while testing, like debug output
parser = argparse.ArgumentParser()
parser.add_argument("--debug", action="store_true", help="Enable debug output")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO if not args.debug else logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
stream=sys.stdout,
)
#
# Load main service configuration from .env
#
load_dotenv()
# API key is required to use the OpenSea APIs extensively
OPENSEA_API_KEY = os.getenv("OPENSEA_API_KEY")
# Webhook URL to write to a given Discord channel
# Which channel to post to is decided by the admin creating the webhook
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL")
# Port to bind to, for exposing metrics to Prometheus. Defaults to port 9110
PROMETHEUS_METRIC_PORT = os.getenv("PROMETHEUS_METRIC_PORT", "9110")
# How often to check each collection for new sales, in seconds. Default 60s.
CHECK_INTERVAL = os.getenv("CHECK_INTERVAL", "60")
# Filename of SQLite3 database with collection names.
SQLITE3_DB = os.getenv("SQLITE3_DB")
if not all([OPENSEA_API_KEY, DISCORD_WEBHOOK_URL, SQLITE3_DB]):
logging.error("Incomplete configuration, please check .env file and try again")
sys.exit(1)
start_http_server(int(PROMETHEUS_METRIC_PORT))
main(OPENSEA_API_KEY, DISCORD_WEBHOOK_URL, SQLITE3_DB, CHECK_INTERVAL)