fix(data): serialize new-link publication races
This commit is contained in:
@@ -542,6 +542,95 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
|
||||
parameters: new NpgsqlParameter("sessionId", seed.SessionIds[0])));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ConcurrentNewLinkPublishAndFutureReschedule_ShouldNotCommitInvalidPublicCard()
|
||||
{
|
||||
var database = await fixture.CreateMigratedDatabaseAsync();
|
||||
await using var seedConnection = await database.OpenConnectionAsync();
|
||||
var seed = await SeedCardAsync(seedConnection, isPublic: false);
|
||||
var rescheduledSessionId = Guid.NewGuid();
|
||||
await ExecuteNonQueryAsync(
|
||||
seedConnection,
|
||||
"""
|
||||
INSERT INTO sessions (id, group_id, title, join_link, scheduled_at)
|
||||
VALUES (@sessionId, @groupId, 'Completed Session', 'https://example.test/session', now() - interval '1 day');
|
||||
""",
|
||||
parameters:
|
||||
[
|
||||
new NpgsqlParameter("sessionId", rescheduledSessionId),
|
||||
new NpgsqlParameter("groupId", seed.GroupId)
|
||||
]);
|
||||
|
||||
await using var rescheduleConnection = await database.OpenConnectionAsync();
|
||||
await using var publishConnection = await database.OpenConnectionAsync();
|
||||
await using var gateConnection = await database.OpenConnectionAsync();
|
||||
await using var observerConnection = await database.OpenConnectionAsync();
|
||||
await using var rescheduleTransaction = await rescheduleConnection.BeginTransactionAsync();
|
||||
await using var publishTransaction = await publishConnection.BeginTransactionAsync();
|
||||
await using var gateTransaction = await gateConnection.BeginTransactionAsync();
|
||||
var reschedulePid = await GetBackendPidAsync(rescheduleConnection, rescheduleTransaction);
|
||||
var publishPid = await GetBackendPidAsync(publishConnection, publishTransaction);
|
||||
var gatePid = await GetBackendPidAsync(gateConnection, gateTransaction);
|
||||
await AcquirePortfolioValidationLockAsync(gateConnection, gateTransaction);
|
||||
|
||||
Assert.Equal(1, await RescheduleSessionAsync(
|
||||
rescheduleConnection,
|
||||
rescheduleTransaction,
|
||||
rescheduledSessionId));
|
||||
var forceRescheduleTriggerTask = ExecuteNonQueryAsync(
|
||||
rescheduleConnection,
|
||||
"SET CONSTRAINTS trg_sessions_unpublish_public_portfolio_games_for_future_reschedule IMMEDIATE",
|
||||
rescheduleTransaction);
|
||||
await WaitUntilBlockedByAsync(observerConnection, reschedulePid, gatePid);
|
||||
|
||||
await ExecuteNonQueryAsync(
|
||||
publishConnection,
|
||||
"""
|
||||
INSERT INTO portfolio_game_sessions (portfolio_game_id, session_id)
|
||||
VALUES (@portfolioGameId, @sessionId);
|
||||
|
||||
UPDATE portfolio_games
|
||||
SET is_public = true,
|
||||
published_at = COALESCE(published_at, now()),
|
||||
updated_at = now()
|
||||
WHERE id = @portfolioGameId;
|
||||
""",
|
||||
publishTransaction,
|
||||
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
|
||||
new NpgsqlParameter("sessionId", rescheduledSessionId));
|
||||
var publishCommitTask = CommitAndCaptureSqlStateAsync(publishTransaction);
|
||||
await WaitUntilBlockedByAsync(observerConnection, publishPid, gatePid);
|
||||
|
||||
await gateTransaction.CommitAsync().WaitAsync(CommandTimeout);
|
||||
await forceRescheduleTriggerTask.WaitAsync(CommandTimeout);
|
||||
await rescheduleTransaction.CommitAsync().WaitAsync(CommandTimeout);
|
||||
|
||||
Assert.Equal(PostgresErrorCodes.CheckViolation, await publishCommitTask.WaitAsync(CommandTimeout));
|
||||
|
||||
await using var verificationConnection = await database.OpenConnectionAsync();
|
||||
Assert.False(await ExecuteScalarAsync<bool>(
|
||||
verificationConnection,
|
||||
"SELECT is_public FROM portfolio_games WHERE id = @portfolioGameId",
|
||||
parameters: new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
|
||||
Assert.True(await ExecuteScalarAsync<bool>(
|
||||
verificationConnection,
|
||||
"SELECT scheduled_at >= now() FROM sessions WHERE id = @sessionId",
|
||||
parameters: new NpgsqlParameter("sessionId", rescheduledSessionId)));
|
||||
Assert.Equal(0, await ExecuteScalarAsync<long>(
|
||||
verificationConnection,
|
||||
"""
|
||||
SELECT COUNT(*)
|
||||
FROM portfolio_game_sessions
|
||||
WHERE portfolio_game_id = @portfolioGameId
|
||||
AND session_id = @sessionId
|
||||
""",
|
||||
parameters:
|
||||
[
|
||||
new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId),
|
||||
new NpgsqlParameter("sessionId", rescheduledSessionId)
|
||||
]));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
|
||||
@@ -50,13 +50,11 @@ public sealed class PortfolioMigrationTests
|
||||
Assert.Contains("IF final_scheduled_at >= now() THEN", normalizedMigration, StringComparison.Ordinal);
|
||||
Assert.Contains("PERFORM pg.id FROM portfolio_games pg WHERE EXISTS", normalizedMigration, StringComparison.Ordinal);
|
||||
Assert.Contains("ORDER BY pg.id FOR UPDATE OF pg;", normalizedMigration, StringComparison.Ordinal);
|
||||
Assert.Contains("ORDER BY pg.id FOR UPDATE OF pg; PERFORM pg_advisory_xact_lock(20260530, 108); UPDATE portfolio_games pg SET is_public = false", normalizedMigration, StringComparison.Ordinal);
|
||||
Assert.Contains("UPDATE portfolio_games pg SET is_public = false, updated_at = now() WHERE pg.is_public = true AND EXISTS (SELECT 1 FROM portfolio_game_sessions pgs JOIN sessions s ON s.id = pgs.session_id WHERE pgs.portfolio_game_id = pg.id AND s.scheduled_at >= now());", normalizedMigration, StringComparison.Ordinal);
|
||||
Assert.Contains("CREATE CONSTRAINT TRIGGER trg_sessions_unpublish_public_portfolio_games_for_future_reschedule AFTER UPDATE OF scheduled_at ON sessions DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE FUNCTION unpublish_public_portfolio_games_for_future_session();", normalizedMigration, StringComparison.Ordinal);
|
||||
Assert.DoesNotContain("unpublish_portfolio_game_without_required_links", normalizedMigration, StringComparison.Ordinal);
|
||||
Assert.DoesNotContain(
|
||||
"pg_advisory_xact_lock",
|
||||
normalizedMigration[unpublishFunctionStart..unpublishFunctionEnd],
|
||||
StringComparison.Ordinal);
|
||||
Assert.Contains("pg_advisory_xact_lock", normalizedMigration[unpublishFunctionStart..unpublishFunctionEnd], StringComparison.Ordinal);
|
||||
Assert.DoesNotContain("published_at = NULL", normalizedMigration, StringComparison.OrdinalIgnoreCase);
|
||||
Assert.Contains("publication_consent_at TIMESTAMPTZ NOT NULL,", normalizedMigration, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user