• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python base.VmSnapshot类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中marvin.lib.base.VmSnapshot的典型用法代码示例。如果您正苦于以下问题:Python VmSnapshot类的具体用法?Python VmSnapshot怎么用?Python VmSnapshot使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了VmSnapshot类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_03_delete_vm_snapshots

    def test_03_delete_vm_snapshots(self):
        """Test to delete vm snapshots
        """

        list_snapshot_response = VmSnapshot.list(self.apiclient, vmid=self.virtual_machine.id, listall=True)

        self.assertEqual(
            isinstance(list_snapshot_response, list),
            True,
            "Check list response returns a valid list"
        )
        self.assertNotEqual(
            list_snapshot_response,
            None,
            "Check if snapshot exists in ListSnapshot"
        )
        VmSnapshot.deleteVMSnapshot(self.apiclient, list_snapshot_response[0].id)

        time.sleep(self.services["sleep"] * 3)

        list_snapshot_response = VmSnapshot.list(self.apiclient, vmid=self.virtual_machine.id, listall=True)

        self.assertEqual(
            list_snapshot_response,
            None,
            "Check list vm snapshot has be deleted"
        )
开发者ID:KarlHarrisSungardAS,项目名称:cloudstack,代码行数:27,代码来源:test_vm_snapshots.py


示例2: test_01_test_vm_volume_snapshot

    def test_01_test_vm_volume_snapshot(self):
        """
        @Desc: Test that Volume snapshot for root volume is allowed
        when VM snapshot is present for the VM
        @Steps:
        1: Deploy a VM and create a VM snapshot for VM
        2: Try to create snapshot for the root volume of the VM,
        It should not fail
        """

        # Creating Virtual Machine
        virtual_machine = VirtualMachine.create(
            self.apiclient,
            self.services["virtual_machine"],
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.service_offering.id,
        )

        VmSnapshot.create(
            self.apiclient,
            virtual_machine.id,
        )

        volumes = Volume.list(self.apiclient,
                              virtualmachineid=virtual_machine.id,
                              type="ROOT",
                              listall=True)

        self.assertEqual(validateList(volumes)[0], PASS,
                "Failed to get root volume of the VM")

        snapshot = Snapshot.create(
            self.apiclient,
            volumes[0].id,
            account=self.account.name,
            domainid=self.account.domainid
        )
        self.debug("Snapshot created: ID - %s" % snapshot.id)
        snapshots = list_snapshots(
            self.apiclient,
            id=snapshot.id
        )
        self.assertEqual(
            validateList(snapshots)[0],
            PASS,
            "Invalid snapshot list"
        )
        self.assertEqual(
            snapshots[0].id,
            snapshot.id,
            "Check resource id in list resources call"
        )
        return
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:54,代码来源:test_vm_snapshots.py


示例3: test_01_check_revert_snapshot

    def test_01_check_revert_snapshot(self):
        """ Test revert snapshot on XenServer

        # 1. Deploy a VM.
        # 2. Take VM snapshot.
        # 3. Verify that volume snapshot fails with error 
                can not create volume snapshot for VM with VM-snapshot

        """
        # Step 1
        vm = VirtualMachine.create(
            self.userapiclient,
            self.testdata["small"],
            templateid=self.template.id,
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.service_offering.id,
            zoneid=self.zone.id,
        )
        volumes_cluster_list = list_volumes(
                self.apiclient,
                virtualmachineid=vm.id,
                type='ROOT',
                listall=True
                )

        volume_list_validation = validateList(volumes_cluster_list)

	self.assertEqual(
                volume_list_validation[0],
                PASS,
                "Event list validation failed due to %s" %
		volume_list_validation[2]
            )
 
        root_volume = volumes_cluster_list[0]
        
        #Step 2
        vm_snap = VmSnapshot.create(self.apiclient,
                vm.id)

	self.assertEqual(
			vm_snap.state,
			"Ready",
			"Check the snapshot of vm is ready!"
			)


        #Step 3
        with self.assertRaises(Exception):
            Snapshot.create(
                    self.apiclient,
                    root_volume.id)

        return
开发者ID:Accelerite,项目名称:cloudstack,代码行数:55,代码来源:testpath_revert_snap.py


示例4: test_01_test_vm_volume_snapshot

    def test_01_test_vm_volume_snapshot(self):
        """
        @Desc: Test that Volume snapshot for root volume is not allowed
        when VM snapshot is present for the VM
        @Steps:
        1: Deploy a VM and create a VM snapshot for VM
        2: Try to create snapshot for the root volume of the VM,
        It should expect Exception
        """

        # Creating Virtual Machine
        virtual_machine = VirtualMachine.create(
            self.apiclient,
            self.services["virtual_machine"],
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.service_offering.id,
        )

        VmSnapshot.create(
            self.apiclient,
            virtual_machine.id,
        )

        volumes = Volume.list(self.apiclient,
                              virtualmachineid=virtual_machine.id,
                              type="ROOT",
                              listall=True)

        self.assertEqual(validateList(volumes)[0], PASS,
                "Failed to get root volume of the VM")

        volume = volumes[0]

        with self.assertRaises(Exception):
            Snapshot.create(self.apiclient,
                            volume_id=volume.id)

        return
开发者ID:milamberspace,项目名称:cloudstack,代码行数:39,代码来源:test_vm_snapshots.py


示例5: test_01_create_vm_snapshots

    def test_01_create_vm_snapshots(self):
        """Test to create VM snapshots
        """

        try:
            # Login to VM and write data to file system
            ssh_client = self.virtual_machine.get_ssh_client()

            cmds = [
                "echo %s > %s/%s" %
                (self.random_data_0, self.test_dir, self.random_data),
                "cat %s/%s" %
                (self.test_dir, self.random_data)]

            for c in cmds:
                self.debug(c)
                result = ssh_client.execute(c)
                self.debug(result)

        except Exception:
            self.fail("SSH failed for Virtual machine: %s" %
                      self.virtual_machine.ipaddress)
        self.assertEqual(
            self.random_data_0,
            result[0],
            "Check the random data has be write into temp file!"
        )

        time.sleep(self.services["sleep"])

        #KVM VM Snapshot needs to set snapshot with memory
        MemorySnapshot = False
        if self.hypervisor.lower() in (KVM.lower()):
           MemorySnapshot = True

        vm_snapshot = VmSnapshot.create(
            self.apiclient,
            self.virtual_machine.id,
            MemorySnapshot,
            "TestSnapshot",
            "Display Text"
        )
        self.assertEqual(
            vm_snapshot.state,
            "Ready",
            "Check the snapshot of vm is ready!"
        )

        return
开发者ID:PCextreme,项目名称:cloudstack,代码行数:49,代码来源:test_vm_snapshots.py


示例6: test_02_take_VM_snapshot_with_data_disk

    def test_02_take_VM_snapshot_with_data_disk(self):
        self.virtual_machine.start(self.apiClient)

        data_volume = Volume.create(
            self.apiClient,
            self.testdata[TestData.volume_1],
            account=self.account.name,
            domainid=self.domain.id,
            zoneid=self.zone.id,
            diskofferingid=self.disk_offering.id
        )

        self.cleanup = [data_volume]

        self.virtual_machine.attach_volume(self.apiClient, data_volume)

        root_volumes = list_volumes(self.apiClient, type="ROOT", listAll="true")

        self._check_list(root_volumes, 1, TestVMSnapshots._should_only_be_one_root_volume_err_msg)

        root_volume = root_volumes[0]

        root_volume_id = {'volumeid': root_volume.id}

        sf_iscsi_name_result = self.cs_api.getVolumeiScsiName(root_volume_id)
        sf_iscsi_root_volume_name = sf_iscsi_name_result['apivolumeiscsiname']['volumeiScsiName']

        self._check_iscsi_name(sf_iscsi_root_volume_name)

        root_volume_path_1 = self._get_path(root_volume_id)

        data_volumes = list_volumes(self.apiClient, type="DATADISK", listAll="true")

        self._check_list(data_volumes, 1, "There should only be one data volume.")

        data_volume = data_volumes[0]

        data_volume_id = {'volumeid': data_volume.id}

        sf_iscsi_name_result = self.cs_api.getVolumeiScsiName(data_volume_id)
        sf_iscsi_data_volume_name = sf_iscsi_name_result['apivolumeiscsiname']['volumeiScsiName']

        self._check_iscsi_name(sf_iscsi_data_volume_name)

        data_volume_path_1 = self._get_path(data_volume_id)

        #######################################
        #######################################
        # STEP 1: Take snapshot of running VM #
        #######################################
        #######################################
        vm_snapshot = VmSnapshot.create(
            self.apiClient,
            vmid=self.virtual_machine.id,
            snapshotmemory="false",
            name="Test Snapshot",
            description="Test Snapshot Desc"
        )

        list_vm_snapshots = VmSnapshot.list(self.apiClient, listAll="true")

        self._verify_vm_snapshot(list_vm_snapshots, vm_snapshot)

        root_volume_path_2 = self._get_path(root_volume_id)

        self.assertEqual(
            root_volume_path_1,
            root_volume_path_2,
            TestVMSnapshots._path_should_not_have_changed_err_msg
        )

        data_volume_path_2 = self._get_path(data_volume_id)

        self.assertEqual(
            data_volume_path_1,
            data_volume_path_2,
            TestVMSnapshots._path_should_not_have_changed_err_msg
        )

        root_volume_xen_sr = self.xen_session.xenapi.SR.get_by_name_label(sf_iscsi_root_volume_name)[0]

        root_volume_xen_vdis = self.xen_session.xenapi.SR.get_VDIs(root_volume_xen_sr)

        self._check_list(root_volume_xen_vdis, 3, TestVMSnapshots._should_be_three_vdis_err_msg)

        root_volume_vdis_after_create = self._get_vdis(root_volume_xen_vdis)

        vdiSnapshotOf = self.xen_session.xenapi.VDI.get_record(root_volume_vdis_after_create.snapshot_vdi["snapshot_of"])

        self.assertEqual(
            vdiSnapshotOf["uuid"],
            root_volume_vdis_after_create.active_vdi["uuid"],
            TestVMSnapshots._snapshot_parent_not_correct_err_msg
        )

        data_volume_xen_sr = self.xen_session.xenapi.SR.get_by_name_label(sf_iscsi_data_volume_name)[0]

        data_volume_xen_vdis = self.xen_session.xenapi.SR.get_VDIs(data_volume_xen_sr)

        self._check_list(data_volume_xen_vdis, 3, TestVMSnapshots._should_be_three_vdis_err_msg)
#.........这里部分代码省略.........
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:101,代码来源:TestVMSnapshots.py


示例7: test_01_take_VM_snapshot

    def test_01_take_VM_snapshot(self):
        self.virtual_machine.start(self.apiClient)

        root_volumes = list_volumes(self.apiClient, type="ROOT", listAll="true")

        self._check_list(root_volumes, 1, TestVMSnapshots._should_only_be_one_root_volume_err_msg)

        root_volume = root_volumes[0]

        volume_id = {'volumeid': root_volume.id}

        sf_iscsi_name_result = self.cs_api.getVolumeiScsiName(volume_id)
        sf_iscsi_name = sf_iscsi_name_result['apivolumeiscsiname']['volumeiScsiName']

        self._check_iscsi_name(sf_iscsi_name)

        root_volume_path_1 = self._get_path(volume_id)

        #######################################
        #######################################
        # STEP 1: Take snapshot of running VM #
        #######################################
        #######################################
        vm_snapshot = VmSnapshot.create(
            self.apiClient,
            vmid=self.virtual_machine.id,
            snapshotmemory="false",
            name="Test Snapshot",
            description="Test Snapshot Desc"
        )

        list_vm_snapshots = VmSnapshot.list(self.apiClient, listAll="true")

        self._verify_vm_snapshot(list_vm_snapshots, vm_snapshot)

        root_volume_path_2 = self._get_path(volume_id)

        self.assertEqual(
            root_volume_path_1,
            root_volume_path_2,
            TestVMSnapshots._path_should_not_have_changed_err_msg
        )

        xen_sr = self.xen_session.xenapi.SR.get_by_name_label(sf_iscsi_name)[0]

        xen_vdis = self.xen_session.xenapi.SR.get_VDIs(xen_sr)

        self._check_list(xen_vdis, 3, TestVMSnapshots._should_be_three_vdis_err_msg)

        vdis_after_create = self._get_vdis(xen_vdis)

        vdiSnapshotOf = self.xen_session.xenapi.VDI.get_record(vdis_after_create.snapshot_vdi["snapshot_of"])

        self.assertEqual(
            vdiSnapshotOf["uuid"],
            vdis_after_create.active_vdi["uuid"],
            TestVMSnapshots._snapshot_parent_not_correct_err_msg
        )

        #######################################
        #######################################
        ###  STEP 2: Revert VM to Snapshot  ###
        #######################################
        #######################################
        self.virtual_machine.stop(self.apiClient)

        VmSnapshot.revertToSnapshot(self.apiClient, vmsnapshotid=vm_snapshot.id)

        list_vm_snapshots = VmSnapshot.list(self.apiClient, listAll="true")

        self._check_list(list_vm_snapshots, 1, TestVMSnapshots._should_only_be_one_vm_snapshot_err_msg)

        root_volume_path_3 = self._get_path(volume_id)

        self.assertNotEqual(
            root_volume_path_1,
            root_volume_path_3,
            TestVMSnapshots._path_should_have_changed_err_msg
        )

        xen_vdis = self.xen_session.xenapi.SR.get_VDIs(xen_sr)

        self._check_list(xen_vdis, 3, TestVMSnapshots._should_be_three_vdis_err_msg)

        vdis_after_revert = self._get_vdis(xen_vdis)

        self.assertNotEqual(
            vdis_after_create.active_vdi["uuid"],
            vdis_after_revert.active_vdi["uuid"],
            TestVMSnapshots._active_vdis_should_not_be_the_same_err_msg
        )

        self.assertEqual(
            vdis_after_create.snapshot_vdi["uuid"],
            vdis_after_revert.snapshot_vdi["uuid"],
            TestVMSnapshots._snapshot_vdis_should_be_the_same_err_msg
        )

        self.assertEqual(
            vdis_after_create.base_vdi["uuid"],
#.........这里部分代码省略.........
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:101,代码来源:TestVMSnapshots.py


示例8: test_04_list_vm_snapshots_byid

    def test_04_list_vm_snapshots_byid(self):
        """
        @summary: Test to List VM Snapshots by Id

        Step1: Listing all the VM snapshots for a user
        Step2: Verifying that list size is 0
        Step3: Creating a VM snapshot
        Step4: Listing all the VM snapshots again for a user
        Step5: Verifying that list size is 1
        Step6: Listing all the VM snapshots by specifying snapshot id
        Step7: Verifying that list size is 1
        Step8: Verifying details of the listed VM snapshot
        """
        if self.hypervisor.lower() in ['kvm', 'hyperv']:
            raise unittest.SkipTest("This feature is not supported on existing hypervisor. Hence, skipping the test")
        # Listing all the VM snapshots for a User
        list_vm_snaps_before = VmSnapshot.list(
                                               self.userapiclient,
                                               listall=self.services["listall"]
                                               )
        # Verifying list size is 0
        self.assertIsNone(
                          list_vm_snaps_before,
                          "VM snapshots exists for newly created user"
                          )
        # Creating a VM snapshot
        snapshot_created = VmSnapshot.create(
                                             self.userapiclient,
                                             self.virtual_machine.id,
                                             )
        self.assertIsNotNone(
                             snapshot_created,
                             "Snapshot creation failed"
                             )
        # Listing all the VM snapshots for user again
        list_vm_snaps_after = VmSnapshot.list(
                                              self.userapiclient,
                                              listall=self.services["listall"]
                                              )
        status = validateList(list_vm_snaps_after)
        self.assertEquals(
                          PASS,
                          status[0],
                          "VM snapshot creation failed"
                          )
        # Verifying that list size is 1
        self.assertEquals(
                          1,
                          len(list_vm_snaps_after),
                          "Failed to create VM snapshot"
                          )
        # Listing vm snapshot by id
        list_vm_snapshot = VmSnapshot.list(
                                           self.userapiclient,
                                           listall=self.services["listall"],
                                           vmsnapshotid=snapshot_created.id
                                          )
        status = validateList(list_vm_snapshot)
        self.assertEquals(
                          PASS,
                          status[0],
                          "Failed to list VM snapshot by Id"
                          )
        # Verifying that list size is 1
        self.assertEquals(
                          1,
                          len(list_vm_snapshot),
                          "Size of the list vm snapshot by Id is not matching"
                          )
        # Verifying details of the listed snapshot to be same as snapshot created above
        # Creating expected and actual values dictionaries
        expected_dict = {
                         "id":snapshot_created.id,
                         "name":snapshot_created.name,
                         "state":snapshot_created.state,
                         "vmid":snapshot_created.virtualmachineid,
                         }
        actual_dict = {
                       "id":list_vm_snapshot[0].id,
                       "name":list_vm_snapshot[0].name,
                       "state":list_vm_snapshot[0].state,
                       "vmid":list_vm_snapshot[0].virtualmachineid,
                       }
        vm_snapshot_status = self.__verify_values(
                                                  expected_dict,
                                                  actual_dict
                                                  )
        self.assertEqual(
                         True,
                         vm_snapshot_status,
                         "Listed VM Snapshot details are not as expected"
                         )
        return
开发者ID:Skotha,项目名称:cloudstack,代码行数:93,代码来源:test_escalations_snapshots.py


示例9: test_03_list_vm_snapshots_pagination

    def test_03_list_vm_snapshots_pagination(self):
        """
        @Desc: Test to List VM Snapshots pagination
        @Steps:
        Step1: Listing all the VM snapshots for a user
        Step2: Verifying that list size is 0
        Step3: Creating (page size + 1) number of VM snapshots
        Step4: Listing all the VM snapshots again for a user
        Step5: Verifying that list size is (page size + 1)
        Step6: Listing all the VM snapshots in page1
        Step7: Verifying that list size is (page size)
        Step8: Listing all the VM snapshots in page2
        Step9: Verifying that list size is 1
        Step10: Deleting the VM snapshot present in page 2
        Step11: Listing all the volume snapshots in page2
        Step12: Verifying that list size is 0
        """
        if self.hypervisor.lower() in ['kvm', 'hyperv']:
            raise unittest.SkipTest("This feature is not supported on existing hypervisor. Hence, skipping the test")
        # Listing all the VM snapshots for a User
        list_vm_snaps_before = VmSnapshot.list(
                                               self.userapiclient,
                                               listall=self.services["listall"]
                                               )
        # Verifying list size is 0
        self.assertIsNone(
                          list_vm_snaps_before,
                          "VM snapshots exists for newly created user"
                          )
        # Creating pagesize + 1 number of VM snapshots
        for i in range(0, (self.services["pagesize"] + 1)):
            snapshot_created = VmSnapshot.create(
                                                 self.userapiclient,
                                                 self.virtual_machine.id,
                                                 )
            self.assertIsNotNone(
                                 snapshot_created,
                                 "Snapshot creation failed"
                                 )

        # Listing all the VM snapshots for user again
        list_vm_snaps_after = VmSnapshot.list(
                                              self.userapiclient,
                                              listall=self.services["listall"]
                                              )
        status = validateList(list_vm_snaps_after)
        self.assertEquals(
                          PASS,
                          status[0],
                          "VM snapshot creation failed"
                          )
        # Verifying that list size is pagesize + 1
        self.assertEquals(
                          self.services["pagesize"] + 1,
                          len(list_vm_snaps_after),
                          "Failed to create pagesize + 1 number of VM snapshots"
                          )
        # Listing all the VM snapshots in page 1
        list_vm_snaps_page1 = VmSnapshot.list(
                                              self.userapiclient,
                                              listall=self.services["listall"],
                                              page=1,
                                              pagesize=self.services["pagesize"]
                                              )
        status = validateList(list_vm_snaps_page1)
        self.assertEquals(
                          PASS,
                          status[0],
                          "Failed to list vm snapshots in page 1"
                          )
        # Verifying the list size to be equal to pagesize
        self.assertEquals(
                          self.services["pagesize"],
                          len(list_vm_snaps_page1),
                          "Size of vm snapshots in page 1 is not matching"
                          )
        # Listing all the vm snapshots in page 2
        list_vm_snaps_page2 = VmSnapshot.list(
                                              self.userapiclient,
                                              listall=self.services["listall"],
                                              page=2,
                                              pagesize=self.services["pagesize"]
                                              )
        status = validateList(list_vm_snaps_page2)
        self.assertEquals(
                          PASS,
                          status[0],
                          "Failed to list vm snapshots in page 2"
                          )
        # Verifying the list size to be equal to pagesize
        self.assertEquals(
                          1,
                          len(list_vm_snaps_page2),
                          "Size of vm snapshots in page 2 is not matching"
                          )
        # Deleting the vm snapshot present in page 2
        VmSnapshot.deleteVMSnapshot(
                                    self.userapiclient,
                                    snapshot_created.id
                                    )
#.........这里部分代码省略.........
开发者ID:Skotha,项目名称:cloudstack,代码行数:101,代码来源:test_escalations_snapshots.py


示例10: test_02_revert_vm_snapshots

    def test_02_revert_vm_snapshots(self):
        """Test to revert VM snapshots
        """

        try:
            ssh_client = self.virtual_machine.get_ssh_client()

            cmds = [
                "rm -rf %s/%s" % (self.test_dir, self.random_data),
                "ls %s/%s" % (self.test_dir, self.random_data)
            ]

            for c in cmds:
                self.debug(c)
                result = ssh_client.execute(c)
                self.debug(result)

        except Exception:
            self.fail("SSH failed for Virtual machine: %s" %
                      self.virtual_machine.ipaddress)

        if str(result[0]).index("No such file or directory") == -1:
            self.fail("Check the random data has be delete from temp file!")

        time.sleep(self.services["sleep"])

        list_snapshot_response = VmSnapshot.list(self.apiclient, vmid=self.virtual_machine.id, listall=True)

        self.assertEqual(
            isinstance(list_snapshot_response, list),
            True,
            "Check list response returns a valid list"
        )
        self.assertNotEqual(
            list_snapshot_response,
            None,
            "Check if snapshot exists in ListSnapshot"
        )

        self.assertEqual(
            list_snapshot_response[0].state,
            "Ready",
            "Check the snapshot of vm is ready!"
        )

        VmSnapshot.revertToSnapshot(self.apiclient, list_snapshot_response[0].id)

        list_vm_response = list_virtual_machines(
            self.apiclient,
            id=self.virtual_machine.id
        )

        self.assertEqual(
            list_vm_response[0].state,
            "Stopped",
            "Check the state of vm is Stopped!"
        )

        cmd = startVirtualMachine.startVirtualMachineCmd()
        cmd.id = list_vm_response[0].id
        self.apiclient.startVirtualMachine(cmd)

        time.sleep(self.services["sleep"])

        try:
            ssh_client = self.virtual_machine.get_ssh_client(reconnect=True)

            cmds = [
                "cat %s/%s" % (self.test_dir, self.random_data)
            ]

            for c in cmds:
                self.debug(c)
                result = ssh_client.execute(c)
                self.debug(result)

        except Exception:
            self.fail("SSH failed for Virtual machine: %s" %
                      self.virtual_machine.ipaddress)

        self.assertEqual(
            self.random_data_0,
            result[0],
            "Check the random data is equal with the ramdom file!"
        )
开发者ID:KarlHarrisSungardAS,项目名称:cloudstack,代码行数:85,代码来源:test_vm_snapshots.py


示例11: test_change_service_offering_for_vm_with_snapshots

    def test_change_service_offering_for_vm_with_snapshots(self):
        """Test to change service offering for instances with vm snapshots
        """
        
        # 1) Create Virtual Machine using service offering 1
        self.debug("Creating VM using Service Offering 1")
        virtual_machine = VirtualMachine.create(
            self.apiclient,
            self.services["small"],
            accountid=self.account.name,
            domainid=self.account.domainid,
            templateid=self.template.id,
            zoneid=self.zone.id,
            hypervisor=self.hypervisor,
            mode=self.zone.networktype,
            serviceofferingid=self.service_offering_1.id
        )
        
        # Verify Service OFfering 1 CPU cores and memory
        try:
            ssh_client = virtual_machine.get_ssh_client(reconnect=True)
            self.checkCPUAndMemory(ssh_client, self.service_offering_1)
        except Exception as e:
            self.fail("SSH failed for virtual machine: %s - %s" % (virtual_machine.ipaddress, e))
        
        # 2) Take VM Snapshot
        self.debug("Taking VM Snapshot for VM - ID: %s" % virtual_machine.id)
        vm_snapshot = VmSnapshot.create(
            self.apiclient,
            virtual_machine.id,
        )
        
        # 3) Stop Virtual Machine
        self.debug("Stopping VM - ID: %s" % virtual_machine.id)
        try:
            virtual_machine.stop(self.apiclient)
        except Exception as e:
            self.fail("Failed to stop VM: %s" % e)
        
        # 4) Change service offering for VM with snapshots from Service Offering 1 to Service Offering 2
        self.debug("Changing service offering from Service Offering 1 to Service Offering 2 for VM - ID: %s" % virtual_machine.id)
        virtual_machine.change_service_offering(self.apiclient, self.service_offering_2.id)
        
        # 5) Start VM
        self.debug("Starting VM - ID: %s" % virtual_machine.id)
        try:
            virtual_machine.start(self.apiclient)
        except Exception as e:
            self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e))
        
        # Wait for vm to start
        timeout = self.wait_vm_start(self.apiclient, virtual_machine.id, self.services["timeout"],
                            self.services["sleep"])
        if timeout == 0:
            self.fail("The virtual machine %s failed to start even after %s minutes"
                   % (virtual_machine.name, self.services["timeout"]))
        
        list_vm_response = list_virtual_machines(
            self.apiclient,
            id=virtual_machine.id
        )
        self.assertEqual(
            isinstance(list_vm_response, list),
            True,
            "Check list response returns a valid list"
        )
        self.assertNotEqual(
            len(list_vm_response),
            0,
            "Check VM avaliable in List Virtual Machines"
        )
        self.assertEqual(
            list_vm_response[0].state,
            "Running",
            "Check virtual machine is in running state"
        )
        self.assertEqual(
            list_vm_response[0].id,
            virtual_machine.id,
            "Check virtual machine id"
        )
        
        # 6) Verify service offering has changed
        try:
            ssh_client_2 = virtual_machine.get_ssh_client(reconnect=True)
            self.checkCPUAndMemory(ssh_client_2, self.service_offering_2)
        except Exception as e:
            self.fail("SSH failed for virtual machine: %s - %s" % (virtual_machine.ipaddress, e))
        
        # 7) Stop Virtual Machine
        self.debug("Stopping VM - ID: %s" % virtual_machine.id)
        try:
            virtual_machine.stop(self.apiclient)
        except Exception as e:
            self.fail("Failed to stop VM: %s" % e)

        time.sleep(30)
        # 8) Revert to VM Snapshot
        self.debug("Revert to vm snapshot: %s" % vm_snapshot.id)
        try:
#.........这里部分代码省略.........
开发者ID:PCextreme,项目名称:cloudstack,代码行数:101,代码来源:test_vm_snapshots.py



注:本文中的marvin.lib.base.VmSnapshot类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python base.VpcOffering类代码示例发布时间:2022-05-27
下一篇:
Python base.VirtualMachine类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap