From 4d6651827b774d5a9486b323da093e77d63cdb38 Mon Sep 17 00:00:00 2001 From: Toutsu Date: Thu, 23 Apr 2026 20:42:16 +0300 Subject: [PATCH] fix: skip stale pending updates on startup --- .../Telegram/ITelegramUpdateHandler.cs | 8 ++ .../Telegram/ITelegramUpdateSource.cs | 14 +++ .../Telegram/TelegramBotService.cs | 54 ++++++---- .../Telegram/TelegramUpdateSource.cs | 21 ++++ .../Infrastructure/Telegram/UpdateRouter.cs | 2 +- src/GmRelay.Bot/Program.cs | 2 + .../Telegram/TelegramBotServiceTests.cs | 101 ++++++++++++++++++ 7 files changed, 181 insertions(+), 21 deletions(-) create mode 100644 src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateHandler.cs create mode 100644 src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateSource.cs create mode 100644 src/GmRelay.Bot/Infrastructure/Telegram/TelegramUpdateSource.cs create mode 100644 tests/GmRelay.Bot.Tests/Infrastructure/Telegram/TelegramBotServiceTests.cs diff --git a/src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateHandler.cs b/src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateHandler.cs new file mode 100644 index 0000000..ac41400 --- /dev/null +++ b/src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateHandler.cs @@ -0,0 +1,8 @@ +using Telegram.Bot.Types; + +namespace GmRelay.Bot.Infrastructure.Telegram; + +public interface ITelegramUpdateHandler +{ + Task RouteAsync(Update update, CancellationToken ct); +} diff --git a/src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateSource.cs b/src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateSource.cs new file mode 100644 index 0000000..2d4b95a --- /dev/null +++ b/src/GmRelay.Bot/Infrastructure/Telegram/ITelegramUpdateSource.cs @@ -0,0 +1,14 @@ +using Telegram.Bot.Types; +using Telegram.Bot.Types.Enums; + +namespace GmRelay.Bot.Infrastructure.Telegram; + +public interface ITelegramUpdateSource +{ + Task GetUpdatesAsync( + int offset, + int? limit = null, + int? timeout = null, + IEnumerable? allowedUpdates = null, + CancellationToken cancellationToken = default); +} diff --git a/src/GmRelay.Bot/Infrastructure/Telegram/TelegramBotService.cs b/src/GmRelay.Bot/Infrastructure/Telegram/TelegramBotService.cs index 41fe2c5..d5fb6a9 100644 --- a/src/GmRelay.Bot/Infrastructure/Telegram/TelegramBotService.cs +++ b/src/GmRelay.Bot/Infrastructure/Telegram/TelegramBotService.cs @@ -1,4 +1,3 @@ -using Telegram.Bot; using Telegram.Bot.Types; using Telegram.Bot.Types.Enums; @@ -9,35 +8,21 @@ namespace GmRelay.Bot.Infrastructure.Telegram; /// Stateless — all state is in PostgreSQL. Safe to restart at any time. /// public sealed class TelegramBotService( - ITelegramBotClient bot, - UpdateRouter router, + ITelegramUpdateSource updateSource, + ITelegramUpdateHandler updateHandler, ILogger logger) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { logger.LogInformation("Telegram bot polling started"); - // Skip any pending updates from before this startup - try - { - var pending = await bot.GetUpdates(offset: -1, limit: 1, cancellationToken: stoppingToken); - if (pending.Length > 0) - { - logger.LogInformation("Skipped {Count} pending update(s)", pending[^1].Id); - } - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to clear pending updates, continuing anyway"); - } - - var offset = 0; + var offset = await GetStartupOffsetAsync(stoppingToken); while (!stoppingToken.IsCancellationRequested) { try { - var updates = await bot.GetUpdates( + var updates = await updateSource.GetUpdatesAsync( offset: offset, timeout: 30, allowedUpdates: [UpdateType.Message, UpdateType.CallbackQuery], @@ -47,7 +32,7 @@ public sealed class TelegramBotService( { try { - await router.RouteAsync(update, stoppingToken); + await updateHandler.RouteAsync(update, stoppingToken); } catch (Exception ex) { @@ -70,4 +55,33 @@ public sealed class TelegramBotService( logger.LogInformation("Telegram bot polling stopped"); } + + private async Task GetStartupOffsetAsync(CancellationToken stoppingToken) + { + try + { + var pending = await updateSource.GetUpdatesAsync( + offset: -1, + limit: 1, + cancellationToken: stoppingToken); + + if (pending.Length == 0) + { + return 0; + } + + var startupOffset = pending[^1].Id + 1; + logger.LogInformation( + "Skipping pending updates through {LastPendingUpdateId}; starting polling from offset {StartupOffset}", + pending[^1].Id, + startupOffset); + + return startupOffset; + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to determine startup offset, continuing from offset 0"); + return 0; + } + } } diff --git a/src/GmRelay.Bot/Infrastructure/Telegram/TelegramUpdateSource.cs b/src/GmRelay.Bot/Infrastructure/Telegram/TelegramUpdateSource.cs new file mode 100644 index 0000000..0e684df --- /dev/null +++ b/src/GmRelay.Bot/Infrastructure/Telegram/TelegramUpdateSource.cs @@ -0,0 +1,21 @@ +using Telegram.Bot; +using Telegram.Bot.Types; +using Telegram.Bot.Types.Enums; + +namespace GmRelay.Bot.Infrastructure.Telegram; + +public sealed class TelegramUpdateSource(ITelegramBotClient bot) : ITelegramUpdateSource +{ + public Task GetUpdatesAsync( + int offset, + int? limit = null, + int? timeout = null, + IEnumerable? allowedUpdates = null, + CancellationToken cancellationToken = default) => + bot.GetUpdates( + offset: offset, + limit: limit, + timeout: timeout, + allowedUpdates: allowedUpdates, + cancellationToken: cancellationToken); +} diff --git a/src/GmRelay.Bot/Infrastructure/Telegram/UpdateRouter.cs b/src/GmRelay.Bot/Infrastructure/Telegram/UpdateRouter.cs index 6951fec..3a8fc12 100644 --- a/src/GmRelay.Bot/Infrastructure/Telegram/UpdateRouter.cs +++ b/src/GmRelay.Bot/Infrastructure/Telegram/UpdateRouter.cs @@ -28,7 +28,7 @@ public sealed class UpdateRouter( HandleRescheduleTimeInputHandler rescheduleTimeInputHandler, HandleRescheduleVoteHandler rescheduleVoteHandler, ITelegramBotClient bot, - ILogger logger) + ILogger logger) : ITelegramUpdateHandler { public async Task RouteAsync(Update update, CancellationToken ct) { diff --git a/src/GmRelay.Bot/Program.cs b/src/GmRelay.Bot/Program.cs index 9cba23f..d71cd0d 100644 --- a/src/GmRelay.Bot/Program.cs +++ b/src/GmRelay.Bot/Program.cs @@ -46,6 +46,7 @@ builder.Services.AddSingleton(sp => "Telegram:BotToken is required. Set via environment variable Telegram__BotToken or appsettings.json."); return new TelegramBotClient(token); }); +builder.Services.AddSingleton(); // ── Feature handlers (explicit registration — AOT safe) ────────────── builder.Services.AddSingleton(); @@ -63,6 +64,7 @@ builder.Services.AddSingleton(); // ── Telegram infrastructure ────────────────────────────────────────── builder.Services.AddSingleton(); +builder.Services.AddSingleton(sp => sp.GetRequiredService()); builder.Services.AddHostedService(); // ── Session scheduler ──────────────────────────────────────────────── diff --git a/tests/GmRelay.Bot.Tests/Infrastructure/Telegram/TelegramBotServiceTests.cs b/tests/GmRelay.Bot.Tests/Infrastructure/Telegram/TelegramBotServiceTests.cs new file mode 100644 index 0000000..5d28fea --- /dev/null +++ b/tests/GmRelay.Bot.Tests/Infrastructure/Telegram/TelegramBotServiceTests.cs @@ -0,0 +1,101 @@ +using System.Reflection; +using GmRelay.Bot.Infrastructure.Telegram; +using Microsoft.Extensions.Logging.Abstractions; +using Telegram.Bot.Types; +using Telegram.Bot.Types.Enums; + +namespace GmRelay.Bot.Tests.Infrastructure.Telegram; + +public sealed class TelegramBotServiceTests +{ + [Fact] + public async Task ExecuteAsync_ShouldStartPollingAfterLastPendingUpdate() + { + using var cts = new CancellationTokenSource(); + var updateSource = new FakeTelegramUpdateSource(cts); + var updateHandler = new FakeTelegramUpdateHandler(); + var service = new TelegramBotService( + updateSource, + updateHandler, + NullLogger.Instance); + + await InvokeExecuteAsync(service, cts.Token); + + Assert.Empty(updateHandler.HandledUpdates); + Assert.Collection( + updateSource.Calls, + call => + { + Assert.Equal(-1, call.Offset); + Assert.Equal(1, call.Limit); + Assert.Null(call.Timeout); + Assert.Null(call.AllowedUpdates); + }, + call => + { + Assert.Equal(43, call.Offset); + Assert.Null(call.Limit); + Assert.Equal(30, call.Timeout); + Assert.Equal([UpdateType.Message, UpdateType.CallbackQuery], call.AllowedUpdates); + }); + } + + private static async Task InvokeExecuteAsync(TelegramBotService service, CancellationToken cancellationToken) + { + var executeAsync = typeof(TelegramBotService).GetMethod( + "ExecuteAsync", + BindingFlags.Instance | BindingFlags.NonPublic); + + Assert.NotNull(executeAsync); + + var task = executeAsync.Invoke(service, [cancellationToken]) as Task; + Assert.NotNull(task); + + await task; + } + + private sealed class FakeTelegramUpdateHandler : ITelegramUpdateHandler + { + public List HandledUpdates { get; } = []; + + public Task RouteAsync(Update update, CancellationToken ct) + { + HandledUpdates.Add(update); + return Task.CompletedTask; + } + } + + private sealed class FakeTelegramUpdateSource(CancellationTokenSource cts) : ITelegramUpdateSource + { + public List Calls { get; } = []; + + public Task GetUpdatesAsync( + int offset, + int? limit = null, + int? timeout = null, + IEnumerable? allowedUpdates = null, + CancellationToken cancellationToken = default) + { + Calls.Add(new PollCall(offset, limit, timeout, allowedUpdates?.ToArray())); + + return Calls.Count switch + { + 1 => Task.FromResult(new[] { new Update { Id = 42 } }), + 2 => ReturnAndCancelAsync(), + _ => throw new InvalidOperationException("Unexpected polling call.") + }; + } + + private Task ReturnAndCancelAsync() + { + cts.Cancel(); + return Task.FromResult(Array.Empty()); + } + } + + private sealed record PollCall( + int Offset, + int? Limit, + int? Timeout, + UpdateType[]? AllowedUpdates); +}