Skip to content

Commit

Permalink
Merge pull request #57 from Kwenta/v3-stats
Browse files Browse the repository at this point in the history
Add the v3 config
  • Loading branch information
LeifuChen authored Apr 29, 2024
2 parents c676543 + 8cfeb39 commit e758e13
Showing 1 changed file with 129 additions and 138 deletions.
267 changes: 129 additions & 138 deletions src/scripts/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,68 +9,124 @@

nest_asyncio.apply()

# constants
# Constants
INFURA_KEY = os.getenv('INFURA_KEY')

# mainnet
SUBGRAPH_ENDPOINT = 'https://subgraph.satsuma-prod.com/05943208e921/kwenta/optimism-perps/api'
RPC_ENDPOINT = f'https://optimism-mainnet.infura.io/v3/{INFURA_KEY}'

# get a web3 provider
w3 = Web3(Web3.HTTPProvider(RPC_ENDPOINT))

# functions


def convertDecimals(x): return Decimal(x) / Decimal(10**18)

# Stats configurations for different versions
CONFIGS = {
'v3': {
'subgraph_endpoint': 'https://subgraph.satsuma-prod.com/05943208e921/kwenta/base-perps-v3/api',
'rpc_endpoint': f'https://base-mainnet.infura.io/v3/{INFURA_KEY}',
'queries': {
'aggregate_stats': {
'query': gql("""
query aggregateStats($last_id: ID!) {
perpsV3AggregateStats(
where: {
id_gt: $last_id,
period: "86400",
marketId: "0",
},
first: 1000
) {
id
timestamp
volume
trades
}
}
"""),
'accessor': 'perpsV3AggregateStats'
},
'traders': {
'query': gql("""
query traders($last_id: ID!) {
orderSettleds(
where: {
id_gt: $last_id,
},
first: 1000
) {
id
accountId
timestamp
}
}
"""),
'accessor': 'orderSettleds'
}
}
},
'v2': {
'subgraph_endpoint': 'https://subgraph.satsuma-prod.com/05943208e921/kwenta/optimism-perps/api',
'rpc_endpoint': f'https://optimism-mainnet.infura.io/v3/{INFURA_KEY}',
'queries': {
'aggregate_stats': {
'query': gql("""
query aggregateStats($last_id: ID!) {
futuresAggregateStats(
where: {
id_gt: $last_id,
period: "86400",
asset: "0x",
},
first: 1000
) {
id
timestamp
volume
trades
feesSynthetix
feesKwenta
}
}
"""),
'accessor': 'futuresAggregateStats'
},
'traders': {
'query': gql("""
query traders($last_id: ID!) {
futuresTrades(
where: {
id_gt: $last_id,
},
first: 1000
) {
id
account
timestamp
}
}
"""),
'accessor': 'futuresTrades'
}
}
}
}

def convertBytes(x): return bytearray.fromhex(
x[2:]).decode().replace('\x00', '')
# Functions
def convert_decimals(x): return Decimal(x) / Decimal(10**18)

def convert_bytes(x): return bytearray.fromhex(
x[2:]).decode().replace('\x00', '')

def clean_df(df, decimal_cols=[], bytes_cols=[]):
for col in decimal_cols:
if col in df.columns:
df[col] = df[col].apply(convertDecimals)
df[col] = df[col].apply(convert_decimals)
else:
print(f"{col} not in DataFrame")
for col in bytes_cols:
if col in df.columns:
df[col] = df[col].apply(convertBytes)
df[col] = df[col].apply(convert_bytes)
else:
print(f"{col} not in DataFrame")
return df


async def run_query(query, params, endpoint=SUBGRAPH_ENDPOINT):
transport = AIOHTTPTransport(url=endpoint)

async with Client(
transport=transport,
fetch_schema_from_transport=True,
) as session:

# Execute single query
query = query

result = await session.execute(query, variable_values=params)
df = pd.DataFrame(result)
return df


async def run_recursive_query(query, params, accessor, endpoint=SUBGRAPH_ENDPOINT):
# Request fails if this is not added to SATSUMA call's headers
headers = {
'origin': 'https://subgraph.satsuma-prod.com',
}

async def run_recursive_query(query, params, accessor, endpoint):
headers = {'origin': 'https://subgraph.satsuma-prod.com'}
transport = AIOHTTPTransport(url=endpoint, headers=headers)

