Skip to content
This repository has been archived by the owner on Dec 10, 2024. It is now read-only.

Fix/trying to fix token holder zezo #431

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f22ecdc
fix: wrong query to get token number holder
Sotatek-DucPhung Jun 14, 2024
e30dc33
perf: decrease token-info process batch size and remove unused logic
Sotatek-DucPhung Jun 14, 2024
9928351
fix: don't compare max block checkpoint when update token-info
Sotatek-DucPhung Jun 14, 2024
3f29415
chore: add missing @Param
Sotatek-DucPhung Jun 14, 2024
3a4937a
chore: fix unit test
Sotatek-DucPhung Jun 14, 2024
03a9040
chore: add logging to trace progress
Sotatek-DucPhung Jun 17, 2024
afaac13
perf: trying to make token-info init faster
Sotatek-DucPhung Jun 17, 2024
9450a8a
chore: add conf props to enable/disable agg-analytic schedule
Sotatek-DucPhung Jun 17, 2024
555d362
chore: add conf props token_info batch-size and more logging
Sotatek-DucPhung Jun 17, 2024
43b18a2
chore: more logging
Sotatek-DucPhung Jun 17, 2024
b40a9b2
fix: update logic to compare block checkpoint
Sotatek-DucPhung Jun 18, 2024
5460c78
fix: update logic to update token init checkpoint
Sotatek-DucPhung Jun 19, 2024
d389af1
feat: replace materialized view with table
Sotatek-DucPhung Jun 27, 2024
2bca087
feat: add TokenTxCount schedule
Sotatek-DucPhung Jun 27, 2024
57c2336
feat: add StakeAddressTxCount scheduler
Sotatek-DucPhung Jun 27, 2024
1411472
feat: add AddressTxCount scheduler
Sotatek-DucPhung Jun 27, 2024
7271a77
feat: add LatestTokenBalance scheduler
Sotatek-DucPhung Jun 27, 2024
1f08680
feat: add Redis locking mechanism in AggregateAnalyticSchedule to avo…
Sotatek-DucPhung Jun 27, 2024
4cdc786
chore: apply code formatter
Sotatek-DucPhung Jun 27, 2024
c768bec
chore: use block statements and uppercase keywords for sql query
Sotatek-DucPhung Jun 27, 2024
5e51097
chore: using "real" datatypes instead of "var"
Sotatek-DucPhung Jun 27, 2024
ec0b7b5
fix: missing env in application props file
Sotatek-DucPhung Jun 27, 2024
580bedc
fix: missing underscore in migration file V1_3_30
Sotatek-DucPhung Jun 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ services:
- AGGREGATE_POOL_INFO_FIXED_DELAY=${AGGREGATE_POOL_INFO_FIXED_DELAY}
- TOKEN_INFO_JOB_ENABLED=${TOKEN_INFO_JOB_ENABLED}
- TOKEN_INFO_FIXED_DELAY=${TOKEN_INFO_FIXED_DELAY}
- TOKEN_INFO_BATCH_SIZE=${TOKEN_INFO_BATCH_SIZE}
- SMART_CONTRACT_INFO_FIXED_DELAY=${SMART_CONTRACT_INFO_FIXED_DELAY}

- LEDGER_SYNC_HOST=${LEDGER_SYNC_HOST}
Expand All @@ -77,4 +78,9 @@ services:
- DREP_INFO_FIXED_DELAY=${DREP_INFO_FIXED_DELAY}
- GOVERNANCE_INFO_JOB_ENABLED=${GOVERNANCE_INFO_JOB_ENABLED}
- GOVERNANCE_INFO_FIXED_DELAY=${GOVERNANCE_INFO_FIXED_DELAY}
- AGG_ANALYTIC_FIXED_DELAY=${AGG_ANALYTIC_FIXED_DELAY}
- AGG_ANALYTIC_FIXED_DELAY=${AGG_ANALYTIC_FIXED_DELAY}
- AGG_ANALYTIC_JOB_ENABLED=${AGG_ANALYTIC_JOB_ENABLED}
- AGG_ANALYTIC_NUMBER_OF_CONCURRENT_TASKS=${AGG_ANALYTIC_NUMBER_OF_CONCURRENT_TASKS}

- ADDRESS_TX_COUNT_FIXED_DELAY=${ADDRESS_TX_COUNT_FIXED_DELAY}
- LATEST_TOKEN_BALANCE_FIXED_DELAY=${LATEST_TOKEN_BALANCE_FIXED_DELAY}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<version.lombok-mapstruct-binding>0.2.0</version.lombok-mapstruct-binding>
<org.projectlombok.version>1.18.24</org.projectlombok.version>
<lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
<version.explorer-common>0.9.0-PR217</version.explorer-common>
<version.explorer-common>0.9.0-PR213</version.explorer-common>
<version.spring-cloud-aws>2.4.2</version.spring-cloud-aws>
<version.lombok>1.18.26</version.lombok>
<version.eclipse.jgit>6.5.0.202303070854-r</version.eclipse.jgit>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,10 @@ public enum RedisKey {
SC_TX_CHECKPOINT,
NATIVE_SCRIPT_CHECKPOINT,
AGGREGATED_CACHE,
TOTAL_TOKEN_COUNT
TOTAL_TOKEN_COUNT,
TOKEN_TX_COUNT_CHECKPOINT,
STAKE_ADDRESS_TX_COUNT_CHECKPOINT,
ADDRESS_TX_COUNT_CHECKPOINT,
LATEST_TOKEN_BALANCE_CHECKPOINT,
AGGREGATED_CONCURRENT_TASKS_COUNT
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
@AllArgsConstructor
@NoArgsConstructor
public class TokenNumberHolders {
String unit;
Long ident;
Long numberOfHolders;

public TokenNumberHolders(String unit, Long numberOfHolders) {
this.unit = unit;
this.numberOfHolders = numberOfHolders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
@AllArgsConstructor
@NoArgsConstructor
public class TokenVolume {
Long ident;
String unit;
BigInteger volume;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.cardanofoundation.job.projection;

public interface TokenUnitProjection {

Long getIdent();

String getUnit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class JOOQTokenInfoRepository {

public JOOQTokenInfoRepository(
@Qualifier("explorerDSLContext") DSLContext dsl,
@Value("${spring.jpa.properties.hibernate.default_schema}") String schema) {
@Value("${multi-datasource.datasourceExplorer.flyway.schemas}") String schema) {
this.dsl = dsl;
this.entityUtil = new EntityUtil(schema, TokenInfo.class);
}
Expand All @@ -46,15 +46,25 @@ public void insertAll(List<TokenInfo> tokenInfos) {
field(entityUtil.getColumnField(TokenInfo_.NUMBER_OF_HOLDERS)),
field(entityUtil.getColumnField(TokenInfo_.VOLUME24H)),
field(entityUtil.getColumnField(TokenInfo_.TOTAL_VOLUME)),
field(entityUtil.getColumnField(TokenInfo_.TX_COUNT)),
field(entityUtil.getColumnField(TokenInfo_.UPDATE_TIME)))
.values(
tokenInfo.getBlockNo(),
tokenInfo.getMultiAssetId(),
tokenInfo.getNumberOfHolders(),
tokenInfo.getVolume24h(),
tokenInfo.getTotalVolume(),
tokenInfo.getTxCount(),
tokenInfo.getUpdateTime())
.onConflict(field(entityUtil.getColumnField(TokenInfo_.MULTI_ASSET_ID)))
.doUpdate()
.set(
field(entityUtil.getColumnField(TokenInfo_.NUMBER_OF_HOLDERS)),
tokenInfo.getNumberOfHolders())
.set(field(entityUtil.getColumnField(TokenInfo_.VOLUME24H)), tokenInfo.getVolume24h())
.set(
field(entityUtil.getColumnField(TokenInfo_.TOTAL_VOLUME)),
tokenInfo.getTotalVolume())
.set(
field(entityUtil.getColumnField(TokenInfo_.UPDATE_TIME)),
tokenInfo.getUpdateTime());

queries.add(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,23 @@

import org.cardanofoundation.explorer.common.entity.compositeKey.AddressTxAmountId;
import org.cardanofoundation.explorer.common.entity.ledgersync.AddressTxAmount;
import org.cardanofoundation.job.model.TokenTxCount;
import org.cardanofoundation.explorer.common.entity.ledgersync.TokenTxCount;
import org.cardanofoundation.job.model.TokenVolume;
import org.cardanofoundation.job.projection.StakeTxProjection;
import org.cardanofoundation.job.projection.UniqueAccountTxCountProjection;

public interface AddressTxAmountRepository
extends JpaRepository<AddressTxAmount, AddressTxAmountId> {

@Query(value = "select max(block_number) from cursor_", nativeQuery = true)
Long getMaxBlockNoFromCursor();

@Query(value = "select max(slot) from cursor_", nativeQuery = true)
Long getMaxSlotNoFromCursor();

@Query(value = "SELECT max(a.id) FROM Address a")
Long getMaxAddressId();

@Query(
value =
"""
Expand All @@ -31,105 +40,102 @@ public interface AddressTxAmountRepository
List<UniqueAccountTxCountProjection> findUniqueAccountsInEpoch(@Param("epochNo") Integer epochNo);

@Query(
value =
"SELECT new org.cardanofoundation.job.projection.StakeTxProjection(tx.id, sum(addTxAmount.quantity), addTxAmount.blockTime)"
+ " FROM AddressTxAmount addTxAmount"
+ " JOIN Tx tx on tx.hash = addTxAmount.txHash"
+ " WHERE addTxAmount.stakeAddress = :stakeAddress"
+ " AND addTxAmount.blockTime >= :fromDate AND addTxAmount.blockTime <= :toDate"
+ " GROUP BY addTxAmount.txHash, addTxAmount.blockTime, tx.id")
"""
SELECT new org.cardanofoundation.job.projection.StakeTxProjection(tx.id, sum(addTxAmount.quantity), addTxAmount.blockTime)
FROM AddressTxAmount addTxAmount
JOIN Tx tx on tx.hash = addTxAmount.txHash
WHERE addTxAmount.stakeAddress = :stakeAddress
AND addTxAmount.blockTime >= :fromDate AND addTxAmount.blockTime <= :toDate
GROUP BY addTxAmount.txHash, addTxAmount.blockTime, tx.id
""")
Page<StakeTxProjection> findTxAndAmountByStake(
@Param("stakeAddress") String stakeAddress,
@Param("fromDate") Long fromDate,
@Param("toDate") Long toDate,
Pageable pageable);

@Query(
"SELECT COUNT(DISTINCT addTxAmount.txHash) FROM AddressTxAmount addTxAmount"
+ " WHERE addTxAmount.stakeAddress = :stakeAddress"
+ " AND addTxAmount.blockTime >= :fromDate AND addTxAmount.blockTime <= :toDate")
"""
SELECT COUNT(DISTINCT addTxAmount.txHash)
FROM AddressTxAmount addTxAmount
WHERE addTxAmount.stakeAddress = :stakeAddress
AND addTxAmount.blockTime >= :fromDate AND addTxAmount.blockTime <= :toDate
""")
Long getCountTxByStakeInDateRange(
@Param("stakeAddress") String stakeAddress,
@Param("fromDate") Long fromDate,
@Param("toDate") Long toDate);

@Query(
"""
SELECT DISTINCT (addressTxAmount.unit)
FROM AddressTxAmount addressTxAmount
WHERE addressTxAmount.blockTime >= :fromTime AND addressTxAmount.blockTime <= :toTime
AND addressTxAmount.unit != 'lovelace'
""")
List<String> getTokensInTransactionInTimeRange(
@Param("fromTime") Long fromTime, @Param("toTime") Long toTime);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
SELECT new org.cardanofoundation.job.model.TokenVolume(ata.unit, sum(ata.quantity))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
JOIN Tx tx ON tx.hash = ata.txHash
WHERE ma.id >= :startIdent AND ma.id <= :endIdent
AND tx.id >= :txId
WHERE ata.unit IN :units
AND ata.blockTime >= :blockTime
AND ata.quantity > 0
GROUP BY ma.id
GROUP BY ata.unit
""")
List<TokenVolume> sumBalanceAfterTx(
@Param("startIdent") Long startIdent,
@Param("endIdent") Long endIdent,
@Param("txId") Long txId);
List<TokenVolume> sumBalanceAfterBlockTime(
@Param("units") List<String> units, @Param("blockTime") Long blockTime);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
SELECT new org.cardanofoundation.job.model.TokenVolume(ata.unit, sum(ata.quantity))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
JOIN Tx tx ON tx.hash = ata.txHash
WHERE ma.id IN :multiAssetIds
AND tx.id >= :txId
WHERE ata.unit IN :units
AND ata.quantity > 0
GROUP BY ma.id
GROUP BY ata.unit
""")
List<TokenVolume> sumBalanceAfterTx(
@Param("multiAssetIds") List<Long> multiAssetIds, @Param("txId") Long txId);
List<TokenVolume> getTotalVolumeByUnits(@Param("units") List<String> units);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
SELECT new org.cardanofoundation.explorer.common.entity.ledgersync.TokenTxCount(ata.unit, count(distinct (ata.txHash)))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id >= :startIdent AND ma.id <= :endIdent
AND ata.quantity > 0
GROUP BY ma.id
WHERE ata.unit in :units
GROUP BY ata.unit
""")
List<TokenVolume> getTotalVolumeByIdentInRange(
@Param("startIdent") Long startIdent, @Param("endIdent") Long endIdent);
List<TokenTxCount> getTotalTxCountByUnitIn(@Param("units") List<String> units);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenTxCount(ma.id, count(distinct (ata.txHash)))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id >= :startIdent AND ma.id <= :endIdent
GROUP BY ma.id
SELECT DISTINCT(ata.unit) FROM AddressTxAmount ata
WHERE ata.unit != 'lovelace'
AND ata.blockTime >= :fromTime AND ata.blockTime <= :toTime
""")
List<TokenTxCount> getTotalTxCountByIdentInRange(
@Param("startIdent") Long startIdent, @Param("endIdent") Long endIdent);
List<String> findUnitByBlockTimeInRange(
@Param("fromTime") Long fromTime, @Param("toTime") Long toTime);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id IN :multiAssetIds
AND ata.quantity > 0
GROUP BY ma.id
SELECT DISTINCT(ata.stakeAddress) FROM AddressTxAmount ata
WHERE ata.slot >= :fromTime AND ata.slot <= :toTime
AND ata.stakeAddress IS NOT NULL
""")
List<TokenVolume> getTotalVolumeByIdentIn(@Param("multiAssetIds") List<Long> multiAssetIds);
List<String> findStakeAddressBySlotNoBetween(
@Param("fromTime") Long fromTime, @Param("toTime") Long toTime);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenTxCount(ma.id, count(distinct (ata.txHash)))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id IN :multiAssetIds
GROUP BY ma.id
SELECT DISTINCT(ata.address) FROM AddressTxAmount ata
WHERE ata.blockTime >= :fromTime AND ata.blockTime <= :toTime
""")
List<TokenTxCount> getTotalTxCountByIdentIn(@Param("multiAssetIds") List<Long> multiAssetIds);
List<String> findAddressByBlockTimeBetween(
@Param("fromTime") Long fromTime, @Param("toTime") Long toTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import org.cardanofoundation.explorer.common.entity.ledgersync.Block;
import org.cardanofoundation.job.projection.PoolCountProjection;

public interface BlockRepository extends JpaRepository<Block, Long> {

@Query("select max(b.time) from Block b")
Optional<Timestamp> getMaxTime();

@Query("select b from Block b where b.blockNo = " + "(select max(blockNo) from Block)")
Optional<Block> findLatestBlock();

@Query("select b.time from Block b where b.blockNo = :blockNo")
Timestamp getBlockTimeByBlockNo(@Param("blockNo") Long blockNo);

@Query(
value = "select b.block_no from block b where extract(epoch from b.time) = :time",
nativeQuery = true)
Long getBlockNoByExtractEpochTime(@Param("time") Long time);

@Query(
value =
"SELECT ph.id AS poolId, count(bk.id) AS countValue "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
public interface LatestTokenBalanceRepository
extends JpaRepository<LatestTokenBalance, AddressBalanceId> {

@Query(
value =
"""
select max(ltb.block_time) from latest_token_balance ltb
where ltb.block_time != (select max(ltb2.block_time) from latest_token_balance ltb2)
""",
nativeQuery = true)
Long getTheSecondLastBlockTime();

@Query(
"""
SELECT multiAsset.policy as scriptHash, COALESCE(COUNT(latestTokenBalance), 0) as numberOfHolders
Expand All @@ -32,27 +41,15 @@ List<ScriptNumberHolderProjection> countHolderByPolicyIn(

@Query(
"""
SELECT new org.cardanofoundation.job.model.TokenNumberHolders(multiAsset.id, COUNT(latestTokenBalance))
FROM MultiAsset multiAsset
LEFT JOIN LatestTokenBalance latestTokenBalance ON multiAsset.unit = latestTokenBalance.unit
WHERE multiAsset.id IN :multiAssetIds
AND latestTokenBalance.quantity > 0
GROUP BY multiAsset.id
""")
List<TokenNumberHolders> countHoldersByMultiAssetIdIn(
@Param("multiAssetIds") List<Long> multiAssetIds);

@Query(
"""
SELECT new org.cardanofoundation.job.model.TokenNumberHolders(multiAsset.id, COUNT(latestTokenBalance))
FROM MultiAsset multiAsset
LEFT JOIN LatestTokenBalance latestTokenBalance ON multiAsset.unit = latestTokenBalance.unit
WHERE multiAsset.id >= :startIdent AND multiAsset.id <= :endIdent
SELECT new org.cardanofoundation.job.model.TokenNumberHolders
(latestTokenBalance.unit, COUNT(DISTINCT(CASE WHEN latestTokenBalance.stakeAddress IS NULL
THEN latestTokenBalance.address ELSE latestTokenBalance.stakeAddress END)))
FROM LatestTokenBalance latestTokenBalance
WHERE latestTokenBalance.unit in :units
AND latestTokenBalance.quantity > 0
GROUP BY multiAsset.id
GROUP BY latestTokenBalance.unit
""")
List<TokenNumberHolders> countHoldersByMultiAssetIdInRange(
@Param("startIdent") Long startIdent, @Param("endIdent") Long endIdent);
List<TokenNumberHolders> countHoldersByMultiAssetIdInRange(@Param("units") List<String> units);

@Modifying(clearAutomatically = true)
@Transactional
Expand Down
Loading
Loading