Stream follows now use new pubsub, cleanup

This commit is contained in:
Kwoth
2021-07-03 00:41:11 +02:00
parent 35d5260538
commit 941d393971
3 changed files with 94 additions and 94 deletions

View File

@@ -6,7 +6,6 @@ using Discord.WebSocket;
using NadekoBot.Common.ModuleBehaviors; using NadekoBot.Common.ModuleBehaviors;
using NadekoBot.Extensions; using NadekoBot.Extensions;
using NadekoBot.Services; using NadekoBot.Services;
using StackExchange.Redis;
using System.Collections.Generic; using System.Collections.Generic;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NadekoBot.Services.Database.Models; using NadekoBot.Services.Database.Models;
@@ -409,7 +408,7 @@ namespace NadekoBot.Modules.Administration.Services
public Task SetStreamAsync(string name, string link) public Task SetStreamAsync(string name, string link)
=> _pubSub.Pub(_activitySetKey, new() { Name = name, Link = link, Type = ActivityType.Streaming }); => _pubSub.Pub(_activitySetKey, new() { Name = name, Link = link, Type = ActivityType.Streaming });
private class ActivityPubData private sealed class ActivityPubData
{ {
public string Name { get; init; } public string Name { get; init; }
public string Link { get; init; } public string Link { get; init; }

View File

@@ -1,4 +1,5 @@
using NadekoBot.Services.Database.Models; using System.Text.Json.Serialization;
using NadekoBot.Services.Database.Models;
using NadekoBot.Db.Models; using NadekoBot.Db.Models;
namespace NadekoBot.Modules.Searches.Common namespace NadekoBot.Modules.Searches.Common
@@ -8,6 +9,7 @@ namespace NadekoBot.Modules.Searches.Common
public FollowedStream.FType Type { get; } public FollowedStream.FType Type { get; }
public string Name { get; } public string Name { get; }
[JsonConstructor]
public StreamDataKey(FollowedStream.FType type, string name) public StreamDataKey(FollowedStream.FType type, string name)
{ {
Type = type; Type = type;

View File

@@ -39,21 +39,39 @@ namespace NadekoBot.Modules.Searches.Services
private readonly Dictionary<StreamDataKey, Dictionary<ulong, HashSet<FollowedStream>>> _shardTrackedStreams; private readonly Dictionary<StreamDataKey, Dictionary<ulong, HashSet<FollowedStream>>> _shardTrackedStreams;
private readonly ConcurrentHashSet<ulong> _offlineNotificationServers; private readonly ConcurrentHashSet<ulong> _offlineNotificationServers;
private readonly ConnectionMultiplexer _multi;
private readonly IBotCredentials _creds; private readonly IBotCredentials _creds;
private readonly IPubSub _pubSub;
private readonly Timer _notifCleanupTimer; private readonly Timer _notifCleanupTimer;
public StreamNotificationService(DbService db, DiscordSocketClient client, private readonly TypedKey<List<StreamData>> _streamsOnlineKey;
IBotStrings strings, IDataCache cache, IBotCredentials creds, IHttpClientFactory httpFactory, private readonly TypedKey<List<StreamData>> _streamsOfflineKey;
Bot bot)
private readonly TypedKey<FollowStreamPubData> _streamFollowKey;
private readonly TypedKey<FollowStreamPubData> _streamUnfollowKey;
public StreamNotificationService(
DbService db,
DiscordSocketClient client,
IBotStrings strings,
ConnectionMultiplexer redis,
IBotCredentials creds,
IHttpClientFactory httpFactory,
Bot bot,
IPubSub pubSub)
{ {
_db = db; _db = db;
_client = client; _client = client;
_strings = strings; _strings = strings;
_multi = cache.Redis;
_creds = creds; _creds = creds;
_streamTracker = new NotifChecker(httpFactory, cache.Redis, creds.RedisKey(), client.ShardId == 0); _pubSub = pubSub;
_streamTracker = new NotifChecker(httpFactory, redis, creds.RedisKey(), client.ShardId == 0);
_streamsOnlineKey = new("streams.online");
_streamsOfflineKey = new("streams.offline");
_streamFollowKey = new("stream.follow");
_streamUnfollowKey = new("stream.unfollow");
using (var uow = db.GetDbContext()) using (var uow = db.GetDbContext())
{ {
@@ -101,9 +119,8 @@ namespace NadekoBot.Modules.Searches.Services
} }
} }
var sub = _multi.GetSubscriber(); _pubSub.Sub(_streamsOfflineKey, HandleStreamsOffline);
sub.Subscribe($"{_creds.RedisKey()}_streams_offline", HandleStreamsOffline); _pubSub.Sub(_streamsOnlineKey, HandleStreamsOnline);
sub.Subscribe($"{_creds.RedisKey()}_streams_online", HandleStreamsOnline);
if (client.ShardId == 0) if (client.ShardId == 0)
{ {
@@ -153,8 +170,8 @@ namespace NadekoBot.Modules.Searches.Services
} }
}, null, TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(30)); }, null, TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(30));
sub.Subscribe($"{_creds.RedisKey()}_follow_stream", HandleFollowStream); _pubSub.Sub(_streamFollowKey, HandleFollowStream);
sub.Subscribe($"{_creds.RedisKey()}_unfollow_stream", HandleUnfollowStream); _pubSub.Sub(_streamUnfollowKey, HandleUnfollowStream);
} }
bot.JoinedGuild += ClientOnJoinedGuild; bot.JoinedGuild += ClientOnJoinedGuild;
@@ -162,70 +179,62 @@ namespace NadekoBot.Modules.Searches.Services
} }
/// <summary> /// <summary>
/// Handles follow_stream pubs to keep the counter up to date. /// Handles follow stream pubs to keep the counter up to date.
/// When counter reaches 0, stream is removed from tracking because /// When counter reaches 0, stream is removed from tracking because
/// that means no guilds are subscribed to that stream anymore /// that means no guilds are subscribed to that stream anymore
/// </summary> /// </summary>
private void HandleFollowStream(RedisChannel ch, RedisValue val) private async ValueTask HandleFollowStream(FollowStreamPubData info)
=> Task.Run(() => {
_streamTracker.CacheAddData(info.Key, null, replace: false);
lock (_shardLock)
{ {
var info = JsonConvert.DeserializeAnonymousType( var key = info.Key;
val.ToString(), if (_trackCounter.ContainsKey(key))
new {Key = default(StreamDataKey), GuildId = 0ul});
_streamTracker.CacheAddData(info.Key, null, replace: false);
lock (_shardLock)
{ {
var key = info.Key; _trackCounter[key].Add(info.GuildId);
if (_trackCounter.ContainsKey(key))
{
_trackCounter[key].Add(info.GuildId);
}
else
{
_trackCounter[key] = new HashSet<ulong>()
{
info.GuildId
};
}
} }
}); else
{
_trackCounter[key] = new HashSet<ulong>()
{
info.GuildId
};
}
}
}
/// <summary> /// <summary>
/// Handles unfollow_stream pubs to keep the counter up to date. /// Handles unfollow pubs to keep the counter up to date.
/// When counter reaches 0, stream is removed from tracking because /// When counter reaches 0, stream is removed from tracking because
/// that means no guilds are subscribed to that stream anymore /// that means no guilds are subscribed to that stream anymore
/// </summary> /// </summary>
private void HandleUnfollowStream(RedisChannel ch, RedisValue val) private ValueTask HandleUnfollowStream(FollowStreamPubData info)
=> Task.Run(() => {
{ lock (_shardLock)
var info = JsonConvert.DeserializeAnonymousType(val.ToString(), {
new {Key = default(StreamDataKey), GuildId = 0ul}); var key = info.Key;
if (!_trackCounter.TryGetValue(key, out var set))
lock (_shardLock) {
{ // it should've been removed already?
var key = info.Key; _streamTracker.UntrackStreamByKey(in key);
if (!_trackCounter.TryGetValue(key, out var set)) return default;
{ }
// it should've been removed already?
_streamTracker.UntrackStreamByKey(in key); set.Remove(info.GuildId);
return; if (set.Count != 0)
} return default;
set.Remove(info.GuildId); _trackCounter.Remove(key);
if (set.Count != 0) // if no other guilds are following this stream
return; // untrack the stream
_streamTracker.UntrackStreamByKey(in key);
_trackCounter.Remove(key); }
// if no other guilds are following this stream
// untrack the stream return default;
_streamTracker.UntrackStreamByKey(in key); }
}
}); private async ValueTask HandleStreamsOffline(List<StreamData> offlineStreams)
private void HandleStreamsOffline(RedisChannel arg1, RedisValue val) => Task.Run(async () =>
{ {
var offlineStreams = JsonConvert.DeserializeObject<List<StreamData>>(val.ToString());
foreach (var stream in offlineStreams) foreach (var stream in offlineStreams)
{ {
var key = stream.CreateKey(); var key = stream.CreateKey();
@@ -242,11 +251,10 @@ namespace NadekoBot.Modules.Searches.Services
await Task.WhenAll(sendTasks); await Task.WhenAll(sendTasks);
} }
} }
}); }
private void HandleStreamsOnline(RedisChannel arg1, RedisValue val) => Task.Run(async () => private async ValueTask HandleStreamsOnline(List<StreamData> onlineStreams)
{ {
var onlineStreams = JsonConvert.DeserializeObject<List<StreamData>>(val.ToString());
foreach (var stream in onlineStreams) foreach (var stream in onlineStreams)
{ {
var key = stream.CreateKey(); var key = stream.CreateKey();
@@ -276,19 +284,13 @@ namespace NadekoBot.Modules.Searches.Services
await Task.WhenAll(sendTasks); await Task.WhenAll(sendTasks);
} }
} }
});
private Task OnStreamsOffline(List<StreamData> data)
{
var sub = _multi.GetSubscriber();
return sub.PublishAsync($"{_creds.RedisKey()}_streams_offline", JsonConvert.SerializeObject(data));
} }
private Task OnStreamsOnline(List<StreamData> data) private Task OnStreamsOnline(List<StreamData> data)
{ => _pubSub.Pub(_streamsOnlineKey, data);
var sub = _multi.GetSubscriber();
return sub.PublishAsync($"{_creds.RedisKey()}_streams_online", JsonConvert.SerializeObject(data)); private Task OnStreamsOffline(List<StreamData> data)
} => _pubSub.Pub(_streamsOfflineKey, data);
private Task ClientOnJoinedGuild(GuildConfig guildConfig) private Task ClientOnJoinedGuild(GuildConfig guildConfig)
{ {
@@ -381,25 +383,16 @@ namespace NadekoBot.Modules.Searches.Services
} }
} }
PublishUnfollowStream(fs); await PublishUnfollowStream(fs);
return fs; return fs;
} }
private void PublishUnfollowStream(FollowedStream fs)
{
var sub = _multi.GetSubscriber();
sub.Publish($"{_creds.RedisKey()}_unfollow_stream",
JsonConvert.SerializeObject(new {Key = fs.CreateKey(), GuildId = fs.GuildId}));
}
private void PublishFollowStream(FollowedStream fs) private void PublishFollowStream(FollowedStream fs)
{ => _pubSub.Pub(_streamFollowKey, new() { Key = fs.CreateKey(), GuildId = fs.GuildId });
var sub = _multi.GetSubscriber();
sub.Publish($"{_creds.RedisKey()}_follow_stream", private Task PublishUnfollowStream(FollowedStream fs)
JsonConvert.SerializeObject(new {Key = fs.CreateKey(), GuildId = fs.GuildId}), => _pubSub.Pub(_streamUnfollowKey, new() { Key = fs.CreateKey(), GuildId = fs.GuildId });
CommandFlags.FireAndForget);
}
public async Task<StreamData> FollowStream(ulong guildId, ulong channelId, string url) public async Task<StreamData> FollowStream(ulong guildId, ulong channelId, string url)
{ {
@@ -577,5 +570,11 @@ namespace NadekoBot.Modules.Searches.Services
return all.Count; return all.Count;
} }
public sealed class FollowStreamPubData
{
public StreamDataKey Key { get; init; }
public ulong GuildId { get; init; }
}
} }
} }