From a826f4245ff9533d5d35e67b5314158be4e0e7a3 Mon Sep 17 00:00:00 2001 From: Kwoth Date: Thu, 16 Jun 2022 03:37:19 +0200 Subject: [PATCH] Fixed .streamrole not updating in real time, closes #345 --- src/NadekoBot/Common/QueueRunner.cs | 62 ++++++++ .../Utility/StreamRole/StreamRoleService.cs | 144 +++++++++++------- src/NadekoBot/_Extensions/Extensions.cs | 2 +- 3 files changed, 148 insertions(+), 60 deletions(-) create mode 100644 src/NadekoBot/Common/QueueRunner.cs diff --git a/src/NadekoBot/Common/QueueRunner.cs b/src/NadekoBot/Common/QueueRunner.cs new file mode 100644 index 000000000..222fdc932 --- /dev/null +++ b/src/NadekoBot/Common/QueueRunner.cs @@ -0,0 +1,62 @@ +using System.Threading.Channels; + +namespace NadekoBot.Common; + +public sealed class QueueRunner +{ + private readonly Channel> _channel; + private readonly int _delayMs; + + public QueueRunner(int delayMs = 0, int maxCapacity = -1) + { + if (delayMs < 0) + throw new ArgumentOutOfRangeException(nameof(delayMs)); + + _delayMs = delayMs; + _channel = maxCapacity switch + { + 0 or < -1 => throw new ArgumentOutOfRangeException(nameof(maxCapacity)), + -1 => Channel.CreateUnbounded>(new UnboundedChannelOptions() + { + SingleReader = true, + SingleWriter = false, + AllowSynchronousContinuations = true, + }), + _ => Channel.CreateBounded>(new BoundedChannelOptions(maxCapacity) + { + Capacity = maxCapacity, + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = false, + AllowSynchronousContinuations = true + }) + }; + } + + public async Task RunAsync(CancellationToken cancel = default) + { + while (true) + { + var func = await _channel.Reader.ReadAsync(cancel); + + try + { + await func(); + } + catch (Exception ex) + { + Log.Warning(ex, "Exception executing a staggered func: {ErrorMessage}", ex.Message); + } + finally + { + if (_delayMs != 0) + { + await Task.Delay(_delayMs, cancel); + } + } + } + } + + public ValueTask Enqueue(Func action) + => _channel.Writer.WriteAsync(action); +} \ No newline at end of file diff --git a/src/NadekoBot/Modules/Utility/StreamRole/StreamRoleService.cs b/src/NadekoBot/Modules/Utility/StreamRole/StreamRoleService.cs index 32a1c84c8..f8e3a9182 100644 --- a/src/NadekoBot/Modules/Utility/StreamRole/StreamRoleService.cs +++ b/src/NadekoBot/Modules/Utility/StreamRole/StreamRoleService.cs @@ -1,17 +1,20 @@ #nullable disable +using NadekoBot.Common.ModuleBehaviors; using NadekoBot.Db; using NadekoBot.Modules.Utility.Common; using NadekoBot.Modules.Utility.Common.Exceptions; using NadekoBot.Services.Database.Models; +using System.Diagnostics; using System.Net; namespace NadekoBot.Modules.Utility.Services; -public class StreamRoleService : INService +public class StreamRoleService : IReadyExecutor, INService { private readonly DbService _db; private readonly DiscordSocketClient _client; private readonly ConcurrentDictionary _guildSettings; + private readonly QueueRunner _queueRunner; public StreamRoleService(DiscordSocketClient client, DbService db, Bot bot) { @@ -22,33 +25,35 @@ public class StreamRoleService : INService .Where(x => x.Value is { Enabled: true }) .ToConcurrent(); - _client.GuildMemberUpdated += Client_GuildMemberUpdated; + _client.PresenceUpdated += OnPresenceUpdate; - _ = Task.Run(async () => - { - try - { - await client.Guilds.Select(g => RescanUsers(g)).WhenAll(); - } - catch - { - // ignored - } - }); + _queueRunner = new QueueRunner(); } - private Task Client_GuildMemberUpdated(Cacheable cacheable, SocketGuildUser after) + private Task OnPresenceUpdate(SocketUser user, SocketPresence oldPresence, SocketPresence newPresence) { + _ = Task.Run(async () => { - //if user wasn't streaming or didn't have a game status at all - if (_guildSettings.TryGetValue(after.Guild.Id, out var setting)) - await RescanUser(after, setting); + if (oldPresence.Activities.Count != newPresence.Activities.Count) + { + var guildUsers = _client.Guilds + .Select(x => x.GetUser(user.Id)); + + foreach (var guildUser in guildUsers) + { + if (_guildSettings.TryGetValue(guildUser.Guild.Id, out var s)) + await RescanUser(guildUser, s); + } + } }); return Task.CompletedTask; } + public Task OnReadyAsync() + => Task.WhenAll(_client.Guilds.Select(RescanUsers).WhenAll(), _queueRunner.RunAsync()); + /// /// Adds or removes a user from a blacklist or a whitelist in the specified guild. /// @@ -135,7 +140,7 @@ public class StreamRoleService : INService streamRoleSettings.Keyword = keyword; UpdateCache(guild.Id, streamRoleSettings); - uow.SaveChanges(); + await uow.SaveChangesAsync(); } await RescanUsers(guild); @@ -191,8 +196,7 @@ public class StreamRoleService : INService foreach (var usr in await fromRole.GetMembersAsync()) { - if (usr is { } x) - await RescanUser(x, setting, addRole); + await RescanUser(usr, setting, addRole); } } @@ -216,7 +220,10 @@ public class StreamRoleService : INService await RescanUsers(guild); } - private async Task RescanUser(IGuildUser user, StreamRoleSettings setting, IRole addRole = null) + private async ValueTask RescanUser(IGuildUser user, StreamRoleSettings setting, IRole addRole = null) + => await _queueRunner.Enqueue(() => RescanUserInternal(user, setting, addRole)); + + private async Task RescanUserInternal(IGuildUser user, StreamRoleSettings setting, IRole addRole = null) { if (user.IsBot) return; @@ -232,58 +239,77 @@ public class StreamRoleService : INService && setting.Blacklist.All(x => x.UserId != user.Id) && user.RoleIds.Contains(setting.FromRoleId)) { - try + await _queueRunner.Enqueue(async () => { - addRole ??= user.Guild.GetRole(setting.AddRoleId); - if (addRole is null) + try + { + addRole ??= user.Guild.GetRole(setting.AddRoleId); + if (addRole is null) + { + await StopStreamRole(user.Guild); + Log.Warning("Stream role in server {RoleId} no longer exists. Stopping", setting.AddRoleId); + return; + } + + //check if he doesn't have addrole already, to avoid errors + if (!user.RoleIds.Contains(addRole.Id)) + { + await user.AddRoleAsync(addRole); + Log.Information("Added stream role to user {User} in {Server} server", + user.ToString(), + user.Guild.ToString()); + } + } + catch (HttpException ex) when (ex.HttpCode == HttpStatusCode.Forbidden) { await StopStreamRole(user.Guild); - Log.Warning("Stream role in server {RoleId} no longer exists. Stopping", setting.AddRoleId); - return; + Log.Warning(ex, "Error adding stream role(s). Forcibly disabling stream role feature"); + throw new StreamRolePermissionException(); } - - //check if he doesn't have addrole already, to avoid errors - if (!user.RoleIds.Contains(addRole.Id)) + catch (Exception ex) { - await user.AddRoleAsync(addRole); - Log.Information("Added stream role to user {User} in {Server} server", - user.ToString(), - user.Guild.ToString()); + Log.Warning(ex, "Failed adding stream role"); } - } - catch (HttpException ex) when (ex.HttpCode == HttpStatusCode.Forbidden) - { - await StopStreamRole(user.Guild); - Log.Warning(ex, "Error adding stream role(s). Forcibly disabling stream role feature"); - throw new StreamRolePermissionException(); - } - catch (Exception ex) - { - Log.Warning(ex, "Failed adding stream role"); - } + }); } else { //check if user is in the addrole if (user.RoleIds.Contains(setting.AddRoleId)) { - try + await _queueRunner.Enqueue(async () => { - addRole ??= user.Guild.GetRole(setting.AddRoleId); - if (addRole is null) - throw new StreamRoleNotFoundException(); + try + { + addRole ??= user.Guild.GetRole(setting.AddRoleId); + if (addRole is null) + { + await StopStreamRole(user.Guild); + Log.Warning( + "Addrole doesn't exist in {GuildId} server. Forcibly disabling stream role feature", + user.Guild.Id); + return; + } - await user.RemoveRoleAsync(addRole); - Log.Information("Removed stream role from the user {User} in {Server} server", - user.ToString(), - user.Guild.ToString()); - } - catch (HttpException ex) when (ex.HttpCode == HttpStatusCode.Forbidden) - { - await StopStreamRole(user.Guild); - Log.Warning(ex, "Error removing stream role(s). Forcibly disabling stream role feature"); - throw new StreamRolePermissionException(); - } + // need to check again in case queuer is taking too long to execute + if (user.RoleIds.Contains(setting.AddRoleId)) + { + await user.RemoveRoleAsync(addRole); + } + + Log.Information("Removed stream role from the user {User} in {Server} server", + user.ToString(), + user.Guild.ToString()); + } + catch (HttpException ex) + { + if (ex.HttpCode == HttpStatusCode.Forbidden) + { + await StopStreamRole(user.Guild); + Log.Warning(ex, "Error removing stream role(s). Forcibly disabling stream role feature"); + } + } + }); } } } diff --git a/src/NadekoBot/_Extensions/Extensions.cs b/src/NadekoBot/_Extensions/Extensions.cs index 1d533fa3e..3ee2299ba 100644 --- a/src/NadekoBot/_Extensions/Extensions.cs +++ b/src/NadekoBot/_Extensions/Extensions.cs @@ -221,7 +221,7 @@ public static class Extensions public static void Lap(this Stopwatch sw, string checkpoint) { - Log.Information("Checkpoint {CheckPoint}: {Time}", checkpoint, sw.Elapsed.TotalMilliseconds); + Log.Information("Checkpoint {CheckPoint}: {Time}ms", checkpoint, sw.Elapsed.TotalMilliseconds); sw.Restart(); } } \ No newline at end of file