diff options
Diffstat (limited to 'tests/qemu-iotests/057')
| -rwxr-xr-x | tests/qemu-iotests/057 | 259 | 
1 files changed, 259 insertions, 0 deletions
| diff --git a/tests/qemu-iotests/057 b/tests/qemu-iotests/057 new file mode 100755 index 00000000..9cdd582e --- /dev/null +++ b/tests/qemu-iotests/057 @@ -0,0 +1,259 @@ +#!/usr/bin/env python +# +# Tests for internal snapshot. +# +# Copyright (C) 2013 IBM, Inc. +# +# Based on 055. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +# + +import time +import os +import iotests +from iotests import qemu_img, qemu_io + +test_drv_base_name = 'drive' + +class ImageSnapshotTestCase(iotests.QMPTestCase): +    image_len = 120 * 1024 * 1024 # MB + +    def __init__(self, *args): +        self.expect = [] +        super(ImageSnapshotTestCase, self).__init__(*args) + +    def _setUp(self, test_img_base_name, image_num): +        self.vm = iotests.VM() +        for i in range(0, image_num): +            filename = '%s%d' % (test_img_base_name, i) +            img = os.path.join(iotests.test_dir, filename) +            device = '%s%d' % (test_drv_base_name, i) +            qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len)) +            self.vm.add_drive(img) +            self.expect.append({'image': img, 'device': device, +                                'snapshots': [], +                                'snapshots_name_counter': 0}) +        self.vm.launch() + +    def tearDown(self): +        self.vm.shutdown() +        for dev_expect in self.expect: +            os.remove(dev_expect['image']) + +    def createSnapshotInTransaction(self, snapshot_num, abort = False): +        actions = [] +        for dev_expect in self.expect: +            num = dev_expect['snapshots_name_counter'] +            for j in range(0, snapshot_num): +                name = '%s_sn%d' % (dev_expect['device'], num) +                num = num + 1 +                if abort == False: +                    dev_expect['snapshots'].append({'name': name}) +                    dev_expect['snapshots_name_counter'] = num +                actions.append({ +                    'type': 'blockdev-snapshot-internal-sync', +                    'data': { 'device': dev_expect['device'], +                              'name': name }, +                }) + +        if abort == True: +            actions.append({ +                'type': 'abort', +                'data': {}, +            }) + +        result = self.vm.qmp('transaction', actions = actions) + +        if abort == True: +            self.assert_qmp(result, 'error/class', 'GenericError') +        else: +            self.assert_qmp(result, 'return', {}) + +    def verifySnapshotInfo(self): +        result = self.vm.qmp('query-block') + +        # Verify each expected result +        for dev_expect in self.expect: +            # 1. Find the returned image value and snapshot info +            image_result = None +            for device in result['return']: +                if device['device'] == dev_expect['device']: +                    image_result = device['inserted']['image'] +                    break +            self.assertTrue(image_result != None) +            # Do not consider zero snapshot case now +            sn_list_result = image_result['snapshots'] +            sn_list_expect = dev_expect['snapshots'] + +            # 2. Verify it with expect +            self.assertTrue(len(sn_list_result) == len(sn_list_expect)) + +            for sn_expect in sn_list_expect: +                sn_result = None +                for sn in sn_list_result: +                    if sn_expect['name'] == sn['name']: +                        sn_result = sn +                        break +                self.assertTrue(sn_result != None) +                # Fill in the detail info +                sn_expect.update(sn_result) + +    def deleteSnapshot(self, device, id = None, name = None): +        sn_list_expect = None +        sn_expect = None + +        self.assertTrue(id != None or name != None) + +        # Fill in the detail info include ID +        self.verifySnapshotInfo() + +        #find the expected snapshot list +        for dev_expect in self.expect: +            if dev_expect['device'] == device: +                sn_list_expect = dev_expect['snapshots'] +                break +        self.assertTrue(sn_list_expect != None) + +        if id != None and name != None: +            for sn in sn_list_expect: +                if sn['id'] == id and sn['name'] == name: +                    sn_expect = sn +                    result = \ +                          self.vm.qmp('blockdev-snapshot-delete-internal-sync', +                                      device = device, +                                      id = id, +                                      name = name) +                    break +        elif id != None: +            for sn in sn_list_expect: +                if sn['id'] == id: +                    sn_expect = sn +                    result = \ +                          self.vm.qmp('blockdev-snapshot-delete-internal-sync', +                                      device = device, +                                      id = id) +                    break +        else: +            for sn in sn_list_expect: +                if sn['name'] == name: +                    sn_expect = sn +                    result = \ +                          self.vm.qmp('blockdev-snapshot-delete-internal-sync', +                                      device = device, +                                      name = name) +                    break + +        self.assertTrue(sn_expect != None) + +        self.assert_qmp(result, 'return', sn_expect) +        sn_list_expect.remove(sn_expect) + +class TestSingleTransaction(ImageSnapshotTestCase): +    def setUp(self): +        self._setUp('test_a.img', 1) + +    def test_create(self): +        self.createSnapshotInTransaction(1) +        self.verifySnapshotInfo() + +    def test_error_name_empty(self): +        actions = [{'type': 'blockdev-snapshot-internal-sync', +                    'data': { 'device': self.expect[0]['device'], +                              'name': '' }, +                  }] +        result = self.vm.qmp('transaction', actions = actions) +        self.assert_qmp(result, 'error/class', 'GenericError') + +    def test_error_device(self): +        actions = [{'type': 'blockdev-snapshot-internal-sync', +                    'data': { 'device': 'drive_error', +                              'name': 'a' }, +                  }] +        result = self.vm.qmp('transaction', actions = actions) +        self.assert_qmp(result, 'error/class', 'DeviceNotFound') + +    def test_error_exist(self): +        self.createSnapshotInTransaction(1) +        self.verifySnapshotInfo() +        actions = [{'type': 'blockdev-snapshot-internal-sync', +                    'data': { 'device': self.expect[0]['device'], +                              'name': self.expect[0]['snapshots'][0] }, +                  }] +        result = self.vm.qmp('transaction', actions = actions) +        self.assert_qmp(result, 'error/class', 'GenericError') + +class TestMultipleTransaction(ImageSnapshotTestCase): +    def setUp(self): +        self._setUp('test_b.img', 2) + +    def test_create(self): +        self.createSnapshotInTransaction(3) +        self.verifySnapshotInfo() + +    def test_abort(self): +        self.createSnapshotInTransaction(2) +        self.verifySnapshotInfo() +        self.createSnapshotInTransaction(3, abort = True) +        self.verifySnapshotInfo() + +class TestSnapshotDelete(ImageSnapshotTestCase): +    def setUp(self): +        self._setUp('test_c.img', 1) + +    def test_delete_with_id(self): +        self.createSnapshotInTransaction(2) +        self.verifySnapshotInfo() +        self.deleteSnapshot(self.expect[0]['device'], +                            id = self.expect[0]['snapshots'][0]['id']) +        self.verifySnapshotInfo() + +    def test_delete_with_name(self): +        self.createSnapshotInTransaction(3) +        self.verifySnapshotInfo() +        self.deleteSnapshot(self.expect[0]['device'], +                            name = self.expect[0]['snapshots'][1]['name']) +        self.verifySnapshotInfo() + +    def test_delete_with_id_and_name(self): +        self.createSnapshotInTransaction(4) +        self.verifySnapshotInfo() +        self.deleteSnapshot(self.expect[0]['device'], +                            id = self.expect[0]['snapshots'][2]['id'], +                            name = self.expect[0]['snapshots'][2]['name']) +        self.verifySnapshotInfo() + + +    def test_error_device(self): +        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', +                              device = 'drive_error', +                              id = '0') +        self.assert_qmp(result, 'error/class', 'DeviceNotFound') + +    def test_error_no_id_and_name(self): +        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', +                              device = self.expect[0]['device']) +        self.assert_qmp(result, 'error/class', 'GenericError') + +    def test_error_snapshot_not_exist(self): +        self.createSnapshotInTransaction(2) +        self.verifySnapshotInfo() +        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', +                              device = self.expect[0]['device'], +                              id = self.expect[0]['snapshots'][0]['id'], +                              name = self.expect[0]['snapshots'][1]['name']) +        self.assert_qmp(result, 'error/class', 'GenericError') + +if __name__ == '__main__': +    iotests.main(supported_fmts=['qcow2']) | 
