fix(data): serialize portfolio mutations before rows

This commit is contained in:
2026-06-02 10:32:13 +03:00
parent edf40c9a09
commit a20da4b1a0
8 changed files with 379 additions and 238 deletions
@@ -96,51 +96,64 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ConcurrentPublishAndLinkDelete_ShouldNotDeadlockOrCommitInvalidPublicCard(bool publishCommitsFirst)
public async Task ConcurrentPublishAndLinkDelete_ShouldSerializeBeforeRowsAndRejectInvalidPublicCard(
bool publishMutatesFirst)
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var publishConnection = await database.OpenConnectionAsync();
await using var deleteConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
var seed = await SeedCardAsync(publishConnection, isPublic: false);
await using var publishTransaction = await publishConnection.BeginTransactionAsync();
await using var deleteTransaction = await deleteConnection.BeginTransactionAsync();
var publishPid = await GetBackendPidAsync(publishConnection, publishTransaction);
var deletePid = await GetBackendPidAsync(deleteConnection, deleteTransaction);
await ExecuteNonQueryAsync(
publishConnection,
"""
UPDATE portfolio_games
SET is_public = true,
published_at = COALESCE(published_at, now()),
updated_at = now()
WHERE id = @portfolioGameId
""",
publishTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId));
await ExecuteNonQueryAsync(deleteConnection, "SET LOCAL lock_timeout = '2s'", deleteTransaction);
if (publishMutatesFirst)
{
Assert.Equal(1, await PublishPortfolioGameAsync(
publishConnection,
publishTransaction,
seed.PortfolioGameId));
var deleteTask = DeletePortfolioGameLinksAsync(
deleteConnection,
deleteTransaction,
"portfolio_game_sessions",
seed.PortfolioGameId);
Assert.Equal(1, await ExecuteNonQueryAsync(
deleteConnection,
"DELETE FROM portfolio_game_sessions WHERE portfolio_game_id = @portfolioGameId",
deleteTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
await WaitUntilBlockedByAsync(observerConnection, deletePid, publishPid);
Assert.Null(await CommitAndCaptureSqlStateAsync(publishTransaction).WaitAsync(CommandTimeout));
Assert.Equal(1, await deleteTask.WaitAsync(CommandTimeout));
Assert.Equal(
PostgresErrorCodes.CheckViolation,
await CommitAndCaptureSqlStateAsync(deleteTransaction).WaitAsync(CommandTimeout));
}
else
{
Assert.Equal(1, await DeletePortfolioGameLinksAsync(
deleteConnection,
deleteTransaction,
"portfolio_game_sessions",
seed.PortfolioGameId));
var publishTask = PublishPortfolioGameAsync(
publishConnection,
publishTransaction,
seed.PortfolioGameId);
await AcquirePortfolioValidationLockAsync(
publishCommitsFirst ? publishConnection : deleteConnection,
publishCommitsFirst ? publishTransaction : deleteTransaction);
var commitStates = await Task.WhenAll(
CommitAndCaptureSqlStateAsync(publishTransaction),
CommitAndCaptureSqlStateAsync(deleteTransaction)).WaitAsync(CommandTimeout);
Assert.Equal(publishCommitsFirst ? null : PostgresErrorCodes.CheckViolation, commitStates[0]);
Assert.Equal(publishCommitsFirst ? PostgresErrorCodes.CheckViolation : null, commitStates[1]);
await WaitUntilBlockedByAsync(observerConnection, publishPid, deletePid);
Assert.Null(await CommitAndCaptureSqlStateAsync(deleteTransaction).WaitAsync(CommandTimeout));
Assert.Equal(1, await publishTask.WaitAsync(CommandTimeout));
Assert.Equal(
PostgresErrorCodes.CheckViolation,
await CommitAndCaptureSqlStateAsync(publishTransaction).WaitAsync(CommandTimeout));
}
await using var verificationConnection = await database.OpenConnectionAsync();
Assert.Equal(publishCommitsFirst, await ExecuteScalarAsync<bool>(
Assert.Equal(publishMutatesFirst, await ExecuteScalarAsync<bool>(
verificationConnection,
"SELECT is_public FROM portfolio_games WHERE id = @portfolioGameId",
parameters: new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
Assert.Equal(publishCommitsFirst ? 1L : 0L, await ExecuteScalarAsync<long>(
Assert.Equal(publishMutatesFirst ? 1L : 0L, await ExecuteScalarAsync<long>(
verificationConnection,
"SELECT COUNT(*) FROM portfolio_game_sessions WHERE portfolio_game_id = @portfolioGameId",
parameters: new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
@@ -162,8 +175,11 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
masterCount: linkTable == "portfolio_game_masters" ? 2 : 1);
await using var firstConnection = await database.OpenConnectionAsync();
await using var secondConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var firstTransaction = await firstConnection.BeginTransactionAsync();
await using var secondTransaction = await secondConnection.BeginTransactionAsync();
var firstPid = await GetBackendPidAsync(firstConnection, firstTransaction);
var secondPid = await GetBackendPidAsync(secondConnection, secondTransaction);
var linkIds = linkTable == "portfolio_game_sessions" ? seed.SessionIds : seed.MasterIds;
await ExecuteNonQueryAsync(
@@ -172,19 +188,19 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
firstTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
new NpgsqlParameter("linkId", linkIds[0]));
await ExecuteNonQueryAsync(
var secondDeleteTask = ExecuteNonQueryAsync(
secondConnection,
$"DELETE FROM {linkTable} WHERE portfolio_game_id = @portfolioGameId AND {linkColumn} = @linkId",
secondTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
new NpgsqlParameter("linkId", linkIds[1]));
var commitStates = await Task.WhenAll(
CommitAndCaptureSqlStateAsync(firstTransaction),
CommitAndCaptureSqlStateAsync(secondTransaction));
Assert.Single(commitStates, state => state is null);
Assert.Single(commitStates, state => state == PostgresErrorCodes.CheckViolation);
await WaitUntilBlockedByAsync(observerConnection, secondPid, firstPid);
Assert.Null(await CommitAndCaptureSqlStateAsync(firstTransaction).WaitAsync(CommandTimeout));
Assert.Equal(1, await secondDeleteTask.WaitAsync(CommandTimeout));
Assert.Equal(
PostgresErrorCodes.CheckViolation,
await CommitAndCaptureSqlStateAsync(secondTransaction).WaitAsync(CommandTimeout));
await using var verificationConnection = await database.OpenConnectionAsync();
Assert.True(await ExecuteScalarAsync<bool>(
@@ -213,8 +229,11 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
masterCount: linkTable == "portfolio_game_masters" ? 2 : 1);
await using var firstConnection = await database.OpenConnectionAsync();
await using var secondConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var firstTransaction = await firstConnection.BeginTransactionAsync(IsolationLevel.RepeatableRead);
await using var secondTransaction = await secondConnection.BeginTransactionAsync(IsolationLevel.RepeatableRead);
var firstPid = await GetBackendPidAsync(firstConnection, firstTransaction);
var secondPid = await GetBackendPidAsync(secondConnection, secondTransaction);
var linkIds = linkTable == "portfolio_game_sessions" ? seed.SessionIds : seed.MasterIds;
await ExecuteNonQueryAsync(
@@ -223,18 +242,21 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
firstTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
new NpgsqlParameter("linkId", linkIds[0]));
await ExecuteNonQueryAsync(
var secondDeleteTask = ExecuteNonQueryAsync(
secondConnection,
$"DELETE FROM {linkTable} WHERE portfolio_game_id = @portfolioGameId AND {linkColumn} = @linkId",
secondTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
new NpgsqlParameter("linkId", linkIds[1]));
var commitStates = await Task.WhenAll(
CommitAndCaptureSqlStateAsync(firstTransaction),
CommitAndCaptureSqlStateAsync(secondTransaction));
Assert.All(commitStates, state => Assert.Equal(PostgresErrorCodes.FeatureNotSupported, state));
await WaitUntilBlockedByAsync(observerConnection, secondPid, firstPid);
Assert.Equal(
PostgresErrorCodes.FeatureNotSupported,
await CommitAndCaptureSqlStateAsync(firstTransaction).WaitAsync(CommandTimeout));
Assert.Equal(1, await secondDeleteTask.WaitAsync(CommandTimeout));
Assert.Equal(
PostgresErrorCodes.FeatureNotSupported,
await CommitAndCaptureSqlStateAsync(secondTransaction).WaitAsync(CommandTimeout));
await using var verificationConnection = await database.OpenConnectionAsync();
Assert.True(await ExecuteScalarAsync<bool>(
@@ -273,43 +295,57 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
[InlineData(true)]
[InlineData(false)]
public async Task RepeatableReadDraftLinkDeleteRacingPublish_ShouldBeRejectedWithoutInvalidPublicCard(
bool publishCommitsFirst)
bool deleteMutatesFirst)
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var seedConnection = await database.OpenConnectionAsync();
var seed = await SeedCardAsync(seedConnection, isPublic: false);
await using var deleteConnection = await database.OpenConnectionAsync();
await using var publishConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var deleteTransaction = await deleteConnection.BeginTransactionAsync(IsolationLevel.RepeatableRead);
await using var publishTransaction = await publishConnection.BeginTransactionAsync();
var deletePid = await GetBackendPidAsync(deleteConnection, deleteTransaction);
var publishPid = await GetBackendPidAsync(publishConnection, publishTransaction);
await ExecuteNonQueryAsync(
deleteConnection,
"DELETE FROM portfolio_game_sessions WHERE portfolio_game_id = @portfolioGameId",
deleteTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId));
await ExecuteNonQueryAsync(
publishConnection,
"""
UPDATE portfolio_games
SET is_public = true,
published_at = COALESCE(published_at, now()),
updated_at = now()
WHERE id = @portfolioGameId
""",
publishTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId));
if (deleteMutatesFirst)
{
Assert.Equal(1, await DeletePortfolioGameLinksAsync(
deleteConnection,
deleteTransaction,
"portfolio_game_sessions",
seed.PortfolioGameId));
var publishTask = PublishPortfolioGameAsync(
publishConnection,
publishTransaction,
seed.PortfolioGameId);
await AcquirePortfolioValidationLockAsync(
publishCommitsFirst ? publishConnection : deleteConnection,
publishCommitsFirst ? publishTransaction : deleteTransaction);
await WaitUntilBlockedByAsync(observerConnection, publishPid, deletePid);
Assert.Equal(
PostgresErrorCodes.FeatureNotSupported,
await CommitAndCaptureSqlStateAsync(deleteTransaction).WaitAsync(CommandTimeout));
Assert.Equal(1, await publishTask.WaitAsync(CommandTimeout));
Assert.Null(await CommitAndCaptureSqlStateAsync(publishTransaction).WaitAsync(CommandTimeout));
}
else
{
Assert.Equal(1, await PublishPortfolioGameAsync(
publishConnection,
publishTransaction,
seed.PortfolioGameId));
var deleteTask = DeletePortfolioGameLinksAsync(
deleteConnection,
deleteTransaction,
"portfolio_game_sessions",
seed.PortfolioGameId);
var commitStates = await Task.WhenAll(
CommitAndCaptureSqlStateAsync(deleteTransaction),
CommitAndCaptureSqlStateAsync(publishTransaction)).WaitAsync(CommandTimeout);
Assert.Equal(PostgresErrorCodes.FeatureNotSupported, commitStates[0]);
Assert.Null(commitStates[1]);
await WaitUntilBlockedByAsync(observerConnection, deletePid, publishPid);
Assert.Null(await CommitAndCaptureSqlStateAsync(publishTransaction).WaitAsync(CommandTimeout));
Assert.Equal(1, await deleteTask.WaitAsync(CommandTimeout));
Assert.Equal(
PostgresErrorCodes.FeatureNotSupported,
await CommitAndCaptureSqlStateAsync(deleteTransaction).WaitAsync(CommandTimeout));
}
await using var verificationConnection = await database.OpenConnectionAsync();
Assert.True(await ExecuteScalarAsync<bool>(
@@ -376,74 +412,36 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
}
[Fact]
public async Task ConcurrentBatchFutureReschedules_ShouldLockPublicCardsInStableOrderWithoutDeadlock()
public async Task ConcurrentBatchFutureReschedules_ShouldSerializeBeforeSessionRowsWithoutDeadlock()
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var seedConnection = await database.OpenConnectionAsync();
var firstSeed = await SeedCardAsync(seedConnection, isPublic: true, sessionCount: 2);
var secondSeed = await SeedCardAsync(seedConnection, isPublic: true, sessionCount: 2);
await ExecuteNonQueryAsync(
seedConnection,
"""
CREATE FUNCTION wait_for_portfolio_card_unpublish_gate()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
PERFORM pg_advisory_xact_lock(20260601, 108);
RETURN NULL;
END;
$$;
CREATE TRIGGER trg_wait_for_portfolio_card_unpublish_gate
AFTER UPDATE OF is_public ON portfolio_games
FOR EACH ROW
WHEN (OLD.is_public = true AND NEW.is_public = false)
EXECUTE FUNCTION wait_for_portfolio_card_unpublish_gate();
""");
await using var firstConnection = await database.OpenConnectionAsync();
await using var secondConnection = await database.OpenConnectionAsync();
await using var gateConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var firstTransaction = await firstConnection.BeginTransactionAsync();
await using var secondTransaction = await secondConnection.BeginTransactionAsync();
await using var gateTransaction = await gateConnection.BeginTransactionAsync();
var firstPid = await GetBackendPidAsync(firstConnection, firstTransaction);
var secondPid = await GetBackendPidAsync(secondConnection, secondTransaction);
var gatePid = await GetBackendPidAsync(gateConnection, gateTransaction);
await AcquireBatchRescheduleGateAsync(gateConnection, gateTransaction);
await RescheduleSessionsAsync(
firstConnection,
firstTransaction,
firstSeed.SessionIds[0],
secondSeed.SessionIds[0]);
await RescheduleSessionsAsync(
var secondRescheduleTask = RescheduleSessionsAsync(
secondConnection,
secondTransaction,
secondSeed.SessionIds[1],
firstSeed.SessionIds[1]);
var firstCommitTask = CommitAndCaptureSqlStateAsync(firstTransaction);
var secondCommitTask = CommitAndCaptureSqlStateAsync(secondTransaction);
var gateBlockedPid = await WaitUntilEitherBlockedByAsync(
observerConnection,
firstPid,
secondPid,
gatePid);
await WaitUntilBlockedByAnyAsync(
observerConnection,
gateBlockedPid == firstPid ? secondPid : firstPid,
gatePid,
gateBlockedPid);
await gateTransaction.CommitAsync().WaitAsync(CommandTimeout);
var commitStates = await Task.WhenAll(firstCommitTask, secondCommitTask).WaitAsync(CommandTimeout);
Assert.All(commitStates, Assert.Null);
await WaitUntilBlockedByAsync(observerConnection, secondPid, firstPid);
Assert.Null(await CommitAndCaptureSqlStateAsync(firstTransaction).WaitAsync(CommandTimeout));
Assert.Equal(2, await secondRescheduleTask.WaitAsync(CommandTimeout));
Assert.Null(await CommitAndCaptureSqlStateAsync(secondTransaction).WaitAsync(CommandTimeout));
await using var verificationConnection = await database.OpenConnectionAsync();
Assert.Equal(0, await ExecuteScalarAsync<long>(
@@ -504,31 +502,19 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
var publishPid = await GetBackendPidAsync(publishConnection, publishTransaction);
var reschedulePid = await GetBackendPidAsync(rescheduleConnection, rescheduleTransaction);
await ExecuteNonQueryAsync(
Assert.Equal(1, await PublishPortfolioGameAsync(
publishConnection,
"""
UPDATE portfolio_games
SET is_public = true,
published_at = COALESCE(published_at, now()),
updated_at = now()
WHERE id = @portfolioGameId
""",
publishTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId));
await ExecuteNonQueryAsync(
seed.PortfolioGameId));
var rescheduleTask = ExecuteNonQueryAsync(
rescheduleConnection,
"UPDATE sessions SET scheduled_at = now() + interval '1 day' WHERE id = @sessionId",
rescheduleTransaction,
new NpgsqlParameter("sessionId", seed.SessionIds[0]));
var forceRescheduleTriggerTask = ExecuteNonQueryAsync(
rescheduleConnection,
"SET CONSTRAINTS trg_sessions_unpublish_public_portfolio_games_for_future_reschedule IMMEDIATE",
rescheduleTransaction);
await WaitUntilBlockedByAsync(observerConnection, reschedulePid, publishPid);
Assert.Null(await CommitAndCaptureSqlStateAsync(publishTransaction).WaitAsync(CommandTimeout));
await forceRescheduleTriggerTask.WaitAsync(CommandTimeout);
Assert.Equal(1, await rescheduleTask.WaitAsync(CommandTimeout));
await rescheduleTransaction.CommitAsync().WaitAsync(CommandTimeout);
await using var verificationConnection = await database.OpenConnectionAsync();
@@ -563,15 +549,11 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
await using var rescheduleConnection = await database.OpenConnectionAsync();
await using var publishConnection = await database.OpenConnectionAsync();
await using var gateConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var rescheduleTransaction = await rescheduleConnection.BeginTransactionAsync();
await using var publishTransaction = await publishConnection.BeginTransactionAsync();
await using var gateTransaction = await gateConnection.BeginTransactionAsync();
var reschedulePid = await GetBackendPidAsync(rescheduleConnection, rescheduleTransaction);
var publishPid = await GetBackendPidAsync(publishConnection, publishTransaction);
var gatePid = await GetBackendPidAsync(gateConnection, gateTransaction);
await AcquirePortfolioValidationLockAsync(gateConnection, gateTransaction);
Assert.Equal(1, await RescheduleSessionAsync(
rescheduleConnection,
@@ -581,9 +563,9 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
rescheduleConnection,
"SET CONSTRAINTS trg_sessions_unpublish_public_portfolio_games_for_future_reschedule IMMEDIATE",
rescheduleTransaction);
await WaitUntilBlockedByAsync(observerConnection, reschedulePid, gatePid);
await forceRescheduleTriggerTask.WaitAsync(CommandTimeout);
await ExecuteNonQueryAsync(
var publishMutationTask = ExecuteNonQueryAsync(
publishConnection,
"""
INSERT INTO portfolio_game_sessions (portfolio_game_id, session_id)
@@ -598,14 +580,13 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
publishTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
new NpgsqlParameter("sessionId", rescheduledSessionId));
var publishCommitTask = CommitAndCaptureSqlStateAsync(publishTransaction);
await WaitUntilBlockedByAsync(observerConnection, publishPid, gatePid);
await gateTransaction.CommitAsync().WaitAsync(CommandTimeout);
await forceRescheduleTriggerTask.WaitAsync(CommandTimeout);
await WaitUntilBlockedByAsync(observerConnection, publishPid, reschedulePid);
await rescheduleTransaction.CommitAsync().WaitAsync(CommandTimeout);
await publishMutationTask.WaitAsync(CommandTimeout);
Assert.Equal(PostgresErrorCodes.CheckViolation, await publishCommitTask.WaitAsync(CommandTimeout));
Assert.Equal(
PostgresErrorCodes.CheckViolation,
await CommitAndCaptureSqlStateAsync(publishTransaction).WaitAsync(CommandTimeout));
await using var verificationConnection = await database.OpenConnectionAsync();
Assert.False(await ExecuteScalarAsync<bool>(
@@ -631,6 +612,78 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
]));
}
[Fact]
public async Task PortfolioSessionLinkInsert_ShouldAcquirePublicationLockBeforeRows()
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var seedConnection = await database.OpenConnectionAsync();
var seed = await SeedCardAsync(seedConnection, isPublic: false);
var sessionId = Guid.NewGuid();
await ExecuteNonQueryAsync(
seedConnection,
"""
INSERT INTO sessions (id, group_id, title, join_link, scheduled_at)
VALUES (@sessionId, @groupId, 'Completed Session', 'https://example.test/session', now() - interval '1 day');
""",
parameters:
[
new NpgsqlParameter("sessionId", sessionId),
new NpgsqlParameter("groupId", seed.GroupId)
]);
await using var insertConnection = await database.OpenConnectionAsync();
await using var gateConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var insertTransaction = await insertConnection.BeginTransactionAsync();
await using var gateTransaction = await gateConnection.BeginTransactionAsync();
var insertPid = await GetBackendPidAsync(insertConnection, insertTransaction);
var gatePid = await GetBackendPidAsync(gateConnection, gateTransaction);
await AcquirePortfolioValidationLockAsync(gateConnection, gateTransaction);
var insertTask = ExecuteNonQueryAsync(
insertConnection,
"""
INSERT INTO portfolio_game_sessions (portfolio_game_id, session_id)
VALUES (@portfolioGameId, @sessionId);
""",
insertTransaction,
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
new NpgsqlParameter("sessionId", sessionId));
await WaitUntilBlockedByAsync(observerConnection, insertPid, gatePid);
await gateTransaction.CommitAsync().WaitAsync(CommandTimeout);
Assert.Equal(1, await insertTask.WaitAsync(CommandTimeout));
await insertTransaction.RollbackAsync().WaitAsync(CommandTimeout);
}
[Fact]
public async Task FutureReschedule_ShouldAcquirePublicationLockBeforeSessionRows()
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var seedConnection = await database.OpenConnectionAsync();
var seed = await SeedCardAsync(seedConnection, isPublic: true);
await using var rescheduleConnection = await database.OpenConnectionAsync();
await using var gateConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var rescheduleTransaction = await rescheduleConnection.BeginTransactionAsync();
await using var gateTransaction = await gateConnection.BeginTransactionAsync();
var reschedulePid = await GetBackendPidAsync(rescheduleConnection, rescheduleTransaction);
var gatePid = await GetBackendPidAsync(gateConnection, gateTransaction);
await AcquirePortfolioValidationLockAsync(gateConnection, gateTransaction);
var rescheduleTask = RescheduleSessionAsync(
rescheduleConnection,
rescheduleTransaction,
seed.SessionIds[0]);
await WaitUntilBlockedByAsync(observerConnection, reschedulePid, gatePid);
await gateTransaction.CommitAsync().WaitAsync(CommandTimeout);
Assert.Equal(1, await rescheduleTask.WaitAsync(CommandTimeout));
await rescheduleTransaction.RollbackAsync().WaitAsync(CommandTimeout);
}
[Fact]
public async Task RepeatableReadStaleSnapshotFutureReschedule_ShouldBeRejectedWithoutInvalidPublicCard()
{
@@ -709,8 +762,8 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ConcurrentSessionDeleteAndFutureReschedule_ShouldSerializeSessionBeforeCardWithoutDeadlock(
bool deleteLocksSessionFirst)
public async Task ConcurrentSessionDeleteAndFutureReschedule_ShouldSerializeMutationGateBeforeRowsWithoutDeadlock(
bool deleteMutatesFirst)
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var seedConnection = await database.OpenConnectionAsync();
@@ -723,8 +776,9 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
var deletePid = await GetBackendPidAsync(deleteConnection, deleteTransaction);
var reschedulePid = await GetBackendPidAsync(rescheduleConnection, rescheduleTransaction);
if (deleteLocksSessionFirst)
if (deleteMutatesFirst)
{
await AcquirePortfolioValidationLockAsync(deleteConnection, deleteTransaction);
await LockSessionAsync(deleteConnection, deleteTransaction, seed.SessionIds[0]);
var rescheduleTask = RescheduleSessionAsync(
rescheduleConnection,
@@ -967,16 +1021,6 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
transaction);
}
private static Task<int> AcquireBatchRescheduleGateAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction)
{
return ExecuteNonQueryAsync(
connection,
"SELECT pg_advisory_xact_lock(20260601, 108)",
transaction);
}
private static Task<int> GetBackendPidAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction)
@@ -996,6 +1040,37 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
new NpgsqlParameter("sessionId", sessionId));
}
private static Task<int> PublishPortfolioGameAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
Guid portfolioGameId)
{
return ExecuteNonQueryAsync(
connection,
"""
UPDATE portfolio_games
SET is_public = true,
published_at = COALESCE(published_at, now()),
updated_at = now()
WHERE id = @portfolioGameId
""",
transaction,
new NpgsqlParameter("portfolioGameId", portfolioGameId));
}
private static Task<int> DeletePortfolioGameLinksAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
string linkTable,
Guid portfolioGameId)
{
return ExecuteNonQueryAsync(
connection,
$"DELETE FROM {linkTable} WHERE portfolio_game_id = @portfolioGameId",
transaction,
new NpgsqlParameter("portfolioGameId", portfolioGameId));
}
private static Task<int> RescheduleSessionAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
@@ -1036,6 +1111,7 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
Guid portfolioGameId,
Guid sessionId)
{
await AcquirePortfolioValidationLockAsync(connection, transaction);
await LockSessionAsync(connection, transaction, sessionId);
await UnpublishAndDeleteSessionAsync(connection, transaction, portfolioGameId, sessionId);
await transaction.CommitAsync().WaitAsync(CommandTimeout);
@@ -1091,74 +1167,6 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
$"PostgreSQL backend {blockedPid} was not blocked by backend {blockingPid} within {CommandTimeout}.");
}
private static async Task<int> WaitUntilEitherBlockedByAsync(
NpgsqlConnection observerConnection,
int firstBlockedPid,
int secondBlockedPid,
int blockingPid)
{
using var timeout = new CancellationTokenSource(CommandTimeout);
while (!timeout.IsCancellationRequested)
{
var blockedPid = await ExecuteScalarAsync<int>(
observerConnection,
"""
SELECT CASE
WHEN @blockingPid = ANY (pg_blocking_pids(@firstBlockedPid)) THEN @firstBlockedPid
WHEN @blockingPid = ANY (pg_blocking_pids(@secondBlockedPid)) THEN @secondBlockedPid
ELSE 0
END
""",
parameters:
[
new NpgsqlParameter("firstBlockedPid", firstBlockedPid),
new NpgsqlParameter("secondBlockedPid", secondBlockedPid),
new NpgsqlParameter("blockingPid", blockingPid)
]);
if (blockedPid != 0)
{
return blockedPid;
}
await Task.Yield();
}
throw new TimeoutException(
$"Neither PostgreSQL backend {firstBlockedPid} nor {secondBlockedPid} was blocked by backend {blockingPid} within {CommandTimeout}.");
}
private static async Task WaitUntilBlockedByAnyAsync(
NpgsqlConnection observerConnection,
int blockedPid,
int firstBlockingPid,
int secondBlockingPid)
{
using var timeout = new CancellationTokenSource(CommandTimeout);
while (!timeout.IsCancellationRequested)
{
if (await ExecuteScalarAsync<bool>(
observerConnection,
"""
SELECT @firstBlockingPid = ANY (pg_blocking_pids(@blockedPid))
OR @secondBlockingPid = ANY (pg_blocking_pids(@blockedPid))
""",
parameters:
[
new NpgsqlParameter("blockedPid", blockedPid),
new NpgsqlParameter("firstBlockingPid", firstBlockingPid),
new NpgsqlParameter("secondBlockingPid", secondBlockingPid)
]))
{
return;
}
await Task.Yield();
}
throw new TimeoutException(
$"PostgreSQL backend {blockedPid} was not blocked by backend {firstBlockingPid} or {secondBlockingPid} within {CommandTimeout}.");
}
private static async Task<int> ExecuteNonQueryAsync(
NpgsqlConnection connection,
string sql,