There is a database and a very large table. It is necessary to read each entry in the table only once in blocks on several computers at the same time for subsequent long-term processing. The first thought was: to enter an additional StatusCode field, which would show that this record was already processed, but if one computer took one block of data for processing but did not save the changes yet, then the other computer reads the same data set and processes his second time, which is very bad.

For clarity, I wrote a small program in C #, although, I think, who does not know him, he will understand in principle

static void Main() { Console.WriteLine(Chek()); List<Task> tl = new List<Task>(); for (int i = 0; i < 20; i++) { tl.Add(Task.Factory.StartNew(Fun, TaskCreationOptions.LongRunning)); } Console.WriteLine("wait"); Task.WaitAll(tl.ToArray()); Console.WriteLine(Chek()); Console.WriteLine("removedCount: " + Reset()); } static void Fun() { using (ApplicationContext db = new ApplicationContext()) using (var transaction = db.Database.BeginTransaction()) { try { var emp = db.Employees.Where(i => i.StatusCode == 0).Take(10).ToList(); foreach (var item in emp) { //выполняем что-то с каждым элементом, что должны выполнить ОДИН раз item.StatusCode++; } db.SaveChanges(); transaction.Commit(); } catch (Exception ex) { if (ex.InnerException != null) Console.WriteLine(ex.InnerException.Message); else Console.WriteLine(ex.Message); transaction.Rollback(); } } } static int Reset() { using (ApplicationContext db = new ApplicationContext()) { return db.Database.ExecuteSqlCommand("UPDATE Employees SET StatusCode=0 WHERE StatusCode>0"); } } static int Chek() { using (ApplicationContext db = new ApplicationContext()) { return db.Employees.Where(i => i.StatusCode > 0).Count(); } } 

And the result

60 out of 200

Expectedly, there should be 200 (20 streams change blocks of 10)

How to solve this problem?

  • Start by understanding that the recording / block that one computer took for processing in your scheme is NOT different from the record / block that nobody took for processing. Here, even genetic algorithms save, so to speak of stupid code ... so without the mark "taken for processing" - well, nowhere. - Akina
  • Number of computers fixed? Then, if they are 3, then all records are calculated at 1-2-3. One takes the first record, the second second, third third. - Sergey
  • @Sergey, not fixed - Qutrix
  • Then use select for update queries. The second computer performing such a cast for the same block will receive figvam, because the records are already locked in another transaction. then you just have to go to the next block. Etc. Sooner or later it will reach the free (not blocked) block. Like that. Specify the topic of blocking and isolation of transactions in Google. At a minimum there should be an option "do not wait for locking release". For example, in postgres select тоси-боси for update nowait - Sergey
  • one
    In general, we must do in a modern way. One process reads records (one by one or by bolka) and sends a message to the message queue (messaging service). Handlers read these messages from the queue. The one who first pulled out the message is the one that handles the records associated with it. In the message you can immediately pack or write (so as not to extract from the database a second time) or their id-shniki. Handlers can connect / disconnect dynamically in any quantity. Windows has a built-in messaging service. Easy to program. By the way, I used it on c #. - Sergey

3 answers 3

It is necessary to enter not the field for the status "processed", but the field for the status "taken for processing". Accordingly, the algorithm: the client tries to take a record for processing - i.e. stupid in the status field of the record to specify your ID. Normal UPDATE, with the condition of the selection and Status Is Null. It then checks if the attempt is successful or not with a normal SELECT. If the attempt is successful - process. If someone managed to get in and reserve the same record for himself (that is, the ID field of the other client turned out to be in the Status field) - well, no luck, try to take another record for processing. In total such type:

 -- таблица CREATE TABLE data ( fieldset, Status DEFAULT NULL ); -- цикл резервирования записи на обработку do UPDATE data SET Status = 'My ID' WHERE Status Is Null Limit 1; SELECT ID from DATA WHERE Status = 'My ID'; loop until recordset.recordscount=1 -- обработка зарезервированной записи process recordset.fields("id") -- пометка, что запись обработана UPDATE data SET Status = 0 WHERE ID = recordset.fields("id") 

Yes, the system is not very - there is no provision for a case when the record is taken, but not processed (for example, the client has fallen off). In an amicable way, we still need the time stamp field when the record is taken (and the time after which it is considered free for reservation).

A kind of "manual transaction emulation" - if you do not want to use regular mechanisms.

  • if you do not want to use regular mechanisms - why do not you want? I just don't know about them, apparently - Qutrix
  • I mean the regular mechanism for blocking the selected record - i.e. a way to ensure that between selecting a record and marking it as taken into processing no one will fit in with the same operation. - Akina
  • here is possible in more detail about this regular mechanism? - Qutrix
  • The standard mechanism is a transaction. It may be implicit (i.e. the choice is packed in one request - UPDATE ... WHERE Status IS NULL) or explicit. In some dialects, there are additional mechanisms for this kind of blocking (for example, SELECT .. FOR UPDATE in the case of MySQL). - Akina
  • UPDATE ... WHERE Status IS NULL - this is in your example, does this mean that you are using a regular mechanism? - Qutrix

The first thought was to introduce an additional field StatusCode, which would indicate that this entry was already processed

Enter two values.

  1. Record is being processed
  2. Write processed

How to do this in a competitive environment - you need to look at a specific database. For MySQL it will be like this

 SELECT id FROM my_table WHERE ... FOR UPDATE; UPDATE my_table SET status = process WHERE id = :id 

For an arbitrary database, you can come up with such a query.

 UPDATE my_table SET status = guid WHERE ...; SELECT * FROM my_table WHERE status = guid; 

At the same time, the application must correctly handle the error Lock conflict

Plus, you can add work on some groups. Let's say if there is some incremental id field (number, or date), then each of the n threads can work with groups

 var int len = (max(id) - min(id)) / n; min(id) + len * i..min(id) + len * (i + 1) 

where i is the stream number

  • And as a result, everything will work slower than from one computer - avp
  • the problem has not disappeared anywhere: while I read and send an update request in one stream, another thread can take the same id, process and send the same update. Work in groups is if there is one computer, I have several computers and everyone can handle an arbitrary number - Qutrix
  • one
    @Qutrix can not. When creating a stream, it immediately receives a certain interval and it only works with this interval - Anton Shchyrov
  • @Qutrix If it’s impossible to issue an interval, then work only by status - Anton Shchyrov
  • @AntonShchyrov, but after all, I wrote above why the statuses won't work either ( - Qutrix

Very conditional example. Everything must be clarified, studied (for a long time did not take checkers)

Dispatcher works only on one computer:

 ... var queue = MessageQueue.Create("emps"); var emps = db.Employees.ToList(); foreach (var emp in emps) { queue.send(emp); } ... 

There can be as many handlers as possible:

 ... var queue = new MessageQueue("emps"); while (true) { Message message = queue.Receive(); Employe emp = message.getObject(); transaction.begin(); db.attach(emp); // что-то делаем с emp db.SaveChanges(); transaction.Commit(); } ... 

There is a processing of one entry. But similarly, it is possible to make sending out blocks of several (or even many) entries.

On the computer (at least one) the message queue service should work. Programs work with the queue and base, but not with each other.

There are materials on the message queues on the Internet, but after prescription I don’t remember anything qualitative. That the first came handy https://professorweb.ru/my/csharp/web/level9/9_1.php . I do not know how complete this guide is, but if that, as already said, there is material on the net. If interested, look for yourself.

I will not say for msmq, but such services have such a useful function.
Sending and receiving messages can be made into a transaction. If the record in the database failed, then you can cancel the transaction of the queue message. Messaga will return to the queue and thus it will be possible to try processing again later without any additional gestures.