• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python task.Task类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp_auto.task.Task的典型用法代码示例。如果您正苦于以下问题:Python Task类的具体用法?Python Task怎么用?Python Task使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Task类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_06_assert_unit_install

 def test_06_assert_unit_install(self):
     unit = {"name": "pike"}
     rpm = RpmUnit(unit, relevant_data_keys=unit.keys())
     with self.pulp.asserting(True):
         response = self.consumer.install_unit(self.pulp, unit, "rpm")
     Task.wait_for_report(self.pulp, response)
     assert rpm in RpmUnit.list(self.consumer.cli), "rpm %s not installed on %s" % (rpm, self.consumer)
开发者ID:pombredanne,项目名称:pulp-automation,代码行数:7,代码来源:test_01_rpm_repo_without_feed.py


示例2: setUpClass

 def setUpClass(cls):
     super(PuppetSearchRepoTest, cls).setUpClass()
     repo_id = cls.__name__
     queries = ['tomcat']
     cls.repo, _, _ = create_puppet_repo(cls.pulp, repo_id, queries)
     cls.repo1, _, _ = create_puppet_repo(cls.pulp, repo_id + '1', queries)
     Task.wait_for_report(cls.pulp, cls.repo.sync(cls.pulp))
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:7,代码来源:test_12_puppet_repo_and_modules_search.py


示例3: test_01_package_category_create

    def test_01_package_category_create(self):
        response = self.repo1.within_repo_search(
            self.pulp,
            data={"criteria": {"type_ids": ["package_group"],"limit": 1}}
        )
        self.assertPulp(code=200)
        result = Association.from_response(response)
        groupList = []
        # make a list of names
        for i in range(0, len(result)):
            groupList.append(result[i].data['metadata']['name'])
        #create metadata for package category import
        data = package_category_metadata(self.repo1.id+"_category1", self.repo1.id, groupList)

        #actually upload category
        with deleting(self.pulp, Upload.create(self.pulp, data=data)) as (upload,) :
            Task.wait_for_report(self.pulp, upload.import_to(self.pulp, self.repo1))
        self.assertPulp(code=200)

        #check that group is there and contains specified packages
        response = self.repo1.within_repo_search(
            self.pulp,
            data={"criteria": {"type_ids": ["package_category"],\
                  "filters": {"unit": {"id": data["unit_key"]["id"]}}}}
        )
        self.assertPulp(code=200)
        result = Association.from_response(response)
        self.assertEqual(result[0].data["metadata"]["packagegroupids"],
                         data["unit_metadata"]["packagegroupids"])
开发者ID:RedHatQE,项目名称:pulp-automation,代码行数:29,代码来源:test_04_package_category.py


示例4: test_10_repos_no_feed_cannot_be_synced

 def test_10_repos_no_feed_cannot_be_synced(self):
     # check that repos without feed cannot be synced
     response = self.dest_repo2.sync(self.pulp)
     self.assertPulp(code=202)
     with self.assertRaises(TaskFailure):
         with self.pulp.asserting(True):
             Task.wait_for_report(self.pulp, response)
开发者ID:lenfree,项目名称:pulp-automation,代码行数:7,代码来源:test_17_iso_repo_orphans_and_copy.py


示例5: setUpClass

    def setUpClass(cls):
        super(PublishGroupTest, cls).setUpClass()
        # create repo_group
        repo_group=RepoGroup(data={'id': cls.__name__ + "_repo_group"})
        response=repo_group.create(cls.pulp)
        cls.repo_group = RepoGroup.from_response(response)
        cls.repo_group1 = RepoGroup(data={'id': cls.__name__ + "_repo_group1"})

        #associate_distributor
        with cls.pulp.asserting(True):
            response = cls.repo_group.associate_distributor(
                cls.pulp,
                data={
                    'distributor_type_id': 'group_export_distributor',
                    'distributor_config': {
                        'http': False,
                        'https': False
                    },
                    'distributor_id': 'dist_1'
                }
            )
        cls.distributor = GroupDistributor.from_response(response)
        #create repo
        repo_config = [repo for repo in ROLES.repos if repo.type == 'rpm'][0]
        cls.repo, importer, [distributor] = YumRepo.from_role(repo_config).create(cls.pulp)
        Task.wait_for_report(cls.pulp, cls.repo.sync(cls.pulp))
开发者ID:RedHatQE,项目名称:pulp-automation,代码行数:26,代码来源:test_04_repo_group_publish.py


