• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test_support.execute_until_empty函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mapreduce.test_support.execute_until_empty函数的典型用法代码示例。如果您正苦于以下问题:Python execute_until_empty函数的具体用法?Python execute_until_empty怎么用?Python execute_until_empty使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了execute_until_empty函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: testSortFile

    def testSortFile(self):
        """Test sorting a file."""
        bucket_name = "testbucket"
        test_filename = "testfile"
        full_filename = "/%s/%s" % (bucket_name, test_filename)

        input_data = [(str(i), "_" + str(i)) for i in range(100)]

        with cloudstorage.open(full_filename, mode="w") as f:
            with records.RecordsWriter(f) as w:
                for (k, v) in input_data:
                    proto = kv_pb.KeyValue()
                    proto.set_key(k)
                    proto.set_value(v)
                    w.write(proto.Encode())

        p = shuffler._SortChunksPipeline("testjob", bucket_name, [[full_filename]])
        p.start()
        test_support.execute_until_empty(self.taskqueue)
        p = shuffler._SortChunksPipeline.from_id(p.pipeline_id)

        input_data.sort()
        output_files = p.outputs.default.value[0]
        output_data = []
        for output_file in output_files:
            with cloudstorage.open(output_file) as f:
                for binary_record in records.RecordsReader(f):
                    proto = kv_pb.KeyValue()
                    proto.ParseFromString(binary_record)
                    output_data.append((proto.key(), proto.value()))

        self.assertEquals(input_data, output_data)
        self.assertEquals(1, len(self.emails))
开发者ID:xinweiliusc,项目名称:appengine-mapreduce,代码行数:33,代码来源:shuffler_end_to_end_test.py


示例2: _run_test

  def _run_test(self, num_shards, num_files):
    bucket_name = "testing"
    object_prefix = "file-"
    job_name = "test_map"
    input_class = (input_readers.__name__ + "." +
                   input_readers._GoogleCloudStorageInputReader.__name__)

    expected_content = self.create_test_content(bucket_name,
                                                object_prefix,
                                                num_files)

    control.start_map(
        job_name,
        __name__ + "." + "_input_reader_memory_mapper",
        input_class,
        {
            "input_reader": {
                "bucket_name": bucket_name,
                "objects": [object_prefix + "*"]
            },
        },
        shard_count=num_shards)

    test_support.execute_until_empty(self.taskqueue)
    self.assertEqual(expected_content.sort(), _memory_mapper_data.sort())
开发者ID:Karima-Kaddouri,项目名称:appengine-mapreduce,代码行数:25,代码来源:input_readers_end_to_end_test.py


示例3: testMergeFiles

    def testMergeFiles(self):
        """Test merging multiple files."""
        input_data = [(str(i), "_" + str(i)) for i in range(100)]
        input_data.sort()

        bucket_name = "testbucket"
        test_filename = "testfile"
        full_filename = "/%s/%s" % (bucket_name, test_filename)

        with cloudstorage.open(full_filename, mode="w") as f:
            with records.RecordsWriter(f) as w:
                for (k, v) in input_data:
                    proto = kv_pb.KeyValue()
                    proto.set_key(k)
                    proto.set_value(v)
                    w.write(proto.Encode())

        p = TestMergePipeline(bucket_name, [full_filename, full_filename, full_filename])
        p.start()
        test_support.execute_until_empty(self.taskqueue)
        p = TestMergePipeline.from_id(p.pipeline_id)

        output_file = p.outputs.default.value[0]
        output_data = []
        with cloudstorage.open(output_file) as f:
            for record in records.RecordsReader(f):
                output_data.append(record)

        expected_data = [str((k, [v, v, v], False)) for (k, v) in input_data]
        self.assertEquals(expected_data, output_data)
        self.assertEquals(1, len(self.emails))
开发者ID:xinweiliusc,项目名称:appengine-mapreduce,代码行数:31,代码来源:shuffler_end_to_end_test.py


