80 lines
3.1 KiB
Diff
80 lines
3.1 KiB
Diff
|
From 2d949c415b97ed9649e78c880ab149d0d39c1152 Mon Sep 17 00:00:00 2001
|
||
|
From: Xavier Queralt <xqueralt@redhat.com>
|
||
|
Date: Thu, 5 Sep 2013 10:08:29 +0200
|
||
|
Subject: [PATCH] Fix Qpid when sending long messages (from oslo)
|
||
|
|
||
|
This is commit 4f97479ad in oslo-incubator
|
||
|
|
||
|
Qpid has a limitation where it cannot serialize a dict containing a
|
||
|
string greater than 65535 characters. This change alters the Qpid
|
||
|
implementation to JSON encode the dict before sending it, but only if
|
||
|
Qpid would fail to serialize it. This maintains as much backward
|
||
|
compatibility as possible, though long messages will still fail if they
|
||
|
are sent to an older receiver.
|
||
|
|
||
|
The first part of this fix was ported to Grizzly in Ib52e9458a to allow
|
||
|
receiving messages from Havana using the new format. Even though this
|
||
|
change will modify the message format, it will only do it when messages
|
||
|
are longer than 65K which would be broken anyway and could cause serious
|
||
|
bugs like the one linked below.
|
||
|
|
||
|
Fixes bug 1215091
|
||
|
|
||
|
Change-Id: I505b648c3d0e1176ec7a3fc7d1646fa5a5232261
|
||
|
---
|
||
|
nova/openstack/common/rpc/impl_qpid.py | 28 ++++++++++++++++++++++++++++
|
||
|
1 file changed, 28 insertions(+)
|
||
|
|
||
|
diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
|
||
|
index 0044088..a7aebc1 100644
|
||
|
--- a/nova/openstack/common/rpc/impl_qpid.py
|
||
|
+++ b/nova/openstack/common/rpc/impl_qpid.py
|
||
|
@@ -31,6 +31,7 @@ from nova.openstack.common import log as logging
|
||
|
from nova.openstack.common.rpc import amqp as rpc_amqp
|
||
|
from nova.openstack.common.rpc import common as rpc_common
|
||
|
|
||
|
+qpid_codec = importutils.try_import("qpid.codec010")
|
||
|
qpid_messaging = importutils.try_import("qpid.messaging")
|
||
|
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
|
||
|
|
||
|
@@ -247,8 +248,35 @@ class Publisher(object):
|
||
|
"""Re-establish the Sender after a reconnection"""
|
||
|
self.sender = session.sender(self.address)
|
||
|
|
||
|
+ def _pack_json_msg(self, msg):
|
||
|
+ """Qpid cannot serialize dicts containing strings longer than 65535
|
||
|
+ characters. This function dumps the message content to a JSON
|
||
|
+ string, which Qpid is able to handle.
|
||
|
+
|
||
|
+ :param msg: May be either a Qpid Message object or a bare dict.
|
||
|
+ :returns: A Qpid Message with its content field JSON encoded.
|
||
|
+ """
|
||
|
+ try:
|
||
|
+ msg.content = jsonutils.dumps(msg.content)
|
||
|
+ except AttributeError:
|
||
|
+ # Need to have a Qpid message so we can set the content_type.
|
||
|
+ msg = qpid_messaging.Message(jsonutils.dumps(msg))
|
||
|
+ msg.content_type = JSON_CONTENT_TYPE
|
||
|
+ return msg
|
||
|
+
|
||
|
def send(self, msg):
|
||
|
"""Send a message"""
|
||
|
+ try:
|
||
|
+ # Check if Qpid can encode the message
|
||
|
+ check_msg = msg
|
||
|
+ if not hasattr(check_msg, 'content_type'):
|
||
|
+ check_msg = qpid_messaging.Message(msg)
|
||
|
+ content_type = check_msg.content_type
|
||
|
+ enc, dec = qpid_messaging.message.get_codec(content_type)
|
||
|
+ enc(check_msg.content)
|
||
|
+ except qpid_codec.CodecException:
|
||
|
+ # This means the message couldn't be serialized as a dict.
|
||
|
+ msg = self._pack_json_msg(msg)
|
||
|
self.sender.send(msg)
|
||
|
|
||
|
|
||
|
--
|
||
|
1.8.1.2
|
||
|
|
||
|
|