Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

272 SIM - BE - Fix - json load for validator config #273

Merged
merged 5 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ on:
types:
- opened
pull_request_review:
branches:
- staging
types: [approved]
types:
- submitted

jobs:
test:
if: ${{ github.event.review.state == 'APPROVED' || github.event_name == 'pull_request' }}
runs-on: ubuntu-latest

env:
Expand Down Expand Up @@ -69,7 +69,7 @@ jobs:
pip install -r requirements.test.txt

- name: Run tests
run: pytest tests/e2e/
run: pytest tests/integration/

- name: Shutdown Docker Compose
if: always()
Expand Down
20 changes: 20 additions & 0 deletions .github/workflows/frontend-unit-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: "Frontend Unit Tests"

on:
workflow_call:

jobs:
frontend:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Use Node.js
uses: actions/setup-node@v4
with:
node-version: "21.x"

- run: npm ci
working-directory: frontend
- run: npm run test:unit
working-directory: frontend
25 changes: 25 additions & 0 deletions .github/workflows/unit-tests-pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: "Unit Tests"

on:
pull_request:
types:
- opened
- ready_for_review
- reopened
pull_request_review:
types:
- submitted
- edited

concurrency:
group: unit-tests-${{ github.event.number }}
cancel-in-progress: true

permissions:
contents: read

jobs:
test:
name: Unit Tests
if: ${{ github.event.review.state == 'APPROVED' || github.event_name == 'pull_request' }}
uses: ./.github/workflows/frontend-unit-tests.yml
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ coverage.xml
*.py,cover
.hypothesis/
.pytest_cache/
.nyc_output/

# Translations
*.mo
Expand Down
File renamed without changes.
File renamed without changes.
189 changes: 189 additions & 0 deletions backend/consensus/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# backend/consensus/base.py

DEPLOY_CONTRACTS_QUEUE_KEY = "deploy_contracts"
DEFAULT_VALIDATORS_COUNT = 5

import traceback
import asyncio
from backend.node.base import Node
from backend.database_handler.db_client import DBClient
from backend.database_handler.types import ConsensusData
from backend.consensus.vrf import get_validators_for_transaction
from backend.database_handler.chain_snapshot import ChainSnapshot
from backend.database_handler.contract_snapshot import ContractSnapshot
from backend.database_handler.transactions_processor import (
TransactionsProcessor,
TransactionStatus,
)


class ConsensusAlgorithm:
def __init__(
self, dbclient: DBClient, transactions_processor: TransactionsProcessor
):
self.dbclient = dbclient
self.transactions_processor = transactions_processor
self.queues = {}

def run_crawl_snapshot_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._crawl_snapshot())
loop.close()

async def _crawl_snapshot(self):
while True:
chain_snapshot = ChainSnapshot(self.dbclient)
pending_transactions = chain_snapshot.get_pending_transactions()
if len(pending_transactions) > 0:
for transaction in pending_transactions:
contract_address = (
transaction["to_address"]
if transaction["to_address"] is not None
else DEPLOY_CONTRACTS_QUEUE_KEY
)
if contract_address not in self.queues:
self.queues[contract_address] = asyncio.Queue()
await self.queues[contract_address].put(transaction)
await asyncio.sleep(10)

def run_consensus_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._run_consensus())
loop.close()

async def _run_consensus(self):
asyncio.set_event_loop(asyncio.new_event_loop())
# watch out! as ollama uses GPU resources and webrequest aka selenium uses RAM
while True:
if self.queues:
chain_snapshot = ChainSnapshot(self.dbclient)
tasks = []

for key in self.queues.keys():
if not self.queues[key].empty():
transaction = await self.queues[key].get()
tasks.append(self.exec_transaction(transaction, chain_snapshot))

if len(tasks) > 0:
try:
await asyncio.gather(*tasks)
except Exception as e:
print("Error running consensus", e)
print(traceback.format_exc())
await asyncio.sleep(10)

async def exec_transaction(
self,
transaction: dict,
snapshot: ChainSnapshot,
) -> dict:
print(" ~ ~ ~ ~ ~ EXECUTING TRANSACTION: ", transaction)
# Update transaction status
self.transactions_processor.update_transaction_status(
transaction["id"], TransactionStatus.PROPOSING.value
)
# Select Leader and validators
all_validators = snapshot.get_all_validators()
leader, remaining_validators = get_validators_for_transaction(
all_validators, DEFAULT_VALIDATORS_COUNT
)
num_validators = len(remaining_validators) + 1

contract_address = transaction.get("to_address", None)
contract_snapshot = ContractSnapshot(contract_address, self.dbclient)

# Create Leader
leader_node = Node(
contract_snapshot=contract_snapshot,
address=leader["address"],
validator_mode="leader",
stake=leader["stake"],
provider=leader["provider"],
model=leader["model"],
config=leader["config"],
)

# Leader executes transaction
leader_receipt = await leader_node.exec_transaction(transaction)
votes = {leader["address"]: leader_receipt["vote"]}
# Update transaction status
self.transactions_processor.update_transaction_status(
transaction["id"], TransactionStatus.COMMITTING.value
)

