Fixed .streamrole not updating in real time, closes #345

This commit is contained in:
Kwoth
2022-06-16 03:37:19 +02:00
parent 780eec62b3
commit a826f4245f
3 changed files with 148 additions and 60 deletions

View File

@@ -0,0 +1,62 @@
using System.Threading.Channels;
namespace NadekoBot.Common;
public sealed class QueueRunner
{
private readonly Channel<Func<Task>> _channel;
private readonly int _delayMs;
public QueueRunner(int delayMs = 0, int maxCapacity = -1)
{
if (delayMs < 0)
throw new ArgumentOutOfRangeException(nameof(delayMs));
_delayMs = delayMs;
_channel = maxCapacity switch
{
0 or < -1 => throw new ArgumentOutOfRangeException(nameof(maxCapacity)),
-1 => Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = true,
}),
_ => Channel.CreateBounded<Func<Task>>(new BoundedChannelOptions(maxCapacity)
{
Capacity = maxCapacity,
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = true
})
};
}
public async Task RunAsync(CancellationToken cancel = default)
{
while (true)
{
var func = await _channel.Reader.ReadAsync(cancel);
try
{
await func();
}
catch (Exception ex)
{
Log.Warning(ex, "Exception executing a staggered func: {ErrorMessage}", ex.Message);
}
finally
{
if (_delayMs != 0)
{
await Task.Delay(_delayMs, cancel);
}
}
}
}
public ValueTask Enqueue(Func<Task> action)
=> _channel.Writer.WriteAsync(action);
}

View File

@@ -1,17 +1,20 @@
#nullable disable
using NadekoBot.Common.ModuleBehaviors;
using NadekoBot.Db;
using NadekoBot.Modules.Utility.Common;
using NadekoBot.Modules.Utility.Common.Exceptions;
using NadekoBot.Services.Database.Models;
using System.Diagnostics;
using System.Net;
namespace NadekoBot.Modules.Utility.Services;
public class StreamRoleService : INService
public class StreamRoleService : IReadyExecutor, INService
{
private readonly DbService _db;
private readonly DiscordSocketClient _client;
private readonly ConcurrentDictionary<ulong, StreamRoleSettings> _guildSettings;
private readonly QueueRunner _queueRunner;
public StreamRoleService(DiscordSocketClient client, DbService db, Bot bot)
{
@@ -22,33 +25,35 @@ public class StreamRoleService : INService
.Where(x => x.Value is { Enabled: true })
.ToConcurrent();
_client.GuildMemberUpdated += Client_GuildMemberUpdated;
_client.PresenceUpdated += OnPresenceUpdate;
_ = Task.Run(async () =>
{
try
{
await client.Guilds.Select(g => RescanUsers(g)).WhenAll();
}
catch
{
// ignored
}
});
_queueRunner = new QueueRunner();
}
private Task Client_GuildMemberUpdated(Cacheable<SocketGuildUser, ulong> cacheable, SocketGuildUser after)
private Task OnPresenceUpdate(SocketUser user, SocketPresence oldPresence, SocketPresence newPresence)
{
_ = Task.Run(async () =>
{
//if user wasn't streaming or didn't have a game status at all
if (_guildSettings.TryGetValue(after.Guild.Id, out var setting))
await RescanUser(after, setting);
if (oldPresence.Activities.Count != newPresence.Activities.Count)
{
var guildUsers = _client.Guilds
.Select(x => x.GetUser(user.Id));
foreach (var guildUser in guildUsers)
{
if (_guildSettings.TryGetValue(guildUser.Guild.Id, out var s))
await RescanUser(guildUser, s);
}
}
});
return Task.CompletedTask;
}
public Task OnReadyAsync()
=> Task.WhenAll(_client.Guilds.Select(RescanUsers).WhenAll(), _queueRunner.RunAsync());
/// <summary>
/// Adds or removes a user from a blacklist or a whitelist in the specified guild.
/// </summary>
@@ -135,7 +140,7 @@ public class StreamRoleService : INService
streamRoleSettings.Keyword = keyword;
UpdateCache(guild.Id, streamRoleSettings);
uow.SaveChanges();
await uow.SaveChangesAsync();
}
await RescanUsers(guild);
@@ -191,8 +196,7 @@ public class StreamRoleService : INService
foreach (var usr in await fromRole.GetMembersAsync())
{
if (usr is { } x)
await RescanUser(x, setting, addRole);
await RescanUser(usr, setting, addRole);
}
}
@@ -216,7 +220,10 @@ public class StreamRoleService : INService
await RescanUsers(guild);
}
private async Task RescanUser(IGuildUser user, StreamRoleSettings setting, IRole addRole = null)
private async ValueTask RescanUser(IGuildUser user, StreamRoleSettings setting, IRole addRole = null)
=> await _queueRunner.Enqueue(() => RescanUserInternal(user, setting, addRole));
private async Task RescanUserInternal(IGuildUser user, StreamRoleSettings setting, IRole addRole = null)
{
if (user.IsBot)
return;
@@ -232,58 +239,77 @@ public class StreamRoleService : INService
&& setting.Blacklist.All(x => x.UserId != user.Id)
&& user.RoleIds.Contains(setting.FromRoleId))
{
try
await _queueRunner.Enqueue(async () =>
{
addRole ??= user.Guild.GetRole(setting.AddRoleId);
if (addRole is null)
try
{
addRole ??= user.Guild.GetRole(setting.AddRoleId);
if (addRole is null)
{
await StopStreamRole(user.Guild);
Log.Warning("Stream role in server {RoleId} no longer exists. Stopping", setting.AddRoleId);
return;
}
//check if he doesn't have addrole already, to avoid errors
if (!user.RoleIds.Contains(addRole.Id))
{
await user.AddRoleAsync(addRole);
Log.Information("Added stream role to user {User} in {Server} server",
user.ToString(),
user.Guild.ToString());
}
}
catch (HttpException ex) when (ex.HttpCode == HttpStatusCode.Forbidden)
{
await StopStreamRole(user.Guild);
Log.Warning("Stream role in server {RoleId} no longer exists. Stopping", setting.AddRoleId);
return;
Log.Warning(ex, "Error adding stream role(s). Forcibly disabling stream role feature");
throw new StreamRolePermissionException();
}
//check if he doesn't have addrole already, to avoid errors
if (!user.RoleIds.Contains(addRole.Id))
catch (Exception ex)
{
await user.AddRoleAsync(addRole);
Log.Information("Added stream role to user {User} in {Server} server",
user.ToString(),
user.Guild.ToString());
Log.Warning(ex, "Failed adding stream role");
}
}
catch (HttpException ex) when (ex.HttpCode == HttpStatusCode.Forbidden)
{
await StopStreamRole(user.Guild);
Log.Warning(ex, "Error adding stream role(s). Forcibly disabling stream role feature");
throw new StreamRolePermissionException();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed adding stream role");
}
});
}
else
{
//check if user is in the addrole
if (user.RoleIds.Contains(setting.AddRoleId))
{
try
await _queueRunner.Enqueue(async () =>
{
addRole ??= user.Guild.GetRole(setting.AddRoleId);
if (addRole is null)
throw new StreamRoleNotFoundException();
try
{
addRole ??= user.Guild.GetRole(setting.AddRoleId);
if (addRole is null)
{
await StopStreamRole(user.Guild);
Log.Warning(
"Addrole doesn't exist in {GuildId} server. Forcibly disabling stream role feature",
user.Guild.Id);
return;
}
await user.RemoveRoleAsync(addRole);
Log.Information("Removed stream role from the user {User} in {Server} server",
user.ToString(),
user.Guild.ToString());
}
catch (HttpException ex) when (ex.HttpCode == HttpStatusCode.Forbidden)
{
await StopStreamRole(user.Guild);
Log.Warning(ex, "Error removing stream role(s). Forcibly disabling stream role feature");
throw new StreamRolePermissionException();
}
// need to check again in case queuer is taking too long to execute
if (user.RoleIds.Contains(setting.AddRoleId))
{
await user.RemoveRoleAsync(addRole);
}
Log.Information("Removed stream role from the user {User} in {Server} server",
user.ToString(),
user.Guild.ToString());
}
catch (HttpException ex)
{
if (ex.HttpCode == HttpStatusCode.Forbidden)
{
await StopStreamRole(user.Guild);
Log.Warning(ex, "Error removing stream role(s). Forcibly disabling stream role feature");
}
}
});
}
}
}

View File

@@ -221,7 +221,7 @@ public static class Extensions
public static void Lap(this Stopwatch sw, string checkpoint)
{
Log.Information("Checkpoint {CheckPoint}: {Time}", checkpoint, sw.Elapsed.TotalMilliseconds);
Log.Information("Checkpoint {CheckPoint}: {Time}ms", checkpoint, sw.Elapsed.TotalMilliseconds);
sw.Restart();
}
}