Skip to content

Commit

Permalink
Merge pull request pantos-io#22 from levonyak/PAN-1979-1980-secondary…
Browse files Browse the repository at this point in the history
…-node-validator-nonce-updates-and-transfer-submission-task

[PAN-1979/-1980] Secondary node validator nonce updates and transfer submission task
  • Loading branch information
markuslevonyak authored Jul 1, 2024
2 parents 1bac8ce + 4a82d0f commit e125f88
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 52 deletions.
76 changes: 75 additions & 1 deletion pantos/validatornode/business/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ def detect_new_transfers(self, source_blockchain: Blockchain) -> None:
if internal_transfer_id is None:
_logger.info('new token transfer',
extra=vars(found_transfer))
# Secondary nodes also assign a validator nonce
# since they are supposed to be able to assume the
# primary role anytime after reconfiguration
validator_nonce = self.__find_unused_validator_nonce(
found_transfer.destination_blockchain)
transfer_creation_request = TransferCreationRequest(
Expand Down Expand Up @@ -259,6 +262,33 @@ def get_validator_nonce(self, source_blockchain: Blockchain,
source_blockchain=source_blockchain,
source_transaction_id=source_transaction_id)

def submit_transfer_to_primary_node(self, internal_transfer_id: int,
transfer: CrossChainTransfer) -> bool:
"""Submit the signature for a cross-chain token transfer after
its successful validation to the primary validator node.
Parameters
----------
internal_transfer_id : int
The unique internal ID of the transfer.
transfer : CrossChainTransfer
The data of the cross-chain token transfer to submit.
Returns
-------
bool
True if the submission is completed for the transfer.
Raises
------
TransferInteractorError
If an error occurs during submitting the signature for a
cross-chain token transfer.
"""
assert not self._is_primary_node() # pragma: no cover
raise NotImplementedError # pragma: no cover

def submit_transfer_onchain(self, internal_transfer_id: int,
transfer: CrossChainTransfer) -> bool:
"""Submit a cross-chain token transfer after its successful
Expand Down Expand Up @@ -406,7 +436,8 @@ def validate_transfer(self, internal_transfer_id: int,
submit_transfer_onchain_task.delay(internal_transfer_id,
transfer.to_dict())
else:
raise NotImplementedError # pragma: no cover
submit_transfer_to_primary_node_task.delay(
internal_transfer_id, transfer.to_dict())
return True
except TransferInteractor.__TransferValidationError as error:
return error.is_permanent()
Expand Down Expand Up @@ -763,6 +794,49 @@ def confirm_transfer_task(self, internal_transfer_id: int,
return True


@celery_app.task(bind=True, max_retries=None)
def submit_transfer_to_primary_node_task(
self, internal_transfer_id: int,
transfer_dict: CrossChainTransferDict) -> bool:
"""Celery task for submitting the signature for a cross-chain token
transfer after its successful validation to the primary validator
node.
Parameters
----------
internal_transfer_id : int
The unique internal ID of the transfer.
transfer_dict : CrossChainTransferDict
The data of the cross-chain token transfer to submit.
Returns
-------
bool
True if the task is executed without error.
"""
transfer = CrossChainTransfer.from_dict(transfer_dict)
try:
submission_completed = \
TransferInteractor().submit_transfer_to_primary_node(
internal_transfer_id, transfer)
except Exception as error:
_logger.error(
'unable to submit the signature for a token transfer to the '
'primary validator node', extra=vars(transfer) | {
'internal_transfer_id': internal_transfer_id,
'task_id': self.request.id
}, exc_info=True)
retry_interval = config['tasks']['submit_transfer_to_primary_node'][
'retry_interval_after_error_in_seconds']
raise self.retry(countdown=retry_interval, exc=error)
if not submission_completed:
retry_interval = config['tasks']['submit_transfer_to_primary_node'][
'retry_interval_in_seconds']
raise self.retry(countdown=retry_interval)
return True


@celery_app.task(bind=True, max_retries=None)
def submit_transfer_onchain_task(
self, internal_transfer_id: int,
Expand Down
2 changes: 1 addition & 1 deletion pantos/validatornode/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@
'required': True,
'schema': {
'confirm_transfer': _VALIDATION_SCHEMA_TASK,
'submit_transfer_offchain': _VALIDATION_SCHEMA_TASK,
'submit_transfer_onchain': _VALIDATION_SCHEMA_TASK,
'submit_transfer_to_primary_node': _VALIDATION_SCHEMA_TASK,
'validate_transfer': _VALIDATION_SCHEMA_TASK
}
},
Expand Down
38 changes: 29 additions & 9 deletions pantos/validatornode/database/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def update_reversal_transfer(
destination_blockchain_id=destination_blockchain.value,
recipient_address=recipient_address,
destination_token_contract_id=destination_token_contract_id,
updated=datetime.datetime.utcnow())
updated=datetime.datetime.now(datetime.timezone.utc))
session.execute(statement)


Expand Down Expand Up @@ -564,7 +564,7 @@ def update_transfer_confirmed_destination_transaction(
destination_transfer_id=destination_transfer_id,
destination_transaction_id=destination_transaction_id,
destination_block_number=destination_block_number,
updated=datetime.datetime.utcnow())
updated=datetime.datetime.now(datetime.timezone.utc))
with get_session_maker().begin() as session:
session.execute(statement)

Expand Down Expand Up @@ -608,7 +608,7 @@ def update_transfer_submitted_destination_transaction(
destination_hub_contract_id=destination_hub_contract_id,
destination_forwarder_contract_id= # noqa: E251
destination_forwarder_contract_id,
updated=datetime.datetime.utcnow())
updated=datetime.datetime.now(datetime.timezone.utc))
session.execute(update_statement)


Expand Down Expand Up @@ -730,8 +730,8 @@ def update_transfer_source_transaction(internal_transfer_id: int,
source_transfer_id)
transfer.source_block_number = typing.cast(sqlalchemy.Column,
source_block_number)
transfer.updated = typing.cast(sqlalchemy.Column,
datetime.datetime.utcnow())
transfer.updated = typing.cast(
sqlalchemy.Column, datetime.datetime.now(datetime.timezone.utc))


def update_transfer_status(internal_transfer_id: int,
Expand All @@ -750,8 +750,8 @@ def update_transfer_status(internal_transfer_id: int,
transfer = session.get(Transfer, internal_transfer_id)
assert transfer is not None
transfer.status_id = typing.cast(sqlalchemy.Column, status.value)
transfer.updated = typing.cast(sqlalchemy.Column,
datetime.datetime.utcnow())
transfer.updated = typing.cast(
sqlalchemy.Column, datetime.datetime.now(datetime.timezone.utc))


def update_transfer_task_id(internal_transfer_id: int,
Expand All @@ -770,8 +770,28 @@ def update_transfer_task_id(internal_transfer_id: int,
transfer = session.get(Transfer, internal_transfer_id)
assert transfer is not None
transfer.task_id = typing.cast(sqlalchemy.Column, str(task_id))
transfer.updated = typing.cast(sqlalchemy.Column,
datetime.datetime.utcnow())
transfer.updated = typing.cast(
sqlalchemy.Column, datetime.datetime.now(datetime.timezone.utc))


def update_transfer_validator_nonce(internal_transfer_id: int,
validator_nonce: int) -> None:
"""Update a transfer's validator nonce.
Parameters
----------
internal_transfer_id : int
The unique internal ID of the transfer.
validator_nonce : int
The new validator nonce assigned to the transfer.
"""
statement = sqlalchemy.update(Transfer).where(
Transfer.id == internal_transfer_id).values(
validator_nonce=validator_nonce,
updated=datetime.datetime.now(datetime.timezone.utc))
with get_session_maker().begin() as session:
session.execute(statement)


def _read_id(session: sqlalchemy.orm.Session, model: typing.Type[B],
Expand Down
58 changes: 58 additions & 0 deletions tests/business/transfers/test_submit_transfer_to_primary_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import unittest.mock

import celery.exceptions # type: ignore
import pytest

from pantos.validatornode.business.transfers import TransferInteractorError
from pantos.validatornode.business.transfers import \
submit_transfer_to_primary_node_task


@pytest.mark.parametrize('submission_completed', [True, False])
@unittest.mock.patch(
'pantos.validatornode.business.transfers.config', {
'tasks': {
'submit_transfer_to_primary_node': {
'retry_interval_in_seconds': 120
}
}
})
@unittest.mock.patch(
'pantos.validatornode.business.transfers.TransferInteractor')
def test_submit_transfer_to_primary_node_task_correct(
mock_transfer_interactor, submission_completed, internal_transfer_id,
cross_chain_transfer, cross_chain_transfer_dict):
mock_transfer_interactor().submit_transfer_to_primary_node.return_value = \
submission_completed
if submission_completed:
submit_transfer_to_primary_node_task(internal_transfer_id,
cross_chain_transfer_dict)
else:
with pytest.raises(celery.exceptions.Retry):
submit_transfer_to_primary_node_task(internal_transfer_id,
cross_chain_transfer_dict)
mock_transfer_interactor().submit_transfer_to_primary_node.\
assert_called_once_with(internal_transfer_id, cross_chain_transfer)


@unittest.mock.patch(
'pantos.validatornode.business.transfers.config', {
'tasks': {
'submit_transfer_to_primary_node': {
'retry_interval_after_error_in_seconds': 300
}
}
})
@unittest.mock.patch(
'pantos.validatornode.business.transfers.TransferInteractor')
def test_submit_transfer_to_primary_node_task_error(mock_transfer_interactor,
internal_transfer_id,
cross_chain_transfer,
cross_chain_transfer_dict):
mock_transfer_interactor().submit_transfer_to_primary_node.side_effect = \
TransferInteractorError('')
with pytest.raises(TransferInteractorError):
submit_transfer_to_primary_node_task(internal_transfer_id,
cross_chain_transfer_dict)
mock_transfer_interactor().submit_transfer_to_primary_node.\
assert_called_once_with(internal_transfer_id, cross_chain_transfer)
Loading

0 comments on commit e125f88

Please sign in to comment.