• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python poolmanager.proxy_from_url函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib3.poolmanager.proxy_from_url函数的典型用法代码示例。如果您正苦于以下问题:Python proxy_from_url函数的具体用法?Python proxy_from_url怎么用?Python proxy_from_url使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了proxy_from_url函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_proxy_verified

    def test_proxy_verified(self):
        http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
                              ca_certs=DEFAULT_CA_BAD)
        https_pool = http._new_pool('https', self.https_host,
                                    self.https_port)
        try:
            https_pool.request('GET', '/')
            self.fail("Didn't raise SSL error with wrong CA")
        except SSLError as e:
            self.assertTrue('certificate verify failed' in str(e),
                            "Expected 'certificate verify failed',"
                            "instead got: %r" % e)

        http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
                              ca_certs=DEFAULT_CA)
        https_pool = http._new_pool('https', self.https_host,
                                    self.https_port)

        conn = https_pool._new_conn()
        self.assertEqual(conn.__class__, VerifiedHTTPSConnection)
        https_pool.request('GET', '/')  # Should succeed without exceptions.

        http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
                              ca_certs=DEFAULT_CA)
        https_fail_pool = http._new_pool('https', '127.0.0.1', self.https_port)

        try:
            https_fail_pool.request('GET', '/')
            self.fail("Didn't raise SSL invalid common name")
        except SSLError as e:
            self.assertTrue("doesn't match" in str(e))
开发者ID:fredthomsen,项目名称:urllib3,代码行数:31,代码来源:test_proxy_poolmanager.py


示例2: test_simple

    def test_simple(self):
        base_url = "http://%s:%d" % (self.host, self.port)
        proxy = proxy_from_url(base_url)

        def echo_socket_handler(listener):
            sock = listener.accept()[0]

            buf = b""
            while not buf.endswith(b"\r\n\r\n"):
                buf += sock.recv(65536)

            sock.send(
                (
                    "HTTP/1.1 200 OK\r\n"
                    "Content-Type: text/plain\r\n"
                    "Content-Length: %d\r\n"
                    "\r\n"
                    "%s" % (len(buf), buf.decode("utf-8"))
                ).encode("utf-8")
            )

        self._start_server(echo_socket_handler)

        r = proxy.request("GET", "http://google.com/")

        self.assertEqual(r.status, 200)
        self.assertEqual(
            r.data,
            b"GET http://google.com/ HTTP/1.1\r\n"
            b"Host: google.com\r\n"
            b"Accept-Encoding: identity\r\n"
            b"Accept: */*\r\n"
            b"\r\n",
        )
开发者ID:eranrund,项目名称:urllib3,代码行数:34,代码来源:test_socketlevel.py


示例3: test_proxy_conn_fail

 def test_proxy_conn_fail(self):
     host, port = get_unreachable_address()
     http = proxy_from_url('http://%s:%s/' % (host, port))
     self.assertRaises(ProxyError, http.request, 'GET',
                       '%s/' % self.https_url)
     self.assertRaises(ProxyError, http.request, 'GET',
                       '%s/' % self.http_url)
开发者ID:1T,项目名称:urllib3,代码行数:7,代码来源:test_proxy_poolmanager.py


