Hello, using AMQ to send messages to the topic ( topic ) and queue ( queue ), the task was to delete (cancel) a specific message sent to the queue or topic. Well, additionally, completely remove the queue.

Working with AMQ is organized through the STOMP protocol of the stompy library, but it does not have suitable functions.

Tell me which libraries to use or the solution itself. Thank.

  • The protocol is no longer important, learned to read the queue and extract all non-ack-nested messages. The necessary command to delete even in using the REST protocol. - Arseny

1 answer 1

How to delete a separate message is clear in theory (according to the results of Wireshark traffic analysis, working in the browser in the ActiveMQ administration section), but it hasn’t been done programmatically yet.

In theory, you need to call the function (spied on the package that sends from the AMQ admin panel for deletion) deleteMessage() in AMQ with parameters [id,secret] , where

  • id - the unique name of the message in the queue / topic
  • secret - a unique number (such as a token) that changes each time the AMQ admin panel is updated. enter image description here enter image description here

Example test queueu:

  • id: ID: ####### NAME_SERVER ###### - 44458-1485427798954-6: 1: 1: 1: 1
  • secret: 1dbd2916-337a-48cc-bce7-63b00d38ba3

To date, the solution to the problem has come down to confirmation (ack) instead of removing the message from the queue.

Here is the code of a simple client who can view the queue and confirm the desired message, thereby removing it from the queue:

 from stompy import stomp import json s = stomp.Stomp(amq_ip, amq_port) try: s.connect(username=amq_user, password=amq_pass) s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'}) except Exception as e: print "ActiveMQ error\n %s" % e while True: try: frame = s.receive_frame() body = json.loads(frame.body) # это сообщение для меня? if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg": print "Its for me. I receive it" # Это сообщение для меня. Я его приму и обработаю s.ack(frame) else: # Это сообщение предназначено для кого-то другого и мне не подходит print "Its not for me" except Exception as e: print e 

