using System.Collections.Concurrent;
using GmRelay.Shared.Features.Confirmation.SendConfirmation;
using GmRelay.Shared.Features.Reminders.SendJoinLink;
using GmRelay.Shared.Features.Reminders.SendOneHourReminder;
using GmRelay.Shared.Platform;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace GmRelay.Shared.Infrastructure.Scheduling;
///
/// Stateless scheduler: wakes every 60 seconds, queries PostgreSQL for actionable sessions.
/// All state is kept in the database so worker restarts do not lose scheduled work.
///
public sealed class SessionSchedulerService(
ISessionTriggerStore triggerStore,
ISendConfirmationHandler confirmationHandler,
ISendOneHourReminderHandler oneHourReminderHandler,
ISendJoinLinkHandler joinLinkHandler,
ISystemClock clock,
ILogger logger) : BackgroundService
{
private static readonly TimeSpan TickInterval = TimeSpan.FromMinutes(1);
private static readonly TimeSpan BackoffDuration = TimeSpan.FromMinutes(15);
private readonly ConcurrentDictionary _confirmationBackoff = new();
private readonly ConcurrentDictionary _oneHourBackoff = new();
private readonly ConcurrentDictionary _joinLinkBackoff = new();
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Session scheduler started (interval: {Interval})", TickInterval);
using var timer = new PeriodicTimer(TickInterval);
do
{
try
{
await TickAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
logger.LogError(ex, "Scheduler tick failed, will retry next tick");
}
}
while (await timer.WaitForNextTickAsync(stoppingToken));
logger.LogInformation("Session scheduler stopped");
}
public async Task TickAsync(CancellationToken ct)
{
var now = clock.UtcNow;
await ProcessConfirmationTriggers(now, ct);
await ProcessOneHourReminderTriggers(now, ct);
await ProcessJoinLinkTriggers(now, ct);
}
private async Task ProcessConfirmationTriggers(DateTimeOffset now, CancellationToken ct)
{
IReadOnlyList sessionIds;
try
{
sessionIds = await triggerStore.GetSessionsNeedingConfirmationAsync(now, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to query confirmation triggers");
return;
}
foreach (var sessionId in sessionIds)
{
if (_confirmationBackoff.TryGetValue(sessionId, out var backoffUntil) && backoffUntil > now)
{
logger.LogDebug(
"Skipping confirmation for session {SessionId} until {Backoff}",
sessionId,
backoffUntil);
continue;
}
try
{
await confirmationHandler.HandleAsync(sessionId, ct);
_confirmationBackoff.TryRemove(sessionId, out _);
logger.LogInformation("Confirmation sent for session {SessionId}", sessionId);
}
catch (Exception ex)
{
var nextAttempt = now.Add(BackoffDuration);
_confirmationBackoff[sessionId] = nextAttempt;
logger.LogError(
ex,
"Failed to send confirmation for session {SessionId}, backing off until {Backoff}",
sessionId,
nextAttempt);
}
}
}
private async Task ProcessOneHourReminderTriggers(DateTimeOffset now, CancellationToken ct)
{
IReadOnlyList sessionIds;
try
{
sessionIds = await triggerStore.GetSessionsNeedingOneHourReminderAsync(now, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to query one-hour reminder triggers");
return;
}
foreach (var sessionId in sessionIds)
{
if (_oneHourBackoff.TryGetValue(sessionId, out var backoffUntil) && backoffUntil > now)
{
logger.LogDebug(
"Skipping one-hour reminder for session {SessionId} until {Backoff}",
sessionId,
backoffUntil);
continue;
}
try
{
await oneHourReminderHandler.HandleAsync(sessionId, ct);
_oneHourBackoff.TryRemove(sessionId, out _);
logger.LogInformation("One-hour reminder processed for session {SessionId}", sessionId);
}
catch (Exception ex)
{
var nextAttempt = now.Add(BackoffDuration);
_oneHourBackoff[sessionId] = nextAttempt;
logger.LogError(
ex,
"Failed to process one-hour reminder for session {SessionId}, backing off until {Backoff}",
sessionId,
nextAttempt);
}
}
}
private async Task ProcessJoinLinkTriggers(DateTimeOffset now, CancellationToken ct)
{
IReadOnlyList sessionIds;
try
{
sessionIds = await triggerStore.GetSessionsNeedingJoinLinkAsync(now, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to query join-link triggers");
return;
}
foreach (var sessionId in sessionIds)
{
if (_joinLinkBackoff.TryGetValue(sessionId, out var backoffUntil) && backoffUntil > now)
{
logger.LogDebug(
"Skipping join link for session {SessionId} until {Backoff}",
sessionId,
backoffUntil);
continue;
}
try
{
await joinLinkHandler.HandleAsync(sessionId, ct);
_joinLinkBackoff.TryRemove(sessionId, out _);
logger.LogInformation("Join link sent for session {SessionId}", sessionId);
}
catch (Exception ex)
{
var nextAttempt = now.Add(BackoffDuration);
_joinLinkBackoff[sessionId] = nextAttempt;
logger.LogError(
ex,
"Failed to send join link for session {SessionId}, backing off until {Backoff}",
sessionId,
nextAttempt);
}
}
}
}