示例4: test_simple

    def test_simple(self):
        base_url = 'http://%s:%d' % (self.host, self.port)
        proxy = proxy_from_url(base_url)

        def echo_socket_handler(listener):
            sock = listener.accept()[0]

            buf = b''
            while not buf.endswith(b'\r\n\r\n'):
                buf += sock.recv(65536)

            sock.send(('HTTP/1.1 200 OK\r\n'
                      'Content-Type: text/plain\r\n'
                      'Content-Length: %d\r\n'
                      '\r\n'
                      '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))

        self._start_server(echo_socket_handler)

        r = proxy.request('GET', 'http://google.com/')

        self.assertEqual(r.status, 200)
        self.assertEqual(r.data, b'GET http://google.com/ HTTP/1.1\r\n'
                                 b'Host: google.com\r\n'
                                 b'Accept-Encoding: identity\r\n'
                                 b'Accept: */*\r\n'
                                 b'\r\n')
开发者ID:thatguystone,项目名称:urllib3,代码行数:27,代码来源:test_socketlevel.py


示例5: get_connection

    def get_connection(self, url, proxies=None):
        """Returns a urllib3 connection for the given URL. This should not be
        called from user code, and is only exposed for use when subclassing the
        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.

        :param url: The URL to connect to.
        :param proxies: (optional) A Requests-style dictionary of proxies used on this request.
        """
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            except_on_missing_scheme(proxy)
            proxy_headers = self.proxy_headers(proxy)

            if not proxy in self.proxy_manager:
                self.proxy_manager[proxy] = proxy_from_url(
                                                proxy,
                                                proxy_headers=proxy_headers,
                                                num_pools=self._pool_connections,
                                                maxsize=self._pool_maxsize,
                                                block=self._pool_block)

            conn = self.proxy_manager[proxy].connection_from_url(url)
        else:
            # Only scheme should be lower case
            parsed = urlparse(url)
            url = parsed.geturl()
            conn = self.poolmanager.connection_from_url(url)

        return conn
开发者ID:KauzClay,项目名称:waggle,代码行数:31,代码来源:adapters.py


示例6: test_retries

    def test_retries(self):
        def echo_socket_handler(listener):
            sock = listener.accept()[0]
            # First request, which should fail
            sock.close()

            # Second request
            sock = listener.accept()[0]

            buf = b''
            while not buf.endswith(b'\r\n\r\n'):
                buf += sock.recv(65536)

            sock.send(('HTTP/1.1 200 OK\r\n'
                      'Content-Type: text/plain\r\n'
                      'Content-Length: %d\r\n'
                      '\r\n'
                      '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
            sock.close()

        self._start_server(echo_socket_handler)
        base_url = 'http://%s:%d' % (self.host, self.port)

        proxy = proxy_from_url(base_url)
        conn = proxy.connection_from_url('http://www.google.com')

        r = conn.urlopen('GET', 'http://www.google.com',
                         assert_same_host=False, retries=1)
        self.assertEqual(r.status, 200)

        self.assertRaises(ProxyError, conn.urlopen, 'GET',
                'http://www.google.com',
                assert_same_host=False, retries=False)
开发者ID:smihulet,项目名称:urllib3,代码行数:33,代码来源:test_socketlevel.py


示例7: test_simple

    def test_simple(self):
        def echo_socket_handler(listener):
            sock = listener.accept()[0]

            buf = b''
            while not buf.endswith(b'\r\n\r\n'):
                buf += sock.recv(65536)

            sock.send(('HTTP/1.1 200 OK\r\n'
                      'Content-Type: text/plain\r\n'
                      'Content-Length: %d\r\n'
                      '\r\n'
                      '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
            sock.close()

        self._start_server(echo_socket_handler)
        base_url = 'http://%s:%d' % (self.host, self.port)
        proxy = proxy_from_url(base_url)

        r = proxy.request('GET', 'http://google.com/')

        self.assertEqual(r.status, 200)
        # FIXME: The order of the headers is not predictable right now. We
        # should fix that someday (maybe when we migrate to
        # OrderedDict/MultiDict).
        self.assertEqual(sorted(r.data.split(b'\r\n')),
                         sorted([
                             b'GET http://google.com/ HTTP/1.1',
                             b'Host: google.com',
                             b'Accept-Encoding: identity',
                             b'Accept: */*',
                             b'',
                             b'',
                         ]))
开发者ID:Altynai,项目名称:urllib3,代码行数:34,代码来源:test_socketlevel.py


示例8: test_https_proxy_timeout

 def test_https_proxy_timeout(self):
     https = proxy_from_url('https://{host}'.format(host=TARPIT_HOST))
     try:
         https.request('GET', self.http_url, timeout=0.001)
         self.fail("Failed to raise retry error.")
     except MaxRetryError as e:
        self.assertEqual(type(e.reason), ConnectTimeoutError)
开发者ID:fredthomsen,项目名称:urllib3,代码行数:7,代码来源:test_proxy_poolmanager.py


示例9: test_headers

    def test_headers(self):
        def echo_socket_handler(listener):
            sock = listener.accept()[0]

            buf = b''
            while not buf.endswith(b'\r\n\r\n'):
                buf += sock.recv(65536)

            sock.send(('HTTP/1.1 200 OK\r\n'
                      'Content-Type: text/plain\r\n'
                      'Content-Length: %d\r\n'
                      '\r\n'
                      '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
            sock.close()

        self._start_server(echo_socket_handler)
        base_url = 'http://%s:%d' % (self.host, self.port)

        # Define some proxy headers.
        proxy_headers = HTTPHeaderDict({'For The Proxy': 'YEAH!'})
        proxy = proxy_from_url(base_url, proxy_headers=proxy_headers)

        conn = proxy.connection_from_url('http://www.google.com/')

        r = conn.urlopen('GET', 'http://www.google.com/', assert_same_host=False)

        self.assertEqual(r.status, 200)
        # FIXME: The order of the headers is not predictable right now. We
        # should fix that someday (maybe when we migrate to
        # OrderedDict/MultiDict).
        self.assertTrue(b'For The Proxy: YEAH!\r\n' in r.data)
开发者ID:Altynai,项目名称:urllib3,代码行数:31,代码来源:test_socketlevel.py


示例10: test_basic_ipv6_proxy

    def test_basic_ipv6_proxy(self):
        http = proxy_from_url(self.proxy_url)

        r = http.request('GET', '%s/' % self.http_url)
        self.assertEqual(r.status, 200)

        r = http.request('GET', '%s/' % self.https_url)
        self.assertEqual(r.status, 200)
开发者ID:fredthomsen,项目名称:urllib3,代码行数:8,代码来源:test_proxy_poolmanager.py


示例11: test_https_proxy_pool_timeout

 def test_https_proxy_pool_timeout(self):
     https = proxy_from_url('https://{host}'.format(host=TARPIT_HOST),
                            timeout=0.001)
     try:
         https.request('GET', self.http_url)
         self.fail("Failed to raise retry error.")
     except MaxRetryError as e:
         assert isinstance(e.reason, ConnectTimeoutError)
开发者ID:Eyal87,项目名称:repository.eyal,代码行数:8,代码来源:test_proxy_poolmanager.py


示例12: test_connect_reconn

    def test_connect_reconn(self):
        def proxy_ssl_one(listener):
            sock = listener.accept()[0]

            buf = b""
            while not buf.endswith(b"\r\n\r\n"):
                buf += sock.recv(65536)
            s = buf.decode("utf-8")
            if not s.startswith("CONNECT "):
                sock.send(("HTTP/1.1 405 Method not allowed\r\n" "Allow: CONNECT\r\n\r\n").encode("utf-8"))
                sock.close()
                return

            if not s.startswith("CONNECT %s:443" % (self.host,)):
                sock.send(("HTTP/1.1 403 Forbidden\r\n\r\n").encode("utf-8"))
                sock.close()
                return

            sock.send(("HTTP/1.1 200 Connection Established\r\n\r\n").encode("utf-8"))
            ssl_sock = ssl.wrap_socket(
                sock,
                server_side=True,
                keyfile=DEFAULT_CERTS["keyfile"],
                certfile=DEFAULT_CERTS["certfile"],
                ca_certs=DEFAULT_CA,
            )

            buf = b""
            while not buf.endswith(b"\r\n\r\n"):
                buf += ssl_sock.recv(65536)

            ssl_sock.send(
                (
                    "HTTP/1.1 200 OK\r\n"
                    "Content-Type: text/plain\r\n"
                    "Content-Length: 2\r\n"
                    "Connection: close\r\n"
                    "\r\n"
                    "Hi"
                ).encode("utf-8")
            )
            ssl_sock.close()

        def echo_socket_handler(listener):
            proxy_ssl_one(listener)
            proxy_ssl_one(listener)

        self._start_server(echo_socket_handler)
        base_url = "http://%s:%d" % (self.host, self.port)

        proxy = proxy_from_url(base_url)

        url = "https://{0}".format(self.host)
        conn = proxy.connection_from_url(url)
        r = conn.urlopen("GET", url, retries=0)
        self.assertEqual(r.status, 200)
        r = conn.urlopen("GET", url, retries=0)
        self.assertEqual(r.status, 200)
开发者ID:boyxuper,项目名称:urllib3,代码行数:58,代码来源:test_socketlevel.py


示例13: test_scheme_host_case_insensitive

    def test_scheme_host_case_insensitive(self):
        """Assert that upper-case schemes and hosts are normalized."""
        http = proxy_from_url(self.proxy_url.upper())

        r = http.request('GET', '%s/' % self.http_url.upper())
        self.assertEqual(r.status, 200)

        r = http.request('GET', '%s/' % self.https_url.upper())
        self.assertEqual(r.status, 200)
开发者ID:fredthomsen,项目名称:urllib3,代码行数:9,代码来源:test_proxy_poolmanager.py


示例14: test_basic_ipv6_proxy

    def test_basic_ipv6_proxy(self):
        http = proxy_from_url(self.proxy_url, ca_certs=DEFAULT_CA)
        self.addCleanup(http.clear)

        r = http.request('GET', '%s/' % self.http_url)
        self.assertEqual(r.status, 200)

        r = http.request('GET', '%s/' % self.https_url)
        self.assertEqual(r.status, 200)
开发者ID:jonparrott,项目名称:urllib3,代码行数:9,代码来源:test_proxy_poolmanager.py


示例15: test_nagle_proxy

 def test_nagle_proxy(self):
     """ Test that proxy connections do not have TCP_NODELAY turned on """
     http = proxy_from_url(self.proxy_url)
     hc2 = http.connection_from_host(self.http_host, self.http_port)
     conn = hc2._get_conn()
     hc2._make_request(conn, 'GET', '/')
     tcp_nodelay_setting = conn.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
     self.assertEqual(tcp_nodelay_setting, 0,
                      ("Expected TCP_NODELAY for proxies to be set "
                       "to zero, instead was %s" % tcp_nodelay_setting))
开发者ID:fredthomsen,项目名称:urllib3,代码行数:10,代码来源:test_proxy_poolmanager.py


示例16: test_connect_reconn

    def test_connect_reconn(self):
        def proxy_ssl_one(listener):
            sock = listener.accept()[0]

            buf = b''
            while not buf.endswith(b'\r\n\r\n'):
                buf += sock.recv(65536)
            s = buf.decode('utf-8')
            if not s.startswith('CONNECT '):
                sock.send(('HTTP/1.1 405 Method not allowed\r\n'
                           'Allow: CONNECT\r\n\r\n').encode('utf-8'))
                sock.close()
                return

            if not s.startswith('CONNECT %s:443' % (self.host,)):
                sock.send(('HTTP/1.1 403 Forbidden\r\n\r\n').encode('utf-8'))
                sock.close()
                return

            sock.send(('HTTP/1.1 200 Connection Established\r\n\r\n').encode('utf-8'))
            ssl_sock = ssl.wrap_socket(sock,
                                       server_side=True,
                                       keyfile=DEFAULT_CERTS['keyfile'],
                                       certfile=DEFAULT_CERTS['certfile'],
                                       ca_certs=DEFAULT_CA)

            buf = b''
            while not buf.endswith(b'\r\n\r\n'):
                buf += ssl_sock.recv(65536)

            ssl_sock.send(('HTTP/1.1 200 OK\r\n'
                           'Content-Type: text/plain\r\n'
                           'Content-Length: 2\r\n'
                           'Connection: close\r\n'
                           '\r\n'
                           'Hi').encode('utf-8'))
            ssl_sock.close()

        def echo_socket_handler(listener):
            proxy_ssl_one(listener)
            proxy_ssl_one(listener)

        self._start_server(echo_socket_handler)
        base_url = 'http://%s:%d' % (self.host, self.port)

        proxy = proxy_from_url(base_url)
        self.addCleanup(proxy.clear)

        url = 'https://{0}'.format(self.host)
        conn = proxy.connection_from_url(url)
        r = conn.urlopen('GET', url, retries=0)
        self.assertEqual(r.status, 200)
        r = conn.urlopen('GET', url, retries=0)
        self.assertEqual(r.status, 200)
开发者ID:Lukasa,项目名称:urllib3,代码行数:54,代码来源:test_socketlevel.py


示例17: test_proxy_conn_fail

    def test_proxy_conn_fail(self):
        # Get a free port on localhost, so a connection will be refused
        s = socket.socket()
        s.bind(('127.0.0.1', 0))
        free_port = s.getsockname()[1]
        s.close()

        http = proxy_from_url('http://127.0.0.1:%s/' % free_port)
        self.assertRaises(ProxyError, http.request, 'GET',
                          '%s/' % self.https_url)
        self.assertRaises(ProxyError, http.request, 'GET',
                          '%s/' % self.http_url)
开发者ID:09Hero,项目名称:urllib3,代码行数:12,代码来源:test_proxy_poolmanager.py


示例18: get_connection

    def get_connection(self, url, proxies=None):
        """Returns a connection for the given URL."""
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url).scheme)

        if proxy:
            proxy = prepend_scheme_if_needed(proxy, urlparse(url).scheme)
            conn = proxy_from_url(proxy)
        else:
            conn = self.poolmanager.connection_from_url(url)

        return conn
开发者ID:hidoba,项目名称:vk4xmpp,代码行数:12,代码来源:adapters.py


示例19: test_proxy_conn_fail

    def test_proxy_conn_fail(self):
        host, port = get_unreachable_address()
        http = proxy_from_url('http://%s:%s/' % (host, port), retries=1, timeout=0.05)
        self.assertRaises(MaxRetryError, http.request, 'GET',
                          '%s/' % self.https_url)
        self.assertRaises(MaxRetryError, http.request, 'GET',
                          '%s/' % self.http_url)

        try:
            http.request('GET', '%s/' % self.http_url)
            self.fail("Failed to raise retry error.")
        except MaxRetryError as e:
            self.assertEqual(type(e.reason), ProxyError)
开发者ID:fredthomsen,项目名称:urllib3,代码行数:13,代码来源:test_proxy_poolmanager.py


示例20: test_redirect

    def test_redirect(self):
        http = proxy_from_url(self.proxy_url)

        r = http.request('GET', '%s/redirect' % self.http_url,
                         fields={'target': '%s/' % self.http_url},
                         redirect=False)

        self.assertEqual(r.status, 303)

        r = http.request('GET', '%s/redirect' % self.http_url,
                         fields={'target': '%s/' % self.http_url})

        self.assertEqual(r.status, 200)
        self.assertEqual(r.data, b'Dummy server!')
开发者ID:fredthomsen,项目名称:urllib3,代码行数:14,代码来源:test_proxy_poolmanager.py



注:本文中的urllib3.poolmanager.proxy_from_url函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python poolmanager.PoolManager类代码示例发布时间:2022-05-27
下一篇:
Python six.u函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap