Skip to content

Commit

Permalink
fix same rowkey error in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
cammj committed May 17, 2024
1 parent 1219e59 commit f040a3b
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion Sync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,36 @@ await BatchQueueProcess(tableActionQueue_Users,
private static async Task BatchQueueProcess(ConcurrentQueue<TableTransactionAction> Queue, TableClient tableClient, CancellationToken ct)
{
List<TableTransactionAction> BatchTransactions = new List<TableTransactionAction>();

// Used to re-queue transactions that cannot be put in this batch
// Such as transactions with a row key that is already present in the batch (cannot perform within the same batch)

List<TableTransactionAction> RequeueTransactions = new List<TableTransactionAction>();

// Take items out of the queue until it's empty or the max batch size hit
while (!Queue.IsEmpty && BatchTransactions.Count < _maxTableBatchSize)
{
TableTransactionAction dequeued;

if (Queue.TryDequeue(out dequeued))
BatchTransactions.Add(dequeued);
{
// Validate row key is not already in batch transactions
// Batches cannot contain two transactions for the same partition key and row.

if (BatchTransactions.Any(x =>
x.Entity.PartitionKey == dequeued.Entity.PartitionKey &&
x.Entity.RowKey == dequeued.Entity.RowKey))
{
// Requeue the transaction for next batch as it is already existing in this batch
RequeueTransactions.Add(dequeued);
}
else
{
BatchTransactions.Add(dequeued);
}

}

}

if (BatchTransactions.Any())
Expand All @@ -200,6 +222,13 @@ private static async Task BatchQueueProcess(ConcurrentQueue<TableTransactionActi
}
}
}

// Requeue transactions
if (RequeueTransactions.Any())
{
foreach(var transaction in RequeueTransactions)
Queue.Enqueue(transaction);
}
}

/// <summary>
Expand Down

0 comments on commit f040a3b

Please sign in to comment.