fix(data): align portfolio mutation lock order
This commit is contained in:
@@ -421,6 +421,70 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
|
||||
parameters: new NpgsqlParameter("sessionId", seed.SessionIds[0])));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
public async Task ConcurrentSessionDeleteAndFutureReschedule_ShouldSerializeSessionBeforeCardWithoutDeadlock(
|
||||
bool deleteLocksSessionFirst)
|
||||
{
|
||||
var database = await fixture.CreateMigratedDatabaseAsync();
|
||||
await using var seedConnection = await database.OpenConnectionAsync();
|
||||
var seed = await SeedCardAsync(seedConnection, isPublic: true);
|
||||
await using var deleteConnection = await database.OpenConnectionAsync();
|
||||
await using var rescheduleConnection = await database.OpenConnectionAsync();
|
||||
await using var observerConnection = await database.OpenConnectionAsync();
|
||||
await using var deleteTransaction = await deleteConnection.BeginTransactionAsync();
|
||||
await using var rescheduleTransaction = await rescheduleConnection.BeginTransactionAsync();
|
||||
var deletePid = await GetBackendPidAsync(deleteConnection, deleteTransaction);
|
||||
var reschedulePid = await GetBackendPidAsync(rescheduleConnection, rescheduleTransaction);
|
||||
|
||||
if (deleteLocksSessionFirst)
|
||||
{
|
||||
await LockSessionAsync(deleteConnection, deleteTransaction, seed.SessionIds[0]);
|
||||
var rescheduleTask = RescheduleSessionAsync(
|
||||
rescheduleConnection,
|
||||
rescheduleTransaction,
|
||||
seed.SessionIds[0]);
|
||||
|
||||
await WaitUntilBlockedByAsync(observerConnection, reschedulePid, deletePid);
|
||||
await UnpublishAndDeleteSessionAsync(
|
||||
deleteConnection,
|
||||
deleteTransaction,
|
||||
seed.PortfolioGameId,
|
||||
seed.SessionIds[0]);
|
||||
await deleteTransaction.CommitAsync().WaitAsync(CommandTimeout);
|
||||
|
||||
Assert.Equal(0, await rescheduleTask.WaitAsync(CommandTimeout));
|
||||
await rescheduleTransaction.CommitAsync().WaitAsync(CommandTimeout);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert.Equal(1, await RescheduleSessionAsync(
|
||||
rescheduleConnection,
|
||||
rescheduleTransaction,
|
||||
seed.SessionIds[0]));
|
||||
var deleteTask = LockUnpublishDeleteAndCommitSessionAsync(
|
||||
deleteConnection,
|
||||
deleteTransaction,
|
||||
seed.PortfolioGameId,
|
||||
seed.SessionIds[0]);
|
||||
|
||||
await WaitUntilBlockedByAsync(observerConnection, deletePid, reschedulePid);
|
||||
await rescheduleTransaction.CommitAsync().WaitAsync(CommandTimeout);
|
||||
await deleteTask.WaitAsync(CommandTimeout);
|
||||
}
|
||||
|
||||
await using var verificationConnection = await database.OpenConnectionAsync();
|
||||
Assert.False(await ExecuteScalarAsync<bool>(
|
||||
verificationConnection,
|
||||
"SELECT is_public FROM portfolio_games WHERE id = @portfolioGameId",
|
||||
parameters: new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
|
||||
Assert.Equal(0, await ExecuteScalarAsync<long>(
|
||||
verificationConnection,
|
||||
"SELECT COUNT(*) FROM sessions WHERE id = @sessionId",
|
||||
parameters: new NpgsqlParameter("sessionId", seed.SessionIds[0])));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("portfolio_game_sessions")]
|
||||
[InlineData("portfolio_game_masters")]
|
||||
@@ -618,6 +682,98 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
|
||||
transaction);
|
||||
}
|
||||
|
||||
private static Task<int> GetBackendPidAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction)
|
||||
{
|
||||
return ExecuteScalarAsync<int>(connection, "SELECT pg_backend_pid()", transaction);
|
||||
}
|
||||
|
||||
private static Task<int> LockSessionAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid sessionId)
|
||||
{
|
||||
return ExecuteNonQueryAsync(
|
||||
connection,
|
||||
"SELECT 1 FROM sessions s WHERE s.id = @sessionId FOR UPDATE OF s",
|
||||
transaction,
|
||||
new NpgsqlParameter("sessionId", sessionId));
|
||||
}
|
||||
|
||||
private static Task<int> RescheduleSessionAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid sessionId)
|
||||
{
|
||||
return ExecuteNonQueryAsync(
|
||||
connection,
|
||||
"UPDATE sessions SET scheduled_at = now() + interval '1 day' WHERE id = @sessionId",
|
||||
transaction,
|
||||
new NpgsqlParameter("sessionId", sessionId));
|
||||
}
|
||||
|
||||
private static async Task LockUnpublishDeleteAndCommitSessionAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid portfolioGameId,
|
||||
Guid sessionId)
|
||||
{
|
||||
await LockSessionAsync(connection, transaction, sessionId);
|
||||
await UnpublishAndDeleteSessionAsync(connection, transaction, portfolioGameId, sessionId);
|
||||
await transaction.CommitAsync().WaitAsync(CommandTimeout);
|
||||
}
|
||||
|
||||
private static async Task UnpublishAndDeleteSessionAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid portfolioGameId,
|
||||
Guid sessionId)
|
||||
{
|
||||
await ExecuteNonQueryAsync(
|
||||
connection,
|
||||
"""
|
||||
UPDATE portfolio_games
|
||||
SET is_public = false,
|
||||
updated_at = now()
|
||||
WHERE id = @portfolioGameId
|
||||
""",
|
||||
transaction,
|
||||
new NpgsqlParameter("portfolioGameId", portfolioGameId));
|
||||
await ExecuteNonQueryAsync(
|
||||
connection,
|
||||
"DELETE FROM sessions WHERE id = @sessionId",
|
||||
transaction,
|
||||
new NpgsqlParameter("sessionId", sessionId));
|
||||
}
|
||||
|
||||
private static async Task WaitUntilBlockedByAsync(
|
||||
NpgsqlConnection observerConnection,
|
||||
int blockedPid,
|
||||
int blockingPid)
|
||||
{
|
||||
using var timeout = new CancellationTokenSource(CommandTimeout);
|
||||
while (!timeout.IsCancellationRequested)
|
||||
{
|
||||
if (await ExecuteScalarAsync<bool>(
|
||||
observerConnection,
|
||||
"SELECT @blockingPid = ANY (pg_blocking_pids(@blockedPid))",
|
||||
parameters:
|
||||
[
|
||||
new NpgsqlParameter("blockedPid", blockedPid),
|
||||
new NpgsqlParameter("blockingPid", blockingPid)
|
||||
]))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await Task.Yield();
|
||||
}
|
||||
|
||||
throw new TimeoutException(
|
||||
$"PostgreSQL backend {blockedPid} was not blocked by backend {blockingPid} within {CommandTimeout}.");
|
||||
}
|
||||
|
||||
private static async Task<int> ExecuteNonQueryAsync(
|
||||
NpgsqlConnection connection,
|
||||
string sql,
|
||||
|
||||
Reference in New Issue
Block a user