In my program I use the library Nito.AsyncEx ( https://github.com/StephenCleary/AsyncEx ). In my code, I use the AsyncLock class to synchronize actions. There was a need to find out how many active locks there are at the moment, but this class does not have such a property.

In my program, I create several instances of the Worker class and add them to the collection. Further, the program is instructed to perform a certain action by the Worker, with the least number of locks in the queue. Sample Worker code:

public class Worker { public int Id { get; private set; } public int someProperty1 { get; private set; } public int someProperty2 { get; private set; } AsyncLock _lock = new AsyncLock(); public async Task ActionFirst() { using(await _lock.LockAsync()) { // code } } public async Task ActionSecond() { using(await _lock.LockAsync()) { // code } } public void ParallelActionFirst() { //code } public void ParallelActionSecond() { //code } } 

Workers can perform quite a lot of functions (10+) in parallel, but some of them must be executed sequentially (ActionFirst, ActionSecond). Some of these sets of actions must be performed sequentially:

ActionFirst -> ActionSecond ActionFirst -> ActionFirst ActionSecond -> ActionFirst ActionSecond -> ActionSecond

Workers have an ID and several other properties. Some tasks provide for the selection of a specific worker, but in the case when a specific worker is NOT needed (or most of the workers are suitable for the task) and the ActionFirst or ActionSecond action must be performed, the minimum loaded should be selected.

Is there a solution to this problem using this library or standard means out of the box C #?

    3 answers 3

    I have the feeling that your real task is called Producer-Consumer Problem

    As I understand it, the pending locks in the workers form something like an incoming queue - and you need to count them for balancing.

    If so, then the correct solution would be to use a common queue of free workers:

     BufferBlock<Worker> workers = new BufferBlock<Worker>(); var worker = await workers.ReceiveAsync(); try { await worker.ActionFirst(); } finally { workers.Post(worker); } 

    Here I used BufferBlock from the Tpl Dataflow library - but you can use other implementations of asynchronous queues. For example, AsyncCollection from the AsyncCollections package

    • In my program, in some teams, workers are selected according to a certain condition (not only by the minimum number of locks), but from BufferBlock, as far as I know, you can only get the first free one. - Dima Gvozdev
    • @DimaGvozdev in this case, you need a more complex implementation of the queue, but the general meaning remains - the queue should be the same outside the worker, and not inside it. - Pavel Mayorov
    • @DimaGvozdev If you write down the logic of choosing a worker in more detail, I can help with the selection of the algorithm. - Pavel Mayorov
    • Added description. - Dima Gvozdev

    I would try the obvious for a start:

     public class Worker { AsyncLock _lock = new AsyncLock(); object _countLock = new object(); int _lockWaitCount = 0; public int GetLockWaitCount() { lock (_countLock) return _lockWaitCount; } public async Task ActionFirst() { lock (_countLock) _lockWaitCount++; using(await _lock.LockAsync()) { lock (_countLock) _lockWaitCount--; // code } } public async Task ActionSecond() { lock (_countLock) _lockWaitCount++; using(await _lock.LockAsync()) { lock (_countLock) _lockWaitCount--; // code } } } 

    and choose Worker 's with the smallest GetLockWaitCount() . All this can be wrapped in the class AsyncLockWithCount , so as not to repeat the code:

     class AsyncLockWithCount { AsyncLock _lock = new AsyncLock(); object _countLock = new object(); int _lockWaitCount = 0; public int GetLockWaitCount() { lock (_countLock) return _lockWaitCount; } async Task<IDisposable> LockAsync() { lock (_countLock) _lockWaitCount++; var result = await _lock.LockAsync(); lock (_countLock) _lockWaitCount--; return result; // ну или try/finally, если вам больше нравится } } 

    For the case when you need to “release” the counter only at the end of the work, you can use this modification:

     class AsyncLockWithCount { AsyncLock _lock = new AsyncLock(); object _countLock = new object(); int _lockWaitCount = 0; public int GetLockWaitCount() { lock (_countLock) return _lockWaitCount; } async Task<IDisposable> LockAsync() { lock (_countLock) _lockWaitCount++; var innerDisposable = await _lock.LockAsync(); return new ActionDisposable(() => { innerDisposable.Dispose(); lock (_countLock) _lockWaitCount--; }); } } class ActionDisposable : IDisposable { Action _a; public ActionDisposable(Action a) { _a = a; } public void Dispose() => _a(); } 

    Instead of ActionDisposable you can use Disposable.Create from Reactive Extensions .

    • In this case, the counter will decrease immediately after unlocking, and not after the completion of the action. - Dima Gvozdev
    • @DimaGvozdev: Well, yes, but do you need a waiting counter? This is it. - VladD
    • If you need to reduce the counter after the end of the work, reduce it not before // code , but after. - VladD
    • Yes, you need to reduce the counter after // code , but then you will not be able to use the class AsyncLockWithCount - Dima Gvozdev
    • @DimaGvozdev: Well, this is also easy to rewrite. Moment. - VladD

    Based on this article ( http://sanjeev.dwivedi.net/?p=292 ) and the source code of the Nito.AsyncEx library, I wrote this class here:

     public class AsyncLockWithCount { private readonly Task<Releaser> _cachedTask; private readonly Queue<TaskCompletionSource<bool>> m_waiters = new Queue<TaskCompletionSource<bool>>(); private int m_currentCount = 1; private readonly object _mutex; public int Count { get { lock(_mutex) { return m_currentCount == 1 ? 0 : (m_waiters.Count + 1); } } } public AsyncLockWithCount() { _mutex = new object(); _cachedTask = Task.FromResult(new Releaser(this)); } public Task<Releaser> LockAsync() { lock (_mutex) { if (m_currentCount > 0) { --m_currentCount; return _cachedTask; } else { TaskCompletionSource<bool> waiter = new TaskCompletionSource<bool>(); m_waiters.Enqueue(waiter); return waiter.Task.ContinueWith((_, state) => new Releaser((AsyncLockWithCount)state), this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } } } internal void Release() { TaskCompletionSource<bool> toRelease = null; lock (_mutex) { if (m_waiters.Count > 0) toRelease = m_waiters.Dequeue(); else ++m_currentCount; } if (toRelease != null) toRelease.SetResult(true); } public struct Releaser : IDisposable { private readonly AsyncLockWithCount m_toRelease; internal Releaser(AsyncLockWithCount toRelease) { m_toRelease = toRelease; } public void Dispose() { if (m_toRelease != null) m_toRelease.Release(); } } } 

    The Count property returns the number of current locks. Initially 1 place in the queue is available, it means no one has used the lock, and if the lock is used (m_currentCount! = 1), then it returns - the queue size is + 1. +1 because the very first lock does not fall into the queue, and it must be taken into account.