From 941d393971f4f8f7708564d91c534d99c3cb054b Mon Sep 17 00:00:00 2001 From: Kwoth Date: Sat, 3 Jul 2021 00:41:11 +0200 Subject: [PATCH] Stream follows now use new pubsub, cleanup --- .../Administration/Services/SelfService.cs | 3 +- .../Models/StreamDataKey.cs | 4 +- .../Services/StreamNotificationService.cs | 181 +++++++++--------- 3 files changed, 94 insertions(+), 94 deletions(-) diff --git a/src/NadekoBot/Modules/Administration/Services/SelfService.cs b/src/NadekoBot/Modules/Administration/Services/SelfService.cs index 0f7f5da2c..98dc81e98 100644 --- a/src/NadekoBot/Modules/Administration/Services/SelfService.cs +++ b/src/NadekoBot/Modules/Administration/Services/SelfService.cs @@ -6,7 +6,6 @@ using Discord.WebSocket; using NadekoBot.Common.ModuleBehaviors; using NadekoBot.Extensions; using NadekoBot.Services; -using StackExchange.Redis; using System.Collections.Generic; using Microsoft.EntityFrameworkCore; using NadekoBot.Services.Database.Models; @@ -409,7 +408,7 @@ namespace NadekoBot.Modules.Administration.Services public Task SetStreamAsync(string name, string link) => _pubSub.Pub(_activitySetKey, new() { Name = name, Link = link, Type = ActivityType.Streaming }); - private class ActivityPubData + private sealed class ActivityPubData { public string Name { get; init; } public string Link { get; init; } diff --git a/src/NadekoBot/Modules/Searches/Common/StreamNotifications/Models/StreamDataKey.cs b/src/NadekoBot/Modules/Searches/Common/StreamNotifications/Models/StreamDataKey.cs index c613a1920..062eca507 100644 --- a/src/NadekoBot/Modules/Searches/Common/StreamNotifications/Models/StreamDataKey.cs +++ b/src/NadekoBot/Modules/Searches/Common/StreamNotifications/Models/StreamDataKey.cs @@ -1,4 +1,5 @@ -using NadekoBot.Services.Database.Models; +using System.Text.Json.Serialization; +using NadekoBot.Services.Database.Models; using NadekoBot.Db.Models; namespace NadekoBot.Modules.Searches.Common @@ -8,6 +9,7 @@ namespace NadekoBot.Modules.Searches.Common public FollowedStream.FType Type { get; } public string Name { get; } + [JsonConstructor] public StreamDataKey(FollowedStream.FType type, string name) { Type = type; diff --git a/src/NadekoBot/Modules/Searches/Services/StreamNotificationService.cs b/src/NadekoBot/Modules/Searches/Services/StreamNotificationService.cs index ade8aac8a..b5f61b6c8 100644 --- a/src/NadekoBot/Modules/Searches/Services/StreamNotificationService.cs +++ b/src/NadekoBot/Modules/Searches/Services/StreamNotificationService.cs @@ -39,21 +39,39 @@ namespace NadekoBot.Modules.Searches.Services private readonly Dictionary>> _shardTrackedStreams; private readonly ConcurrentHashSet _offlineNotificationServers; - - private readonly ConnectionMultiplexer _multi; + private readonly IBotCredentials _creds; + private readonly IPubSub _pubSub; private readonly Timer _notifCleanupTimer; - public StreamNotificationService(DbService db, DiscordSocketClient client, - IBotStrings strings, IDataCache cache, IBotCredentials creds, IHttpClientFactory httpFactory, - Bot bot) + private readonly TypedKey> _streamsOnlineKey; + private readonly TypedKey> _streamsOfflineKey; + + private readonly TypedKey _streamFollowKey; + private readonly TypedKey _streamUnfollowKey; + + public StreamNotificationService( + DbService db, + DiscordSocketClient client, + IBotStrings strings, + ConnectionMultiplexer redis, + IBotCredentials creds, + IHttpClientFactory httpFactory, + Bot bot, + IPubSub pubSub) { _db = db; _client = client; _strings = strings; - _multi = cache.Redis; _creds = creds; - _streamTracker = new NotifChecker(httpFactory, cache.Redis, creds.RedisKey(), client.ShardId == 0); + _pubSub = pubSub; + _streamTracker = new NotifChecker(httpFactory, redis, creds.RedisKey(), client.ShardId == 0); + + _streamsOnlineKey = new("streams.online"); + _streamsOfflineKey = new("streams.offline"); + + _streamFollowKey = new("stream.follow"); + _streamUnfollowKey = new("stream.unfollow"); using (var uow = db.GetDbContext()) { @@ -101,9 +119,8 @@ namespace NadekoBot.Modules.Searches.Services } } - var sub = _multi.GetSubscriber(); - sub.Subscribe($"{_creds.RedisKey()}_streams_offline", HandleStreamsOffline); - sub.Subscribe($"{_creds.RedisKey()}_streams_online", HandleStreamsOnline); + _pubSub.Sub(_streamsOfflineKey, HandleStreamsOffline); + _pubSub.Sub(_streamsOnlineKey, HandleStreamsOnline); if (client.ShardId == 0) { @@ -153,8 +170,8 @@ namespace NadekoBot.Modules.Searches.Services } }, null, TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(30)); - sub.Subscribe($"{_creds.RedisKey()}_follow_stream", HandleFollowStream); - sub.Subscribe($"{_creds.RedisKey()}_unfollow_stream", HandleUnfollowStream); + _pubSub.Sub(_streamFollowKey, HandleFollowStream); + _pubSub.Sub(_streamUnfollowKey, HandleUnfollowStream); } bot.JoinedGuild += ClientOnJoinedGuild; @@ -162,70 +179,62 @@ namespace NadekoBot.Modules.Searches.Services } /// - /// Handles follow_stream pubs to keep the counter up to date. + /// Handles follow stream pubs to keep the counter up to date. /// When counter reaches 0, stream is removed from tracking because /// that means no guilds are subscribed to that stream anymore /// - private void HandleFollowStream(RedisChannel ch, RedisValue val) - => Task.Run(() => + private async ValueTask HandleFollowStream(FollowStreamPubData info) + { + _streamTracker.CacheAddData(info.Key, null, replace: false); + lock (_shardLock) { - var info = JsonConvert.DeserializeAnonymousType( - val.ToString(), - new {Key = default(StreamDataKey), GuildId = 0ul}); - - _streamTracker.CacheAddData(info.Key, null, replace: false); - lock (_shardLock) + var key = info.Key; + if (_trackCounter.ContainsKey(key)) { - var key = info.Key; - if (_trackCounter.ContainsKey(key)) - { - _trackCounter[key].Add(info.GuildId); - } - else - { - _trackCounter[key] = new HashSet() - { - info.GuildId - }; - } + _trackCounter[key].Add(info.GuildId); } - }); + else + { + _trackCounter[key] = new HashSet() + { + info.GuildId + }; + } + } + } /// - /// Handles unfollow_stream pubs to keep the counter up to date. + /// Handles unfollow pubs to keep the counter up to date. /// When counter reaches 0, stream is removed from tracking because /// that means no guilds are subscribed to that stream anymore /// - private void HandleUnfollowStream(RedisChannel ch, RedisValue val) - => Task.Run(() => - { - var info = JsonConvert.DeserializeAnonymousType(val.ToString(), - new {Key = default(StreamDataKey), GuildId = 0ul}); - - lock (_shardLock) - { - var key = info.Key; - if (!_trackCounter.TryGetValue(key, out var set)) - { - // it should've been removed already? - _streamTracker.UntrackStreamByKey(in key); - return; - } - - set.Remove(info.GuildId); - if (set.Count != 0) - return; - - _trackCounter.Remove(key); - // if no other guilds are following this stream - // untrack the stream - _streamTracker.UntrackStreamByKey(in key); - } - }); - - private void HandleStreamsOffline(RedisChannel arg1, RedisValue val) => Task.Run(async () => + private ValueTask HandleUnfollowStream(FollowStreamPubData info) + { + lock (_shardLock) + { + var key = info.Key; + if (!_trackCounter.TryGetValue(key, out var set)) + { + // it should've been removed already? + _streamTracker.UntrackStreamByKey(in key); + return default; + } + + set.Remove(info.GuildId); + if (set.Count != 0) + return default; + + _trackCounter.Remove(key); + // if no other guilds are following this stream + // untrack the stream + _streamTracker.UntrackStreamByKey(in key); + } + + return default; + } + + private async ValueTask HandleStreamsOffline(List offlineStreams) { - var offlineStreams = JsonConvert.DeserializeObject>(val.ToString()); foreach (var stream in offlineStreams) { var key = stream.CreateKey(); @@ -242,11 +251,10 @@ namespace NadekoBot.Modules.Searches.Services await Task.WhenAll(sendTasks); } } - }); + } - private void HandleStreamsOnline(RedisChannel arg1, RedisValue val) => Task.Run(async () => + private async ValueTask HandleStreamsOnline(List onlineStreams) { - var onlineStreams = JsonConvert.DeserializeObject>(val.ToString()); foreach (var stream in onlineStreams) { var key = stream.CreateKey(); @@ -276,19 +284,13 @@ namespace NadekoBot.Modules.Searches.Services await Task.WhenAll(sendTasks); } } - }); - - private Task OnStreamsOffline(List data) - { - var sub = _multi.GetSubscriber(); - return sub.PublishAsync($"{_creds.RedisKey()}_streams_offline", JsonConvert.SerializeObject(data)); } private Task OnStreamsOnline(List data) - { - var sub = _multi.GetSubscriber(); - return sub.PublishAsync($"{_creds.RedisKey()}_streams_online", JsonConvert.SerializeObject(data)); - } + => _pubSub.Pub(_streamsOnlineKey, data); + + private Task OnStreamsOffline(List data) + => _pubSub.Pub(_streamsOfflineKey, data); private Task ClientOnJoinedGuild(GuildConfig guildConfig) { @@ -381,25 +383,16 @@ namespace NadekoBot.Modules.Searches.Services } } - PublishUnfollowStream(fs); + await PublishUnfollowStream(fs); return fs; } - private void PublishUnfollowStream(FollowedStream fs) - { - var sub = _multi.GetSubscriber(); - sub.Publish($"{_creds.RedisKey()}_unfollow_stream", - JsonConvert.SerializeObject(new {Key = fs.CreateKey(), GuildId = fs.GuildId})); - } - private void PublishFollowStream(FollowedStream fs) - { - var sub = _multi.GetSubscriber(); - sub.Publish($"{_creds.RedisKey()}_follow_stream", - JsonConvert.SerializeObject(new {Key = fs.CreateKey(), GuildId = fs.GuildId}), - CommandFlags.FireAndForget); - } + => _pubSub.Pub(_streamFollowKey, new() { Key = fs.CreateKey(), GuildId = fs.GuildId }); + + private Task PublishUnfollowStream(FollowedStream fs) + => _pubSub.Pub(_streamUnfollowKey, new() { Key = fs.CreateKey(), GuildId = fs.GuildId }); public async Task FollowStream(ulong guildId, ulong channelId, string url) { @@ -577,5 +570,11 @@ namespace NadekoBot.Modules.Searches.Services return all.Count; } + + public sealed class FollowStreamPubData + { + public StreamDataKey Key { get; init; } + public ulong GuildId { get; init; } + } } } \ No newline at end of file