示例4: testShardRetry

  def testShardRetry(self):
    entity_count = 200
    db.delete(TestOutputEntity.all())
    db.delete(RetryCount.all())

    for i in range(entity_count):
      TestEntity(data=str(i)).put()

    p = mapper_pipeline.MapperPipeline(
        "test",
        handler_spec=__name__ + ".test_shard_retry_map",
        input_reader_spec=input_readers.__name__ + ".DatastoreInputReader",
        params={
            "input_reader": {
                "entity_kind": __name__ + "." + TestEntity.__name__,
            },
        },
        shards=5)
    p.start()
    test_support.execute_until_empty(self.taskqueue)

    self.assertEquals(1, len(self.emails))
    self.assertTrue(self.emails[0][1].startswith(
        "Pipeline successful:"))

    p = mapper_pipeline.MapperPipeline.from_id(p.pipeline_id)
    outputs = []
    for output in TestOutputEntity.all():
      outputs.append(int(output.data))
    outputs.sort()

    expected_outputs = [i for i in range(entity_count)]
    expected_outputs.sort()
    self.assertEquals(expected_outputs, outputs)
开发者ID:Batterii,项目名称:appengine-mapreduce,代码行数:34,代码来源:mapper_pipeline_test.py


示例5: testShardRetryTooMany

  def testShardRetryTooMany(self):
    entity_count = 200
    db.delete(TestOutputEntity.all())
    db.delete(RetryCount.all())

    for i in range(entity_count):
      TestEntity(data=str(i)).put()

    p = mapper_pipeline.MapperPipeline(
        "test",
        handler_spec=__name__ + ".test_shard_retry_too_many_map",
        input_reader_spec=input_readers.__name__ + ".DatastoreInputReader",
        params={
            "input_reader": {
                "entity_kind": __name__ + "." + TestEntity.__name__,
            },
        },
        shards=5)
    p.max_attempts = 1
    p.start()
    test_support.execute_until_empty(self.taskqueue)

    state = model.MapreduceState.all().get()
    self.assertEqual(model.MapreduceState.RESULT_FAILED, state.result_status)

    self.assertEquals(1, len(self.emails))
    self.assertTrue(self.emails[0][1].startswith(
        "Pipeline aborted:"))
开发者ID:Batterii,项目名称:appengine-mapreduce,代码行数:28,代码来源:mapper_pipeline_test.py


示例6: testFailedMap

  def testFailedMap(self):
    for i in range(1):
      TestEntity(data=str(i)).put()

    pipeline.pipeline._DEFAULT_MAX_ATTEMPTS = 1

    p = mapper_pipeline.MapperPipeline(
        "test",
        handler_spec=__name__ + ".test_fail_map",
        input_reader_spec=input_readers.__name__ + ".DatastoreInputReader",
        params={
            "input_reader": {
                "entity_kind": __name__ + "." + TestEntity.__name__,
            },
        },
        shards=5)
    p.start()
    test_support.execute_until_empty(self.taskqueue)

    p = mapper_pipeline.MapperPipeline.from_id(p.pipeline_id)
    self.assertTrue(p.was_aborted)

    self.assertTrue(p.outputs.job_id.filled)
    state = model.MapreduceState.get_by_job_id(p.outputs.job_id.value)
    self.assertEqual(model.MapreduceState.RESULT_FAILED, state.result_status)
    self.assertFalse(p.outputs.result_status.filled)
    self.assertFalse(p.outputs.default.filled)

    self.assertEquals(1, len(self.emails))
    self.assertTrue(self.emails[0][1].startswith(
        "Pipeline aborted:"))
开发者ID:Batterii,项目名称:appengine-mapreduce,代码行数:31,代码来源:mapper_pipeline_test.py


示例7: testNoCombiner

    def testNoCombiner(self):
        """Test running with low values count but without combiner."""
        # Even though this test doesn't have combiner specified, it's still
        # interesting to run. It forces MergePipeline to produce partial
        # key values and we verify that they are combined correctly in reader.

        # Prepare test data
        entity_count = 200

        for i in range(entity_count):
            TestEntity(data=str(i)).put()
            TestEntity(data=str(i)).put()

        p = mapreduce_pipeline.MapreducePipeline(
            "test",
            __name__ + ".test_combiner_map",
            __name__ + ".test_combiner_reduce",
            input_reader_spec=input_readers.__name__ + ".DatastoreInputReader",
            output_writer_spec=output_writers.__name__ + ".BlobstoreOutputWriter",
            mapper_params={"entity_kind": __name__ + ".TestEntity"},
            shards=4,
        )
        p.start()
        test_support.execute_until_empty(self.taskqueue)

        p = mapreduce_pipeline.MapreducePipeline.from_id(p.pipeline_id)
        self.assertEquals(1, len(p.outputs.default.value))
        output_file = p.outputs.default.value[0]

        file_content = []
        with files.open(output_file, "r") as f:
            file_content = sorted(f.read(10000000).strip().split("\n"))

        self.assertEquals(["('0', 9800)", "('1', 9900)", "('2', 10000)", "('3', 10100)"], file_content)