示例6: test_01_repo_content_applicability

 def test_01_repo_content_applicability(self):
     response = RepoAppl.applicability(self.pulp, data={
                                      "repo_criteria": {"filters": {"id":{"$in":["test-repo", "test-errata"]}}}
                                                  }
                                 )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:7,代码来源:test_02_repo_applicability.py


示例7: test_07_no_unassociation_within_repo_with_feed

 def test_07_no_unassociation_within_repo_with_feed(self):
     # repos with feed now can delete partial content inside it
     response = self.source_repo.unassociate_units(
         self.pulp, data={"criteria": {"type_ids": ["iso"], "filters": {"unit": {"name": "test.iso"}}}}
     )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
开发者ID:preethit,项目名称:pulp-automation,代码行数:7,代码来源:test_17_iso_repo_orphans_and_copy.py


示例8: test_05_unassociate_iso_from_copied_repo

 def test_05_unassociate_iso_from_copied_repo(self):
     # unassociate unit from a copied repo
     response = self.dest_repo1.unassociate_units(
         self.pulp, data={"criteria": {"type_ids": ["iso"], "filters": {"unit": {"name": "test.iso"}}}}
     )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
开发者ID:preethit,项目名称:pulp-automation,代码行数:7,代码来源:test_17_iso_repo_orphans_and_copy.py


示例9: setUpClass

 def setUpClass(cls):
     super(PuppetCopyRepoTest, cls).setUpClass()
     # this repo role is hardwired because of the search strings
     # refering to exact names as e.g. tomcat7_rhel
     # The proxy role is considered
     repo = {
         'id': cls.__name__,
         'feed': 'https://forge.puppetlabs.com',
         'queries': ['tomcat'],
         'proxy': ROLES.get('proxy'),
     }
     # create source repo and sync it to have modules fetched
     cls.source_repo, _, _ = PuppetRepo.from_role(repo).create(cls.pulp)
     Task.wait_for_report(cls.pulp, cls.source_repo.sync(cls.pulp))
     # create two destinations repos for copy purpose
     importer = PuppetImporter(feed=None, queries=[])
     distributors = [PuppetDistributor()]
     cls.dest_repo1, _, _ = PuppetRepo(id=cls.__name__ + '1', importer=importer,
                 distributors=distributors).create(cls.pulp)
     cls.dest_repo2, _, _ = PuppetRepo(id=cls.__name__ + '2', importer=importer,
                 distributors=distributors).create(cls.pulp)
     # create data for repo
     cls.invalid_repo = Repo(data={'id': cls.__name__ + "_invalidrepo"})
     # create yum repo
     cls.yumrepo, _, _ = YumRepo(id=cls.__name__ + 'yum', importer=YumImporter(feed=None),
                             distributors=[YumDistributor(relative_url='xyz')]).create(cls.pulp)
开发者ID:elyezer,项目名称:pulp-automation,代码行数:26,代码来源:test_12_puppet_repo_orphans_and_copy.py


示例10: test_02_assert_unit_update

 def test_02_assert_unit_update(self):
     unit = {"name": "pike"}
     response = self.consumer_group.update_unit(
         self.pulp, unit, "rpm", options={"apply": True, "reboot": False, "importkeys": False}
     )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
开发者ID:pombredanne,项目名称:pulp-automation,代码行数:7,代码来源:test_14_consumer_group_content.py


示例11: test_02_repo_sync_start

    def test_02_repo_sync_start(self):
        self.el.update(self.pulp, {'event_types': ['repo.sync.start']})
        self.assertPulpOK()
        self.el.reload(self.pulp)
        report = self.repo.sync(self.pulp)
        # wait till the sync is done
        Task.wait_for_report(self.pulp, report)
        # keep track of all the spawned tasks
        tasks = Task.from_report(self.pulp, report)
        assert tasks, 'no tasks induced'
        # fetch the request as POSTed by pulp event listener to the bin (http://requestb.in/<bin_id>)
        self.bin.reload()
        assert self.bin.request_count == 1, 'invalid event listener POST count (%s)' \
                                                % self.bin.request_count
        el_request = self.bin.requests[0]
        assert el_request.method == 'POST', 'invalid request method: %s' % el_request.method
        # assert the bin was POSTed no later any task finished
        tasks_finished_before_request = [task.id for task in tasks if el_request.time > task.finish_time]
        assert tasks_finished_before_request == [], 'tasks %s finished before request at: %s' % \
                (tasks_finished_before_request, el_request.time)
        # FIXME: not yet specified in docs: assert the bin was not POSTed before any task has started
        # tasks_started_after_request = [task.id for task in tasks if el_request.time < task.start_time]
        # assert tasks_started_after_request == [], 'tasks %s started after request at: %s' % \
        #        (tasks_started_after_request, el_request.time)

        # assert there's a task POSTed to the bin with the same ID pulp reported with sync
        # request.body contains original POSTed task-report-data --- create a Task object from it
        el_task = Task.from_call_report_data(json.loads(el_request.body))
        assert el_task.state == TASK_RUNNING_STATE, 'invalid task state: %s' % el_task.state
        el_task.reload(self.pulp)
        # assert the task is indeed in the tasks list spawned by pulp to perform repo sync
        assert el_task.id in [task.id for task in tasks], 'invalid task id posted: %s' % el_task.id
        assert sorted([u'pulp:repository:EventListenerRepo', u'pulp:action:sync']) == sorted(el_task.data['tags']), \
                'invalid task tags: %s' % el_task.data['tags']
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:34,代码来源:test_23_event_listener.py


示例12: test_02_associate_importer

    def test_02_associate_importer(self):
        '''to the importer_config query/queries can be added to specify witch
        modules have to be synced'''
        response = self.repo.associate_importer(
            self.pulp,
            data={
                'importer_type_id': 'puppet_importer',
                'importer_config': {
                    'feed': self.feed,
                    'queries': ["stdlib", "yum"]
                }
            }
        )
        self.assertPulp(code=202)
        Task.wait_for_report(self.pulp, response)
        importer = self.repo.get_importer(self.pulp, "puppet_importer")
        self.assertEqual(
            importer,
            {
                'id': 'puppet_importer',
                'importer_type_id': 'puppet_importer',
                'repo_id': self.repo.id,
                'config': {
                    'feed': self.feed,
                    'queries': ["stdlib", "yum"]

                },
                'last_sync': None
            }
        )
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:30,代码来源:test_11_puppet_modules_repo.py


示例13: test_03_repo_sync_finish

 def test_03_repo_sync_finish(self):
     self.el.update(self.pulp, {'event_types': ['repo.sync.finish']})
     self.assertPulpOK()
     self.el.reload(self.pulp)
     report = self.repo.sync(self.pulp)
     # wait till the sync is done
     Task.wait_for_report(self.pulp, report)
     # fetch the tasks sync-call has spawned
     tasks = Task.from_report(self.pulp, report)
     assert tasks, 'no tasks induced'
     # check the requestsb.in got notified
     self.bin.reload()
     assert self.bin.request_count == 1, 'invalid event listener requests count: %s' % \
                                             self.bin.request_count
     el_request = self.bin.requests[0]
     assert el_request.method == 'POST', 'invalid request method: %s' % el_request.method
     # assert the bin was posted no sooner than all tasks have finished
     tasks_finished_after_request = [task.id for task in tasks if el_request.time < task.finish_time]
     assert tasks_finished_after_request, 'tasks %s finished after request at: %s' % \
             (tasks_finished_after_request, el_request.time)
     # the request body contains a task
     el_task = Task.from_call_report_data(json.loads(el_request.body))
     # doesn't work and won't get fixed --- disabling
     # assert el_task.state == TASK_FINISHED_STATE, 'invalid task state: %s' % el_task.state
     el_task.reload(self.pulp)
     # assert proper task was posted
     assert el_task.id in [task.id for task in tasks], 'invalid task id posted: %s' % el_task.id
     assert sorted([u'pulp:repository:EventListenerRepo', u'pulp:action:sync']) == sorted(el_task.data['tags']), \
             'invalid task tags: %s' % el_task.data['tags']
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:29,代码来源:test_23_event_listener.py


