In this guide, we’re going to walk you through all the steps involved with creating a basic events processor in Rust to track events on the Aptos blockchain. At the end of this guide, you’ll be able to run the events processor and customize the processor for your indexing needs.
To get started, clone the aptos-indexer-processors-example repo.
# HTTPS
https://github.com/aptos-labs/aptos-indexer-processor-example.git
# SSH
[email protected]:aptos-labs/aptos-indexer-processor-example.git
Processors consume transactions from the Transaction Stream Service. In order to use the Labs-Hosted Transaction Stream
Service you need an authorization token.
Follow this guide
to guide to get a token from the Developer Portal. Create an API Key for Testnet
, as this tutorial is for Testnet
.
Once you’re done, you should have a token that looks like this:
aptoslabs_yj4bocpaKy_Q6RBP4cdBmjA8T51hto1GcVX5ZS9S65dx
You also need the following tools:
- Rust 1.79: Installation Guide
- Cargo: Installation Guide
- Diesel CLI: Installation Guide
We use PostgreSQL as our database in this tutorial. You’re free to use whatever you want, but this tutorial is geared towards PostgreSQL for the sake of simplicity. We use the following database configuration and tools:
- We will use a database hosted on
localhost
on the port5432
, which should be the default. - When you create your username, keep track of it and the password you use for it.
- You can view a tutorial for installing PostgreSQL and psql here tool to set up your database more quickly.
- To easily view your database data, consider using a GUI like DBeaver recommended, pgAdmin, or Postico.
Explaining how to create a database is beyond the scope of this tutorial. If you are not sure how to do it, consider
checking out tutorials on how to create a database with the psql
tool.
Make sure to start the postgresql
service:
The command for Linux/WSL might be something like:
sudo service postgresql start
For mac, if you’re using brew, start it up with:
brew services start postgresql
Now let’s set up the configuration details for the actual indexer processor we’re going to use.
In the example repo, there is a sample config.yaml file that should look something like this:
health_check_port: 8085
server_config:
processor_config:
type: "events_processor"
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
starting_version: 0
# request_ending_version: 10000
auth_token: "AUTH_TOKEN"
request_name_header: "events-processor"
db_config:
postgres_connection_string: postgresql://postgres:@localhost:5432/example
Open the config.yaml
file and update these fields:
auth_token
- the auth token you got from the Developer Portalpostgres_connection_string
- connection string to your PostgreSQL database
You can customize additional configuration with the config.yaml
file.
To start at a specific ledger version, you can specify the version in the config.yaml
file with:
starting_version: <Starting Version>
To stop processing at a specific ledger version, you can specify the ending version with:
request_ending_version: <Ending Version>
If you want to use a different network, change the indexer_grpc_data_service_address
field to the corresponding
desired value:
# Devnet
indexer_grpc_data_service_address: grpc.devnet.aptoslabs.com:443
# Testnet
indexer_grpc_data_service_address: grpc.testnet.aptoslabs.com:443
# Mainnet
indexer_grpc_data_service_address: grpc.mainnet.aptoslabs.com:443
At a high level, each processor is responsible for receiving a stream of transactions, parsing and transforming the relevant data, and storing the data into a database.
In src/db/postgres/schema.rs
, you will see events table which has the following schema:
diesel::table! {
events (transaction_version, event_index) {
sequence_number -> Int8,
creation_number -> Int8,
#[max_length = 66]
account_address -> Varchar,
transaction_version -> Int8,
transaction_block_height -> Int8,
#[sql_name = "type"]
type_ -> Text,
data -> Jsonb,
inserted_at -> Timestamp,
event_index -> Int8,
#[max_length = 300]
indexed_type -> Varchar,
}
}
The events schema represents the data that this processor is indexing. This schema.rs
file is an
autogenerated from the database migrations. In the next section, we’ll go over how these migrations are run.
There are two other important tables:
ledger_infos
which tracks the chain id of the ledger being indexedprocessor_status
which tracks thelast_success_version
of the processor
The file src/processors/events/events_processor.rs
contains the code which defines the events processor. Inside of
run_processor
there are a few key components:
- First, we setup the processor:
run_migrations
automatically runs the database migrations defined insrc/db/postgres/migrations
- We merge the starting version in
config.yaml
and theprocessor_status.last_success_version
in the database to get the final starting version for the processor. This allows us to restart the processor from a previously processed version. - We check the
ledger_infos.chain_id
to make sure the processor is indexing the correct chain
- Next, we instantiate the processor steps. Here we explain the purpose of each step:
TransactionStreamStep
provides a stream of transactions to the processorEventsExtractor
extracts events data from each transactionEventsStorer
inserts the extracted events into theevents
tableLatestVersionTracker
keeps track of the latest processed version and updates theprocessor_status
table
- Lastly, we connect the processor steps together.
ProcessorBuilder::new_with_inputless_first_step
takes in the first step of the processor. In most cases, the first step is aTransactionStreamStep
.- The rest of the steps are connected with
connect_to
.connect_to
creates a channel between the steps so the output of one step becomes the input of the next step. - And then we end the builder with
end_and_return_output_receiver
.
With the config.yaml
you created earlier, you’re ready to run the events processor:
cd aptos-indexer-processor-example
cargo run --release -- -c config.yaml
You should see the processor start to index Aptos blockchain events!
{"timestamp":"2024-08-15T01:06:35.169217Z","level":"INFO","message":"[Transaction Stream] Received transactions from GRPC.","stream_address":"https://grpc.testnet.aptoslabs.com/","connection_id":"5575cb8c-61fb-498f-aaae-868d1e8773ac","start_version":0,"end_version":4999,"start_txn_timestamp_iso":"1970-01-01T00:00:00.000000000Z","end_txn_timestamp_iso":"2022-09-09T01:49:02.023089000Z","num_of_transactions":5000,"size_in_bytes":5708539,"duration_in_secs":0.310734,"tps":16078,"bytes_per_sec":18371143.80788713,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e1e1bdd/rust/transaction-stream/src/transaction_stream.rs","line_number":400,"threadName":"tokio-runtime-worker","threadId":"ThreadId(6)"}
{"timestamp":"2024-08-15T01:06:35.257756Z","level":"INFO","message":"Events version [0, 4999] stored successfully","filename":"src/processors/events/events_storer.rs","line_number":75,"threadName":"tokio-runtime-worker","threadId":"ThreadId(10)"}
{"timestamp":"2024-08-15T01:06:35.257801Z","level":"INFO","message":"Finished processing events from versions [0, 4999]","filename":"src/processors/events/events_processor.rs","line_number":90,"threadName":"tokio-runtime-worker","threadId":"ThreadId(17)"}
In most cases, you want to index events from your own contracts. The example processor offers a good starting point to creating your own custom processor.
To customize the processor to index events from your custom contract, you can make change in these places:
-
EventsExtractor
- In
process()
, you can filter by specific event types and extract specific event data from your custom contract
- In
-
EventsStorer
- If you need to change the database model, you can generate a new database migration by going to
src/db/postgres
and running
diesel migration generate {migration_name}
- Add your migration changes to
up.sql
anddown.sql
, then run
diesel migration run --database-url={YOUR_DATABASE_URL}
to update
schema.rs
.- And then update the
EventsStorer.process()
to handle storing the events data to the updated database model
- If you need to change the database model, you can generate a new database migration by going to
To upgrade the Indexer SDK, you need to update the SDK dependency in Cargo.toml
:
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" }
In the future, we plan to implement the following release strategy for the Indexer SDK:
Warning
We are NOT using semantic versioning, but borrowing its numeric release pattern.
Given a version number MAJOR.MINOR.PATCH
(X.Y.Z
), increment the:
- MAJOR version when we make incompatible SDK changes (e.g. remove deprecated features, making changes to ProcessorBuilder)
- MINOR version matches Aptos node version (e.g. proto or bcs upgrades)
- Since the SDK will re-export the protos, we’ll need to bump the minor version every time there’s a proto upgrade
- PATCH version when you change functionality in a backward compatible manner (e.g. bug fixes, security fixes, grpc upgrades, new features / steps, improvements)