fix: skip stale pending updates on startup
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
using Telegram.Bot.Types;
|
||||
|
||||
namespace GmRelay.Bot.Infrastructure.Telegram;
|
||||
|
||||
public interface ITelegramUpdateHandler
|
||||
{
|
||||
Task RouteAsync(Update update, CancellationToken ct);
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
using Telegram.Bot.Types;
|
||||
using Telegram.Bot.Types.Enums;
|
||||
|
||||
namespace GmRelay.Bot.Infrastructure.Telegram;
|
||||
|
||||
public interface ITelegramUpdateSource
|
||||
{
|
||||
Task<Update[]> GetUpdatesAsync(
|
||||
int offset,
|
||||
int? limit = null,
|
||||
int? timeout = null,
|
||||
IEnumerable<UpdateType>? allowedUpdates = null,
|
||||
CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
using Telegram.Bot;
|
||||
using Telegram.Bot.Types;
|
||||
using Telegram.Bot.Types.Enums;
|
||||
|
||||
@@ -9,35 +8,21 @@ namespace GmRelay.Bot.Infrastructure.Telegram;
|
||||
/// Stateless — all state is in PostgreSQL. Safe to restart at any time.
|
||||
/// </summary>
|
||||
public sealed class TelegramBotService(
|
||||
ITelegramBotClient bot,
|
||||
UpdateRouter router,
|
||||
ITelegramUpdateSource updateSource,
|
||||
ITelegramUpdateHandler updateHandler,
|
||||
ILogger<TelegramBotService> logger) : BackgroundService
|
||||
{
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
logger.LogInformation("Telegram bot polling started");
|
||||
|
||||
// Skip any pending updates from before this startup
|
||||
try
|
||||
{
|
||||
var pending = await bot.GetUpdates(offset: -1, limit: 1, cancellationToken: stoppingToken);
|
||||
if (pending.Length > 0)
|
||||
{
|
||||
logger.LogInformation("Skipped {Count} pending update(s)", pending[^1].Id);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to clear pending updates, continuing anyway");
|
||||
}
|
||||
|
||||
var offset = 0;
|
||||
var offset = await GetStartupOffsetAsync(stoppingToken);
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var updates = await bot.GetUpdates(
|
||||
var updates = await updateSource.GetUpdatesAsync(
|
||||
offset: offset,
|
||||
timeout: 30,
|
||||
allowedUpdates: [UpdateType.Message, UpdateType.CallbackQuery],
|
||||
@@ -47,7 +32,7 @@ public sealed class TelegramBotService(
|
||||
{
|
||||
try
|
||||
{
|
||||
await router.RouteAsync(update, stoppingToken);
|
||||
await updateHandler.RouteAsync(update, stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -70,4 +55,33 @@ public sealed class TelegramBotService(
|
||||
|
||||
logger.LogInformation("Telegram bot polling stopped");
|
||||
}
|
||||
|
||||
private async Task<int> GetStartupOffsetAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
var pending = await updateSource.GetUpdatesAsync(
|
||||
offset: -1,
|
||||
limit: 1,
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
if (pending.Length == 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
var startupOffset = pending[^1].Id + 1;
|
||||
logger.LogInformation(
|
||||
"Skipping pending updates through {LastPendingUpdateId}; starting polling from offset {StartupOffset}",
|
||||
pending[^1].Id,
|
||||
startupOffset);
|
||||
|
||||
return startupOffset;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to determine startup offset, continuing from offset 0");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
using Telegram.Bot;
|
||||
using Telegram.Bot.Types;
|
||||
using Telegram.Bot.Types.Enums;
|
||||
|
||||
namespace GmRelay.Bot.Infrastructure.Telegram;
|
||||
|
||||
public sealed class TelegramUpdateSource(ITelegramBotClient bot) : ITelegramUpdateSource
|
||||
{
|
||||
public Task<Update[]> GetUpdatesAsync(
|
||||
int offset,
|
||||
int? limit = null,
|
||||
int? timeout = null,
|
||||
IEnumerable<UpdateType>? allowedUpdates = null,
|
||||
CancellationToken cancellationToken = default) =>
|
||||
bot.GetUpdates(
|
||||
offset: offset,
|
||||
limit: limit,
|
||||
timeout: timeout,
|
||||
allowedUpdates: allowedUpdates,
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
@@ -28,7 +28,7 @@ public sealed class UpdateRouter(
|
||||
HandleRescheduleTimeInputHandler rescheduleTimeInputHandler,
|
||||
HandleRescheduleVoteHandler rescheduleVoteHandler,
|
||||
ITelegramBotClient bot,
|
||||
ILogger<UpdateRouter> logger)
|
||||
ILogger<UpdateRouter> logger) : ITelegramUpdateHandler
|
||||
{
|
||||
public async Task RouteAsync(Update update, CancellationToken ct)
|
||||
{
|
||||
|
||||
@@ -46,6 +46,7 @@ builder.Services.AddSingleton<ITelegramBotClient>(sp =>
|
||||
"Telegram:BotToken is required. Set via environment variable Telegram__BotToken or appsettings.json.");
|
||||
return new TelegramBotClient(token);
|
||||
});
|
||||
builder.Services.AddSingleton<ITelegramUpdateSource, TelegramUpdateSource>();
|
||||
|
||||
// ── Feature handlers (explicit registration — AOT safe) ──────────────
|
||||
builder.Services.AddSingleton<SendConfirmationHandler>();
|
||||
@@ -63,6 +64,7 @@ builder.Services.AddSingleton<HandleRescheduleVoteHandler>();
|
||||
|
||||
// ── Telegram infrastructure ──────────────────────────────────────────
|
||||
builder.Services.AddSingleton<UpdateRouter>();
|
||||
builder.Services.AddSingleton<ITelegramUpdateHandler>(sp => sp.GetRequiredService<UpdateRouter>());
|
||||
builder.Services.AddHostedService<TelegramBotService>();
|
||||
|
||||
// ── Session scheduler ────────────────────────────────────────────────
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
using System.Reflection;
|
||||
using GmRelay.Bot.Infrastructure.Telegram;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Telegram.Bot.Types;
|
||||
using Telegram.Bot.Types.Enums;
|
||||
|
||||
namespace GmRelay.Bot.Tests.Infrastructure.Telegram;
|
||||
|
||||
public sealed class TelegramBotServiceTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_ShouldStartPollingAfterLastPendingUpdate()
|
||||
{
|
||||
using var cts = new CancellationTokenSource();
|
||||
var updateSource = new FakeTelegramUpdateSource(cts);
|
||||
var updateHandler = new FakeTelegramUpdateHandler();
|
||||
var service = new TelegramBotService(
|
||||
updateSource,
|
||||
updateHandler,
|
||||
NullLogger<TelegramBotService>.Instance);
|
||||
|
||||
await InvokeExecuteAsync(service, cts.Token);
|
||||
|
||||
Assert.Empty(updateHandler.HandledUpdates);
|
||||
Assert.Collection(
|
||||
updateSource.Calls,
|
||||
call =>
|
||||
{
|
||||
Assert.Equal(-1, call.Offset);
|
||||
Assert.Equal(1, call.Limit);
|
||||
Assert.Null(call.Timeout);
|
||||
Assert.Null(call.AllowedUpdates);
|
||||
},
|
||||
call =>
|
||||
{
|
||||
Assert.Equal(43, call.Offset);
|
||||
Assert.Null(call.Limit);
|
||||
Assert.Equal(30, call.Timeout);
|
||||
Assert.Equal([UpdateType.Message, UpdateType.CallbackQuery], call.AllowedUpdates);
|
||||
});
|
||||
}
|
||||
|
||||
private static async Task InvokeExecuteAsync(TelegramBotService service, CancellationToken cancellationToken)
|
||||
{
|
||||
var executeAsync = typeof(TelegramBotService).GetMethod(
|
||||
"ExecuteAsync",
|
||||
BindingFlags.Instance | BindingFlags.NonPublic);
|
||||
|
||||
Assert.NotNull(executeAsync);
|
||||
|
||||
var task = executeAsync.Invoke(service, [cancellationToken]) as Task;
|
||||
Assert.NotNull(task);
|
||||
|
||||
await task;
|
||||
}
|
||||
|
||||
private sealed class FakeTelegramUpdateHandler : ITelegramUpdateHandler
|
||||
{
|
||||
public List<Update> HandledUpdates { get; } = [];
|
||||
|
||||
public Task RouteAsync(Update update, CancellationToken ct)
|
||||
{
|
||||
HandledUpdates.Add(update);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class FakeTelegramUpdateSource(CancellationTokenSource cts) : ITelegramUpdateSource
|
||||
{
|
||||
public List<PollCall> Calls { get; } = [];
|
||||
|
||||
public Task<Update[]> GetUpdatesAsync(
|
||||
int offset,
|
||||
int? limit = null,
|
||||
int? timeout = null,
|
||||
IEnumerable<UpdateType>? allowedUpdates = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
Calls.Add(new PollCall(offset, limit, timeout, allowedUpdates?.ToArray()));
|
||||
|
||||
return Calls.Count switch
|
||||
{
|
||||
1 => Task.FromResult(new[] { new Update { Id = 42 } }),
|
||||
2 => ReturnAndCancelAsync(),
|
||||
_ => throw new InvalidOperationException("Unexpected polling call.")
|
||||
};
|
||||
}
|
||||
|
||||
private Task<Update[]> ReturnAndCancelAsync()
|
||||
{
|
||||
cts.Cancel();
|
||||
return Task.FromResult(Array.Empty<Update>());
|
||||
}
|
||||
}
|
||||
|
||||
private sealed record PollCall(
|
||||
int Offset,
|
||||
int? Limit,
|
||||
int? Timeout,
|
||||
UpdateType[]? AllowedUpdates);
|
||||
}
|
||||
Reference in New Issue
Block a user