fix(data): serialize portfolio future reschedules

This commit is contained in:
2026-06-01 20:58:53 +03:00
parent a28b75dd5b
commit d762ecc377
7 changed files with 299 additions and 22 deletions
@@ -344,6 +344,123 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
parameters: new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
}
[Fact]
public async Task PublishedCardPastFuturePastReschedule_ShouldRemainPublicAndPreserveFirstPublishedAt()
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var connection = await database.OpenConnectionAsync();
var seed = await SeedCardAsync(connection, isPublic: true);
await using var transaction = await connection.BeginTransactionAsync();
await ExecuteNonQueryAsync(
connection,
"UPDATE sessions SET scheduled_at = now() + interval '1 day' WHERE id = @sessionId",
transaction,
new NpgsqlParameter("sessionId", seed.SessionIds[0]));
await ExecuteNonQueryAsync(
connection,
"UPDATE sessions SET scheduled_at = now() - interval '2 days' WHERE id = @sessionId",
transaction,
new NpgsqlParameter("sessionId", seed.SessionIds[0]));
await transaction.CommitAsync().WaitAsync(CommandTimeout);
Assert.True(await ExecuteScalarAsync<bool>(
connection,
"SELECT is_public FROM portfolio_games WHERE id = @portfolioGameId",
parameters: new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
Assert.Equal(seed.PublishedAt, await ExecuteScalarAsync<DateTime>(
connection,
"SELECT published_at FROM portfolio_games WHERE id = @portfolioGameId",
parameters: new NpgsqlParameter("portfolioGameId", seed.PortfolioGameId)));
}
[Fact]
public async Task ConcurrentBatchFutureReschedules_ShouldLockPublicCardsInStableOrderWithoutDeadlock()
{
var database = await fixture.CreateMigratedDatabaseAsync();
await using var seedConnection = await database.OpenConnectionAsync();
var firstSeed = await SeedCardAsync(seedConnection, isPublic: true, sessionCount: 2);
var secondSeed = await SeedCardAsync(seedConnection, isPublic: true, sessionCount: 2);
await ExecuteNonQueryAsync(
seedConnection,
"""
CREATE FUNCTION wait_for_portfolio_card_unpublish_gate()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
PERFORM pg_advisory_xact_lock(20260601, 108);
RETURN NULL;
END;
$$;
CREATE TRIGGER trg_wait_for_portfolio_card_unpublish_gate
AFTER UPDATE OF is_public ON portfolio_games
FOR EACH ROW
WHEN (OLD.is_public = true AND NEW.is_public = false)
EXECUTE FUNCTION wait_for_portfolio_card_unpublish_gate();
""");
await using var firstConnection = await database.OpenConnectionAsync();
await using var secondConnection = await database.OpenConnectionAsync();
await using var gateConnection = await database.OpenConnectionAsync();
await using var observerConnection = await database.OpenConnectionAsync();
await using var firstTransaction = await firstConnection.BeginTransactionAsync();
await using var secondTransaction = await secondConnection.BeginTransactionAsync();
await using var gateTransaction = await gateConnection.BeginTransactionAsync();
var firstPid = await GetBackendPidAsync(firstConnection, firstTransaction);
var secondPid = await GetBackendPidAsync(secondConnection, secondTransaction);
var gatePid = await GetBackendPidAsync(gateConnection, gateTransaction);
await AcquireBatchRescheduleGateAsync(gateConnection, gateTransaction);
await RescheduleSessionsAsync(
firstConnection,
firstTransaction,
firstSeed.SessionIds[0],
secondSeed.SessionIds[0]);
await RescheduleSessionsAsync(
secondConnection,
secondTransaction,
secondSeed.SessionIds[1],
firstSeed.SessionIds[1]);
var firstCommitTask = CommitAndCaptureSqlStateAsync(firstTransaction);
var secondCommitTask = CommitAndCaptureSqlStateAsync(secondTransaction);
var gateBlockedPid = await WaitUntilEitherBlockedByAsync(
observerConnection,
firstPid,
secondPid,
gatePid);
await WaitUntilBlockedByAnyAsync(
observerConnection,
gateBlockedPid == firstPid ? secondPid : firstPid,
gatePid,
gateBlockedPid);
await gateTransaction.CommitAsync().WaitAsync(CommandTimeout);
var commitStates = await Task.WhenAll(firstCommitTask, secondCommitTask).WaitAsync(CommandTimeout);
Assert.All(commitStates, Assert.Null);
await using var verificationConnection = await database.OpenConnectionAsync();
Assert.Equal(0, await ExecuteScalarAsync<long>(
verificationConnection,
"""
SELECT COUNT(*)
FROM portfolio_games
WHERE id IN (@firstPortfolioGameId, @secondPortfolioGameId)
AND is_public = true
""",
parameters:
[
new NpgsqlParameter("firstPortfolioGameId", firstSeed.PortfolioGameId),
new NpgsqlParameter("secondPortfolioGameId", secondSeed.PortfolioGameId)
]));
}
[Fact]
public async Task PublishingDraftCardWithAnyFutureLinkedSession_ShouldFailCommit()
{
@@ -682,6 +799,16 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
transaction);
}
private static Task<int> AcquireBatchRescheduleGateAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction)
{
return ExecuteNonQueryAsync(
connection,
"SELECT pg_advisory_xact_lock(20260601, 108)",
transaction);
}
private static Task<int> GetBackendPidAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction)
@@ -713,6 +840,28 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
new NpgsqlParameter("sessionId", sessionId));
}
private static Task<int> RescheduleSessionsAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
Guid firstSessionId,
Guid secondSessionId)
{
return ExecuteNonQueryAsync(
connection,
"""
UPDATE sessions
SET scheduled_at = now() + interval '1 day'
WHERE id = @firstSessionId;
UPDATE sessions
SET scheduled_at = now() + interval '1 day'
WHERE id = @secondSessionId;
""",
transaction,
new NpgsqlParameter("firstSessionId", firstSessionId),
new NpgsqlParameter("secondSessionId", secondSessionId));
}
private static async Task LockUnpublishDeleteAndCommitSessionAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
@@ -774,6 +923,74 @@ public sealed class PortfolioMigrationPostgresTests(PortfolioMigrationPostgresFi
$"PostgreSQL backend {blockedPid} was not blocked by backend {blockingPid} within {CommandTimeout}.");
}
private static async Task<int> WaitUntilEitherBlockedByAsync(
NpgsqlConnection observerConnection,
int firstBlockedPid,
int secondBlockedPid,
int blockingPid)
{
using var timeout = new CancellationTokenSource(CommandTimeout);
while (!timeout.IsCancellationRequested)
{
var blockedPid = await ExecuteScalarAsync<int>(
observerConnection,
"""
SELECT CASE
WHEN @blockingPid = ANY (pg_blocking_pids(@firstBlockedPid)) THEN @firstBlockedPid
WHEN @blockingPid = ANY (pg_blocking_pids(@secondBlockedPid)) THEN @secondBlockedPid
ELSE 0
END
""",
parameters:
[
new NpgsqlParameter("firstBlockedPid", firstBlockedPid),
new NpgsqlParameter("secondBlockedPid", secondBlockedPid),
new NpgsqlParameter("blockingPid", blockingPid)
]);
if (blockedPid != 0)
{
return blockedPid;
}
await Task.Yield();
}
throw new TimeoutException(
$"Neither PostgreSQL backend {firstBlockedPid} nor {secondBlockedPid} was blocked by backend {blockingPid} within {CommandTimeout}.");
}
private static async Task WaitUntilBlockedByAnyAsync(
NpgsqlConnection observerConnection,
int blockedPid,
int firstBlockingPid,
int secondBlockingPid)
{
using var timeout = new CancellationTokenSource(CommandTimeout);
while (!timeout.IsCancellationRequested)
{
if (await ExecuteScalarAsync<bool>(
observerConnection,
"""
SELECT @firstBlockingPid = ANY (pg_blocking_pids(@blockedPid))
OR @secondBlockingPid = ANY (pg_blocking_pids(@blockedPid))
""",
parameters:
[
new NpgsqlParameter("blockedPid", blockedPid),
new NpgsqlParameter("firstBlockingPid", firstBlockingPid),
new NpgsqlParameter("secondBlockingPid", secondBlockingPid)
]))
{
return;
}
await Task.Yield();
}
throw new TimeoutException(
$"PostgreSQL backend {blockedPid} was not blocked by backend {firstBlockingPid} or {secondBlockingPid} within {CommandTimeout}.");
}
private static async Task<int> ExecuteNonQueryAsync(
NpgsqlConnection connection,
string sql,
@@ -7,6 +7,12 @@ public sealed class PortfolioMigrationTests
{
var migration = await ReadRepositoryFileAsync("src/GmRelay.Bot/Migrations/V029__add_completed_game_portfolios_and_reviews.sql");
var normalizedMigration = NormalizeSql(migration);
var unpublishFunctionStart = normalizedMigration.IndexOf(
"CREATE FUNCTION unpublish_public_portfolio_games_for_future_session()",
StringComparison.Ordinal);
var unpublishFunctionEnd = normalizedMigration.IndexOf(
"CREATE CONSTRAINT TRIGGER trg_sessions_unpublish_public_portfolio_games_for_future_reschedule",
StringComparison.Ordinal);
Assert.Contains("CREATE TABLE portfolio_games", migration, StringComparison.Ordinal);
Assert.Contains("CREATE TABLE portfolio_game_sessions", migration, StringComparison.Ordinal);
@@ -40,11 +46,16 @@ public sealed class PortfolioMigrationTests
Assert.Contains("CREATE CONSTRAINT TRIGGER trg_portfolio_game_sessions_validate_required_links AFTER INSERT OR DELETE OR UPDATE OF portfolio_game_id, session_id ON portfolio_game_sessions DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE FUNCTION validate_public_portfolio_game_required_links();", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("CREATE CONSTRAINT TRIGGER trg_portfolio_game_masters_validate_required_links AFTER DELETE OR UPDATE OF portfolio_game_id ON portfolio_game_masters DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE FUNCTION validate_public_portfolio_game_required_links();", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("CREATE FUNCTION unpublish_public_portfolio_games_for_future_session() RETURNS TRIGGER LANGUAGE plpgsql", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("OLD.scheduled_at IS DISTINCT FROM NEW.scheduled_at AND NEW.scheduled_at >= now()", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("UPDATE portfolio_games pg SET is_public = false, updated_at = now() FROM portfolio_game_sessions pgs WHERE pgs.portfolio_game_id = pg.id AND pgs.session_id = NEW.id AND pg.is_public = true;", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("SELECT s.scheduled_at INTO final_scheduled_at FROM sessions s WHERE s.id = NEW.id;", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("IF final_scheduled_at >= now() THEN", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("ORDER BY pg.id FOR UPDATE OF pg;", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("UPDATE portfolio_games pg SET is_public = false, updated_at = now() WHERE pg.is_public = true AND EXISTS (SELECT 1 FROM portfolio_game_sessions pgs JOIN sessions s ON s.id = pgs.session_id WHERE pgs.portfolio_game_id = pg.id AND s.scheduled_at >= now());", normalizedMigration, StringComparison.Ordinal);
Assert.Contains("CREATE CONSTRAINT TRIGGER trg_sessions_unpublish_public_portfolio_games_for_future_reschedule AFTER UPDATE OF scheduled_at ON sessions DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE FUNCTION unpublish_public_portfolio_games_for_future_session();", normalizedMigration, StringComparison.Ordinal);
Assert.DoesNotContain("unpublish_portfolio_game_without_required_links", normalizedMigration, StringComparison.Ordinal);
Assert.DoesNotContain("FOR UPDATE", normalizedMigration, StringComparison.Ordinal);
Assert.DoesNotContain(
"pg_advisory_xact_lock",
normalizedMigration[unpublishFunctionStart..unpublishFunctionEnd],
StringComparison.Ordinal);
Assert.DoesNotContain("published_at = NULL", normalizedMigration, StringComparison.OrdinalIgnoreCase);
Assert.Contains("publication_consent_at TIMESTAMPTZ NOT NULL,", normalizedMigration, StringComparison.Ordinal);
}
@@ -17,7 +17,7 @@ public sealed class PortfolioSchemaGateSourceTests
var appHost = NormalizeSource(await ReadRepositoryFileAsync("src/GmRelay.AppHost/Program.cs"));
Assert.Contains(
"var bot = builder.AddProject<Projects.GmRelay_Bot>(\"bot\") .WithReference(postgres) .WaitFor(postgres) .WithHttpEndpoint(port: 8081, targetPort: 8081, name: \"health\") .WithHttpHealthCheck(\"/health\", endpointName: \"health\");",
"var bot = builder.AddProject<Projects.GmRelay_Bot>(\"bot\") .WithReference(postgres) .WaitFor(postgres) .WithHttpEndpoint(port: 8081, targetPort: 8081, name: \"health\", isProxied: false) .WithHttpHealthCheck(\"/health\", endpointName: \"health\");",
appHost,
StringComparison.Ordinal);
Assert.Contains(
@@ -30,6 +30,14 @@ public sealed class PortfolioSchemaGateSourceTests
StringComparison.Ordinal);
}
[Fact]
public async Task Aspire_ShouldUseApplicationDatabaseConnectionStringName()
{
var appHost = NormalizeSource(await ReadRepositoryFileAsync("src/GmRelay.AppHost/Program.cs"));
Assert.Contains(".AddDatabase(\"gmrelaydb\");", appHost, StringComparison.Ordinal);
}
private static void AssertServiceDependsOnHealthyBot(string compose, string serviceName)
{
var serviceBlock = GetServiceBlock(compose, serviceName);