How in RabbitMQ in python to make sending messages to the queue so that it came to the queue only after a certain time, for example, 4 hours?

3 answers 3

RabbitMQ does not support such a "native" function.

The proposal above is problematic because it requires a database and constant processing of messages. Such a solution is much more "risky." Not to mention the fact that just an additional processing tool + database is simply no good.

But the function can be emulated. In this case, the message will be sent and will be on the server until the time to deliver the message. Next version of NServiceBus. RabbitMQ Transport will support Native Delays with Topic Exchanges.

Implementation does not depend on the language of the sending application.

The idea is to create header exchanges where each level represents a bit from the delay. If bit 0, exchange sends the message to the next level. If bit 1, then in the queue with the TTL corresponding to the delay at this level. Each queue with DLQ binding to the next exchange.

Schematically, it will look like this:

enter image description here

Note: can also be implemented with Header Exchanges

With a 32-bit delay (32 levels of exchanges), the delay is 1 to 2 ^ 31 seconds.

  • Instead of such exercises, I would simply select in turn for each possible amount of delay - usually there should not be too many of them. - Pavel Mayorov
  • How to say. The opening can sometimes be quite serious. Our clientele does not work with queues directly, so this option suits them more. In addition, administrators like the standard in implementation. There is no companion to the taste and color 😃 - Sean Feldman

The message will be in the queue as soon as you send it there ..

Something you want to wise. Then add the date to the message and, when reading from the queue, if 4 hours has not passed - send a task clone to the end of the queue, complete the current one.

Or, write to some intermediate table in the database. Read it constantly (for example, crown). When messages with the desired date are removed, - create a task in the queue and delete the line from the database :)

    There is an excellent plugin that allows this to be implemented - Scheduling Messages with RabbitMQ .

    Usage Example (Java)

    Define the exchange point:

     // ... Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... 

    Add a couple of messages:

     byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes); byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8"); Map<String, Object> headers2 = new HashMap<String, Object>(); headers2.put("x-delay", 1000); AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2); channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2); 

    The delay is indicated in the message header (x-delay) in seconds, respectively.

    More examples and information can be found on the page of the project itself - Scheduling Messages with RabbitMQ .