Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use coreutils/wallet.Event #149

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 212 additions & 176 deletions explorer/events.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23.1
toolchain go1.23.2

require (
github.com/google/go-cmp v0.6.0
github.com/ip2location/ip2location-go v8.3.0+incompatible
github.com/mattn/go-sqlite3 v1.14.24
go.sia.tech/core v0.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/ip2location/ip2location-go v8.3.0+incompatible h1:QwUE+FlSbo6bjOWZpv2Grb57vJhWYFNPyBj2KCvfWaM=
github.com/ip2location/ip2location-go v8.3.0+incompatible/go.mod h1:3JUY1TBjTx1GdA7oRT7Zeqfc0bg3lMMuU5lXmzdpuME=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
Expand Down
7 changes: 5 additions & 2 deletions internal/testutil/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.sia.tech/core/consensus"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/explored/explorer"
Expand All @@ -13,8 +16,8 @@ import (
func Equal[T any](t *testing.T, desc string, expect, got T) {
t.Helper()

if !reflect.DeepEqual(expect, got) {
t.Fatalf("expected %v %s, got %v", expect, desc, got)
if !cmp.Equal(expect, got, cmpopts.EquateEmpty(), cmpopts.IgnoreUnexported(consensus.Work{}), cmpopts.IgnoreFields(types.StateElement{}, "MerkleProof")) {
t.Fatalf("%s expected != got, diff: %s", desc, cmp.Diff(expect, got))
}
}

Expand Down
134 changes: 40 additions & 94 deletions persist/sqlite/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,74 +10,50 @@ import (
"go.sia.tech/explored/explorer"
)

func scanEvent(tx *txn, s scanner) (ev explorer.Event, eventID int64, err error) {
var eventType string

err = s.Scan(&eventID, decode(&ev.ID), &ev.MaturityHeight, decode(&ev.Timestamp), &ev.Index.Height, decode(&ev.Index.ID), &eventType)
if err != nil {
return
}
// AddressEvents returns the events of a single address.
func (s *Store) AddressEvents(address types.Address, offset, limit uint64) (events []explorer.Event, err error) {
err = s.transaction(func(tx *txn) error {
const query = `
WITH last_chain_index (height) AS (
SELECT MAX(height) FROM blocks
)
SELECT
ev.id,
ev.event_id,
ev.maturity_height,
ev.date_created,
b.height,
b.id,
CASE
WHEN last_chain_index.height < b.height THEN 0
ELSE last_chain_index.height - b.height
END AS confirmations,
ev.event_type
FROM events ev INDEXED BY events_maturity_height_id_idx -- force the index to prevent temp-btree sorts
INNER JOIN event_addresses ea ON (ev.id = ea.event_id)
INNER JOIN address_balance sa ON (ea.address_id = sa.id)
INNER JOIN blocks b ON (ev.block_id = b.id)
CROSS JOIN last_chain_index
WHERE sa.address = $1
ORDER BY ev.maturity_height DESC, ev.id DESC
LIMIT $2 OFFSET $3`

switch eventType {
case explorer.EventTypeTransaction:
var txnID int64
var eventTx explorer.EventTransaction
err = tx.QueryRow(`SELECT transaction_id, fee FROM transaction_events WHERE event_id = ?`, eventID).Scan(&txnID, decode(&eventTx.Fee))
if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch transaction ID: %w", err)
}
txns, err := getTransactions(tx, map[int64]transactionID{0: {dbID: txnID, id: types.TransactionID(ev.ID)}})
if err != nil || len(txns) == 0 {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch transaction: %w", err)
}
eventTx.Transaction = txns[0]
eventTx.HostAnnouncements = eventTx.Transaction.HostAnnouncements
ev.Data = &eventTx
case explorer.EventTypeV2Transaction:
var txnID int64
err = tx.QueryRow(`SELECT transaction_id FROM v2_transaction_events WHERE event_id = ?`, eventID).Scan(&txnID)
if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch v2 transaction ID: %w", err)
}
txns, err := getV2Transactions(tx, []types.TransactionID{types.TransactionID(ev.ID)})
if err != nil || len(txns) == 0 {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch v2 transaction: %w", err)
}
eventTx := explorer.EventV2Transaction(txns[0])
ev.Data = &eventTx
case explorer.EventTypeContractPayout:
var m explorer.EventContractPayout
err = tx.QueryRow(`SELECT sce.output_id, sce.leaf_index, sce.maturity_height, sce.address, sce.value, fce.contract_id, fce.leaf_index, fce.filesize, fce.file_merkle_root, fce.window_start, fce.window_end, fce.payout, fce.unlock_hash, fce.revision_number, ev.missed
FROM contract_payout_events ev
JOIN siacoin_elements sce ON ev.output_id = sce.id
JOIN file_contract_elements fce ON ev.contract_id = fce.id
WHERE ev.event_id = ?`, eventID).Scan(decode(&m.SiacoinOutput.ID), decode(&m.SiacoinOutput.StateElement.LeafIndex), &m.SiacoinOutput.MaturityHeight, decode(&m.SiacoinOutput.SiacoinOutput.Address), decode(&m.SiacoinOutput.SiacoinOutput.Value), decode(&m.FileContract.ID), decode(&m.FileContract.StateElement.LeafIndex), decode(&m.FileContract.FileContract.Filesize), decode(&m.FileContract.FileContract.FileMerkleRoot), decode(&m.FileContract.FileContract.WindowStart), decode(&m.FileContract.FileContract.WindowEnd), decode(&m.FileContract.FileContract.Payout), decode(&m.FileContract.FileContract.UnlockHash), decode(&m.FileContract.FileContract.RevisionNumber), &m.Missed)
ev.Data = &m
case explorer.EventTypeMinerPayout:
var m explorer.EventMinerPayout
err = tx.QueryRow(`SELECT sc.output_id, sc.leaf_index, sc.maturity_height, sc.address, sc.value
FROM siacoin_elements sc
INNER JOIN miner_payout_events ev ON ev.output_id = sc.id
WHERE ev.event_id = ?`, eventID).Scan(decode(&m.SiacoinOutput.ID), decode(&m.SiacoinOutput.StateElement.LeafIndex), decode(&m.SiacoinOutput.MaturityHeight), decode(&m.SiacoinOutput.SiacoinOutput.Address), decode(&m.SiacoinOutput.SiacoinOutput.Value))
rows, err := tx.Query(query, encode(address), limit, offset)
if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch miner payout event data: %w", err)
return err
}
ev.Data = &m
case explorer.EventTypeFoundationSubsidy:
var m explorer.EventFoundationSubsidy
err = tx.QueryRow(`SELECT sc.output_id, sc.leaf_index, sc.maturity_height, sc.address, sc.value
FROM siacoin_elements sc
INNER JOIN foundation_subsidy_events ev ON ev.output_id = sc.id
WHERE ev.event_id = ?`, eventID).Scan(decode(&m.SiacoinOutput.ID), decode(&m.SiacoinOutput.StateElement.LeafIndex), decode(&m.SiacoinOutput.MaturityHeight), decode(&m.SiacoinOutput.SiacoinOutput.Address), decode(&m.SiacoinOutput.SiacoinOutput.Value))
ev.Data = &m
default:
return explorer.Event{}, 0, fmt.Errorf("unknown event type: %s", eventType)
}

if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch transaction event data: %w", err)
}
defer rows.Close()

for rows.Next() {
event, _, err := scanEvent(tx, rows)
if err != nil {
return fmt.Errorf("failed to scan event: %w", err)
}
event.Relevant = []types.Address{address}
events = append(events, event)
}
return rows.Err()
})
return
}

Expand Down Expand Up @@ -186,36 +162,6 @@ func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, off
return
}

// AddressEvents returns the events of a single address.
func (s *Store) AddressEvents(address types.Address, offset, limit uint64) (events []explorer.Event, err error) {
err = s.transaction(func(tx *txn) error {
const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ev.height, ev.block_id, ev.event_type
FROM events ev
INNER JOIN event_addresses ea ON ev.id = ea.event_id
INNER JOIN address_balance sa ON ea.address_id = sa.id
WHERE sa.address = $1
ORDER BY ev.maturity_height DESC, ev.id DESC
LIMIT $2 OFFSET $3`

rows, err := tx.Query(query, encode(address), limit, offset)
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
event, _, err := scanEvent(tx, rows)
if err != nil {
return fmt.Errorf("failed to scan event: %w", err)
}

events = append(events, event)
}
return rows.Err()
})
return
}

func scanSiacoinOutput(s scanner) (sco explorer.SiacoinOutput, err error) {
var spentIndex types.ChainIndex
err = s.Scan(decode(&sco.ID), decode(&sco.StateElement.LeafIndex), &sco.Source, decodeNull(&spentIndex), &sco.MaturityHeight, decode(&sco.SiacoinOutput.Address), decode(&sco.SiacoinOutput.Value))
Expand Down
102 changes: 49 additions & 53 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package sqlite

import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"reflect"
"time"

"go.sia.tech/core/types"
Expand Down Expand Up @@ -620,18 +620,18 @@ func addSiafundElements(tx *txn, index types.ChainIndex, spentElements, newEleme
return sfDBIds, nil
}

func addEvents(tx *txn, scDBIds map[types.SiacoinOutputID]int64, fcDBIds map[explorer.DBFileContract]int64, txnDBIds map[types.TransactionID]txnDBId, v2TxnDBIds map[types.TransactionID]txnDBId, events []explorer.Event) error {
func addEvents(tx *txn, bid types.BlockID, scDBIds map[types.SiacoinOutputID]int64, sfDBIds map[types.SiafundOutputID]int64, fcDBIds map[explorer.DBFileContract]int64, v2FcDBIds map[explorer.DBFileContract]int64, txnDBIds map[types.TransactionID]txnDBId, v2TxnDBIds map[types.TransactionID]txnDBId, events []explorer.Event) error {
if len(events) == 0 {
return nil
}

insertEventStmt, err := tx.Prepare(`INSERT INTO events (event_id, maturity_height, date_created, event_type, block_id, height) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (event_id) DO NOTHING RETURNING id`)
insertEventStmt, err := tx.Prepare(`INSERT INTO events (event_id, maturity_height, date_created, event_type, block_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (event_id) DO NOTHING RETURNING id`)
if err != nil {
return fmt.Errorf("failed to prepare event statement: %w", err)
}
defer insertEventStmt.Close()

addrStmt, err := tx.Prepare(`INSERT INTO address_balance (address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $3, 0) ON CONFLICT (address) DO UPDATE SET address=EXCLUDED.address RETURNING id`)
addrStmt, err := tx.Prepare(`INSERT INTO address_balance (address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $2, 0) ON CONFLICT (address) DO UPDATE SET address=EXCLUDED.address RETURNING id`)
if err != nil {
return fmt.Errorf("failed to prepare address statement: %w", err)
}
Expand All @@ -643,84 +643,53 @@ func addEvents(tx *txn, scDBIds map[types.SiacoinOutputID]int64, fcDBIds map[exp
}
defer relevantAddrStmt.Close()

transactionEventStmt, err := tx.Prepare(`INSERT INTO transaction_events (event_id, transaction_id, fee) VALUES (?, ?, ?)`)
v1TransactionEventStmt, err := tx.Prepare(`INSERT INTO v1_transaction_events (event_id, transaction_id) VALUES (?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare transaction event statement: %w", err)
return fmt.Errorf("failed to prepare v1 transaction event statement: %w", err)
}
defer transactionEventStmt.Close()
defer v1TransactionEventStmt.Close()

v2TransactionEventStmt, err := tx.Prepare(`INSERT INTO v2_transaction_events (event_id, transaction_id) VALUES (?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare v2 transaction event statement: %w", err)
}
defer v2TransactionEventStmt.Close()

minerPayoutEventStmt, err := tx.Prepare(`INSERT INTO miner_payout_events (event_id, output_id) VALUES (?, ?)`)
payoutEventStmt, err := tx.Prepare(`INSERT INTO payout_events (event_id, output_id) VALUES (?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare miner payout event statement: %w", err)
return fmt.Errorf("failed to prepare minerpayout event statement: %w", err)
}
defer minerPayoutEventStmt.Close()
defer payoutEventStmt.Close()

contractPayoutEventStmt, err := tx.Prepare(`INSERT INTO contract_payout_events (event_id, output_id, contract_id, missed) VALUES (?, ?, ?, ?)`)
v1ContractResolutionEventStmt, err := tx.Prepare(`INSERT INTO v1_contract_resolution_events (event_id, output_id, parent_id, missed) VALUES (?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare contract payout event statement: %w", err)
return fmt.Errorf("failed to prepare v1 contract resolution event statement: %w", err)
}
defer contractPayoutEventStmt.Close()
defer v1ContractResolutionEventStmt.Close()

foundationSubsidyEventStmt, err := tx.Prepare(`INSERT INTO foundation_subsidy_events (event_id, output_id) VALUES (?, ?)`)
chris124567 marked this conversation as resolved.
Show resolved Hide resolved
v2ContractResolutionEventStmt, err := tx.Prepare(`INSERT INTO v2_contract_resolution_events (event_id, output_id, parent_id, missed) VALUES (?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare foundation subsidy event statement: %w", err)
return fmt.Errorf("failed to prepare v2 contract resolution event statement: %w", err)
}
defer foundationSubsidyEventStmt.Close()
defer v2ContractResolutionEventStmt.Close()

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for _, event := range events {
buf.Reset()
if err := enc.Encode(event.Data); err != nil {
return fmt.Errorf("failed to encode event: %w", err)
}

var eventID int64
err = insertEventStmt.QueryRow(encode(event.ID), event.MaturityHeight, encode(event.Timestamp), event.Data.EventType(), encode(event.Index.ID), event.Index.Height).Scan(&eventID)
err = insertEventStmt.QueryRow(encode(event.ID), event.MaturityHeight, encode(event.Timestamp), event.Type, encode(bid)).Scan(&eventID)
if errors.Is(err, sql.ErrNoRows) {
continue // skip if the event already exists
} else if err != nil {
return fmt.Errorf("failed to add event: %w", err)
}

switch v := event.Data.(type) {
case *explorer.EventTransaction:
dbID := txnDBIds[types.TransactionID(event.ID)].id
if _, err = transactionEventStmt.Exec(eventID, dbID, encode(v.Fee)); err != nil {
return fmt.Errorf("failed to insert transaction event: %w", err)
}
case *explorer.EventV2Transaction:
dbID := v2TxnDBIds[types.TransactionID(event.ID)].id
if _, err = v2TransactionEventStmt.Exec(eventID, dbID); err != nil {
return fmt.Errorf("failed to insert transaction event: %w", err)
}
case *explorer.EventMinerPayout:
_, err = minerPayoutEventStmt.Exec(eventID, scDBIds[types.SiacoinOutputID(event.ID)])
case *explorer.EventContractPayout:
_, err = contractPayoutEventStmt.Exec(eventID, scDBIds[v.SiacoinOutput.ID], fcDBIds[explorer.DBFileContract{ID: v.FileContract.ID, RevisionNumber: v.FileContract.FileContract.RevisionNumber}], v.Missed)
case *explorer.EventFoundationSubsidy:
_, err = foundationSubsidyEventStmt.Exec(eventID, scDBIds[types.SiacoinOutputID(event.ID)])
default:
return errors.New("unknown event type")
}
if err != nil {
return fmt.Errorf("failed to insert %s event: %w", event.Data.EventType(), err)
}

used := make(map[types.Address]bool)
for _, addr := range event.Addresses {
for _, addr := range event.Relevant {
if used[addr] {
continue
}

var addressID int64
err = addrStmt.QueryRow(encode(addr), encode(types.ZeroCurrency), encode(types.ZeroCurrency)).Scan(&addressID)
err = addrStmt.QueryRow(encode(addr), encode(types.ZeroCurrency)).Scan(&addressID)
if err != nil {
return fmt.Errorf("failed to get address: %w", err)
}
Expand All @@ -732,6 +701,33 @@ func addEvents(tx *txn, scDBIds map[types.SiacoinOutputID]int64, fcDBIds map[exp

used[addr] = true
}

switch v := event.Data.(type) {
case explorer.EventV1Transaction:
dbID := txnDBIds[types.TransactionID(event.ID)].id
if _, err = v1TransactionEventStmt.Exec(eventID, dbID); err != nil {
return fmt.Errorf("failed to insert transaction event: %w", err)
}
case explorer.EventV2Transaction:
dbID := v2TxnDBIds[types.TransactionID(event.ID)].id
if _, err = v2TransactionEventStmt.Exec(eventID, dbID); err != nil {
return fmt.Errorf("failed to insert transaction event: %w", err)
}
case explorer.EventPayout:
_, err = payoutEventStmt.Exec(eventID, scDBIds[types.SiacoinOutputID(event.ID)])
case explorer.EventV1ContractResolution:
ddd := explorer.DBFileContract{ID: v.Parent.ID, RevisionNumber: v.Parent.RevisionNumber}
log.Printf("scDBIDs[%+v] = %v, fcDBIds[%+v] = %d", v.SiacoinElement.ID, scDBIds[v.SiacoinElement.ID], ddd, fcDBIds[ddd])

_, err = v1ContractResolutionEventStmt.Exec(eventID, scDBIds[v.SiacoinElement.ID], fcDBIds[explorer.DBFileContract{ID: v.Parent.ID, RevisionNumber: v.Parent.RevisionNumber}], v.Missed)
case explorer.EventV2ContractResolution:
_, err = v2ContractResolutionEventStmt.Exec(eventID, scDBIds[v.SiacoinElement.ID], v2FcDBIds[explorer.DBFileContract{ID: v.Resolution.Parent.ID, RevisionNumber: v.Resolution.Parent.V2FileContract.RevisionNumber}], v.Missed)
default:
return fmt.Errorf("unknown event type: %T", reflect.TypeOf(event.Data))
}
if err != nil {
return fmt.Errorf("failed to insert %v event: %w", reflect.TypeOf(event.Data), err)
}
}
return nil
}
Expand Down Expand Up @@ -1062,12 +1058,12 @@ func (ut *updateTx) ApplyIndex(state explorer.UpdateState) error {
return fmt.Errorf("ApplyIndex: failed to update metrics: %w", err)
} else if err := addHostAnnouncements(ut.tx, state.Block.Timestamp, state.HostAnnouncements, state.V2HostAnnouncements); err != nil {
return fmt.Errorf("ApplyIndex: failed to add host announcements: %w", err)
} else if err := addEvents(ut.tx, scDBIds, fcDBIds, txnDBIds, v2TxnDBIds, state.Events); err != nil {
return fmt.Errorf("ApplyIndex: failed to add events: %w", err)
} else if err := updateFileContractIndices(ut.tx, false, state.Metrics.Index, state.FileContractElements); err != nil {
return fmt.Errorf("ApplyIndex: failed to update file contract element indices: %w", err)
} else if err := updateV2FileContractIndices(ut.tx, false, state.Metrics.Index, state.V2FileContractElements); err != nil {
return fmt.Errorf("ApplyIndex: failed to update v2 file contract element indices: %w", err)
} else if err := addEvents(ut.tx, state.Block.ID(), scDBIds, sfDBIds, fcDBIds, v2FcDBIds, txnDBIds, v2TxnDBIds, state.Events); err != nil {
return fmt.Errorf("ApplyIndex: failed to add events: %w", err)
}

return nil
Expand Down
Loading
Loading