开发者ID:stefanojames,项目名称:KhanLatest,代码行数:34,代码来源:combiner_test.py


示例8: testEmptyMapper

  def testEmptyMapper(self):
    """Test empty mapper over empty dataset."""
    p = mapper_pipeline.MapperPipeline(
        "empty_map",
        handler_spec=__name__ + ".test_empty_handler",
        input_reader_spec=input_readers.__name__ + ".DatastoreInputReader",
        params={
            "input_reader": {
                "entity_kind": __name__ + ".TestEntity",
                # Test datetime can be json serialized.
                "filters": [("dt", "=", datetime.datetime(2000, 1, 1))],
                },
            },
        )
    p.start()
    test_support.execute_until_empty(self.taskqueue)

    self.assertEquals(1, len(self.emails))
    self.assertTrue(self.emails[0][1].startswith(
        "Pipeline successful:"))

    p = mapper_pipeline.MapperPipeline.from_id(p.pipeline_id)
    self.assertTrue(p.outputs.job_id.value)

    counters = p.outputs.counters.value
    self.assertTrue(counters)
    self.assertTrue(context.COUNTER_MAPPER_WALLTIME_MS in counters)
开发者ID:johnwlockwood,项目名称:appengine-mapreduce,代码行数:27,代码来源:mapper_pipeline_test.py


示例9: testFailedMapReduce

  def testFailedMapReduce(self):
     # Add some random data.
    entity_count = 200

    for i in range(entity_count):
      TestEntity(data=str(i)).put()
      TestEntity(data=str(i)).put()

    p = mapreduce_pipeline.MapreducePipeline(
        "test",
        __name__ + ".test_failed_map",
        __name__ + ".test_mapreduce_reduce",
        input_reader_spec=input_readers.__name__ + ".DatastoreInputReader",
        output_writer_spec=(
            output_writers.__name__ + ".BlobstoreRecordsOutputWriter"),
        mapper_params={
            "entity_kind": __name__ + "." + TestEntity.__name__,
            },
        shards=16)
    p.start()
    test_support.execute_until_empty(self.taskqueue)

    p = mapreduce_pipeline.MapreducePipeline.from_id(p.pipeline_id)
    self.assertEqual(model.MapreduceState.RESULT_FAILED,
                     p.outputs.result_status.value)
    self.assertEqual(0, len(p.outputs.default.value))
开发者ID:OsoTech,项目名称:appengine-mapreduce,代码行数:26,代码来源:mapreduce_pipeline_test.py


示例10: testHugeTaskUseDatastore

  def testHugeTaskUseDatastore(self):
    """Test map job with huge parameter values."""
    input_file = files.blobstore.create()
    input_data = [str(i) for i in range(100)]

    with files.open(input_file, "a") as f:
      with records.RecordsWriter(f) as w:
        for record in input_data:
          w.write(record)
    files.finalize(input_file)
    input_file = files.blobstore.get_file_name(
        files.blobstore.get_blob_key(input_file))

    mapreduce_id = control.start_map(
        "test_map",
        __name__ + ".TestHandler",
        "mapreduce.input_readers.RecordsReader",
        {
            "file": input_file,
            # the parameter can't be compressed and wouldn't fit into
            # taskqueue payload
            "huge_parameter": random_string(900000)
        },
        shard_count=4,
        base_path="/mapreduce_base_path")

    test_support.execute_until_empty(self.taskqueue)
    self.assertEquals(100, len(TestHandler.processed_entites))
    self.assertEquals([], model._HugeTaskPayload.all().fetch(100))
开发者ID:Karima-Kaddouri,项目名称:appengine-mapreduce,代码行数:29,代码来源:end_to_end_test.py


