Hello. Faced with the problem of ordering tasks in Celery.

  1. I start the RabbitMQ server
  2. I launch Celery (app = Celery ('celery_tasks', broker = 'amqp: // guest: guest @ localhost: 5672 //', backend = 'amqp'))
  3. I run the script where the functions test.delay (4, 1) or test.apply_async ((4, 4)) are called

In the Celery console: Celery received and deleted unknown message. Wrong destination and further message body. Content type: JSON and others Console screenshot I do not know what else to try. How to get an answer? Because the script is loaded until it is cut down.
Celery ad file and test function. If you perform this function from another file, an error appears that is higher in the screenshot.

# -*- coding: utf-8 -*- from celery import Celery app = Celery('celery_tasks', broker='amqp://guest:guest@localhost:5672//', backend='rpc://') @app.task def test(): return 'Success' 

And this is the file to run the script:

 # -*- coding: utf-8 -*- from celery_tasks import * class Tasks: def __init__(self): result = self.test.apply_async((self,), serializer='JSON') print result, type(result), result.ready() @app.task def test(self): return 'Successful' if __name__ == '__main__': Tasks() 

After executing this script an error appears. kombu.exceptions.SerializerNotInstalled: No encoder installed for JSON

If we take a function outside the class, then again the same error with the removal of an unknown message. As I understand the message is the one that I send when I run the function. Because the task queues are empty. Yes, and the message can be seen (in the picture).

 # -*- coding: utf-8 -*- from celery_tasks import * @app.task def test(): return 'Successful' class Tasks: def __init__(self): result = test.delay() print result, type(result), result.ready() if __name__ == '__main__': Tasks() 

Tried to change the serialization to yaml, msgpack, pickle but without result

  • Try a minimal but self-sufficient example of code to create that demonstrates the problem: the minimum reproducible example is jfs
  • This is an example. The function that I run simply returns a string. Or not just a string, but I tried all the values. The script hangs when I want to view the result. result.get (). If just print result, then it displays the job id. - Rustam Yanbaev
  • I do not see the complete code sample in question. You can simply follow the instructions on the link. - jfs
  • app = Celery ('celery_tasks', broker = 'amqp: // guest: guest @ localhost: 5672 //', backend = 'amqp') is the Celery ad file. @ app.task def test (): print 'Successful' is a function. Then you call the function: test.delay () or test.apply_async () - Rustam Yanbaev
  • Transferred the job function to a separate file and an error occurs: kombu.exceptions.SerializerNotInstalled: No encoder installed for JSON. Before, I simply imported the function from the file where I announced Celery - Rustam Yanbaev

1 answer 1

Solved a problem.
Changed the interpreter version from 2.7.6 to 3.4.3
The truth after that was to change the code a bit, namely: transferred the test function to the announcement file

celery_tasks:

 # -*- coding: utf-8 -*- from celery import Celery app = Celery('celery_tasks', broker='amqp://guest:guest@localhost:5672//', backend='rpc://') @app.task def test(): print('Start') return 'Success' 

And this is how the script startup file looks like:

 # -*- coding: utf-8 -*- from celery_tasks import * class Tasks: def __init__(self): result = test.apply_async(serializer='json') print(result, type(result), result.ready()) print(result.get(timeout=5)) if __name__ == '__main__': Tasks() 

Why did I transfer? Transferred because there was an error of unregistered tasks. The test () function was not registered with Celery (). And do not forget to put a timeout and then the error will be. Even to return the value takes time.