async with Client(
transport=transport,
fetch_schema_from_transport=True,
) as session:
async with Client(transport=transport, fetch_schema_from_transport=True,) as session:
done_fetching = False
all_results = []
while not done_fetching:
Expand All @@ -80,111 +136,46 @@ async def run_recursive_query(query, params, accessor, endpoint=SUBGRAPH_ENDPOIN
params['last_id'] = all_results[-1]['id']
else:
done_fetching = True

df = pd.DataFrame(all_results)
return df

# queries
aggregateStats = gql("""
query aggregateStats(
$last_id: ID!
) {
futuresAggregateStats(
where: {
id_gt: $last_id,
period: "86400",
asset: "0x",
},
first: 1000
) {
id
timestamp
volume
trades
feesSynthetix
feesKwenta
}
}
""")

traders = gql("""
query traders(
$last_id: ID!
) {
futuresTrades(
where: {
id_gt: $last_id,
},
first: 1000
) {
id
account
timestamp
}
}
""")


async def main():
# get aggregate data
agg_params = {
'last_id': ''
}

agg_decimal_cols = [
'volume',
'feesSynthetix',
'feesKwenta'
]

agg_response = await run_recursive_query(aggregateStats, agg_params, 'futuresAggregateStats')
df_agg = pd.DataFrame(agg_response)
return pd.DataFrame(all_results)

async def main(config_key):
config = CONFIGS[config_key]

# Aggregate data query and cleaning
agg_query = config['queries']['aggregate_stats']
df_agg = pd.DataFrame(await run_recursive_query(agg_query['query'], {'last_id': ''}, agg_query['accessor'], config['subgraph_endpoint']))
print(f'agg result size: {df_agg.shape[0]}')
df_agg = clean_df(df_agg, decimal_cols=agg_decimal_cols).drop(
'id', axis=1).sort_values('timestamp')
df_agg = clean_df(df_agg, decimal_cols=['volume', 'feesSynthetix', 'feesKwenta'] if config_key == 'v2' else ['volume']).drop('id', axis=1).sort_values('timestamp')
df_agg['timestamp'] = df_agg['timestamp'].astype(int)
df_agg['trades'] = df_agg['trades'].astype(int)
df_agg['cumulativeTrades'] = df_agg['trades'].cumsum()

# get trader data
trader_params = {
'last_id': ''
}

trader_response = await run_recursive_query(traders, trader_params, 'futuresTrades')
df_trader = pd.DataFrame(trader_response).drop(
'id', axis=1).sort_values('timestamp')

# create the aggregates
df_trader['dateTs'] = df_trader['timestamp'].apply(
lambda x: int(int(x) / 86400) * 86400)
df_trader['cumulativeTraders'] = (
~df_trader['account'].duplicated()).cumsum()

df_trader_agg = df_trader.groupby(
'dateTs')['account'].nunique().reset_index()
# Trader data query and processing
trader_query = config['queries']['traders']
df_trader = pd.DataFrame(await run_recursive_query(trader_query['query'], {'last_id': ''}, trader_query['accessor'], config['subgraph_endpoint'])).drop('id', axis=1).sort_values('timestamp')
df_trader['dateTs'] = df_trader['timestamp'].apply(lambda x: int(int(x) / 86400) * 86400)
df_trader['cumulativeTraders'] = (~df_trader['accountId' if config_key == 'v3' else 'account'].duplicated()).cumsum()
df_trader_agg = df_trader.groupby('dateTs')['accountId' if config_key == 'v3' else 'account'].nunique().reset_index()
df_trader_agg.columns = ['timestamp', 'uniqueTraders']
df_trader_agg['cumulativeTraders'] = df_trader.groupby(
'dateTs')['cumulativeTraders'].max().reset_index()['cumulativeTraders']
df_trader_agg['cumulativeTraders'] = df_trader.groupby('dateTs')['cumulativeTraders'].max().reset_index()['cumulativeTraders']

print(f'trader result size: {df_trader.shape[0]}')
print(f'trader agg result size: {df_trader_agg.shape[0]}')

# combine the two datasets
# Combine the two datasets
df_write = df_agg.merge(df_trader_agg, on='timestamp')
print(f'combined result size: {df_write.shape[0]}')

# make sure the directory exists
outdirs = ['data', 'data/stats']
for outdir in outdirs:
if not os.path.exists(outdir):
os.mkdir(outdir)
# Ensure directory exists
outdir = f'data/stats'
os.makedirs(outdir, exist_ok=True)

# Write out JSON data
filename = 'daily_stats.json' if config_key == 'v2' else f'daily_stats_{config_key}.json'
df_write.to_json(f'{outdir}/{filename}', orient='records')

# write out json data
df_write.to_json(
f'{outdir}/daily_stats.json',
orient='records'
)
# Print results
print(f'Combined result size for {config_key}: {df_write.shape[0]}')

if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main('v3'))
asyncio.run(main('v2'))

0 comments on commit e758e13

Please sign in to comment.