Hello. Faced with the problem of ordering tasks in Celery.
- I start the RabbitMQ server
- I launch Celery (app = Celery ('celery_tasks', broker = 'amqp: // guest: guest @ localhost: 5672 //', backend = 'amqp'))
- I run the script where the functions test.delay (4, 1) or test.apply_async ((4, 4)) are called
In the Celery console: Celery received and deleted unknown message. Wrong destination and further message body. Content type: JSON and others
I do not know what else to try. How to get an answer? Because the script is loaded until it is cut down.
Celery ad file and test function. If you perform this function from another file, an error appears that is higher in the screenshot.
# -*- coding: utf-8 -*- from celery import Celery app = Celery('celery_tasks', broker='amqp://guest:guest@localhost:5672//', backend='rpc://') @app.task def test(): return 'Success' And this is the file to run the script:
# -*- coding: utf-8 -*- from celery_tasks import * class Tasks: def __init__(self): result = self.test.apply_async((self,), serializer='JSON') print result, type(result), result.ready() @app.task def test(self): return 'Successful' if __name__ == '__main__': Tasks() After executing this script an error appears. kombu.exceptions.SerializerNotInstalled: No encoder installed for JSON
If we take a function outside the class, then again the same error with the removal of an unknown message. As I understand the message is the one that I send when I run the function. Because the task queues are empty. Yes, and the message can be seen (in the picture).
# -*- coding: utf-8 -*- from celery_tasks import * @app.task def test(): return 'Successful' class Tasks: def __init__(self): result = test.delay() print result, type(result), result.ready() if __name__ == '__main__': Tasks() Tried to change the serialization to yaml, msgpack, pickle but without result