In addition, there is a code that I tried to use for deletion:

 # -*- coding: utf-8 -*- import activemq_api import urllib3 import json # Connection to ActiveMQ BROKER_NAME = "localhost" AMQ_API_PORT = 8161 AMQ_API_USER = "admin" AMQ_API_PASS = "admin" AMQ_API_POSTFIX = "/api/jolokia" AMQ_TASK_QUEUE_NAME = "test" BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS) AMQ_STATUS_QUEUE = "/queue/test" LOGIN_EXEMPT_URLS = [ r'admin/' ] LOGIN_URL = 'url_login' LOGOUT_REDIRECT_URL = 'url_login' if __name__ == '__main__': user_agent = "curl/7.49.1" headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent) addition = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "*/*" } try: headers.update(addition) connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX) manager = activemq_api.AMQManager(connect) except Exception as e: print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__) else: print(u'Соединение успешно установлено') try: id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1" secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw" print(manager.removeMsgQueue("test", id)) except Exception as inst: print inst #!/usr/bin/python2 # -*- coding: utf-8 -*- import urllib3 import json class Connection: def __init__(self, amq_ip, amq_port, broker, header, postfix): self.BROKER_NAME = broker self.AMQ_IP = amq_ip self.AMQ_PORT = amq_port self.HEADERS = header self.POSTFIX = postfix class AMQManager(): def __init__(self, conn): self.QUEUES = {} self.QUEUES_COUNT = None self.HEAP_MEMORY_USED = None self.MEMORY_PERSENT_USED = None self.CONNECTION = conn self.update() def rmQueue(self, queue_names): REUQEST = { "type": "exec", "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, "operation": "removeQueue(java.lang.String)", "arguments": [queue_names] } return json.dumps(REUQEST) def queueList(self): REUQEST = { "type": "read", "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, "attribute":"Queues" } return json.dumps(REUQEST) def browseQueueSubscribers(self): REUQEST = { "type": "read", "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, "attribute": "QueueSubscribers" } return json.dumps(REUQEST) def memoryPersentUsed(self): REUQEST = { "type": "read", "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, "attribute": "MemoryPercentUsage" } return json.dumps(REUQEST) def heapMemoryUsed(self): REUQEST = { "type": "read", "mbean": "java.lang:type=Memory", "attribute":"HeapMemoryUsage", "path":"used" } return json.dumps(REUQEST) def request(self, name, param): http = urllib3.PoolManager() body = '' if name == "removeQueue": body = self.rmQueue(param["QUEUE_NAME"]) elif name == "queueList": body = self.queueList() elif name == "browseQueueSubscribers": body = self.browseQueueSubscribers() elif name == "memoryPersentUsed": body = self.memoryPersentUsed() elif name == "heapMemoryUsed": body = self.heapMemoryUsed() url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX) r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body) return r.data def updateQueues(self): res = json.loads(self.request("queueList", {})) # print res data = [] for queue in res["value"]: object = {} queue["objectName"] = queue["objectName"].split(":")[1] for key in queue["objectName"].split(","): object.update({key.split("=")[0]: key.split("=")[1]}) data.append(object) self.QUEUES_COUNT = 0 self.QUEUES = {} # print data for queue in data: self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)}) self.QUEUES_COUNT += 1 def updateHeapMem(self): self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"] def updatePersMem(self): self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"] Ars, [26.01.17 14:06] ## EXPORTABLE def update(self): self.updateQueues() self.updateHeapMem() self.updatePersMem() ## EXPORTABLE def getQueues(self): self.updateQueues() data = [] for queue in self.QUEUES: data.append(self.QUEUES[queue].getInfo()) return { "queues_count": self.QUEUES_COUNT, "queues": data } ## EXPORTABLE def getQueueInfo(self, name): return self.QUEUES[name].getInfo() ## EXPORTABLE def browseQueue(self, name): return self.QUEUES[name].browse() ## EXPORTABLE def getMessage(self, name, msg_id): return self.QUEUES[name].message(msg_id) def getAllQueueMessages(self, name): return self.QUEUES[name].messages() ## EXPORTABLE def removeQueue(self, name): param = { "QUEUE_NAME": name } return json.loads(self.request("removeQueue", param)) ## EXPORTABLE def clearQueue(self, name): return self.QUEUES[name].clear() # ARS def removeMsgQueue(self,nameQueue, id): return self.QUEUES[nameQueue].delete_msg(id) class Queue(): def __init__(self, q_name, conn): # научите обращаться к атрибутам суперкласса! self.MESSAGES = [] self.QUEUE_NAME = q_name self.ENQUEUE_COUNT = None self.DEQUEUE_COUNT = None self.CONSUMER_COUNT = None self.QUEUE_SIZE = None self.CONNECTION = conn self.updateEnCount() self.updateDeCount() self.updateCoCount() self.updateQuSize() def queueEnqueueCount(self): # MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"'] REUQEST = { "type": "read", "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), "attribute": "EnqueueCount" } return json.dumps(REUQEST) def queueDequeueCount(self): REUQEST = { "type": "read", "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), "attribute": "DequeueCount" } return json.dumps(REUQEST) def queueConsumerCount(self): REUQEST = { "type": "read", "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), "attribute": "ConsumerCount" } return json.dumps(REUQEST) def queueSize(self): REUQEST = { "type": "read", "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), "attribute": "QueueSize" } return json.dumps(REUQEST) def browseMessages(self): REUQEST = { "type": "exec", "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), "operation": "browse()", # "arguments": [""] } return json.dumps(REUQEST) Ars, [26.01.17 14:06] def purge(self): REUQEST = { "type": "exec", "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), "operation": "purge()" } return json.dumps(REUQEST) #ARS def deleteMsg(self, ID): REUQEST = { "type": "exec", "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), "operation": "deleteMessage()", "arguments": [ID, "11111111-1111-1111-1111-111111111111"] } return json.dumps(REUQEST) def request(self, name, param): http = urllib3.PoolManager() if name == "queueEnqueueCount": body = self.queueEnqueueCount() elif name == "queueDequeueCount": body = self.queueDequeueCount() elif name == "queueConsumerCount": body = self.queueConsumerCount() elif name == "queueSize": body = self.queueSize() elif name == "browseMessages": body = self.browseMessages() elif name == "purge": body = self.purge() elif name == "delete_msg": body = self.deleteMsg(param) url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX) r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body) return r.data def updateEnCount(self): try: self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"] except Exception as inst: self.ENQUEUE_COUNT = -1 def updateDeCount(self): try: self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"] except Exception as inst: self.ENQUEUE_COUNT = -1 def updateCoCount(self): try: self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"] except Exception as inst: self.ENQUEUE_COUNT = -1 def updateQuSize(self): try: self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"] except Exception as inst: self.ENQUEUE_COUNT = -1 def updateMessages(self): self.MESSAGES = [] res = json.loads(self.request("browseMessages", {}))["value"] for msg in res: data = { "id": msg["JMSMessageID"], "data": msg["Text"], "timestamp": msg["JMSTimestamp"], "priority": msg["JMSPriority"] } self.MESSAGES.append(data) def update(self): self.updateEnCount() self.updateDeCount() self.updateCoCount() self.updateQuSize() self.updateMessages() def getInfo(self): self.updateEnCount() self.updateDeCount() self.updateCoCount() self.updateQuSize() return { "queue_name": self.QUEUE_NAME, "enqueue_count": self.ENQUEUE_COUNT, "dequeue_count": self.DEQUEUE_COUNT, "consumer_count": self.CONSUMER_COUNT, "queue_size": self.QUEUE_SIZE } def browse(self): self.updateMessages() data = [] for msg in self.MESSAGES: chunk = { "id": msg["id"], "timestamp": msg["timestamp"], "priority": msg["priority"] } data.append(chunk) return data Ars, [26.01.17 14:06] def message(self, msg_id): self.updateMessages() for msg in self.MESSAGES: if msg["id"] == msg_id: return msg["data"] # ARS def messages(self): self.updateMessages() return self.MESSAGES # ARS def delete_msg(self, id): return json.loads(self.request("delete_msg",id)) def clear(self): return json.loads(self.request("purge", {}))