# Create Validators
validator_nodes = [
Node(
contract_snapshot=contract_snapshot,
address=validator["address"],
validator_mode="validator",
stake=validator["stake"],
provider=validator["provider"],
model=validator["model"],
config=validator["config"],
leader_receipt=leader_receipt,
)
for i, validator in enumerate(remaining_validators)
]

# Validators execute transaction
validators_results = []
validation_tasks = [
(
validator.exec_transaction(transaction)
) # watch out! as ollama uses GPU resources and webrequest aka selenium uses RAM
for validator in validator_nodes
]
validation_results = await asyncio.gather(*validation_tasks)

for i in range(len(validation_results)):
votes[f"{validator_nodes[i].address}"] = validation_results[i]["vote"]
self.transactions_processor.update_transaction_status(
transaction["id"], TransactionStatus.REVEALING.value
)

if (
len([vote for vote in votes.values() if vote == "agree"])
< num_validators // 2
):
raise Exception("Consensus not reached")

self.transactions_processor.update_transaction_status(
transaction["id"], TransactionStatus.ACCEPTED.value
)

final = False
consensus_data = ConsensusData(
final=final,
votes=votes,
leader=leader_receipt,
validators=validators_results,
).model_dump_json()

execution_output = {}
execution_output["leader_data"] = leader_receipt
execution_output["consensus_data"] = consensus_data

# Register contract if it is a new contract
if transaction["type"] == 1:
new_contract = {
"id": transaction["data"]["contract_address"],
"data": {
"state": leader_receipt["result"]["contract_state"],
"code": transaction["data"]["contract_code"],
},
}
contract_snapshot.register_contract(new_contract)

# Update contract state if it is an existing contract
else:
contract_snapshot.update_contract_state(
leader_receipt["result"]["contract_state"]
)

# Finalize transaction
self.transactions_processor.set_transaction_result(
transaction["id"], execution_output
)
27 changes: 27 additions & 0 deletions backend/consensus/vrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import random


def select_random_validators(all_validators: list, num_validators: int) -> list:
weights = []
for i in all_validators:
weights.append(float(i["stake"]))

weighted_indices = random.choices(
range(len(all_validators)), weights=weights, k=num_validators * 10
)
unique_indices = set()
random.shuffle(weighted_indices)

for idx in weighted_indices:
unique_indices.add(idx)
if len(unique_indices) == num_validators:
break

return [all_validators[i] for i in unique_indices]


def get_validators_for_transaction(all_validators: list, num_validators: int) -> tuple:
selected_validators = select_random_validators(all_validators, num_validators)
leader = selected_validators[0]
remaining_validators = selected_validators[1 : num_validators + 1]
return leader, remaining_validators
File renamed without changes.
74 changes: 74 additions & 0 deletions backend/database_handler/accounts_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# consensus/services/transactions_db_service.py

import json
from enum import Enum

from backend.database_handler.db_client import DBClient
from backend.database_handler.errors import AccountNotFoundError
from backend.database_handler.transactions_processor import TransactionsProcessor


class AccountsManager:
def __init__(
self, db_client: DBClient, transactions_processor: TransactionsProcessor
):
self.db_client = db_client
self.transactions_processor = transactions_processor
self.db_accounts_table = "current_state"

def _parse_account_data(self, account_data: dict) -> dict:
return {
"id": account_data["id"],
"data": account_data["data"],
"updated_at": account_data["updated_at"].isoformat(),
}

def get_account(self, account_address: str):
"""Private method to retrieve if an account from the data base"""
condition = f"id = '{account_address}'"
account_data = self.db_client.get(self.db_accounts_table, condition)
return self._parse_account_data(account_data[0]) if account_data else None

def get_account_or_fail(self, account_address: str):
"""Private method to check if an account exists, and raise an error if not."""
account_data = self.get_account(account_address)
if not account_data:
raise AccountNotFoundError(
account_address, f"Account {account_address} does not exist."
)
return account_data

def create_new_account(self, address: str, balance: int) -> None:
account_state = {
"id": address,
"data": json.dumps({"balance": balance}),
}
self.db_client.insert(self.db_accounts_table, account_state)

def fund_account(self, account_address: str, amount: int):
# account creation or balance update
account_data = self.get_account(account_address)
if account_data:
# Account exists, update it
update_condition = f"id = {account_address}"
new_balance = account_data["data"]["balance"] + amount
updated_account_state = {
"data": json.dumps({"balance": new_balance}),
}

self.db_client.update(
self.db_accounts_table, updated_account_state, update_condition
)
else:
# Account doesn't exist, create it
self.create_new_account(account_address, amount)

# Record transaction
transaction_data = {
"from_address": "NULL",
"to_address": account_address,
"data": json.dumps({"action": "fund_account", "amount": amount}),
"value": amount,
"type": 0,
}
self.transactions_processor.insert_transaction(**transaction_data)
Loading
Loading