More restructuring

This commit is contained in:
Kwoth
2023-03-26 14:44:25 +02:00
parent 01f70f0a24
commit 308ba36b2e
319 changed files with 681 additions and 218 deletions

View File

@@ -1,200 +0,0 @@
#nullable disable
using Microsoft.EntityFrameworkCore;
using NadekoBot.Db;
using NadekoBot.Db.Models;
using NadekoBot.Modules.Searches.Services;
namespace NadekoBot.Modules.Searches;
public partial class Searches
{
[Group]
public partial class StreamNotificationCommands : NadekoModule<StreamNotificationService>
{
private readonly DbService _db;
public StreamNotificationCommands(DbService db)
=> _db = db;
[Cmd]
[RequireContext(ContextType.Guild)]
[UserPerm(GuildPerm.ManageMessages)]
public async Task StreamAdd(string link)
{
var data = await _service.FollowStream(ctx.Guild.Id, ctx.Channel.Id, link);
if (data is null)
{
await ReplyErrorLocalizedAsync(strs.stream_not_added);
return;
}
var embed = _service.GetEmbed(ctx.Guild.Id, data);
await ctx.Channel.EmbedAsync(embed, GetText(strs.stream_tracked));
}
[Cmd]
[RequireContext(ContextType.Guild)]
[UserPerm(GuildPerm.ManageMessages)]
[Priority(1)]
public async Task StreamRemove(int index)
{
if (--index < 0)
return;
var fs = await _service.UnfollowStreamAsync(ctx.Guild.Id, index);
if (fs is null)
{
await ReplyErrorLocalizedAsync(strs.stream_no);
return;
}
await ReplyConfirmLocalizedAsync(strs.stream_removed(Format.Bold(fs.Username), fs.Type));
}
[Cmd]
[RequireContext(ContextType.Guild)]
[UserPerm(GuildPerm.Administrator)]
public async Task StreamsClear()
{
await _service.ClearAllStreams(ctx.Guild.Id);
await ReplyConfirmLocalizedAsync(strs.streams_cleared);
}
[Cmd]
[RequireContext(ContextType.Guild)]
public async Task StreamList(int page = 1)
{
if (page-- < 1)
return;
var streams = new List<FollowedStream>();
await using (var uow = _db.GetDbContext())
{
var all = uow.GuildConfigsForId(ctx.Guild.Id, set => set.Include(gc => gc.FollowedStreams))
.FollowedStreams.OrderBy(x => x.Id)
.ToList();
for (var index = all.Count - 1; index >= 0; index--)
{
var fs = all[index];
if (((SocketGuild)ctx.Guild).GetTextChannel(fs.ChannelId) is null)
await _service.UnfollowStreamAsync(fs.GuildId, index);
else
streams.Insert(0, fs);
}
}
await ctx.SendPaginatedConfirmAsync(page,
cur =>
{
var elements = streams
.Skip(cur * 12)
.Take(12)
.ToList();
if (elements.Count == 0)
return _eb.Create().WithDescription(GetText(strs.streams_none)).WithErrorColor();
var eb = _eb.Create().WithTitle(GetText(strs.streams_follow_title)).WithOkColor();
for (var index = 0; index < elements.Count; index++)
{
var elem = elements[index];
eb.AddField($"**#{index + 1 + (12 * cur)}** {elem.Username.ToLower()}",
$"【{elem.Type}】\n<#{elem.ChannelId}>\n{elem.Message?.TrimTo(50)}",
true);
}
return eb;
},
streams.Count,
12);
}
[Cmd]
[RequireContext(ContextType.Guild)]
[UserPerm(GuildPerm.ManageMessages)]
public async Task StreamOffline()
{
var newValue = _service.ToggleStreamOffline(ctx.Guild.Id);
if (newValue)
await ReplyConfirmLocalizedAsync(strs.stream_off_enabled);
else
await ReplyConfirmLocalizedAsync(strs.stream_off_disabled);
}
[Cmd]
[RequireContext(ContextType.Guild)]
[UserPerm(GuildPerm.ManageMessages)]
public async Task StreamOnlineDelete()
{
var newValue = _service.ToggleStreamOnlineDelete(ctx.Guild.Id);
if (newValue)
await ReplyConfirmLocalizedAsync(strs.stream_online_delete_enabled);
else
await ReplyConfirmLocalizedAsync(strs.stream_online_delete_disabled);
}
[Cmd]
[RequireContext(ContextType.Guild)]
[UserPerm(GuildPerm.ManageMessages)]
public async Task StreamMessage(int index, [Leftover] string message)
{
if (--index < 0)
return;
if (!_service.SetStreamMessage(ctx.Guild.Id, index, message, out var fs))
{
await ReplyConfirmLocalizedAsync(strs.stream_not_following);
return;
}
if (string.IsNullOrWhiteSpace(message))
await ReplyConfirmLocalizedAsync(strs.stream_message_reset(Format.Bold(fs.Username)));
else
await ReplyConfirmLocalizedAsync(strs.stream_message_set(Format.Bold(fs.Username)));
}
[Cmd]
[RequireContext(ContextType.Guild)]
[UserPerm(GuildPerm.ManageMessages)]
public async Task StreamMessageAll([Leftover] string message)
{
var count = _service.SetStreamMessageForAll(ctx.Guild.Id, message);
if (count == 0)
{
await ReplyConfirmLocalizedAsync(strs.stream_not_following_any);
return;
}
await ReplyConfirmLocalizedAsync(strs.stream_message_set_all(count));
}
[Cmd]
[RequireContext(ContextType.Guild)]
public async Task StreamCheck(string url)
{
try
{
var data = await _service.GetStreamDataAsync(url);
if (data is null)
{
await ReplyErrorLocalizedAsync(strs.no_channel_found);
return;
}
if (data.IsLive)
{
await ReplyConfirmLocalizedAsync(strs.streamer_online(Format.Bold(data.Name),
Format.Bold(data.Viewers.ToString())));
}
else
await ReplyConfirmLocalizedAsync(strs.streamer_offline(data.Name));
}
catch
{
await ReplyErrorLocalizedAsync(strs.no_channel_found);
}
}
}
}

