https://review.openstack.org/gitweb?p=openstack%2Fneutron.git;a=commitdiff;h=a52ef6ecf19b8b015465ddda2a3ca087f0e12122 index 04dbfef..77f233a 100755 (executable) --- a/neutron/plugins/nicira/vshield/tasks/tasks.py +++ b/neutron/plugins/nicira/vshield/tasks/tasks.py @@ -15,8 +15,6 @@ # License for the specific language governing permissions and limitations # under the License. -from __future__ import print_function - import collections import uuid @@ -167,6 +165,9 @@ class TaskManager(): # A dict to store resource -> resource's tasks self._tasks = {} + # Current task being executed in main thread + self._main_thread_exec_task = None + # New request event self._req = event.Event() @@ -311,8 +312,10 @@ class TaskManager(): continue try: + self._main_thread_exec_task = task self._execute(task) finally: + self._main_thread_exec_task = None if task.status is None: # The thread is killed during _execute(). To guarantee # the task been aborted correctly, put it to the queue. @@ -348,20 +351,19 @@ class TaskManager(): self._thread = None def has_pending_task(self): - if self._tasks_queue: - return True - - if self._tasks: + if self._tasks_queue or self._tasks or self._main_thread_exec_task: return True - - return False + else: + return False def show_pending_tasks(self): for task in self._tasks_queue: - print(str(task)) + LOG.info(str(task)) for resource, tasks in self._tasks.iteritems(): for task in tasks: - print(str(task)) + LOG.info(str(task)) + if self._main_thread_exec_task: + LOG.info(str(self._main_thread_exec_task)) def count(self): count = 0 diff --git a/neutron/tests/unit/nicira/test_edge_router.py b/neutron/tests/unit/nicira/test_edge_router.py index 41efeed..a360b71 100644 (file) --- a/neutron/tests/unit/nicira/test_edge_router.py +++ b/neutron/tests/unit/nicira/test_edge_router.py @@ -135,7 +135,8 @@ class ServiceRouterTest(test_nicira_plugin.NiciraL3NatTest, def tearDown(self): plugin = NeutronManager.get_plugin() manager = plugin.vcns_driver.task_manager - for i in range(20): + # wait max ~10 seconds for all tasks to be finished + for i in range(100): if not manager.has_pending_task(): break greenthread.sleep(0.1) @@ -183,8 +184,8 @@ class ServiceRouterTestCase(ServiceRouterTest, NvpRouterTestCase): for k, v in expected_value_1: self.assertEqual(router['router'][k], v) - # wait ~1 seconds for router status update - for i in range(2): + # wait max ~10 seconds for router status update + for i in range(20): greenthread.sleep(0.5) res = self._show('routers', router['router']['id']) if res['router']['status'] == 'ACTIVE': diff --git a/neutron/tests/unit/nicira/test_vcns_driver.py b/neutron/tests/unit/nicira/test_vcns_driver.py index b0d69a4..ddc0c33 100644 (file) --- a/neutron/tests/unit/nicira/test_vcns_driver.py +++ b/neutron/tests/unit/nicira/test_vcns_driver.py @@ -253,6 +253,31 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase): def test_task_manager_stop_4(self): self._test_task_manager_stop(False, False, 1) + def test_task_pending_task(self): + def _exec(task): + task.userdata['executing'] = True + while not task.userdata['tested']: + greenthread.sleep(0) + task.userdata['executing'] = False + return TaskStatus.COMPLETED + + userdata = { + 'executing': False, + 'tested': False + } + manager = ts.TaskManager().start(100) + task = ts.Task('name', 'res', _exec, userdata=userdata) + manager.add(task) + + while not userdata['executing']: + greenthread.sleep(0) + self.assertTrue(manager.has_pending_task()) + + userdata['tested'] = True + while userdata['executing']: + greenthread.sleep(0) + self.assertFalse(manager.has_pending_task()) + class VcnsDriverTestCase(base.BaseTestCase): @@ -298,6 +323,10 @@ class VcnsDriverTestCase(base.BaseTestCase): self.edge_id = None self.result = None + def tearDown(self): + self.vcns_driver.task_manager.stop() + super(VcnsDriverTestCase, self).tearDown() + def _deploy_edge(self): task = self.vcns_driver.deploy_edge( 'router-id', 'myedge', 'internal-network', {}, wait_for_exec=True) @@ -355,12 +384,13 @@ class VcnsDriverTestCase(base.BaseTestCase): self.assertTrue(jobdata.get('edge_deploy_result')) def test_deploy_edge_fail(self): - self.vcns_driver.deploy_edge( + task1 = self.vcns_driver.deploy_edge( 'router-1', 'myedge', 'internal-network', {}, wait_for_exec=True) - task = self.vcns_driver.deploy_edge( + task2 = self.vcns_driver.deploy_edge( 'router-2', 'myedge', 'internal-network', {}, wait_for_exec=True) - task.wait(TaskState.RESULT) - self.assertEqual(task.status, TaskStatus.ERROR) + task1.wait(TaskState.RESULT) + task2.wait(TaskState.RESULT) + self.assertEqual(task2.status, TaskStatus.ERROR) def test_get_edge_status(self): self._deploy_edge()