The algorithm parallelizes the computation of the integral over the network and separately on clients by threads. I start the server and two clients for the test. One client fulfills, and the second just hangs. He connected to connectedClients, you can see it, but it’s not in workingClients. And on the server instead of the result at the end I get "The queue is empty.". This is despite the fact that I have everywhere where Dequeue is done there are checks on the void queue. But if I run a purely single client, then it works fine and the result on the server is displayed. Help to understand please! Here is the link to the repository with the code: https://github.com/DarkByte2015/lab3 .

Well, at the request of the workers, the code in the topic (but I still did not understand how to insert it here, if anyone has moderator authority, correct it)

TaskService.cs

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.ServiceModel; using System.Timers; namespace lab3_Server { /// <summary> /// Описывает контракт сервиса. /// </summary> [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Reentrant, IncludeExceptionDetailInFaults = true)] public class TaskService : ITaskService { /// <summary> /// Список контрактов обратного вызова всех подключенных клиентов. /// </summary> private List<IClientCallback> connectedClients = new List<IClientCallback>(); /// <summary> /// Список представлений работающих клиентов. /// </summary> private List<ClientView> workingClients = new List<ClientView>(); /// <summary> /// Очередь еще не выполненных задач. /// </summary> private Queue<ClientTask> tasks = new Queue<ClientTask>(); /// <summary> /// Список результатов. /// </summary> private List<double> results = new List<double>(); /// <summary> /// Вызывается при присоединении клиента к серверу. /// </summary> public void Connect() { /// При присоединении нового клиента - /// добавляем его в список подключенных клиентов. var callback = OperationContext.Current.GetCallbackChannel<IClientCallback>(); connectedClients.Add(callback); /// Если в очереди есть невыполненные задачи - /// берем задачу на выполнение. if (tasks.Count > 0) AddWorkingClient(callback); } /// <summary> /// Решает заданную пользователем задачу. /// </summary> /// <param name="bottom"></param> /// <param name="top"></param> /// <param name="step"></param> /// <returns></returns> public double Solve(double bottom, double top, double step) { /// Формируем очередь задач. const double taskStep = 1.0; for (var x = bottom; x < top; x += taskStep + step) { var task = new ClientTask(x, x + taskStep, step); tasks.Enqueue(task); } var tasksCount = tasks.Count; /// Рассылаем задачи подключенным клиентам. for (var i = 0; i < connectedClients.Count; i++) AddWorkingClient(connectedClients[i]); /// Ждем в цикле пока количество результатов в списке /// не станет равно изначальному количеству задач, /// после чего возвращаем их сумму. while (results.Count != tasksCount) ; return results.Sum(); } /// <summary> /// Добавляет работающего клиента с задачей из очереди. /// </summary> /// <param name="callback"></param> private void AddWorkingClient(IClientCallback callback) { var client = new ClientView(callback, tasks.Dequeue()); client.Calculated += Client_OnCalculated; client.Disconnected += Client_OnDisconnected; workingClients.Add(client); client.Calculate(); } /// <summary> /// Вызывается после подсчета клиентом результата. /// </summary> /// <param name="sender"></param> /// <param name="result"></param> private void Client_OnCalculated(ClientView sender, double result) { /// Добавляем результат в список. results.Add(result); /// Если в очереди еще есть задачи - /// берем задачу на выполнение, /// иначе удаляем клиента из работающих. if (tasks.Count > 0) { sender.Task = tasks.Dequeue(); sender.Calculate(); } else workingClients.Remove(sender); } /// <summary> /// Вызывается при отключении клиента. /// </summary> /// <param name="sender"></param> private void Client_OnDisconnected(ClientView sender, Exception error) { /// Возвращаем задачу в очередь и /// удаляем клиента из обоих списков. tasks.Enqueue(sender.Task); workingClients.Remove(sender); connectedClients.Remove(sender.Callback); Console.WriteLine(error.Message); } /// <summary> /// Вызывается при отключении клиента. /// </summary> /// <param name="result"></param> void ITaskService.SetResult(double result) { /// Переадресовываем результат клиенту. var callback = OperationContext.Current.GetCallbackChannel<IClientCallback>(); var client = workingClients.Find(c => c.Callback == callback); client.OnSetResult(result); } } } 

ClientView.cs

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Timers; namespace lab3_Server { /// <summary> /// Описывает представление клиента. /// </summary> class ClientView { /// <summary> /// Представляет метод, который будет обрабатывать событие Calculated. /// </summary> /// <param name="sender"></param> /// <param name="result"></param> public delegate void CalculatedHandler(ClientView sender, double result); /// <summary> /// Представляет метод, который будет обрабатывать событие Disconnected. /// </summary> /// <param name="sender"></param> public delegate void DisconnectedHandler(ClientView sender, Exception error); /// <summary> /// Происходит при получении результата. /// </summary> public event CalculatedHandler Calculated; /// <summary> /// Происходит при отключении клиента. /// </summary> public event DisconnectedHandler Disconnected; /// <summary> /// Контракт обратного вызова клиента. /// </summary> public IClientCallback Callback { get; } /// <summary> /// Задание, которое клиент выполняет в данный момент. /// </summary> public ClientTask Task { get; set; } /// <summary> /// Таймер, определяющий задержку возврата результата. /// </summary> private Timer timer = new Timer(5000) { AutoReset = false }; /// <summary> /// Определяет был ли возвращен результат. /// </summary> private bool returned = false; public ClientView(IClientCallback callback, ClientTask task) { Callback = callback; Task = task; timer.Elapsed += Timer_Elapsed; } /// <summary> /// Выдает задание. /// </summary> public void Calculate() { returned = false; timer.Enabled = true; try { Callback.OnGiveTask(Task); //Console.WriteLine("Give task success!"); } /// Если произошла ошибка передачи задания - значит клиент отключился. catch (Exception e) { Disconnected?.Invoke(this, e); } } /// <summary> /// Вызывается при получении результата. /// </summary> /// <param name="result"></param> public void OnSetResult(double result) { returned = true; timer.Enabled = false; Calculated?.Invoke(this, result); } /// <summary> /// Обработчик таймера. /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Timer_Elapsed(object sender, ElapsedEventArgs e) { /// Если результат не вернули за истекшее время - /// считаем что клиент отключился. if (!returned) Disconnected?.Invoke(this, new TimeoutException("The client has not returned a result within a set time.")); } } } 

Program.cs client without integration code

 class Program { /// <summary> /// Прокси-сервис. /// </summary> static TaskServiceClient proxy = null; /// <summary> /// Список результатов. /// </summary> static ConcurrentBag<double> results = null; static void Main(string[] args) { try { /// Создаем калбэк, подключаем обработчики, создаем и подключаем прокси. var callback = new ClientCallback(); callback.GiveTask += Callback_OnGiveTask; var ctx = new InstanceContext(callback); proxy = new TaskServiceClient(ctx); proxy.Connect(); Console.WriteLine("Connected!"); Console.ReadKey(); } catch (Exception e) { Console.Write(e.Message); Console.ReadKey(); } } /// <summary> /// Вызывается при получении задания. /// </summary> /// <param name="task"></param> private static void Callback_OnGiveTask(ClientTask task) { /// Распределяем задачу по потокам и /// дождавшись их завершения выводим результат. try { results = new ConcurrentBag<double>(); var threadCount = Environment.ProcessorCount; var threadStep = (task.Top - task.Bottom) / threadCount; var threads = new List<Thread>(); var x = task.Bottom; for (var i = 0; i < threadCount; i++, x += threadStep + task.Step) { var threadTask = new ClientTask() { Bottom = x, Top = x + threadStep, Step = task.Step }; var thread = new Thread(ThreadHandler); threads.Add(thread); thread.Start(threadTask); } threads.ForEach(t => t.Join()); var result = results.Sum(); Console.WriteLine("bottom: {0} top: {1} step: {2} result: {3}", task.Bottom, task.Top, task.Step, result); proxy.SetResult(result); } catch (Exception e) { Console.WriteLine(e.Message); } } } 

This is in my opinion the most important.

  • one
    Please remove from the code the part that is not related to the problem itself (the code for calculating the integral) and give the basis of the code directly in the body of the question. - PashaPash
  • But you yourself understand the main part of the service - this is just the service itself. The solution of the integral there includes only 1-2 functions in the Program.cs client. Everything else is contracts. If I post it all here, the topic will swell across several pages. : D On githaba everything is quite convenient to read. - PECHAPTER
  • one
    It's easy to read on a githaba, but the day after tomorrow you close your repository, and the question will turn into a dummy. Therefore, the rules require a minimal reproducible example right in the body of the question. Contracts are optional. - PashaPash

1 answer 1

Call OnGiveTask(); you have - synchronous. This blocks the second client from receiving the task for a while until the first one is completed. And, in general, it causes unpredictable dedlocks when trying to issue a task at the moment of client connection.

Bring the customer to the background thread:

 System.Threading.Tasks.Task.Run(() => Callback.OnGiveTask(Task) ); 
  • replace the used collections with thread-safe ones and it will work.
  • one
    It seems that WCF already supports async interfaces. - VladD
  • Comments are not intended for extended discussion; conversation moved to chat . - PashaPash