View File

@@ -1,619 +0,0 @@
#nullable disable
using Microsoft.EntityFrameworkCore;
using Nadeko.Common;
using NadekoBot.Common.ModuleBehaviors;
using NadekoBot.Db;
using NadekoBot.Db.Models;
using NadekoBot.Modules.Searches._Common;
using NadekoBot.Modules.Searches.Common;
using NadekoBot.Modules.Searches.Common.StreamNotifications;
using NadekoBot.Services.Database.Models;
namespace NadekoBot.Modules.Searches.Services;
public sealed class StreamNotificationService : INService, IReadyExecutor
{
private readonly DbService _db;
private readonly IBotStrings _strings;
private readonly Random _rng = new NadekoRandom();
private readonly DiscordSocketClient _client;
private readonly NotifChecker _streamTracker;
private readonly object _shardLock = new();
private readonly Dictionary<StreamDataKey, HashSet<ulong>> _trackCounter = new();
private readonly Dictionary<StreamDataKey, Dictionary<ulong, HashSet<FollowedStream>>> _shardTrackedStreams;
private readonly ConcurrentHashSet<ulong> _offlineNotificationServers;
private readonly ConcurrentHashSet<ulong> _deleteOnOfflineServers;
private readonly IPubSub _pubSub;
private readonly IEmbedBuilderService _eb;
public TypedKey<List<StreamData>> StreamsOnlineKey { get; }
public TypedKey<List<StreamData>> StreamsOfflineKey { get; }
private readonly TypedKey<FollowStreamPubData> _streamFollowKey;
private readonly TypedKey<FollowStreamPubData> _streamUnfollowKey;
public event Func<
FollowedStream.FType,
string,
IReadOnlyCollection<(ulong, ulong)>,
Task> OnlineMessagesSent = static delegate { return Task.CompletedTask; };
public StreamNotificationService(
DbService db,
DiscordSocketClient client,
IBotStrings strings,
IBotCredsProvider creds,
IHttpClientFactory httpFactory,
Bot bot,
IPubSub pubSub,
IEmbedBuilderService eb)
{
_db = db;
_client = client;
_strings = strings;
_pubSub = pubSub;
_eb = eb;
_streamTracker = new(httpFactory, creds);
StreamsOnlineKey = new("streams.online");
StreamsOfflineKey = new("streams.offline");
_streamFollowKey = new("stream.follow");
_streamUnfollowKey = new("stream.unfollow");
using (var uow = db.GetDbContext())
{
var ids = client.GetGuildIds();
var guildConfigs = uow.Set<GuildConfig>()
.AsQueryable()
.Include(x => x.FollowedStreams)
.Where(x => ids.Contains(x.GuildId))
.ToList();
_offlineNotificationServers = new(guildConfigs
.Where(gc => gc.NotifyStreamOffline)
.Select(x => x.GuildId)
.ToList());
_deleteOnOfflineServers = new(guildConfigs
.Where(gc => gc.DeleteStreamOnlineMessage)
.Select(x => x.GuildId)
.ToList());
var followedStreams = guildConfigs.SelectMany(x => x.FollowedStreams).ToList();
_shardTrackedStreams = followedStreams.GroupBy(x => new
{
x.Type,
Name = x.Username.ToLower()
})
.ToList()
.ToDictionary(
x => new StreamDataKey(x.Key.Type, x.Key.Name.ToLower()),
x => x.GroupBy(y => y.GuildId)
.ToDictionary(y => y.Key,
y => y.AsEnumerable().ToHashSet()));
// shard 0 will keep track of when there are no more guilds which track a stream
if (client.ShardId == 0)
{
var allFollowedStreams = uow.Set<FollowedStream>().AsQueryable().ToList();
foreach (var fs in allFollowedStreams)
_streamTracker.AddLastData(fs.CreateKey(), null, false);
_trackCounter = allFollowedStreams.GroupBy(x => new
{
x.Type,
Name = x.Username.ToLower()
})
.ToDictionary(x => new StreamDataKey(x.Key.Type, x.Key.Name),
x => x.Select(fs => fs.GuildId).ToHashSet());
}
}
_pubSub.Sub(StreamsOfflineKey, HandleStreamsOffline);
_pubSub.Sub(StreamsOnlineKey, HandleStreamsOnline);
if (client.ShardId == 0)
{
// only shard 0 will run the tracker,
// and then publish updates with redis to other shards
_streamTracker.OnStreamsOffline += OnStreamsOffline;
_streamTracker.OnStreamsOnline += OnStreamsOnline;
_ = _streamTracker.RunAsync();
_pubSub.Sub(_streamFollowKey, HandleFollowStream);
_pubSub.Sub(_streamUnfollowKey, HandleUnfollowStream);
}
bot.JoinedGuild += ClientOnJoinedGuild;
client.LeftGuild += ClientOnLeftGuild;
}
public async Task OnReadyAsync()
{
if (_client.ShardId != 0)
return;
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(30));
while (await timer.WaitForNextTickAsync())
{
try
{
var errorLimit = TimeSpan.FromHours(12);
var failingStreams = _streamTracker.GetFailingStreams(errorLimit, true).ToList();
if (!failingStreams.Any())
continue;
var deleteGroups = failingStreams.GroupBy(x => x.Type)
.ToDictionary(x => x.Key, x => x.Select(y => y.Name).ToList());
await using var uow = _db.GetDbContext();
foreach (var kvp in deleteGroups)
{
Log.Information(
"Deleting {StreamCount} {Platform} streams because they've been erroring for more than {ErrorLimit}: {RemovedList}",
kvp.Value.Count,
kvp.Key,
errorLimit,
string.Join(", ", kvp.Value));
var toDelete = uow.Set<FollowedStream>()
.AsQueryable()
.Where(x => x.Type == kvp.Key && kvp.Value.Contains(x.Username))
.ToList();
uow.RemoveRange(toDelete);
await uow.SaveChangesAsync();
foreach (var loginToDelete in kvp.Value)
_streamTracker.UntrackStreamByKey(new(kvp.Key, loginToDelete));
}
}
catch (Exception ex)
{
Log.Error(ex, "Error cleaning up FollowedStreams");
}
}
}
/// <summary>
/// Handles follow stream pubs to keep the counter up to date.
/// When counter reaches 0, stream is removed from tracking because
/// that means no guilds are subscribed to that stream anymore
/// </summary>
private ValueTask HandleFollowStream(FollowStreamPubData info)
{
_streamTracker.AddLastData(info.Key, null, false);
lock (_shardLock)
{
var key = info.Key;
if (_trackCounter.ContainsKey(key))
_trackCounter[key].Add(info.GuildId);
else
{
_trackCounter[key] = new()
{
info.GuildId
};
}
}
return default;
}
/// <summary>
/// Handles unfollow pubs to keep the counter up to date.
/// When counter reaches 0, stream is removed from tracking because
/// that means no guilds are subscribed to that stream anymore
/// </summary>
private ValueTask HandleUnfollowStream(FollowStreamPubData info)
{
lock (_shardLock)
{
var key = info.Key;
if (!_trackCounter.TryGetValue(key, out var set))
{
// it should've been removed already?
_streamTracker.UntrackStreamByKey(in key);
return default;
}
set.Remove(info.GuildId);
if (set.Count != 0)
return default;
_trackCounter.Remove(key);
// if no other guilds are following this stream
// untrack the stream
_streamTracker.UntrackStreamByKey(in key);
}
return default;
}
private async ValueTask HandleStreamsOffline(List<StreamData> offlineStreams)
{
foreach (var stream in offlineStreams)
{
var key = stream.CreateKey();
if (_shardTrackedStreams.TryGetValue(key, out var fss))
{
await fss
// send offline stream notifications only to guilds which enable it with .stoff
.SelectMany(x => x.Value)
.Where(x => _offlineNotificationServers.Contains(x.GuildId))
.Select(fs => _client.GetGuild(fs.GuildId)
?.GetTextChannel(fs.ChannelId)
?.EmbedAsync(GetEmbed(fs.GuildId, stream)))
.WhenAll();
}
}
}
private async ValueTask HandleStreamsOnline(List<StreamData> onlineStreams)
{
foreach (var stream in onlineStreams)
{
var key = stream.CreateKey();
if (_shardTrackedStreams.TryGetValue(key, out var fss))
{
var messages = await fss.SelectMany(x => x.Value)
.Select(async fs =>
{
var textChannel = _client.GetGuild(fs.GuildId)?.GetTextChannel(fs.ChannelId);
if (textChannel is null)
return default;
var rep = new ReplacementBuilder().WithOverride("%user%", () => fs.Username)
.WithOverride("%platform%", () => fs.Type.ToString())
.Build();
var message = string.IsNullOrWhiteSpace(fs.Message) ? "" : rep.Replace(fs.Message);
var msg = await textChannel.EmbedAsync(GetEmbed(fs.GuildId, stream, false), message);
// only cache the ids of channel/message pairs
if(_deleteOnOfflineServers.Contains(fs.GuildId))
return (textChannel.Id, msg.Id);
else
return default;
})
.WhenAll();
// push online stream messages to redis
// when streams go offline, any server which
// has the online stream message deletion feature
// enabled will have the online messages deleted
try
{
var pairs = messages
.Where(x => x != default)
.Select(x => (x.Item1, x.Item2))
.ToList();
if (pairs.Count > 0)
await OnlineMessagesSent(key.Type, key.Name, pairs);
}
catch
{
}
}
}
}
private Task OnStreamsOnline(List<StreamData> data)
=> _pubSub.Pub(StreamsOnlineKey, data);
private Task OnStreamsOffline(List<StreamData> data)
=> _pubSub.Pub(StreamsOfflineKey, data);
private Task ClientOnJoinedGuild(GuildConfig guildConfig)
{
using (var uow = _db.GetDbContext())
{
var gc = uow.GuildConfigs.AsQueryable()
.Include(x => x.FollowedStreams)
.FirstOrDefault(x => x.GuildId == guildConfig.GuildId);
if (gc is null)
return Task.CompletedTask;
if (gc.NotifyStreamOffline)
_offlineNotificationServers.Add(gc.GuildId);
foreach (var followedStream in gc.FollowedStreams)
{
var key = followedStream.CreateKey();
var streams = GetLocalGuildStreams(key, gc.GuildId);
streams.Add(followedStream);
PublishFollowStream(followedStream);
}
}
return Task.CompletedTask;
}
private Task ClientOnLeftGuild(SocketGuild guild)
{
using (var uow = _db.GetDbContext())
{
var gc = uow.GuildConfigsForId(guild.Id, set => set.Include(x => x.FollowedStreams));
_offlineNotificationServers.TryRemove(gc.GuildId);
foreach (var followedStream in gc.FollowedStreams)
{
var streams = GetLocalGuildStreams(followedStream.CreateKey(), guild.Id);
streams.Remove(followedStream);
PublishUnfollowStream(followedStream);
}
}
return Task.CompletedTask;
}
public async Task<int> ClearAllStreams(ulong guildId)
{
await using var uow = _db.GetDbContext();
var gc = uow.GuildConfigsForId(guildId, set => set.Include(x => x.FollowedStreams));
uow.RemoveRange(gc.FollowedStreams);
foreach (var s in gc.FollowedStreams)
await PublishUnfollowStream(s);
uow.SaveChanges();
return gc.FollowedStreams.Count;
}
public async Task<FollowedStream> UnfollowStreamAsync(ulong guildId, int index)
{
FollowedStream fs;
await using (var uow = _db.GetDbContext())
{
var fss = uow.Set<FollowedStream>()
.AsQueryable()
.Where(x => x.GuildId == guildId)
.OrderBy(x => x.Id)
.ToList();
// out of range
if (fss.Count <= index)
return null;
fs = fss[index];
uow.Remove(fs);
await uow.SaveChangesAsync();
// remove from local cache
lock (_shardLock)
{
var key = fs.CreateKey();
var streams = GetLocalGuildStreams(key, guildId);
streams.Remove(fs);
}
}
await PublishUnfollowStream(fs);
return fs;
}
private void PublishFollowStream(FollowedStream fs)
=> _pubSub.Pub(_streamFollowKey,
new()
{
Key = fs.CreateKey(),
GuildId = fs.GuildId
});
private Task PublishUnfollowStream(FollowedStream fs)
=> _pubSub.Pub(_streamUnfollowKey,
new()
{
Key = fs.CreateKey(),
GuildId = fs.GuildId
});
public async Task<StreamData> FollowStream(ulong guildId, ulong channelId, string url)
{
// this will
var data = await _streamTracker.GetStreamDataByUrlAsync(url);
if (data is null)
return null;
FollowedStream fs;
await using (var uow = _db.GetDbContext())
{
var gc = uow.GuildConfigsForId(guildId, set => set.Include(x => x.FollowedStreams));
// add it to the database
fs = new()
{
Type = data.StreamType,
Username = data.UniqueName,
ChannelId = channelId,
GuildId = guildId
};
if (gc.FollowedStreams.Count >= 10)
return null;
gc.FollowedStreams.Add(fs);
await uow.SaveChangesAsync();
// add it to the local cache of tracked streams
// this way this shard will know it needs to post a message to discord
// when shard 0 publishes stream status changes for this stream
lock (_shardLock)
{
var key = data.CreateKey();
var streams = GetLocalGuildStreams(key, guildId);
streams.Add(fs);
}
}
PublishFollowStream(fs);
return data;
}
public IEmbedBuilder GetEmbed(ulong guildId, StreamData status, bool showViewers = true)
{
var embed = _eb.Create()
.WithTitle(status.Name)
.WithUrl(status.StreamUrl)
.WithDescription(status.StreamUrl)
.AddField(GetText(guildId, strs.status), status.IsLive ? "🟢 Online" : "🔴 Offline", true);
if (showViewers)
{
embed.AddField(GetText(guildId, strs.viewers),
status.Viewers == 0 && !status.IsLive
? "-"
: status.Viewers,
true);
}
if (status.IsLive)
embed = embed.WithOkColor();
else
embed = embed.WithErrorColor();
if (!string.IsNullOrWhiteSpace(status.Title))
embed.WithAuthor(status.Title);
if (!string.IsNullOrWhiteSpace(status.Game))
embed.AddField(GetText(guildId, strs.streaming), status.Game, true);
if (!string.IsNullOrWhiteSpace(status.AvatarUrl))
embed.WithThumbnailUrl(status.AvatarUrl);
if (!string.IsNullOrWhiteSpace(status.Preview))
embed.WithImageUrl(status.Preview + "?dv=" + _rng.Next());
return embed;
}
private string GetText(ulong guildId, LocStr str)
=> _strings.GetText(str, guildId);
public bool ToggleStreamOffline(ulong guildId)
{
bool newValue;
using var uow = _db.GetDbContext();
var gc = uow.GuildConfigsForId(guildId, set => set);
newValue = gc.NotifyStreamOffline = !gc.NotifyStreamOffline;
uow.SaveChanges();
if (newValue)
_offlineNotificationServers.Add(guildId);
else
_offlineNotificationServers.TryRemove(guildId);
return newValue;
}
public bool ToggleStreamOnlineDelete(ulong guildId)
{
using var uow = _db.GetDbContext();
var gc = uow.GuildConfigsForId(guildId, set => set);
var newValue = gc.DeleteStreamOnlineMessage = !gc.DeleteStreamOnlineMessage;
uow.SaveChanges();
if (newValue)
_deleteOnOfflineServers.Add(guildId);
else
_deleteOnOfflineServers.TryRemove(guildId);
return newValue;
}
public Task<StreamData> GetStreamDataAsync(string url)
=> _streamTracker.GetStreamDataByUrlAsync(url);
private HashSet<FollowedStream> GetLocalGuildStreams(in StreamDataKey key, ulong guildId)
{
if (_shardTrackedStreams.TryGetValue(key, out var map))
{
if (map.TryGetValue(guildId, out var set))
return set;
return map[guildId] = new();
}
_shardTrackedStreams[key] = new()
{
{ guildId, new() }
};
return _shardTrackedStreams[key][guildId];
}
public bool SetStreamMessage(
ulong guildId,
int index,
string message,
out FollowedStream fs)
{
using var uow = _db.GetDbContext();
var fss = uow.Set<FollowedStream>().AsQueryable().Where(x => x.GuildId == guildId).OrderBy(x => x.Id).ToList();
if (fss.Count <= index)
{
fs = null;
return false;
}
fs = fss[index];
fs.Message = message;
lock (_shardLock)
{
var streams = GetLocalGuildStreams(fs.CreateKey(), guildId);
// message doesn't participate in equality checking
// removing and adding = update
streams.Remove(fs);
streams.Add(fs);
}
uow.SaveChanges();
return true;
}
public int SetStreamMessageForAll(ulong guildId, string message)
{
using var uow = _db.GetDbContext();
var all = uow.Set<FollowedStream>().ToList();
if (all.Count == 0)
return 0;
all.ForEach(x => x.Message = message);
uow.SaveChanges();
return all.Count;
}
public sealed class FollowStreamPubData
{
public StreamDataKey Key { get; init; }
public ulong GuildId { get; init; }
}
}

View File

@@ -1,99 +0,0 @@
#nullable disable
using LinqToDB;
using LinqToDB.EntityFrameworkCore;
using NadekoBot.Common.ModuleBehaviors;
using NadekoBot.Db.Models;
using NadekoBot.Modules.Searches.Common;
namespace NadekoBot.Modules.Searches.Services;
public sealed class StreamOnlineMessageDeleterService : INService, IReadyExecutor
{
private readonly StreamNotificationService _notifService;
private readonly DbService _db;
private readonly DiscordSocketClient _client;
private readonly IPubSub _pubSub;
public StreamOnlineMessageDeleterService(
StreamNotificationService notifService,
DbService db,
IPubSub pubSub,
DiscordSocketClient client)
{
_notifService = notifService;
_db = db;
_client = client;
_pubSub = pubSub;
}
public async Task OnReadyAsync()
{
_notifService.OnlineMessagesSent += OnOnlineMessagesSent;
if (_client.ShardId == 0)
await _pubSub.Sub(_notifService.StreamsOfflineKey, OnStreamsOffline);
}
private async Task OnOnlineMessagesSent(
FollowedStream.FType type,
string name,
IReadOnlyCollection<(ulong, ulong)> pairs)
{
await using var ctx = _db.GetDbContext();
foreach (var (channelId, messageId) in pairs)
{
await ctx.GetTable<StreamOnlineMessage>()
.InsertAsync(() => new()
{
Name = name,
Type = type,
MessageId = messageId,
ChannelId = channelId,
DateAdded = DateTime.UtcNow,
});
}
}
private async ValueTask OnStreamsOffline(List<StreamData> streamDatas)
{
if (_client.ShardId != 0)
return;
var pairs = await GetMessagesToDelete(streamDatas);
foreach (var (channelId, messageId) in pairs)
{
try
{
var textChannel = await _client.GetChannelAsync(channelId) as ITextChannel;
if (textChannel is null)
continue;
await textChannel.DeleteMessageAsync(messageId);
}
catch
{
continue;
}
}
}
private async Task<IEnumerable<(ulong, ulong)>> GetMessagesToDelete(List<StreamData> streamDatas)
{
await using var ctx = _db.GetDbContext();
var toReturn = new List<(ulong, ulong)>();
foreach (var sd in streamDatas)
{
var key = sd.CreateKey();
var toDelete = await ctx.GetTable<StreamOnlineMessage>()
.Where(x => (x.Type == key.Type && x.Name == key.Name)
|| Sql.DateDiff(Sql.DateParts.Day, x.DateAdded, DateTime.UtcNow) > 1)
.DeleteWithOutputAsync();
toReturn.AddRange(toDelete.Select(x => (x.ChannelId, x.MessageId)));
}
return toReturn;
}
}