示例11: testFetchEndToEnd

 def testFetchEndToEnd(self):
   """Test for through of fetcher job"""
   createMockCrawlDbDatum("http://foo.com/bar.txt")
   static_robots = "User-agent: test\nDisallow: /content_0\nDisallow: /content_1\nDisallow: /content_3"
   self.setReturnValue(url="http://foo.com/robots.txt",
       content=static_robots,
       headers={"Content-Length": len(static_robots),
         "content-type": "text/plain"})
   
   static_content = "test"
   static_content_length = len(static_content)
   self.setReturnValue(url="http://foo.com/bar.txt",
       content=static_content,
       headers={"Content-Length": static_content_length,
           "Content-Type": "text/plain"})
   p = pipelines.FetcherPipeline("FetcherPipeline",
       params={
         "entity_kind": "lakshmi.datum.CrawlDbDatum"
       },
       parser_params={
         "text/plain": __name__ + "._parserNotOutlinks"
       },
       shards=2)
   p.start()
   test_support.execute_until_empty(self.taskqueue)
开发者ID:Letractively,项目名称:lakshmi,代码行数:25,代码来源:pipeline_endtoend_test.py


示例12: testMergeFiles

    def testMergeFiles(self):
        """Test merging multiple files."""
        input_data = [(str(i), "_" + str(i)) for i in range(100)]
        input_data.sort()

        input_file = files.blobstore.create()

        with files.open(input_file, "a") as f:
            with records.RecordsWriter(f) as w:
                for (k, v) in input_data:
                    proto = file_service_pb.KeyValue()
                    proto.set_key(k)
                    proto.set_value(v)
                    w.write(proto.Encode())
        files.finalize(input_file)
        input_file = files.blobstore.get_file_name(files.blobstore.get_blob_key(input_file))

        p = TestMergePipeline([input_file, input_file, input_file])
        p.start()
        test_support.execute_until_empty(self.taskqueue)
        p = TestMergePipeline.from_id(p.pipeline_id)

        output_file = p.outputs.default.value[0]
        output_data = []
        with files.open(output_file, "r") as f:
            for record in records.RecordsReader(f):
                output_data.append(record)

        expected_data = [str((k, [v, v, v], False)) for (k, v) in input_data]
        self.assertEquals(expected_data, output_data)
开发者ID:stefanojames,项目名称:KhanLatest,代码行数:30,代码来源:shuffler_end_to_end_test.py


示例13: testShuffleFiles

    def testShuffleFiles(self):
        """Test shuffling multiple files."""
        input_data = [(str(i), str(i)) for i in range(100)]
        input_data.sort()

        input_file = files.blobstore.create()

        with files.open(input_file, "a") as f:
            with records.RecordsWriter(f) as w:
                for (k, v) in input_data:
                    proto = file_service_pb.KeyValue()
                    proto.set_key(k)
                    proto.set_value(v)
                    w.write(proto.Encode())
        files.finalize(input_file)
        input_file = files.blobstore.get_file_name(files.blobstore.get_blob_key(input_file))

        p = shuffler.ShufflePipeline("testjob", [input_file, input_file, input_file])
        p.start()
        test_support.execute_until_empty(self.taskqueue)
        p = shuffler.ShufflePipeline.from_id(p.pipeline_id)

        output_files = p.outputs.default.value
        output_data = []
        for output_file in output_files:
            with files.open(output_file, "r") as f:
                for record in records.RecordsReader(f):
                    proto = file_service_pb.KeyValues()
                    proto.ParseFromString(record)
                    output_data.append((proto.key(), proto.value_list()))
        output_data.sort()

        expected_data = sorted([(str(k), [str(v), str(v), str(v)]) for (k, v) in input_data])
        self.assertEquals(expected_data, output_data)
开发者ID:stefanojames,项目名称:KhanLatest,代码行数:34,代码来源:shuffler_end_to_end_test.py


示例14: testSingleShard

  def testSingleShard(self):
    entity_count = 1000

    for _ in range(entity_count):
      TestEntity().put()

    mapreduce_id = control.start_map(
        "test_map",
        __name__ + ".test_handler_yield_key_str",
        "mapreduce.input_readers.DatastoreInputReader",
        {
            "entity_kind": __name__ + "." + TestEntity.__name__,
        },
        shard_count=4,
        base_path="/mapreduce_base_path",
        output_writer_spec=BLOBSTORE_WRITER_NAME)

    test_support.execute_until_empty(self.taskqueue)

    mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
    filenames = output_writers.BlobstoreOutputWriter.get_filenames(
        mapreduce_state)
    self.assertEqual(1, len(filenames))
    blob_name = filenames[0]
    self.assertTrue(blob_name.startswith("/blobstore/"))
    self.assertFalse(blob_name.startswith("/blobstore/writable:"))

    with files.open(blob_name, "r") as f:
      data = f.read(10000000)
      self.assertEquals(1000, len(data.strip().split("\n")))