示例14: test_05_repo_publish_finish

 def test_05_repo_publish_finish(self):
     self.el.update(self.pulp, {'event_types': ['repo.publish.finish']})
     self.assertPulpOK()
     self.el.reload(self.pulp)
     report = self.repo.publish(self.pulp, self.distributor.id)
     # wait till publish-induced tasks finish
     Task.wait_for_report(self.pulp, report)
     # fetch the tasks spawned for the publish to perform
     tasks = [task for task in Task.from_report(self.pulp, report) \
             if u'pulp:action:publish' in task.data['tags']]
     assert tasks, 'no tasks induced'
     # assert bin status
     self.bin.reload()
     assert self.bin.request_count == 1, 'invalid event listener requests count: %s' % \
             self.bin.request_count
     el_request = self.bin.requests[0]
     # assert request method
     assert el_request.method == 'POST', 'invalid request method: %s' % el_request.method
     # assert the request was made after all tasks finished
     tasks_finished_after_request = [task.id for task in tasks if el_request.time < task.finish_time]
     # doesn't work --- disabling
     #assert tasks_finished_after_request == [], '%s finished after request at %s' % \
     #        (tasks_finished_after_request, el_request.time)
     # the request body contains a task
     el_task = Task.from_call_report_data(json.loads(el_request.body))
     #assert el_task.state == TASK_FINISHED_STATE, 'invalid task state: %s' % el_task.state
     el_task.reload(self.pulp)
     # assert proper task was posted
     assert el_task.id in [task.id for task in tasks], 'invalid task id posted: %s' % el_task.id
     assert sorted([u'pulp:repository:EventListenerRepo', u'pulp:action:publish']) == sorted(el_task.data['tags']), \
             'invalid task tags: %s' % el_task.data['tags']
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:31,代码来源:test_23_event_listener.py


示例15: test_01_publish_repo_group_with_no_members_bz1148937

 def test_01_publish_repo_group_with_no_members_bz1148937(self):
     response = self.repo_group.publish(
         self.pulp,
         'dist_1'
     )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
开发者ID:RedHatQE,项目名称:pulp-automation,代码行数:7,代码来源:test_04_repo_group_publish.py


示例16: test_12_publish_repo

 def test_12_publish_repo(self):
     response = self.repo.publish(
         self.pulp,
         'dist_1'
     )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
开发者ID:jeremycline,项目名称:pulp-automation,代码行数:7,代码来源:test_2_rpm_repo.py


示例17: test_05_associate_importer

 def test_05_associate_importer(self):
     response = self.repo.associate_importer(
         self.pulp,
         data={
             'importer_type_id': 'docker_importer',
             'importer_config': {
                 'feed': self.feed,
                  "upstream_name": "busybox"
                                  }
         }
     )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
     importer = self.repo.get_importer(self.pulp, "docker_importer")
     self.assertEqual(
         importer,
         {
             'id': 'docker_importer',
             'importer_type_id': 'docker_importer',
             'repo_id': self.repo.id,
             'config': {
                 'feed': self.feed,
                 "upstream_name": "busybox"
             },
             'last_sync': None
         }
     )
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:27,代码来源:test_20_docker_repo_cud.py


示例18: test_99_node_unbind_repo

 def test_99_node_unbind_repo(self):
     self.node.unbind_repo(self.pulp, self.repo.id, self.node_distributor.id)
     self.assertPulpOK()
     # nodes keep the repos after updating
     child_repos = Repo.list(self.pulp_child)
     for repo in child_repos:
         Task.wait_for_report(self.pulp_child, repo.delete(self.pulp_child))
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:7,代码来源:test_01_cud.py


示例19: test_05_unassociate_module_from_copied_repo_1076628

 def test_05_unassociate_module_from_copied_repo_1076628(self):
     # unassociate unit from a copied repo
     # https://bugzilla.redhat.com/show_bug.cgi?id=1076628
     response = self.dest_repo1.unassociate_units(
         self.pulp, data={"criteria": {"type_ids": ["puppet_module"], "filters": {"unit": {"name": "tomcat7_rhel"}}}}
     )
     self.assertPulp(code=202)
     Task.wait_for_report(self.pulp, response)
开发者ID:preethit,项目名称:pulp-automation,代码行数:8,代码来源:test_12_puppet_repo_orphans_and_copy.py


示例20: wrapper_ctx

 def wrapper_ctx(thing):
     with calling_method(thing, 'delete', pulp) as thing:
         yield thing
     # async-delete hacks
     if pulp.last_response.status_code == 202:
         Task.wait_for_report(pulp, pulp.last_response)
     assert pulp.is_ok, 'deleting %s caused pulp not feeling ok: %s' % \
             (thing, pulp.last_response)
开发者ID:CUXIDUMDUM,项目名称:pulp-automation,代码行数:8,代码来源:pulp_test.py



注:本文中的pulp_auto.task.Task类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python units.Orphans类代码示例发布时间:2022-05-25
下一篇:
Python repo.Repo类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap