feat(platform): route scheduler notifications through platform messenger
PR Checks / test-and-build (pull_request) Successful in 7m9s

This commit is contained in:
2026-05-21 12:30:35 +03:00
parent 5dbec1a0a4
commit 2a707e4825
49 changed files with 2158 additions and 846 deletions
@@ -0,0 +1,109 @@
using Dapper;
using GmRelay.Shared.Domain;
using Npgsql;
namespace GmRelay.Shared.Infrastructure.Scheduling;
public interface ISessionTriggerStore
{
Task<IReadOnlyList<Guid>> GetSessionsNeedingConfirmationAsync(DateTimeOffset now, CancellationToken ct);
Task<IReadOnlyList<Guid>> GetSessionsNeedingOneHourReminderAsync(DateTimeOffset now, CancellationToken ct);
Task<IReadOnlyList<Guid>> GetSessionsNeedingJoinLinkAsync(DateTimeOffset now, CancellationToken ct);
}
public sealed class DbSessionTriggerStore(
NpgsqlDataSource dataSource,
PlatformSchedulerOptions options) : ISessionTriggerStore
{
private static readonly TimeSpan ConfirmationLeadTime = TimeSpan.FromHours(24);
private static readonly TimeSpan OneHourReminderLeadTime = TimeSpan.FromHours(1);
private static readonly TimeSpan JoinLinkLeadTime = TimeSpan.FromMinutes(5);
public async Task<IReadOnlyList<Guid>> GetSessionsNeedingConfirmationAsync(DateTimeOffset now, CancellationToken ct)
{
await using var connection = await dataSource.OpenConnectionAsync(ct);
var results = await connection.QueryAsync<Guid>(
"""
SELECT s.id
FROM sessions s
JOIN game_groups g ON g.id = s.group_id
WHERE g.platform = @Platform
AND s.status = @Planned
AND s.scheduled_at - @LeadTime <= @Now
AND s.confirmation_sent_at IS NULL
""",
new
{
Platform = options.Platform.ToString(),
Planned = SessionStatus.Planned,
LeadTime = ConfirmationLeadTime,
Now = now.UtcDateTime
});
return results.ToList();
}
public async Task<IReadOnlyList<Guid>> GetSessionsNeedingOneHourReminderAsync(DateTimeOffset now, CancellationToken ct)
{
await using var connection = await dataSource.OpenConnectionAsync(ct);
var results = await connection.QueryAsync<Guid>(
"""
SELECT s.id
FROM sessions s
JOIN game_groups g ON g.id = s.group_id
WHERE g.platform = @Platform
AND s.status IN (@Confirmed, @ConfirmationSent)
AND s.scheduled_at - @LeadTime <= @Now
AND s.one_hour_reminder_processed_at IS NULL
""",
new
{
Platform = options.Platform.ToString(),
Confirmed = SessionStatus.Confirmed,
ConfirmationSent = SessionStatus.ConfirmationSent,
LeadTime = OneHourReminderLeadTime,
Now = now.UtcDateTime
});
return results.ToList();
}
public async Task<IReadOnlyList<Guid>> GetSessionsNeedingJoinLinkAsync(DateTimeOffset now, CancellationToken ct)
{
await using var connection = await dataSource.OpenConnectionAsync(ct);
var results = await connection.QueryAsync<Guid>(
"""
SELECT s.id
FROM sessions s
JOIN game_groups g ON g.id = s.group_id
WHERE g.platform = @Platform
AND s.status = @Confirmed
AND s.scheduled_at - @LeadTime <= @Now
AND (
(g.platform = 'Telegram' AND s.link_message_id IS NULL)
OR (
g.platform <> 'Telegram'
AND NOT EXISTS (
SELECT 1
FROM platform_messages pm
WHERE pm.session_id = s.id
AND pm.platform = g.platform
AND pm.purpose = 'join_link'
)
)
)
""",
new
{
Platform = options.Platform.ToString(),
Confirmed = SessionStatus.Confirmed,
LeadTime = JoinLinkLeadTime,
Now = now.UtcDateTime
});
return results.ToList();
}
}
@@ -0,0 +1,5 @@
using GmRelay.Shared.Platform;
namespace GmRelay.Shared.Infrastructure.Scheduling;
public sealed record PlatformSchedulerOptions(PlatformKind Platform);
@@ -0,0 +1,139 @@
using GmRelay.Shared.Features.Confirmation.SendConfirmation;
using GmRelay.Shared.Features.Reminders.SendJoinLink;
using GmRelay.Shared.Features.Reminders.SendOneHourReminder;
using GmRelay.Shared.Platform;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace GmRelay.Shared.Infrastructure.Scheduling;
/// <summary>
/// Stateless scheduler: wakes every 60 seconds, queries PostgreSQL for actionable sessions.
/// All state is kept in the database so worker restarts do not lose scheduled work.
/// </summary>
public sealed class SessionSchedulerService(
ISessionTriggerStore triggerStore,
ISendConfirmationHandler confirmationHandler,
ISendOneHourReminderHandler oneHourReminderHandler,
ISendJoinLinkHandler joinLinkHandler,
ISystemClock clock,
ILogger<SessionSchedulerService> logger) : BackgroundService
{
private static readonly TimeSpan TickInterval = TimeSpan.FromMinutes(1);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Session scheduler started (interval: {Interval})", TickInterval);
using var timer = new PeriodicTimer(TickInterval);
do
{
try
{
await TickAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
logger.LogError(ex, "Scheduler tick failed, will retry next tick");
}
}
while (await timer.WaitForNextTickAsync(stoppingToken));
logger.LogInformation("Session scheduler stopped");
}
public async Task TickAsync(CancellationToken ct)
{
var now = clock.UtcNow;
await ProcessConfirmationTriggers(now, ct);
await ProcessOneHourReminderTriggers(now, ct);
await ProcessJoinLinkTriggers(now, ct);
}
private async Task ProcessConfirmationTriggers(DateTimeOffset now, CancellationToken ct)
{
IReadOnlyList<Guid> sessionIds;
try
{
sessionIds = await triggerStore.GetSessionsNeedingConfirmationAsync(now, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to query confirmation triggers");
return;
}
foreach (var sessionId in sessionIds)
{
try
{
await confirmationHandler.HandleAsync(sessionId, ct);
logger.LogInformation("Confirmation sent for session {SessionId}", sessionId);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to send confirmation for session {SessionId}", sessionId);
}
}
}
private async Task ProcessOneHourReminderTriggers(DateTimeOffset now, CancellationToken ct)
{
IReadOnlyList<Guid> sessionIds;
try
{
sessionIds = await triggerStore.GetSessionsNeedingOneHourReminderAsync(now, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to query one-hour reminder triggers");
return;
}
foreach (var sessionId in sessionIds)
{
try
{
await oneHourReminderHandler.HandleAsync(sessionId, ct);
logger.LogInformation("One-hour reminder processed for session {SessionId}", sessionId);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process one-hour reminder for session {SessionId}", sessionId);
}
}
}
private async Task ProcessJoinLinkTriggers(DateTimeOffset now, CancellationToken ct)
{
IReadOnlyList<Guid> sessionIds;
try
{
sessionIds = await triggerStore.GetSessionsNeedingJoinLinkAsync(now, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to query join-link triggers");
return;
}
foreach (var sessionId in sessionIds)
{
try
{
await joinLinkHandler.HandleAsync(sessionId, ct);
logger.LogInformation("Join link sent for session {SessionId}", sessionId);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to send join link for session {SessionId}", sessionId);
}
}
}
}