From ef5730a4620b409a3b46e46966e3bc6f3a306464 Mon Sep 17 00:00:00 2001 From: Ben Nemec Date: Thu, 9 May 2013 19:06:45 +0000 Subject: [PATCH] Fix problem with long messages in Qpid (from oslo) This is commit 478ac3a3e in oslo-incubator Qpid has a limitation where it cannot serialize a dict containing a string greater than 65535 characters. This change alters the Qpid implementation to JSON encode the dict before sending it, but only if Qpid would fail to serialize it. This maintains as much backward compatibility as possible, though long messages will still fail if they are sent to an older receiver. Even though this change will modify the message format, it will only do it when messages are longer than 65K which would be broken anyway and could cause serious bugs like the one linked below. Fixes bug 1215091 Change-Id: I2f0e88435748bab631d969573d3a598d9e1f7fef --- nova/openstack/common/rpc/impl_qpid.py | 47 ++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index e579316..67388fd 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -23,6 +23,7 @@ import uuid import eventlet import greenlet +import qpid.codec010 as qpid_codec import qpid.messaging import qpid.messaging.exceptions @@ -63,6 +64,8 @@ qpid_opts = [ cfg.CONF.register_opts(qpid_opts) +JSON_CONTENT_TYPE = 'application/json; charset=utf8' + class ConsumerBase(object): """Consumer base class.""" @@ -117,10 +120,27 @@ class ConsumerBase(object): self.receiver = session.receiver(self.address) self.receiver.capacity = 1 + def _unpack_json_msg(self, msg): + """Load the JSON data in msg if msg.content_type indicates that it + is necessary. Put the loaded data back into msg.content and + update msg.content_type appropriately. + + A Qpid Message containing a dict will have a content_type of + 'amqp/map', whereas one containing a string that needs to be converted + back from JSON will have a content_type of JSON_CONTENT_TYPE. + + :param msg: a Qpid Message object + :returns: None + """ + if msg.content_type == JSON_CONTENT_TYPE: + msg.content = jsonutils.loads(msg.content) + msg.content_type = 'amqp/map' + def consume(self): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() try: + self._unpack_json_msg(message) self.callback(message.content) except Exception: LOG.exception(_("Failed to process message... skipping it.")) @@ -220,8 +240,35 @@ class Publisher(object): """Re-establish the Sender after a reconnection""" self.sender = session.sender(self.address) + def _pack_json_msg(self, msg): + """Qpid cannot serialize dicts containing strings longer than 65535 + characters. This function dumps the message content to a JSON + string, which Qpid is able to handle. + + :param msg: May be either a Qpid Message object or a bare dict. + :returns: A Qpid Message with its content field JSON encoded. + """ + try: + msg.content = jsonutils.dumps(msg.content) + except AttributeError: + # Need to have a Qpid message so we can set the content_type. + msg = qpid.messaging.Message(jsonutils.dumps(msg)) + msg.content_type = JSON_CONTENT_TYPE + return msg + def send(self, msg): """Send a message""" + try: + # Check if Qpid can encode the message + check_msg = msg + if not hasattr(check_msg, 'content_type'): + check_msg = qpid.messaging.Message(msg) + content_type = check_msg.content_type + enc, dec = qpid.messaging.message.get_codec(content_type) + enc(check_msg.content) + except qpid_codec.CodecException: + # This means the message couldn't be serialized as a dict. + msg = self._pack_json_msg(msg) self.sender.send(msg) -- 1.8.1.2