Skip to content

Commit

Permalink
fix(mysql): index gap lock deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Dec 13, 2024
1 parent bc4c7c6 commit cdf2dce
Show file tree
Hide file tree
Showing 15 changed files with 1,046 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,12 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

// Read before write is unfortunate, however batch it
final Map<String, Set<String>> urnAspects = batchWithDefaults.getUrnAspectsMap();

// read #1
// READ COMMITED is used in conjunction with SELECT FOR UPDATE (read lock) in order
// to ensure that the aspect's version is not modified outside the transaction.
// We rely on the retry mechanism if the row is modified and will re-read (require the
// lock)
Map<String, Map<String, EntityAspect>> databaseAspects =
aspectDao.getLatestAspects(urnAspects, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@

@Slf4j
public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {

// READ COMMITED is used in conjunction with SELECT FOR UPDATE (read lock) in order
// to ensure that the aspect's version is not modified outside the transaction.
// We rely on the retry mechanism if the row is modified and will re-read (require the lock)
public static final TxIsolation TX_ISOLATION = TxIsolation.READ_COMMITED;
private final Database _server;
private boolean _connectionValidated = false;
private final Clock _clock = Clock.systemUTC();
Expand Down Expand Up @@ -736,8 +739,7 @@ public <T> T runInTransactionWithRetryUnlocked(
T result = null;
do {
try (Transaction transaction =
_server.beginTransaction(
TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) {
_server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) {
transaction.setBatchMode(true);
result = block.apply(transactionContext.tx(transaction));
transaction.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.ebean.Database;
import io.ebean.Transaction;
import io.ebean.TxScope;
import io.ebean.annotation.TxIsolation;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.time.Instant;
Expand Down Expand Up @@ -281,12 +280,11 @@ public void testNestedTransactions() throws AssertionError {
Database server = _aspectDao.getServer();

try (Transaction transaction =
server.beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) {
server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) {
transaction.setBatchMode(true);
// Work 1
try (Transaction transaction2 =
server.beginTransaction(
TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) {
server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) {
transaction2.setBatchMode(true);
// Work 2
transaction2.commit();
Expand Down Expand Up @@ -337,7 +335,7 @@ public void testSystemMetadataDuplicateKey() throws Exception {
try (Transaction transaction =
((EbeanAspectDao) _entityServiceImpl.aspectDao)
.getServer()
.beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) {
.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) {
TransactionContext transactionContext = TransactionContext.empty(transaction, 3);
_entityServiceImpl.aspectDao.saveAspect(
transactionContext,
Expand Down
Empty file.
32 changes: 32 additions & 0 deletions smoke-test/tests/database/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging

import pytest
from datahub.emitter.mce_builder import make_dataset_urn

from tests.utilities.concurrent_openapi import run_tests
from tests.utils import delete_urns, wait_for_writes_to_sync

logger = logging.getLogger(__name__)


generated_urns = [make_dataset_urn("test", f"database_test_{i}") for i in range(0, 100)]


@pytest.fixture(scope="module")
def ingest_cleanup_data(graph_client, request):
print("removing test data before")
delete_urns(graph_client, generated_urns)
wait_for_writes_to_sync()
yield
print("removing test data after")
delete_urns(graph_client, generated_urns)
wait_for_writes_to_sync()


def test_mysql_deadlock_gap_locking(auth_session, ingest_cleanup_data):
# This generates concurrent batches with interleaved urn ids
run_tests(
auth_session,
fixture_globs=["tests/database/v3/mysql_gap_deadlock/*.json"],
num_workers=8,
)
115 changes: 115 additions & 0 deletions smoke-test/tests/database/v3/mysql_gap_deadlock/batchA1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
[{
"request": {
"url": "/openapi/v3/entity/dataset",
"params": {
"async": "false"
},
"description": "Create dataset batch, single transaction",
"json": [
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_0,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_4,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_8,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_12,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_16,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_20,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_24,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_28,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_32,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_36,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_40,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_44,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_48,PROD)",
"status": {
"value": {
"removed": false
}
}
}
]
}
}]
107 changes: 107 additions & 0 deletions smoke-test/tests/database/v3/mysql_gap_deadlock/batchA2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
[{
"request": {
"url": "/openapi/v3/entity/dataset",
"params": {
"async": "false"
},
"description": "Create dataset batch, single transaction",
"json": [
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_52,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_56,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_60,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_64,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_68,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_72,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_76,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_80,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_84,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_88,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_92,PROD)",
"status": {
"value": {
"removed": false
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_96,PROD)",
"status": {
"value": {
"removed": false
}
}
}
]
}
}]
Loading

0 comments on commit cdf2dce

Please sign in to comment.