开发者ID:bslatkin,项目名称:8-bits,代码行数:30,代码来源:output_writers_end_to_end_test.py


示例15: testDedicatedParams

  def testDedicatedParams(self):
    entity_count = 1000

    for _ in range(entity_count):
      TestEntity().put()

    mapreduce_id = control.start_map(
        "test_map",
        __name__ + ".test_handler_yield_key_str",
        "mapreduce.input_readers.DatastoreInputReader",
        {
            "input_reader": {
                "entity_kind": __name__ + "." + TestEntity.__name__,
            },
            "output_writer": {
                "filesystem": "gs",
                "gs_bucket_name": "bucket",
            },
        },
        shard_count=4,
        base_path="/mapreduce_base_path",
        output_writer_spec=FILE_WRITER_NAME)

    test_support.execute_until_empty(self.taskqueue)

    mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
    filenames = output_writers.FileOutputWriter.get_filenames(mapreduce_state)
    self.assertEqual(1, len(filenames))
    self.assertTrue(filenames[0].startswith("/gs/bucket/"))

    with files.open(filenames[0], "r") as f:
      data = f.read(10000000)
      self.assertEquals(1000, len(data.strip().split("\n")))
开发者ID:bslatkin,项目名称:8-bits,代码行数:33,代码来源:output_writers_end_to_end_test.py


示例16: testSuccessfulRun

  def testSuccessfulRun(self):
    p = shuffler._ShuffleServicePipeline("testjob", ["file1", "file2"])
    p.start()
    test_support.execute_until_empty(self.taskqueue)

    request = self.file_service.shuffle_request
    self.assertTrue(request)
    self.assertTrue(request.shuffle_name().startswith("testjob-"))
    self.assertEquals(2, len(request.input_list()))
    self.assertEquals(1, request.input(0).format())
    self.assertEquals("file1", request.input(0).path())
    self.assertEquals(1, request.input(1).format())
    self.assertEquals("file2", request.input(1).path())
    self.assertEquals(2, len(request.output().path_list()))

    callback = request.callback()
    self.assertTrue(callback.url().startswith(
        "/mapreduce/pipeline/callback?pipeline_id="))
    self.assertEquals(self.version_id, callback.app_version_id())
    self.assertEquals("GET", callback.method())
    self.assertEquals("default", callback.queue())

    callback_task = {
        "url": callback.url(),
        "method": callback.method(),
        }
    test_support.execute_task(callback_task)
    test_support.execute_until_empty(self.taskqueue)

    p = shuffler._ShuffleServicePipeline.from_id(p.pipeline_id)
    self.assertTrue(p.has_finalized)
    output_files = p.outputs.default.value
    self.assertEquals(2, len(output_files))
    self.assertTrue(output_files[0].startswith("/blobstore/"))
    self.assertTrue(output_files[1].startswith("/blobstore/"))
开发者ID:johnwlockwood,项目名称:appengine-mapreduce,代码行数:35,代码来源:shuffler_test.py


示例17: testFailedMapReduce

    def testFailedMapReduce(self):
        bucket_name = "testbucket"
        max_attempts_before = pipeline.pipeline._DEFAULT_MAX_ATTEMPTS
        try:
            pipeline.pipeline._DEFAULT_MAX_ATTEMPTS = 1

            # Add some random data.
            entity_count = 200

            print dir(pipeline.pipeline)

            for i in range(entity_count):
                TestEntity(data=str(i)).put()
                TestEntity(data=str(i)).put()

            p = mapreduce_pipeline.MapreducePipeline(
                "test",
                __name__ + ".test_failed_map",
                __name__ + ".test_mapreduce_reduce",
                input_reader_spec=input_readers.__name__ + ".DatastoreInputReader",
                output_writer_spec=(output_writers.__name__ + "._GoogleCloudStorageRecordOutputWriter"),
                mapper_params={"entity_kind": __name__ + "." + TestEntity.__name__},
                reducer_params={"output_writer": {"bucket_name": bucket_name}},
                shards=3,
            )
            p.max_attempts = 1
            p.start()
            test_support.execute_until_empty(self.taskqueue)

            p = mapreduce_pipeline.MapreducePipeline.from_id(p.pipeline_id)
            self.assertTrue(p.was_aborted)
        finally:
            pipeline.pipeline._DEFAULT_MAX_ATTEMPTS = max_attempts_before
