This commit is contained in:
Kwoth
2022-07-07 22:08:57 +02:00
5 changed files with 260 additions and 167 deletions

View File

@@ -57,6 +57,6 @@ public sealed class QueueRunner
} }
} }
public ValueTask Enqueue(Func<Task> action) public ValueTask EnqueueAsync(Func<Task> action)
=> _channel.Writer.WriteAsync(action); => _channel.Writer.WriteAsync(action);
} }

View File

@@ -221,7 +221,7 @@ public class StreamRoleService : IReadyExecutor, INService
} }
private async ValueTask RescanUser(IGuildUser user, StreamRoleSettings setting, IRole addRole = null) private async ValueTask RescanUser(IGuildUser user, StreamRoleSettings setting, IRole addRole = null)
=> await _queueRunner.Enqueue(() => RescanUserInternal(user, setting, addRole)); => await _queueRunner.EnqueueAsync(() => RescanUserInternal(user, setting, addRole));
private async Task RescanUserInternal(IGuildUser user, StreamRoleSettings setting, IRole addRole = null) private async Task RescanUserInternal(IGuildUser user, StreamRoleSettings setting, IRole addRole = null)
{ {
@@ -239,7 +239,7 @@ public class StreamRoleService : IReadyExecutor, INService
&& setting.Blacklist.All(x => x.UserId != user.Id) && setting.Blacklist.All(x => x.UserId != user.Id)
&& user.RoleIds.Contains(setting.FromRoleId)) && user.RoleIds.Contains(setting.FromRoleId))
{ {
await _queueRunner.Enqueue(async () => await _queueRunner.EnqueueAsync(async () =>
{ {
try try
{ {
@@ -277,7 +277,7 @@ public class StreamRoleService : IReadyExecutor, INService
//check if user is in the addrole //check if user is in the addrole
if (user.RoleIds.Contains(setting.AddRoleId)) if (user.RoleIds.Contains(setting.AddRoleId))
{ {
await _queueRunner.Enqueue(async () => await _queueRunner.EnqueueAsync(async () =>
{ {
try try
{ {

View File

@@ -1,4 +1,5 @@
#nullable disable #nullable disable
using LinqToDB;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NadekoBot.Common.ModuleBehaviors; using NadekoBot.Common.ModuleBehaviors;
using NadekoBot.Db; using NadekoBot.Db;
@@ -12,7 +13,7 @@ using SixLabors.ImageSharp.Drawing.Processing;
using SixLabors.ImageSharp.Formats; using SixLabors.ImageSharp.Formats;
using SixLabors.ImageSharp.PixelFormats; using SixLabors.ImageSharp.PixelFormats;
using SixLabors.ImageSharp.Processing; using SixLabors.ImageSharp.Processing;
using StackExchange.Redis; using System.Threading.Channels;
using Color = SixLabors.ImageSharp.Color; using Color = SixLabors.ImageSharp.Color;
using Image = SixLabors.ImageSharp.Image; using Image = SixLabors.ImageSharp.Image;
@@ -37,7 +38,6 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
private readonly ConcurrentDictionary<ulong, ConcurrentHashSet<ulong>> _excludedChannels; private readonly ConcurrentDictionary<ulong, ConcurrentHashSet<ulong>> _excludedChannels;
private readonly ConcurrentHashSet<ulong> _excludedServers; private readonly ConcurrentHashSet<ulong> _excludedServers;
private readonly System.Collections.Concurrent.ConcurrentQueue<UserCacheItem> _addMessageXp = new();
private XpTemplate template; private XpTemplate template;
private readonly DiscordSocketClient _client; private readonly DiscordSocketClient _client;
@@ -45,6 +45,10 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
private readonly IPatronageService _ps; private readonly IPatronageService _ps;
private readonly IBotCache _c; private readonly IBotCache _c;
private readonly QueueRunner _levelUpQueue = new QueueRunner(0, 50);
private readonly Channel<UserXpGainData> _xpGainQueue = Channel.CreateUnbounded<UserXpGainData>();
public XpService( public XpService(
DiscordSocketClient client, DiscordSocketClient client,
Bot bot, Bot bot,
@@ -122,147 +126,131 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
public async Task OnReadyAsync() public async Task OnReadyAsync()
{ {
_ = Task.Run(() => _levelUpQueue.RunAsync());
using var timer = new PeriodicTimer(5.Seconds()); using var timer = new PeriodicTimer(5.Seconds());
while (await timer.WaitForNextTickAsync()) while (await timer.WaitForNextTickAsync())
{ {
await UpdateLoop(); await UpdateXp();
} }
} }
private async Task UpdateLoop() public sealed class MiniGuildXpStats
{
public long Xp { get; set; }
public XpNotificationLocation NotifyOnLevelUp { get; set; }
public ulong GuildId { get; set; }
public ulong UserId { get; set; }
}
private async Task UpdateXp()
{ {
try try
{ {
var toNotify = var reader = _xpGainQueue.Reader;
new List<(IGuild Guild, IMessageChannel MessageChannel, IUser User, long Level,
XpNotificationLocation NotifyType, NotifOf NotifOf)>();
var roleRewards = new Dictionary<ulong, List<XpRoleReward>>();
var curRewards = new Dictionary<ulong, List<XpCurrencyReward>>();
var toAddTo = new List<UserCacheItem>(); // sum up all gains into a single UserCacheItem
while (_addMessageXp.TryDequeue(out var usr)) var globalToAdd = new Dictionary<ulong, UserXpGainData>();
toAddTo.Add(usr); var guildToAdd = new Dictionary<ulong, Dictionary<ulong, UserXpGainData>>();
while (reader.TryRead(out var item))
var group = toAddTo.GroupBy(x => (GuildId: x.Guild.Id, x.User));
if (toAddTo.Count == 0)
return;
await using (var uow = _db.GetDbContext())
{ {
foreach (var item in group) // add global xp to these users
{ if (!globalToAdd.TryGetValue(item.User.Id, out var ci))
var xp = item.Sum(x => x.XpAmount); globalToAdd[item.User.Id] = item.Clone();
var usr = uow.GetOrCreateUserXpStats(item.Key.GuildId, item.Key.User.Id);
var du = uow.GetOrCreateUser(item.Key.User);
var globalXp = du.TotalXp;
var oldGlobalLevelData = new LevelStats(globalXp);
var newGlobalLevelData = new LevelStats(globalXp + xp);
var oldGuildLevelData = new LevelStats(usr.Xp + usr.AwardedXp);
usr.Xp += xp;
du.TotalXp += xp;
if (du.Club is not null)
du.Club.Xp += xp;
var newGuildLevelData = new LevelStats(usr.Xp + usr.AwardedXp);
if (oldGlobalLevelData.Level < newGlobalLevelData.Level)
{
var first = item.First();
if (du.NotifyOnLevelUp != XpNotificationLocation.None)
{
toNotify.Add((first.Guild, first.Channel, first.User, newGlobalLevelData.Level,
du.NotifyOnLevelUp, NotifOf.Global));
}
}
if (oldGuildLevelData.Level < newGuildLevelData.Level)
{
//send level up notification
var first = item.First();
if (usr.NotifyOnLevelUp != XpNotificationLocation.None)
{
toNotify.Add((first.Guild, first.Channel, first.User, newGuildLevelData.Level,
usr.NotifyOnLevelUp, NotifOf.Server));
}
//give role
if (!roleRewards.TryGetValue(usr.GuildId, out var rrews))
{
rrews = uow.XpSettingsFor(usr.GuildId).RoleRewards.ToList();
roleRewards.Add(usr.GuildId, rrews);
}
if (!curRewards.TryGetValue(usr.GuildId, out var crews))
{
crews = uow.XpSettingsFor(usr.GuildId).CurrencyRewards.ToList();
curRewards.Add(usr.GuildId, crews);
}
//loop through levels since last level up, so if a high amount of xp is gained, reward are still applied.
for (var i = oldGuildLevelData.Level + 1; i <= newGuildLevelData.Level; i++)
{
var rrew = rrews.FirstOrDefault(x => x.Level == i);
if (rrew is not null)
{
var role = first.User.Guild.GetRole(rrew.RoleId);
if (role is not null)
{
if (rrew.Remove)
_ = first.User.RemoveRoleAsync(role);
else else
_ = first.User.AddRoleAsync(role); ci.XpAmount += item.XpAmount;
}
}
//get currency reward for this level
var crew = crews.FirstOrDefault(x => x.Level == i);
if (crew is not null)
//give the user the reward if it exists
await _cs.AddAsync(item.Key.User.Id, crew.Amount, new("xp", "level-up"));
}
}
}
uow.SaveChanges(); // ad guild xp in these guilds to these users
} if (!guildToAdd.TryGetValue(item.Guild.Id, out var users))
users = guildToAdd[item.Guild.Id] = new();
await toNotify.Select(async x => if (!users.TryGetValue(item.User.Id, out ci))
{ users[item.User.Id] = item.Clone();
if (x.NotifOf == NotifOf.Server)
{
if (x.NotifyType == XpNotificationLocation.Dm)
{
await x.User.SendConfirmAsync(_eb,
_strings.GetText(strs.level_up_dm(x.User.Mention,
Format.Bold(x.Level.ToString()),
Format.Bold(x.Guild.ToString() ?? "-")),
x.Guild.Id));
}
else if (x.MessageChannel is not null) // channel
{
await x.MessageChannel.SendConfirmAsync(_eb,
_strings.GetText(strs.level_up_channel(x.User.Mention,
Format.Bold(x.Level.ToString())),
x.Guild.Id));
}
}
else else
{ ci.XpAmount += item.XpAmount;
IMessageChannel chan; }
if (x.NotifyType == XpNotificationLocation.Dm)
chan = await x.User.CreateDMChannelAsync(); await using var ctx = _db.GetDbContext();
else // channel await using var tran = await ctx.Database.BeginTransactionAsync();
chan = x.MessageChannel;
// update global user xp in batches
await chan.SendConfirmAsync(_eb, // group by xp amount and update the same amounts at the same time
_strings.GetText(strs.level_up_global(x.User.Mention, var dus = new List<DiscordUser>(globalToAdd.Count);
Format.Bold(x.Level.ToString())), foreach (var group in globalToAdd.GroupBy(x => x.Value.XpAmount, x => x.Key))
x.Guild.Id)); {
var items = await ctx.DiscordUser
.Where(x => group.Contains(x.UserId))
.UpdateWithOutputAsync(old => new()
{
TotalXp = old.TotalXp + group.Key
},
(_, n) => n);
dus.AddRange(items);
}
// update guild user xp in batches
var gxps = new List<UserXpStats>(globalToAdd.Count);
foreach (var (guildId, toAdd) in guildToAdd)
{
foreach (var group in toAdd.GroupBy(x => x.Value.XpAmount, x => x.Key))
{
var items = await ctx
.UserXpStats
.Where(x => x.GuildId == guildId)
.Where(x => group.Contains(x.UserId))
.UpdateWithOutputAsync(old => new()
{
Xp = old.Xp + group.Key
},
(_, n) => n);
gxps.AddRange(items);
}
}
await tran.CommitAsync();
foreach (var du in dus)
{
var oldLevel = new LevelStats(du.TotalXp - globalToAdd[du.UserId].XpAmount);
var newLevel = new LevelStats(du.TotalXp);
if (oldLevel.Level != newLevel.Level)
{
var item = globalToAdd[du.UserId];
await _levelUpQueue.EnqueueAsync(
NotifyUser(item.Guild.Id,
item.Channel.Id,
du.UserId,
false,
oldLevel.Level,
newLevel.Level,
du.NotifyOnLevelUp));
}
}
foreach (var du in gxps)
{
if (guildToAdd.TryGetValue(du.GuildId, out var users)
&& users.TryGetValue(du.UserId, out var xpGainData))
{
var oldLevel = new LevelStats(du.Xp - xpGainData.XpAmount);
var newLevel = new LevelStats(du.Xp);
if (oldLevel.Level < newLevel.Level)
{
await _levelUpQueue.EnqueueAsync(
NotifyUser(xpGainData.Guild.Id,
xpGainData.Channel.Id,
du.UserId,
true,
oldLevel.Level,
newLevel.Level,
du.NotifyOnLevelUp));
}
}
} }
})
.WhenAll();
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -270,7 +258,112 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
} }
} }
private Func<Task> NotifyUser(
ulong guildId,
ulong channelId,
ulong userId,
bool isServer,
long oldLevel,
long newLevel,
XpNotificationLocation notifyLoc)
=> async () =>
{
if (isServer)
{
await HandleRewardsInternalAsync(guildId, userId, oldLevel, newLevel);
}
await HandleNotifyInternalAsync(guildId, channelId, userId, isServer, newLevel, notifyLoc);
};
private async Task HandleRewardsInternalAsync(ulong guildId, ulong userId, long oldLevel, long newLevel)
{
await using var ctx = _db.GetDbContext();
var rrews = ctx.XpSettingsFor(guildId).RoleRewards.ToList();
var crews = ctx.XpSettingsFor(guildId).CurrencyRewards.ToList();
//loop through levels since last level up, so if a high amount of xp is gained, reward are still applied.
for (var i = oldLevel + 1; i <= newLevel; i++)
{
var rrew = rrews.FirstOrDefault(x => x.Level == i);
if (rrew is not null)
{
var guild = _client.GetGuild(guildId);
var role = guild?.GetRole(rrew.RoleId);
var user = guild?.GetUser(userId);
if (role is not null && user is not null)
{
if (rrew.Remove)
_ = user.RemoveRoleAsync(role);
else
_ = user.AddRoleAsync(role);
}
}
//get currency reward for this level
var crew = crews.FirstOrDefault(x => x.Level == i);
if (crew is not null)
{
//give the user the reward if it exists
await _cs.AddAsync(userId, crew.Amount, new("xp", "level-up"));
}
}
}
private async Task HandleNotifyInternalAsync(ulong guildId,
ulong channelId,
ulong userId,
bool isServer,
long newLevel,
XpNotificationLocation notifyLoc)
{
var user = await _client.GetUserAsync(userId);
var guild = _client.GetGuild(guildId);
var ch = guild?.GetTextChannel(channelId);
if (user is null || guild is null)
return;
if (isServer)
{
if (notifyLoc == XpNotificationLocation.Dm)
{
await user.SendConfirmAsync(_eb,
_strings.GetText(strs.level_up_dm(user.Mention,
Format.Bold(newLevel.ToString()),
Format.Bold(guild.ToString() ?? "-")),
guild.Id));
}
else // channel
{
await ch.SendConfirmAsync(_eb,
_strings.GetText(strs.level_up_channel(user.Mention,
Format.Bold(newLevel.ToString())),
guild.Id));
}
}
else // global level
{
var chan = notifyLoc switch
{
XpNotificationLocation.Dm => (IMessageChannel)await user.CreateDMChannelAsync(),
XpNotificationLocation.Channel => ch,
_ => null
};
if (chan is null)
return;
await chan.SendConfirmAsync(_eb,
_strings.GetText(strs.level_up_global(user.Mention,
Format.Bold(newLevel.ToString())),
guild.Id));
}
}
private const string XP_TEMPLATE_PATH = "./data/xp_template.json"; private const string XP_TEMPLATE_PATH = "./data/xp_template.json";
private void InternalReloadXpTemplate() private void InternalReloadXpTemplate()
{ {
try try
@@ -295,7 +388,8 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
{ {
Log.Warning("Loaded default xp_template.json values as the old one was version 0. " Log.Warning("Loaded default xp_template.json values as the old one was version 0. "
+ "Old one was renamed to xp_template.json.old"); + "Old one was renamed to xp_template.json.old");
File.WriteAllText("./data/xp_template.json.old", JsonConvert.SerializeObject(template, Formatting.Indented)); File.WriteAllText("./data/xp_template.json.old",
JsonConvert.SerializeObject(template, Formatting.Indented));
template = new(); template = new();
template.Version = 1; template.Version = 1;
File.WriteAllText(XP_TEMPLATE_PATH, JsonConvert.SerializeObject(template, Formatting.Indented)); File.WriteAllText(XP_TEMPLATE_PATH, JsonConvert.SerializeObject(template, Formatting.Indented));
@@ -473,9 +567,11 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
if (after.VoiceChannel is not null && after.VoiceChannel != before.VoiceChannel) if (after.VoiceChannel is not null && after.VoiceChannel != before.VoiceChannel)
await ScanChannelForVoiceXp(after.VoiceChannel); await ScanChannelForVoiceXp(after.VoiceChannel);
else if (after.VoiceChannel is null) else if (after.VoiceChannel is null)
{
// In this case, the user left the channel and the previous for loops didn't catch // In this case, the user left the channel and the previous for loops didn't catch
// it because it wasn't in any new channel. So we need to get rid of it. // it because it wasn't in any new channel. So we need to get rid of it.
await UserLeftVoiceChannel(user, before.VoiceChannel); await UserLeftVoiceChannel(user, before.VoiceChannel);
}
}); });
return Task.CompletedTask; return Task.CompletedTask;
@@ -546,7 +642,7 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
if (actualXp > 0) if (actualXp > 0)
{ {
_addMessageXp.Enqueue(new() await _xpGainQueue.Writer.WriteAsync(new()
{ {
Guild = channel.Guild, Guild = channel.Guild,
User = user, User = user,
@@ -593,7 +689,7 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
if (!await SetUserRewardedAsync(user.Id)) if (!await SetUserRewardedAsync(user.Id))
return; return;
_addMessageXp.Enqueue(new() await _xpGainQueue.Writer.WriteAsync(new()
{ {
Guild = user.Guild, Guild = user.Guild,
Channel = arg.Channel, Channel = arg.Channel,
@@ -604,19 +700,19 @@ public class XpService : INService, IReadyExecutor, IExecNoCommand
return Task.CompletedTask; return Task.CompletedTask;
} }
public void AddXpDirectly(IGuildUser user, IMessageChannel channel, int amount) // public void AddXpDirectly(IGuildUser user, IMessageChannel channel, int amount)
{ // {
if (amount <= 0) // if (amount <= 0)
throw new ArgumentOutOfRangeException(nameof(amount)); // throw new ArgumentOutOfRangeException(nameof(amount));
//
_addMessageXp.Enqueue(new() // _xpGainQueue.Writer.WriteAsync(new()
{ // {
Guild = user.Guild, // Guild = user.Guild,
Channel = channel, // Channel = channel,
User = user, // User = user,
XpAmount = amount // XpAmount = amount
}); // });
} // }
public void AddXp(ulong userId, ulong guildId, int amount) public void AddXp(ulong userId, ulong guildId, int amount)
{ {

View File

@@ -3,7 +3,7 @@ using NadekoBot.Modules.Xp.Services;
namespace NadekoBot.Modules.Xp; namespace NadekoBot.Modules.Xp;
public class LevelStats public readonly struct LevelStats
{ {
public long Level { get; } public long Level { get; }
public long LevelXp { get; } public long LevelXp { get; }

View File

@@ -1,16 +1,13 @@
#nullable disable #nullable disable
using Cloneable;
namespace NadekoBot.Modules.Xp.Services; namespace NadekoBot.Modules.Xp.Services;
public class UserCacheItem [Cloneable]
public sealed partial class UserXpGainData : ICloneable<UserXpGainData>
{ {
public IGuildUser User { get; set; } public IGuildUser User { get; set; }
public IGuild Guild { get; set; } public IGuild Guild { get; set; }
public IMessageChannel Channel { get; set; } public IMessageChannel Channel { get; set; }
public int XpAmount { get; set; } public int XpAmount { get; set; }
public override int GetHashCode()
=> User.GetHashCode();
public override bool Equals(object obj)
=> obj is UserCacheItem uci && uci.User == User;
} }