Well, here's an example of implementation. Immediately I warn you, there will be a lot of code.
Take as a basis this unreliable function:
class EvilComputation { static Random random = new Random(); public static async Task<double> Compute( int numberOfSeconds, double x, CancellationToken ct) { bool wellBehaved = random.Next(2) == 0; var y = x * x; var delay = TimeSpan.FromSeconds(numberOfSeconds); await Task.Delay(delay, wellBehaved ? ct : CancellationToken.None); return y; } }
You see that the function is bad: depending on random conditions, it may not respond to cancellation.
What to do in this case? We take out the function in a separate process. This process can be killed without much harm to the original process.
In order to call a function in another process, you need to transfer data about the function call there. For communication, use, for example, anonymous pipes (you can use essentially anything). I base the code on this example: How to: Use Anonymous Pipes for Local Interprocess Communication .
For data transfer we will use standard binary formatting, since we did not go through WCF. We need DTO-objects that will be transferred between processes. They need to be used in two processes - the main and the auxiliary (we will call it a plug-in ), so a separate assembly is required for the DTO types.
We start the OutProcCommonData assembly, put the following classes into it:
namespace OutProcCommonData { [Serializable] public class Command // общий класс-предок для посылаемой команды { } [Serializable] public class Evaluate : Command // команда на вычисление { public int NumberOfSecondsToProcess; public double X; } [Serializable] public class Cancel : Command // команда на отмену { } }
Next, the return result:
namespace OutProcCommonData { [Serializable] public class Response // общий класс-предок для возвращаемого результата { } [Serializable] public class Result : Response // готовый результат вычислений { public double Y; } [Serializable] public class Error : Response // ошибка с текстом { public string Text; } [Serializable] public class Cancelled : Response // подтверждение отмены { } }
Next, our plugin. This is a separate console application (although, if we don’t want to see the console and debug output, you can make it non-console).
The protocol of communication is as follows. The main program sends Evaluate , and after it, possibly, Cancel . The plugin returns Result in case of successful calculation, Cancelled in case of received cancel signal and successfully canceled calculation, and Error in case of error (for example, violation of the communication protocol).
Here is the binding code:
class Plugin { static int Main(string[] args) { // нам должны быть переданы два аргумента: хендл входящего и исходящего пайпов if (args.Length != 2) { Console.Error.WriteLine("Shouldn't be started directly"); return 1; } return new Plugin().Run(args[0], args[1]).Result; } BinaryFormatter serializer = new BinaryFormatter(); // для сериализации async Task<int> Run(string hIn, string hOut) { Console.WriteLine("[Plugin] Running"); // открывем переданные пайпы using (var inStream = new AnonymousPipeClientStream(PipeDirection.In, hIn)) using (var outStream = new AnonymousPipeClientStream(PipeDirection.Out, hOut)) { try { var cts = new CancellationTokenSource(); // токен для отмены Console.WriteLine("[Plugin] Reading args"); // пытаемся десериализовать аргументы var args = SafeGet<OutProcCommonData.Evaluate>(inStream); if (args == null) { Console.WriteLine("[Plugin] Didn't get args"); // отправляем ошибку, если не удалось serializer.Serialize( outStream, new OutProcCommonData.Error() { Text = "Unrecognized input" }); // и выходим return 3; } Console.WriteLine("[Plugin] Got args, start compute and waiting cancel"); // запускаем вычисление var computeTask = EvilComputation.Compute( args.NumberOfSecondsToProcess, args.X, cts.Token); // параллельно запускаем чтение возможной отмены var waitForCancelTask = Task.Run(() => (OutProcCommonData.Cancel)serializer.Deserialize(inStream)); // дожидаемся одного из двух var winner = await Task.WhenAny(computeTask, waitForCancelTask); // если первой пришла отмена... if (winner == waitForCancelTask) { Console.WriteLine("[Plugin] Got cancel, cancelling computation"); // просим вычисление завершиться cts.Cancel(); } // окончания вычисления всё равно нужно дождаться Console.WriteLine("[Plugin] Awaiting computation"); // если вычисление отменится, здесь будет исключение var result = await computeTask; Console.WriteLine("[Plugin] Sending back result"); // отсылаем результат в пайп serializer.Serialize( outStream, new OutProcCommonData.Result() { Y = result }); // нормальный выход return 0; } catch (OperationCanceledException) { // мы успешно отменили задание, рапортуем Console.WriteLine("[Plugin] Sending cancellation"); serializer.Serialize( outStream, new OutProcCommonData.Cancelled()); return 2; } catch (Exception ex) { // возникла непредвиденная ошибка, рапортуем Console.WriteLine($"[Plugin] Sending error {ex.Message}"); serializer.Serialize( outStream, new OutProcCommonData.Error() { Text = ex.Message }); return 3; } } } // ну и вспомогательная функция, которая пытается читать данные из пайпа T SafeGet<T>(Stream s) where T : class { try { return (T)serializer.Deserialize(s); } catch { return null; } } }
I do not catch errors when writing to the pipe, add to your taste.
Now, the main program. We will have it separately from the plug-in (that is, we have three assemblies).
class Program { static void Main(string[] args) => new Program().Run().Wait(); async Task Run() { var cts = new CancellationTokenSource(); try { var y = await ComputeOutProc(2, cts.Token); Console.WriteLine($"[Main] Result: {y}"); } catch (TimeoutException) { Console.WriteLine("[Main] Timed out"); } catch (OperationCanceledException) { Console.WriteLine("[Main] Cancelled"); } } const int SecondsToSend = 3; const int TimeoutSeconds = 5; const int CancelSeconds = 2; BinaryFormatter serializer = new BinaryFormatter(); async Task<double> ComputeOutProc(double x, CancellationToken ct) { Process plugin = null; bool pluginStarted = false; try { // создаём исходящий и входящий пайпы using (var commandStream = new AnonymousPipeServerStream( PipeDirection.Out, HandleInheritability.Inheritable)) using (var responseStream = new AnonymousPipeServerStream( PipeDirection.In, HandleInheritability.Inheritable)) { Console.WriteLine("[Main] Starting plugin"); plugin = new Process() { StartInfo = { FileName = "OutProcPlugin.exe", Arguments = commandStream.GetClientHandleAsString() + " " + responseStream.GetClientHandleAsString(), UseShellExecute = false } }; // запускаем плагин с параметрами plugin.Start(); pluginStarted = true; Console.WriteLine("[Main] Started plugin"); commandStream.DisposeLocalCopyOfClientHandle(); responseStream.DisposeLocalCopyOfClientHandle(); void Send(Command c) { serializer.Serialize(commandStream, c); commandStream.Flush(); } try { // отсылаем плагину команду на вычисление Console.WriteLine("[Main] Sending evaluate request"); Send(new OutProcCommonData.Evaluate() { NumberOfSecondsToProcess = SecondsToSend, X = x }); Task<Response> responseTask; bool readyInTime; bool cancellationSent = false; // внутри этого блока при отмене будем отсылать команду плагину using (ct.Register(() => { Send(new OutProcCommonData.Cancel()); Console.WriteLine("[Main] Requested cancellation"); cancellationSent = true; })) { Console.WriteLine("[Main] Starting getting response"); // ожидаем получение ответа responseTask = Task.Run(() => (Response)serializer.Deserialize(responseStream)); // или таймаута var timeoutTask = Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)); var winner = await Task.WhenAny(responseTask, timeoutTask); readyInTime = winner == responseTask; } // если наступил таймаут, просим процесс вежливо завершить вычисления if (!readyInTime) { if (!cancellationSent) { Console.WriteLine("[Main] Not ready in time, sending cancel"); Send(new OutProcCommonData.Cancel()); } else { Console.WriteLine("[Main] Not ready in time, cancel sent"); } // и ждём ещё немного, ну или прихода ответа var timeoutTask = Task.Delay(TimeSpan.FromSeconds(CancelSeconds)); await Task.WhenAny(responseTask, timeoutTask); } // если до сих пор ничего не пришло, плагин завис, убиваем его if (!responseTask.IsCompleted) { Console.WriteLine("[Main] No response, killing plugin"); plugin.Kill(); // это завершит ожидание с исключением, по идее // в ранних версиях .NET нужно было бы поймать // это исключение // и уходим с исключением-таймаутом ct.ThrowIfCancellationRequested(); throw new TimeoutException(); } // здесь мы уверены, что ожидание завершилось Console.WriteLine("[Main] Obtaining response"); var response = await responseTask; // тут может быть брошено исключение // если была затребована отмена, выходим ct.ThrowIfCancellationRequested(); // проверяем тип результата: switch (response) { case Result r: // нормальный результат, возвращаем его Console.WriteLine("[Main] Got result, returning"); return rY; case Cancelled _: // отмена не по ct = таймаут Console.WriteLine("[Main] Got cancellation"); throw new TimeoutException(); case Error err: // пришла ошибка, бросаем исключение // лучше, конечно, определить собственный тип здесь Console.WriteLine("[Main] Got error"); throw new Exception(err.Text); default: // сюда мы вообще не должны попасть, если плагин работает нормально Console.WriteLine("[Main] Unexpected error"); throw new Exception("Unexpected response type"); } } catch (IOException e) { Console.WriteLine("[Main] IO error occured"); throw new Exception("IO Error", e); } } } finally { if (pluginStarted) { plugin.WaitForExit(); plugin.Close(); } } } }
The result of the run:
[Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Plugin] Awaiting computation [Plugin] Sending back result [Main] Obtaining response [Main] Got result, returning [Main] Result: 4
If we change the SecondsToSend constant to 10 so that there is a timeout, we get the result of two runs:
For regular completion:
[Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Not ready in time, sending cancel [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Plugin] Sending cancellation [Main] Obtaining response [Main] Got cancellation [Main] Timed out
To force a completion:
[Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Not ready in time, sending cancel [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Main] No response, killing plugin [Main] Timed out
If add before
var y = await ComputeOutProc(2, cts.Token);
premature cancellation:
cts.CancelAfter(TimeSpan.FromSeconds(1));
we get the following result: for regular completion
[Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Requested cancellation [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Plugin] Sending cancellation [Main] Obtaining response [Main] Cancelled
and to force completion
[Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Requested cancellation [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Main] Not ready in time, cancel sent [Main] No response, killing plugin [Main] Cancelled
Surely in some places the errors are not sufficiently controlled, so check to see if you need to catch any other exceptions.
You can add your logic on top of this blank. For example, it is possible, like a pool of threads, to create a pool of plug-ins, and deliver tasks to the currently free plug-in.