开发者ID:xinweiliusc,项目名称:appengine-mapreduce,代码行数:33,代码来源:mapreduce_pipeline_test.py


示例18: testOrgsForAnotherProgram

  def testOrgsForAnotherProgram(self):
    """Tests that status of organizations for another program is untouched."""
    # seed another program
    program = seeder_logic.seed(program_model.Program)

    # seed a few pre-accepted and pre-rejected organizations
    pre_accepted_orgs = []
    for i in range(2):
      org = org_utils.seedOrganization(
          program.key(), org_id='pre_accepted_org_id_%s' % i,
          status=org_model.Status.PRE_ACCEPTED)
      pre_accepted_orgs.append(org.key)

    pre_rejected_orgs = []
    for i in range(3):
      org = org_utils.seedOrganization(
          program.key(), org_id='pre_rejrected_org_id_%s' % i,
          status=org_model.Status.PRE_REJECTED)
      pre_rejected_orgs.append(org.key)

    mapreduce_control.start_map(
        'ApplyOrgAdmissionDecisions', params=self.params)
    test_support.execute_until_empty(self.get_task_queue_stub())

    # check that pre-accepted organizations are still pre-accepted
    for org_key in pre_accepted_orgs:
      org = org_key.get()
      self.assertEqual(org.status, org_model.Status.PRE_ACCEPTED)

    # check that pre-rejected organizations are still pre-rejected
    for org_key in pre_rejected_orgs:
      org = org_key.get()
      self.assertEqual(org.status, org_model.Status.PRE_REJECTED)
开发者ID:rhyolight,项目名称:nupic.son,代码行数:33,代码来源:test_apply_org_admission_decisions.py


示例19: _run_test

  def _run_test(self, num_shards, num_files, multi_slices=False):
    bucket_name = "testing"
    object_prefix = "file-"
    job_name = "test_map"
    expected_content = self.create_test_content(bucket_name,
                                                object_prefix,
                                                num_files)
    job = map_job.Job.submit(map_job.JobConfig(
        job_name=job_name,
        mapper=_InputReaderMemoryMapper,
        input_reader_cls=input_reader.GCSInputReader,
        input_reader_params={"bucket_name": bucket_name,
                             "objects": [object_prefix + "*"],
                             "path_filter": _MyPathFilter()},
        shard_count=num_shards))

    test_support.execute_until_empty(self.taskqueue)
    self.assertEqual(expected_content.sort(), _memory_mapper_data.sort())
    self.assertEqual(job.SUCCESS, job.get_status())
    self.assertEqual(
        num_files - 1,
        job.get_counter(input_reader.GCSInputReader.COUNTER_FILE_READ))
    if multi_slices:
      ss = model.ShardState.find_all_by_mapreduce_state(job._state)
      for s in ss:
        self.assertTrue(s.slice_id > 0)
开发者ID:Batterii,项目名称:appengine-mapreduce,代码行数:26,代码来源:_gcs_end_to_end_test.py


示例20: testMultipleShards

  def testMultipleShards(self):
    entity_count = 1000

    for _ in range(entity_count):
      TestEntity().put()

    mapreduce_id = control.start_map(
        "test_map",
        __name__ + ".test_handler_yield_key_str",
        DATASTORE_READER_NAME,
        {
            "entity_kind": __name__ + "." + TestEntity.__name__,
            "output_sharding": "input",
        },
        shard_count=4,
        base_path="/mapreduce_base_path",
        output_writer_spec=BLOBSTORE_WRITER_NAME)

    test_support.execute_until_empty(self.taskqueue)

    mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
    filenames = output_writers.BlobstoreOutputWriter.get_filenames(
        mapreduce_state)
    self.assertEqual(4, len(filenames))

    file_lengths = []
    for filename in filenames:
      self.assertTrue(filename.startswith("/blobstore/"))
      self.assertFalse(filename.startswith("/blobstore/writable:"))

      with files.open(filename, "r") as f:
        data = f.read(10000000)
        file_lengths.append(len(data.strip().split("\n")))

    self.assertEqual(1000, sum(file_lengths))
开发者ID:johnwlockwood,项目名称:appengine-mapreduce,代码行数:35,代码来源:output_writers_end_to_end_test.py



注:本文中的mapreduce.test_support.execute_until_empty函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python util.create_datastore_write_config函数代码示例发布时间:2022-05-27
下一篇:
Python simplejson.loads函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap