- NadekoBot class renamed to Bot

- Implemented grpc based coordinator. Supports restarting, killing single or all shards, as well as getting current shard statuses. (Adaptation of the one used by the public bot)
- Coord is setup via coord.yml file
- Methods from SelfService which deal with shard/bot restart etc have been moved to ICoordinator (with GrpcRemoteCoordinator being the default implementation atm)
- Vastly simplified NadekoBot/Program.cs
This commit is contained in:
Kwoth
2021-06-19 13:13:54 +02:00
parent d8c7cdc7f4
commit c86bf6f300
58 changed files with 1212 additions and 635 deletions

View File

@@ -37,7 +37,7 @@ namespace NadekoBot.Core.Services
private readonly DiscordSocketClient _client;
private readonly CommandService _commandService;
private readonly BotConfigService _bss;
private readonly NadekoBot _bot;
private readonly Bot _bot;
private IServiceProvider _services;
private IEnumerable<IEarlyBehavior> _earlyBehaviors;
private IEnumerable<IInputTransformer> _inputTransformers;
@@ -57,7 +57,7 @@ namespace NadekoBot.Core.Services
private readonly Timer _clearUsersOnShortCooldown;
public CommandHandler(DiscordSocketClient client, DbService db, CommandService commandService,
BotConfigService bss, NadekoBot bot, IServiceProvider services)
BotConfigService bss, Bot bot, IServiceProvider services)
{
_client = client;
_commandService = commandService;

View File

@@ -27,7 +27,7 @@ namespace NadekoBot.Core.Services
private readonly BotConfigService _bss;
public bool GroupGreets => _bss.Data.GroupGreets;
public GreetSettingsService(DiscordSocketClient client, NadekoBot bot, DbService db,
public GreetSettingsService(DiscordSocketClient client, Bot bot, DbService db,
BotConfigService bss)
{
_db = db;

View File

@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
namespace NadekoBot.Services
{
public interface ICoordinator
{
bool RestartBot();
void Die();
bool RestartShard(int shardId);
IEnumerable<ShardStatus> GetAllShardStatuses();
int GetGuildCount();
}
public class ShardStatus
{
public Discord.ConnectionState ConnectionState { get; set; }
public DateTime Time { get; set; }
public int ShardId { get; set; }
public int Guilds { get; set; }
}
}

View File

@@ -22,7 +22,7 @@ namespace NadekoBot.Core.Services.Impl
private static readonly Dictionary<string, CommandData> _commandData = JsonConvert.DeserializeObject<Dictionary<string, CommandData>>(
File.ReadAllText("./data/strings/commands/commands.en-US.json"));
public Localization(BotConfigService bss, NadekoBot bot, DbService db)
public Localization(BotConfigService bss, Bot bot, DbService db)
{
_bss = bss;
_db = db;

View File

@@ -0,0 +1,138 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Discord;
using Discord.WebSocket;
using Grpc.Core;
using NadekoBot.Common.ModuleBehaviors;
using NadekoBot.Coordinator;
using NadekoBot.Core.Services;
using NadekoBot.Extensions;
using Serilog;
namespace NadekoBot.Services
{
public class RemoteGrpcCoordinator : ICoordinator, IReadyExecutor
{
private readonly Coordinator.Coordinator.CoordinatorClient _coordClient;
private readonly DiscordSocketClient _client;
public RemoteGrpcCoordinator(IBotCredentials creds, DiscordSocketClient client)
{
// todo should use credentials
var channel = Grpc.Net.Client.GrpcChannel.ForAddress("https://localhost:3443");
_coordClient = new(channel);
_client = client;
}
public bool RestartBot()
{
_coordClient.RestartAllShards(new RestartAllRequest
{
});
return true;
}
public void Die()
{
_coordClient.Die(new DieRequest()
{
Graceful = false
});
}
public bool RestartShard(int shardId)
{
_coordClient.RestartShard(new RestartShardRequest
{
ShardId = shardId,
});
return true;
}
public IEnumerable<ShardStatus> GetAllShardStatuses()
{
var res = _coordClient.GetAllStatuses(new GetAllStatusesRequest());
return res.Statuses
.ToArray()
.Map(s => new ShardStatus()
{
ConnectionState = FromCoordConnState(s.State),
Guilds = s.GuildCount,
ShardId = s.ShardId,
Time = s.LastUpdate.ToDateTime(),
});
}
public int GetGuildCount()
{
var res = _coordClient.GetAllStatuses(new GetAllStatusesRequest());
return res.Statuses.Sum(x => x.GuildCount);
}
public Task OnReadyAsync()
{
Task.Run(async () =>
{
var gracefulImminent = false;
while (true)
{
try
{
var reply = await _coordClient.HeartbeatAsync(new HeartbeatRequest
{
State = ToCoordConnState(_client.ConnectionState),
GuildCount = _client.ConnectionState == Discord.ConnectionState.Connected ? _client.Guilds.Count : 0,
ShardId = _client.ShardId,
}, deadline: DateTime.UtcNow + TimeSpan.FromSeconds(10));
gracefulImminent = reply.GracefulImminent;
}
catch (RpcException ex)
{
if (!gracefulImminent)
{
Log.Warning(ex, "Hearbeat failed and graceful shutdown was not expected: {Message}",
ex.Message);
break;
}
await Task.Delay(22500).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Error(ex, "Unexpected heartbeat exception: {Message}", ex.Message);
break;
}
await Task.Delay(7500).ConfigureAwait(false);
}
Environment.Exit(5);
});
return Task.CompletedTask;
}
private ConnState ToCoordConnState(Discord.ConnectionState state)
=> state switch
{
Discord.ConnectionState.Connecting => ConnState.Connecting,
Discord.ConnectionState.Connected => ConnState.Connected,
_ => ConnState.Disconnected
};
private Discord.ConnectionState FromCoordConnState(ConnState state)
=> state switch
{
ConnState.Connecting => Discord.ConnectionState.Connecting,
ConnState.Connected => Discord.ConnectionState.Connected,
_ => Discord.ConnectionState.Disconnected
};
}
}

View File

@@ -1,477 +0,0 @@
using Discord;
using Discord.Commands;
using Discord.WebSocket;
using Microsoft.Extensions.DependencyInjection;
using NadekoBot.Common;
using NadekoBot.Common.ShardCom;
using NadekoBot.Core.Services;
using NadekoBot.Core.Services.Database.Models;
using NadekoBot.Core.Services.Impl;
using NadekoBot.Extensions;
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Discord.Net;
using LinqToDB.EntityFrameworkCore;
using NadekoBot.Common.ModuleBehaviors;
using NadekoBot.Core.Common;
using NadekoBot.Core.Common.Configs;
using NadekoBot.Db;
using NadekoBot.Modules.Administration;
using NadekoBot.Modules.Gambling.Services;
using NadekoBot.Modules.Administration.Services;
using NadekoBot.Modules.CustomReactions.Services;
using NadekoBot.Modules.Utility.Services;
using Serilog;
namespace NadekoBot
{
public class NadekoBot
{
public BotCredentials Credentials { get; }
public DiscordSocketClient Client { get; }
public CommandService CommandService { get; }
private readonly DbService _db;
public ImmutableArray<GuildConfig> AllGuildConfigs { get; private set; }
/* Will have to be removed soon, it's been way too long */
public static Color OkColor { get; set; }
public static Color ErrorColor { get; set; }
public static Color PendingColor { get; set; }
public TaskCompletionSource<bool> Ready { get; private set; } = new TaskCompletionSource<bool>();
public IServiceProvider Services { get; private set; }
public IDataCache Cache { get; private set; }
public int GuildCount =>
Cache.Redis.GetDatabase()
.ListRange(Credentials.RedisKey() + "_shardstats")
.Select(x => JsonConvert.DeserializeObject<ShardComMessage>(x))
.Sum(x => x.Guilds);
public string Mention { get; set; }
public event Func<GuildConfig, Task> JoinedGuild = delegate { return Task.CompletedTask; };
public NadekoBot(int shardId, int parentProcessId)
{
if (shardId < 0)
throw new ArgumentOutOfRangeException(nameof(shardId));
LogSetup.SetupLogger(shardId);
TerribleElevatedPermissionCheck();
Credentials = new BotCredentials();
Cache = new RedisCache(Credentials, shardId);
LinqToDBForEFTools.Initialize();
_db = new DbService(Credentials);
if (shardId == 0)
{
_db.Setup();
}
Client = new DiscordSocketClient(new DiscordSocketConfig
{
MessageCacheSize = 50,
LogLevel = LogSeverity.Warning,
ConnectionTimeout = int.MaxValue,
TotalShards = Credentials.TotalShards,
ShardId = shardId,
AlwaysDownloadUsers = false,
ExclusiveBulkDelete = true,
});
CommandService = new CommandService(new CommandServiceConfig()
{
CaseSensitiveCommands = false,
DefaultRunMode = RunMode.Sync,
});
SetupShard(parentProcessId);
#if GLOBAL_NADEKO || DEBUG
Client.Log += Client_Log;
#endif
}
private void StartSendingData()
{
Task.Run(async () =>
{
while (true)
{
var data = new ShardComMessage()
{
ConnectionState = Client.ConnectionState,
Guilds = Client.ConnectionState == ConnectionState.Connected ? Client.Guilds.Count : 0,
ShardId = Client.ShardId,
Time = DateTime.UtcNow,
};
var sub = Cache.Redis.GetSubscriber();
var msg = JsonConvert.SerializeObject(data);
await sub.PublishAsync(Credentials.RedisKey() + "_shardcoord_send", msg).ConfigureAwait(false);
await Task.Delay(7500).ConfigureAwait(false);
}
});
}
public List<ulong> GetCurrentGuildIds()
{
return Client.Guilds.Select(x => x.Id).ToList();
}
public IEnumerable<GuildConfig> GetCurrentGuildConfigs()
{
using var uow = _db.GetDbContext();
return uow.GuildConfigs.GetAllGuildConfigs(GetCurrentGuildIds()).ToImmutableArray();
}
private void AddServices()
{
var startingGuildIdList = GetCurrentGuildIds();
var sw = Stopwatch.StartNew();
var _bot = Client.CurrentUser;
using (var uow = _db.GetDbContext())
{
uow.EnsureUserCreated(_bot.Id, _bot.Username, _bot.Discriminator, _bot.AvatarId);
AllGuildConfigs = uow.GuildConfigs.GetAllGuildConfigs(startingGuildIdList).ToImmutableArray();
}
var s = new ServiceCollection()
.AddSingleton<IBotCredentials>(Credentials)
.AddSingleton(_db)
.AddSingleton(Client)
.AddSingleton(CommandService)
.AddSingleton(this)
.AddSingleton(Cache)
.AddSingleton(Cache.Redis)
.AddSingleton<ISeria, JsonSeria>()
.AddSingleton<IPubSub, RedisPubSub>()
.AddSingleton<IConfigSeria, YamlSeria>()
.AddBotStringsServices()
.AddConfigServices()
.AddConfigMigrators()
.AddMemoryCache()
.AddSingleton<IShopService, ShopService>()
// music
.AddMusic()
;
s.AddHttpClient();
s.AddHttpClient("memelist").ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler
{
AllowAutoRedirect = false
});
s.LoadFrom(Assembly.GetAssembly(typeof(CommandHandler)));
s.AddSingleton<IReadyExecutor>(x => x.GetService<SelfService>());
s.AddSingleton<IReadyExecutor>(x => x.GetService<CustomReactionsService>());
s.AddSingleton<IReadyExecutor>(x => x.GetService<RepeaterService>());
//initialize Services
Services = s.BuildServiceProvider();
var commandHandler = Services.GetService<CommandHandler>();
if (Client.ShardId == 0)
{
ApplyConfigMigrations();
}
//what the fluff
commandHandler.AddServices(s);
_ = LoadTypeReaders(typeof(NadekoBot).Assembly);
sw.Stop();
Log.Information($"All services loaded in {sw.Elapsed.TotalSeconds:F2}s");
}
private void ApplyConfigMigrations()
{
// execute all migrators
var migrators = Services.GetServices<IConfigMigrator>();
foreach (var migrator in migrators)
{
migrator.EnsureMigrated();
}
// and then drop the bot config table
// var conn = _db.GetDbContext()._context.Database.GetDbConnection();
// using var deleteBotConfig = conn.CreateCommand();
// deleteBotConfig.CommandText = "DROP TABLE IF EXISTS BotConfig;";
// deleteBotConfig.ExecuteNonQuery();
}
private IEnumerable<object> LoadTypeReaders(Assembly assembly)
{
Type[] allTypes;
try
{
allTypes = assembly.GetTypes();
}
catch (ReflectionTypeLoadException ex)
{
Log.Warning(ex.LoaderExceptions[0], "Error getting types");
return Enumerable.Empty<object>();
}
var filteredTypes = allTypes
.Where(x => x.IsSubclassOf(typeof(TypeReader))
&& x.BaseType.GetGenericArguments().Length > 0
&& !x.IsAbstract);
var toReturn = new List<object>();
foreach (var ft in filteredTypes)
{
var x = (TypeReader)Activator.CreateInstance(ft, Client, CommandService);
var baseType = ft.BaseType;
var typeArgs = baseType.GetGenericArguments();
CommandService.AddTypeReader(typeArgs[0], x);
toReturn.Add(x);
}
return toReturn;
}
private async Task LoginAsync(string token)
{
var clientReady = new TaskCompletionSource<bool>();
Task SetClientReady()
{
var _ = Task.Run(async () =>
{
clientReady.TrySetResult(true);
try
{
foreach (var chan in (await Client.GetDMChannelsAsync().ConfigureAwait(false)))
{
await chan.CloseAsync().ConfigureAwait(false);
}
}
catch
{
// ignored
}
finally
{
}
});
return Task.CompletedTask;
}
//connect
Log.Information("Shard {ShardId} logging in ...", Client.ShardId);
try
{
await Client.LoginAsync(TokenType.Bot, token).ConfigureAwait(false);
await Client.StartAsync().ConfigureAwait(false);
}
catch (HttpException ex)
{
LoginErrorHandler.Handle(ex);
Helpers.ReadErrorAndExit(3);
}
catch (Exception ex)
{
LoginErrorHandler.Handle(ex);
Helpers.ReadErrorAndExit(4);
}
Client.Ready += SetClientReady;
await clientReady.Task.ConfigureAwait(false);
Client.Ready -= SetClientReady;
Client.JoinedGuild += Client_JoinedGuild;
Client.LeftGuild += Client_LeftGuild;
Log.Information("Shard {0} logged in.", Client.ShardId);
}
private Task Client_LeftGuild(SocketGuild arg)
{
Log.Information("Left server: {0} [{1}]", arg?.Name, arg?.Id);
return Task.CompletedTask;
}
private Task Client_JoinedGuild(SocketGuild arg)
{
Log.Information($"Joined server: {0} [{1}]", arg.Name, arg.Id);
var _ = Task.Run(async () =>
{
GuildConfig gc;
using (var uow = _db.GetDbContext())
{
gc = uow.GuildConfigsForId(arg.Id);
}
await JoinedGuild.Invoke(gc).ConfigureAwait(false);
});
return Task.CompletedTask;
}
public async Task RunAsync()
{
var sw = Stopwatch.StartNew();
await LoginAsync(Credentials.Token).ConfigureAwait(false);
Mention = Client.CurrentUser.Mention;
Log.Information("Shard {ShardId} loading services...", Client.ShardId);
try
{
AddServices();
}
catch (Exception ex)
{
Log.Error(ex, "Error adding services");
Helpers.ReadErrorAndExit(9);
}
sw.Stop();
Log.Information("Shard {ShardId} connected in {Elapsed:F2}s", Client.ShardId, sw.Elapsed.TotalSeconds);
var stats = Services.GetService<IStatsService>();
stats.Initialize();
var commandHandler = Services.GetService<CommandHandler>();
var CommandService = Services.GetService<CommandService>();
// start handling messages received in commandhandler
await commandHandler.StartHandling().ConfigureAwait(false);
_ = await CommandService.AddModulesAsync(this.GetType().GetTypeInfo().Assembly, Services)
.ConfigureAwait(false);
HandleStatusChanges();
StartSendingData();
Ready.TrySetResult(true);
_ = Task.Run(ExecuteReadySubscriptions);
Log.Information("Shard {ShardId} ready", Client.ShardId);
}
private Task ExecuteReadySubscriptions()
{
var readyExecutors = Services.GetServices<IReadyExecutor>();
var tasks = readyExecutors.Select(async toExec =>
{
try
{
await toExec.OnReadyAsync();
}
catch (Exception ex)
{
Log.Error(ex, "Failed running OnReadyAsync method on {Type} type: {Message}",
toExec.GetType().Name, ex.Message);
}
});
return Task.WhenAll(tasks);
}
private Task Client_Log(LogMessage arg)
{
if (arg.Exception != null)
Log.Warning(arg.Exception, arg.Source + " | " + arg.Message);
else
Log.Warning(arg.Source + " | " + arg.Message);
return Task.CompletedTask;
}
public async Task RunAndBlockAsync()
{
await RunAsync().ConfigureAwait(false);
await Task.Delay(-1).ConfigureAwait(false);
}
private void TerribleElevatedPermissionCheck()
{
try
{
var rng = new NadekoRandom().Next(100000, 1000000);
var str = rng.ToString();
File.WriteAllText(str, str);
File.Delete(str);
}
catch
{
Log.Error("You must run the application as an ADMINISTRATOR");
Helpers.ReadErrorAndExit(2);
}
}
private static void SetupShard(int parentProcessId)
{
new Thread(new ThreadStart(() =>
{
try
{
var p = Process.GetProcessById(parentProcessId);
p.WaitForExit();
}
finally
{
Environment.Exit(7);
}
})).Start();
}
private void HandleStatusChanges()
{
var sub = Services.GetService<IDataCache>().Redis.GetSubscriber();
sub.Subscribe(Client.CurrentUser.Id + "_status.game_set", async (ch, game) =>
{
try
{
var obj = new { Name = default(string), Activity = ActivityType.Playing };
obj = JsonConvert.DeserializeAnonymousType(game, obj);
await Client.SetGameAsync(obj.Name, type: obj.Activity).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Warning(ex, "Error setting game");
}
}, CommandFlags.FireAndForget);
sub.Subscribe(Client.CurrentUser.Id + "_status.stream_set", async (ch, streamData) =>
{
try
{
var obj = new { Name = "", Url = "" };
obj = JsonConvert.DeserializeAnonymousType(streamData, obj);
await Client.SetGameAsync(obj.Name, obj.Url, ActivityType.Streaming).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Warning(ex, "Error setting stream");
}
}, CommandFlags.FireAndForget);
}
public Task SetGameAsync(string game, ActivityType type)
{
var obj = new { Name = game, Activity = type };
var sub = Services.GetService<IDataCache>().Redis.GetSubscriber();
return sub.PublishAsync(Client.CurrentUser.Id + "_status.game_set", JsonConvert.SerializeObject(obj));
}
public Task SetStreamAsync(string name, string link)
{
var obj = new { Name = name, Url = link };
var sub = Services.GetService<IDataCache>().Redis.GetSubscriber();
return sub.PublishAsync(Client.CurrentUser.Id + "_status.stream_set", JsonConvert.SerializeObject(obj));
}
}
}

View File

@@ -37,9 +37,9 @@ namespace NadekoBot.Core.Services
var error = _data.Color.Error;
var pend = _data.Color.Pending;
// todo future remove these static props once cleanup is done
NadekoBot.OkColor = new Color(ok.R, ok.G, ok.B);
NadekoBot.ErrorColor = new Color(error.R, error.G, error.B);
NadekoBot.PendingColor = new Color(pend.R, pend.G, pend.B);
Bot.OkColor = new Color(ok.R, ok.G, ok.B);
Bot.ErrorColor = new Color(error.R, error.G, error.B);
Bot.PendingColor = new Color(pend.R, pend.G, pend.B);
}
protected override void OnStateUpdate()

View File

@@ -1,388 +0,0 @@
using NadekoBot.Common.Collections;
using NadekoBot.Common.ShardCom;
using NadekoBot.Core.Services.Impl;
using NadekoBot.Extensions;
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using NadekoBot.Core.Common;
using Serilog;
namespace NadekoBot.Core.Services
{
public class ShardsCoordinator
{
private class ShardsCoordinatorQueue
{
private readonly object _locker = new object();
private readonly HashSet<int> _set = new HashSet<int>();
private readonly Queue<int> _queue = new Queue<int>();
public int Count => _queue.Count;
public void Enqueue(int i)
{
lock (_locker)
{
if (_set.Add(i))
_queue.Enqueue(i);
}
}
public bool TryPeek(out int id)
{
lock (_locker)
{
return _queue.TryPeek(out id);
}
}
public bool TryDequeue(out int id)
{
lock (_locker)
{
if (_queue.TryDequeue(out id))
{
_set.Remove(id);
return true;
}
}
return false;
}
}
private readonly BotCredentials _creds;
private readonly string _key;
private readonly Process[] _shardProcesses;
private readonly int _curProcessId;
private readonly ConnectionMultiplexer _redis;
private ShardComMessage _defaultShardState;
private ShardsCoordinatorQueue _shardStartQueue =
new ShardsCoordinatorQueue();
private ConcurrentHashSet<int> _shardRestartWaitingList =
new ConcurrentHashSet<int>();
public ShardsCoordinator()
{
//load main stuff
LogSetup.SetupLogger("coord");
_creds = new BotCredentials();
Log.Information("Starting NadekoBot v" + StatsService.BotVersion);
_key = _creds.RedisKey();
var conf = ConfigurationOptions.Parse(_creds.RedisOptions);
try
{
_redis = ConnectionMultiplexer.Connect(conf);
}
catch (RedisConnectionException ex)
{
Log.Error(ex, "Redis error. Make sure Redis is installed and running as a service");
Helpers.ReadErrorAndExit(11);
}
var imgCache = new RedisImagesCache(_redis, _creds); //reload images into redis
if (!imgCache.AllKeysExist().GetAwaiter().GetResult()) // but only if the keys don't exist. If images exist, you have to reload them manually
{
imgCache.Reload().GetAwaiter().GetResult();
}
else
{
Log.Information("Images are already present in redis. Use .imagesreload to force update if needed");
}
//setup initial shard statuses
_defaultShardState = new ShardComMessage()
{
ConnectionState = Discord.ConnectionState.Disconnected,
Guilds = 0,
Time = DateTime.UtcNow
};
var db = _redis.GetDatabase();
//clear previous statuses
db.KeyDelete(_key + "_shardstats");
_shardProcesses = new Process[_creds.TotalShards];
#if GLOBAL_NADEKO
var shardIdsEnum = Enumerable.Range(1, 31)
.Concat(Enumerable.Range(33, _creds.TotalShards - 33))
.Shuffle()
.Prepend(32)
.Prepend(0);
#else
var shardIdsEnum = Enumerable.Range(1, _creds.TotalShards - 1)
.Shuffle()
.Prepend(0);
#endif
var shardIds = shardIdsEnum
.ToArray();
for (var i = 0; i < shardIds.Length; i++)
{
var id = shardIds[i];
//add it to the list of shards which should be started
#if DEBUG
if (id > 0)
_shardStartQueue.Enqueue(id);
else
_shardProcesses[id] = Process.GetCurrentProcess();
#else
_shardStartQueue.Enqueue(id);
#endif
//set the shard's initial state in redis cache
var msg = _defaultShardState.Clone();
msg.ShardId = id;
//this is to avoid the shard coordinator thinking that
//the shard is unresponsive while starting up
var delay = 45;
#if GLOBAL_NADEKO
delay = 180;
#endif
msg.Time = DateTime.UtcNow + TimeSpan.FromSeconds(delay * (id + 1));
db.ListRightPush(_key + "_shardstats",
JsonConvert.SerializeObject(msg),
flags: CommandFlags.FireAndForget);
}
_curProcessId = Process.GetCurrentProcess().Id;
//subscribe to shardcoord events
var sub = _redis.GetSubscriber();
//send is called when shard status is updated. Every 7.5 seconds atm
sub.Subscribe(_key + "_shardcoord_send",
OnDataReceived,
CommandFlags.FireAndForget);
//called to stop the shard, although the shard will start again when it finds out it's dead
sub.Subscribe(_key + "_shardcoord_stop",
OnStop,
CommandFlags.FireAndForget);
//called kill the bot
sub.Subscribe(_key + "_die",
(ch, x) => Environment.Exit(0),
CommandFlags.FireAndForget);
}
private void OnStop(RedisChannel ch, RedisValue data)
{
var shardId = JsonConvert.DeserializeObject<int>(data);
OnStop(shardId);
}
private void OnStop(int shardId)
{
var db = _redis.GetDatabase();
var msg = _defaultShardState.Clone();
msg.ShardId = shardId;
db.ListSetByIndex(_key + "_shardstats",
shardId,
JsonConvert.SerializeObject(msg),
CommandFlags.FireAndForget);
var p = _shardProcesses[shardId];
if (p is null)
return; // ignore
_shardProcesses[shardId] = null;
try
{
p.KillTree();
p.Dispose();
}
catch { }
}
private void OnDataReceived(RedisChannel ch, RedisValue data)
{
var msg = JsonConvert.DeserializeObject<ShardComMessage>(data);
if (msg is null)
return;
var db = _redis.GetDatabase();
//sets the shard state
db.ListSetByIndex(_key + "_shardstats",
msg.ShardId,
data,
CommandFlags.FireAndForget);
if (msg.ConnectionState == Discord.ConnectionState.Disconnected
|| msg.ConnectionState == Discord.ConnectionState.Disconnecting)
{
Log.Error("!!! SHARD {0} IS IN {1} STATE !!!", msg.ShardId, msg.ConnectionState.ToString());
OnShardUnavailable(msg.ShardId);
}
else
{
// remove the shard from the waiting list if it's on it,
// because it's connected/connecting now
_shardRestartWaitingList.TryRemove(msg.ShardId);
}
return;
}
private void OnShardUnavailable(int shardId)
{
//if the shard is dc'd, add it to the restart waiting list
if (!_shardRestartWaitingList.Add(shardId))
{
//if it's already on the waiting list
//stop the shard
OnStop(shardId);
//add it to the start queue (start the shard)
_shardStartQueue.Enqueue(shardId);
//remove it from the waiting list
_shardRestartWaitingList.TryRemove(shardId);
}
}
public async Task RunAsync()
{
//this task will complete when the initial start of the shards
//is complete, but will keep running in order to restart shards
//which are disconnected for too long
TaskCompletionSource<bool> tsc = new TaskCompletionSource<bool>();
var _ = Task.Run(async () =>
{
do
{
//start a shard which is scheduled for start every 6 seconds
while (_shardStartQueue.TryPeek(out var id))
{
// if the shard is on the waiting list again
// remove it since it's starting up now
_shardRestartWaitingList.TryRemove(id);
//if the task is already completed,
//it means the initial shard starting is done,
//and this is an auto-restart
if (tsc.Task.IsCompleted)
{
Log.Warning("Auto-restarting shard {0}, {1} more in queue.", id, _shardStartQueue.Count);
}
else
{
Log.Warning("Starting shard {0}, {1} more in queue.", id, _shardStartQueue.Count - 1);
}
var rem = _shardProcesses[id];
if (rem != null)
{
try
{
rem.KillTree();
rem.Dispose();
}
catch { }
}
_shardProcesses[id] = StartShard(id);
_shardStartQueue.TryDequeue(out var __);
await Task.Delay(10000).ConfigureAwait(false);
}
tsc.TrySetResult(true);
await Task.Delay(6000).ConfigureAwait(false);
}
while (true);
// ^ keep checking for shards which need to be restarted
});
//restart unresponsive shards
_ = Task.Run(async () =>
{
//after all shards have started initially
await tsc.Task.ConfigureAwait(false);
while (true)
{
await Task.Delay(15000).ConfigureAwait(false);
try
{
var db = _redis.GetDatabase();
//get all shards which didn't communicate their status in the last 30 seconds
var all = db.ListRange(_creds.RedisKey() + "_shardstats")
.Select(x => JsonConvert.DeserializeObject<ShardComMessage>(x));
var statuses = all
.Where(x => x.Time < DateTime.UtcNow - TimeSpan.FromSeconds(30))
.ToArray();
if (!statuses.Any())
{
#if DEBUG
for (var i = 0; i < _shardProcesses.Length; i++)
{
var p = _shardProcesses[i];
if (p is null || p.HasExited)
{
Log.Warning("Scheduling shard {0} for restart because it's process is stopped.", i);
_shardStartQueue.Enqueue(i);
}
}
#endif
}
else
{
for (var i = 0; i < statuses.Length; i++)
{
var s = statuses[i];
OnStop(s.ShardId);
_shardStartQueue.Enqueue(s.ShardId);
//to prevent shards which are already scheduled for restart to be scheduled again
s.Time = DateTime.UtcNow + TimeSpan.FromSeconds(60 * _shardStartQueue.Count);
db.ListSetByIndex(_key + "_shardstats", s.ShardId,
JsonConvert.SerializeObject(s), CommandFlags.FireAndForget);
Log.Warning("Shard {0} is scheduled for a restart because it's unresponsive.", s.ShardId);
}
}
}
catch (Exception ex) { Log.Error(ex, "Error in RunAsync"); throw; }
}
});
await tsc.Task.ConfigureAwait(false);
return;
}
private Process StartShard(int shardId)
{
return Process.Start(new ProcessStartInfo()
{
FileName = _creds.ShardRunCommand,
Arguments = string.Format(_creds.ShardRunArguments, shardId, _curProcessId, "")
});
// last "" in format is for backwards compatibility
// because current startup commands have {2} in them probably
}
public async Task RunAndBlockAsync()
{
try
{
await RunAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Error(ex, "Unhandled exception in RunAsync");
foreach (var p in _shardProcesses)
{
if (p is null)
continue;
try
{
p.KillTree();
p.Dispose();
}
catch { }
}
return;
}
await Task.Delay(-1).ConfigureAwait(false);
}
}
}