• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python urllib3.connection_from_url函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib3.connection_from_url函数的典型用法代码示例。如果您正苦于以下问题:Python connection_from_url函数的具体用法?Python connection_from_url怎么用?Python connection_from_url使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了connection_from_url函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _init

    def _init(self, path_to_tx=None):
        instructions = "Run 'tx init' to initialize your project first!"
        try:
            self.root = self._get_tx_dir_path(path_to_tx)
            self.config_file = self._get_config_file_path(self.root)
            self.config = self._read_config_file(self.config_file)

            local_txrc_file = self._get_transifex_file(os.getcwd())
            if os.path.exists(local_txrc_file):
                self.txrc_file = local_txrc_file
            else:
                self.txrc_file = self._get_transifex_file()
            self.txrc = self._get_transifex_config([self.txrc_file, ])
        except ProjectNotInit as e:
            logger.error('\n'.join([six.u(str(e)), instructions]))
            raise
        host = self.config.get('main', 'host')
        if host.lower().startswith('https://'):
            self.conn = urllib3.connection_from_url(
                host,
                cert_reqs=ssl.CERT_REQUIRED,
                ca_certs=certs_file()
            )
        else:
            self.conn = urllib3.connection_from_url(host)
开发者ID:akx,项目名称:transifex-client,代码行数:25,代码来源:project.py


示例2: parse_feed_parallel

def parse_feed_parallel(num, feed_options_item, all_links, queue, t_limit=None):
    """
    Parallel creation of a RSSItem for each post in the feed.

    :param num: The feed's number in the list. For DEBUG purposes
    :param feed_options_item: The RSS Feed options
    :param all_links: A set of all the links in the database
    :param queue: A Queue to store the resulting RSSPost objects
    :param t_limit: An integer used to limit the number of running threads
    """

    t1 = millis()

    # Read the feed XML and store it as a string
    try:
        a = urllib.urlopen(feed_options_item.feed_url).read()
    except IOError as e:
        logger.error("Getting XML for feed %s failed. No posts from this feed will be processed"
                     % feed_options_item.feed_url)
        return

    d = speedparser.parse(a, clean_html=False)  # SpeedParser is ~10 times faster than FeedParser

    t2 = millis()
    logger.debug("%d %s with %d posts, SpeedParser done in: %d ms" %
                 (num, feed_options_item.feed_url, len(d.entries), (t2-t1)))

    # Create a thread for each entry in the feed which is not present in the database
    threads = []
    http = None
    if 'feedburner' in feed_options_item.feed_url:
        # Get the host of the first original link
        http = urllib3.connection_from_url(d.entries[0].get("id", d.entries[0].link), maxsize=40, block=True)
    else:
        # Got maxsize=40 experimentally as best value
        http = urllib3.connection_from_url(feed_options_item.feed_url, maxsize=40, block=True)

    # Fill threads list
    for entry in d.entries:
        if 'feedproxy.google' in entry.link:    # FeedProxy workaround
            if entry.get("id", entry.link) not in all_links:
                threads.append(threading.Thread(target=get_html3, args=(http, entry, feed_options_item, queue)))
        else:
            if entry.link not in all_links:
                threads.append(threading.Thread(target=get_html3, args=(http, entry, feed_options_item, queue)))

    # Run threads depending on thread limit
    if t_limit:
        for i in range(0, len(threads), t_limit):
            for j in range(min(t_limit, len(threads) - i)):
                threads[i+j].start()
            for j in range(min(t_limit, len(threads) - i)):
                threads[i+j].join()

    # If t_limit is None, run all threads at once
    else:
        for t in threads:
            t.start()
        for t in threads:
            t.join()
开发者ID:T1T4N,项目名称:MiniTimeMK,代码行数:60,代码来源:updater.py


示例3: make_request

def make_request(method, host, url, username, password, fields=None):
    if host.lower().startswith('https://'):
        connection = urllib3.connection_from_url(
            host,
            cert_reqs=ssl.CERT_REQUIRED,
            ca_certs=certs_file()
        )
    else:
        connection = urllib3.connection_from_url(host)
    headers = urllib3.util.make_headers(
        basic_auth='{0}:{1}'.format(username, password),
        accept_encoding=True,
        user_agent=user_agent_identifier(),
        keep_alive=True
    )
    r = None
    try:
        r = connection.request(method, url, headers=headers, fields=fields)
        data = r.data
        charset = determine_charset(r)
        if isinstance(data, bytes):
            data = data.decode(charset)
        if r.status < 200 or r.status >= 400:
            if r.status == 404:
                raise HttpNotFound(data)
            else:
                raise Exception(data)
        return data, charset
    except SSLError:
        logger.error("Invalid SSL certificate")
        raise
    finally:
        if not r is None:
            r.close()
开发者ID:shimizukawa,项目名称:transifex-client,代码行数:34,代码来源:utils.py


示例4: __init__

    def __init__(self, api_host, api_mobile_host, customer_id, secret_key, ssl=True):
        self._customer_id = customer_id
        self._secret_key = secret_key
        self._api_host = api_host
        self._api_mobile_host = api_mobile_host

        http_root = "https://" if ssl else "http://"
        self._pool = urllib3.connection_from_url(http_root + api_host)
        self._pool_mobile = urllib3.connection_from_url(http_root + api_mobile_host)
开发者ID:minhlq,项目名称:python_telesign,代码行数:9,代码来源:api.py


示例5: create_http_pool

def create_http_pool(settings):
    url = settings['notification_url']
    maxsize = settings['threaded.threads']  # sort of a lie, potentially
    timeout = settings['timeout']

    if settings['use_ssl']:
        ca_certs = settings['ca_certs']
        return urllib3.connection_from_url(url, maxsize=maxsize,
                                           timeout=timeout,
                                           cert_reqs='CERT_REQUIRED',
                                           ca_certs=ca_certs)

    return urllib3.connection_from_url(url, maxsize=maxsize, timeout=timeout)
开发者ID:epii,项目名称:pyramid_airbrake,代码行数:13,代码来源:submit.py


示例6: test_same_url

    def test_same_url(self):
        # Convince ourselves that normally we don't get the same object
        conn1 = connection_from_url('http://localhost:8081/foo')
        conn2 = connection_from_url('http://localhost:8081/bar')

        self.assertNotEqual(conn1, conn2)

        # Now try again using the PoolManager
        p = PoolManager(1)

        conn1 = p.connection_from_url('http://localhost:8081/foo')
        conn2 = p.connection_from_url('http://localhost:8081/bar')

        self.assertEqual(conn1, conn2)
开发者ID:poupas,项目名称:urllib3,代码行数:14,代码来源:test_poolmanager.py


示例7: __init__

    def __init__(self, uri, analysis, schema, timeout, prefetch, http_op, num_pools):
        self.__uri=URI.URI(uri)
        if self.__uri.scheme!="http" and self.__uri.scheme!="https":
            raise Exception("Scheme '%s' is not currently supported" % uri.scheme)
        self.__host=str(self.__uri)
        self.__request=self.__uri.path
        self.__host=self.__host[:self.__host.find(self.__request)]
        self.__fields=dict(self.__uri.query)
        self.__analysisfunct=analysis
        self.__parser=etree.XMLParser(schema=schema)
        self.__timeout=timeout
        self.__prefetch=prefetch
        self.__http_op=http_op

        self.__httppool=urllib3.connection_from_url(self.__host, maxsize=num_pools)
        # AutoPageCount doesn't rely on earlier pages, so we can skip a thread pool
        if isinstance(self.__analysisfunct, AutoPageCount):
            self.__threadpool=None
        else:
            self.__threadpool=ThreadPool(1)
        self.__page=0
        self.__data=[]
        for slot in xrange(0, self.__prefetch):
            self.__dataptr=slot
            self.__data.append(None)
            self.__fillData()
        # Wait for the first page in case it was a bad URL        
        if self.__getData(0).status!=200:
            raise Exception("HTTP error %d (%s)" % (self.__data[0].status, self.__data[0].reason))
开发者ID:ned14,项目名称:BEXML,代码行数:29,代码来源:PaginatedDataSource.py


示例8: getpicurl

def getpicurl(picname):
    # input: The name of a file uploaded on the iGEM 2016 Wiki-Server #
    # IMPORTANT: The picture has to be uploaded before running the script! #
    # picname=input('please paste the name of an uploaded iGEM-wiki file:\n')

    # correct picname for changes the iGEM-Server needs
    picname=picname.replace(':', '-')

    # define fix url for Wiki-Sever #
    url = 'http://2016.igem.org/File:Freiburg_%s' % picname
    # print('the url I looked for was:\n%s' %url)

    # get raw_html from url as specified here:
    #  http://stackoverflow.com/questions/17257912/how-to-print-raw-html-string-using-urllib3 #

    http_pool = urllib3.connection_from_url(url)
    r = http_pool.urlopen('GET', url)
    raw_html=r.data.decode('utf-8')

    # initialise bs-object '
    soup = BeautifulSoup(raw_html, 'html.parser')

    # find the href-link in an a-object in a div with id=file #
    try:
        serverlink = 'http://2016.igem.org'+soup.find(id='file').find('a').get('href')
        # return the link #
        return serverlink
    except:
        return None
开发者ID:smohsinali,项目名称:igemWiki,代码行数:29,代码来源:wikitranslate.py


示例9: __init__

    def __init__(self, uri, poolmanager=None, username='admin',
                 password='admin', **kwargs):
        """Constructor for the AICLib object.

        Arguments:
        uri -- the address of the nvp controller including scheme (required)

        Keyword arguments:
        poolmanager -- a pool manager provided by urlib3 (default None)
        username -- the username to log into the nvp controller
        password -- the password to log into the nvp controller
        """
        retries = kwargs.get("retries", 3)
        socket_options = \
            urllib3.connection.HTTPConnection.default_socket_options + \
            [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), ]

        if poolmanager is None:
            self.conn = urllib3.connection_from_url(
                uri,
                retries=retries,
                socket_options=socket_options)

        else:
            self.conn = poolmanager.connection_from_url(
                uri,
                retries=retries,
                socket_options=socket_options)

        self.connection = Connection(connection=self.conn,
                                     username=username,
                                     password=password,
                                     **kwargs)
开发者ID:mycloud-rax-rado,项目名称:aiclib,代码行数:33,代码来源:core.py


示例10: __init__

    def __init__(self, username, password, inputfile, connection_pool_size=10):
        self.username = username
        self.password = password
        self.inputfile = inputfile
        self.sleep = 60
        
        self.pool = urllib3.connection_from_url('https://atlas.ripe.net', maxsize=connection_pool_size)
        self.headers = [('User-agent', 'Mozilla/5.0'),
                                  ('Referer', 'https://atlas.ripe.net/atlas/udm.html'),
                                  ('Host', 'atlas.ripe.net'),
                                  ('Origin', 'https://atlas.ripe.net'),
                                  ('X-Requested-With', 'XMLHttpRequest')]
        self.login()
        self.target_dict = measure_baseclass.load_input(self.inputfile)
        
        """
        self.target_list = []
        f = open(self.inputfile)
        for line in f:
            line = line.strip()
            chunks = line.split()
            target = chunks[0]
            probes = chunks[1:]
        
            #if target in self.target_list:
            #    sys.stderr.write('Already saw target %s\n' % target)
            #    continue

            self.target_list.append((target, probes))
        f.close()       
        """

        """
开发者ID:USC-NSL,项目名称:ripe-atlas,代码行数:33,代码来源:atlas_http.py


示例11: bench_urllib3_with_threads

def bench_urllib3_with_threads():
    begin = time.time()

    pool = urllib3.connection_from_url(urls[0], maxsize=4)

    urls_queue = Queue.Queue()
    for url in urls:
        urls_queue.put(url)

    def download():
        while True:
            try:
                url = urls_queue.get_nowait()
            except Queue.Empty:
                return
            pool.get_url(url)
            urls_queue.task_done()

    for i in range(4):
        threading.Thread(target=download).start()

    urls_queue.join()

    end = time.time()

    print "took %0.3f seconds" % (end - begin)
开发者ID:mackstann,项目名称:urllib3-testrun,代码行数:26,代码来源:bench.py


示例12: read_exif

def read_exif(files, source_folder):
    with exiftool.ExifTool() as et:
        #j = et.get_metadata('/'.join([source_folder, files[0]]))

        f = ['/'.join([source_folder, j]) for j in files]

        metadata = et.get_metadata_batch(f)

    for d in metadata:

        print(d["SourceFile"])
        #[(k,d[k]) for k in d.keys() if str(d[k]).lower().find('g') >= 0]

        conn = urllib3.connection_from_url(progress_url, maxsize=1,
                                        headers={'Content-Type': 'application/json'})

        myjson = {'folder': source_folder,
                'file': d.get("SourceFile", None),
                'focal_length': d.get(u'EXIF:FocalLengthIn35mmFormat', None),
                'apeture': d.get(u'EXIF:FNumber', None),
                'ISO': d.get(u"EXIF:ISO", None),
                'shutter': d.get(u'EXIF:ExposureTime', None),
               # 'raw_json': json.dumps(d)
                }

        # update status
        conn.urlopen('POST', '/update', body=json.dumps(myjson), headers={'connection': 'keep-alive', 'Content-Type': 'application/json'})
开发者ID:arrowed,项目名称:exifweb,代码行数:27,代码来源:exif_processor.py


示例13: get_con_pool

def get_con_pool(host,
                 key_file=None,
                 cert_file=None,
                 ca_file=None,
                 socket_timeout=15.0,
                 max_pool_size=3,
                 verify_https=True):
    """
    Return a ConnectionPool instance of given host
    :param socket_timeout:
        socket timeout for each connection in seconds
    """
    kwargs = {
        "timeout": socket_timeout,
        "maxsize": max_pool_size,
        "block": True,
        }

    if key_file is not None and cert_file is not None:
        kwargs["key_file"] = key_file
        kwargs["cert_file"] = cert_file

    if urlparse(host).scheme == "https":
        kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
        if verify_https:
            kwargs["cert_reqs"] = "CERT_REQUIRED"
            kwargs["ca_certs"] = ca_file

    return connection_from_url(host, **kwargs)
开发者ID:UWIT-IAM,项目名称:iam-resttools,代码行数:29,代码来源:live.py


示例14: getcontent

def getcontent():
    pages.drop()
    for link in links.find():
    	url = link['url']
    	id = link['_id']
    	
    	try:
    		http_pool = urllib3.connection_from_url(url)
    		r = http_pool.urlopen('GET',url)
    		html = r.data.decode('utf-8')
    		
    		#print(html)
    		soup = BeautifulSoup(html, "html5lib")
    		
    		head = soup.find('head')
        	body = soup.find('body')
    		
    		json_html = {
    			"url_link": id,
    			"url": url,
    			"html": html,
    			"head": head.encode(),
		        "body": body.encode()
    		}
    		pages.insert_one(json_html)
    	except:
    		print("Unexpected error:", sys.exc_info()[0])
    
    #return number of pages retrieved
    return pages.count()
开发者ID:zedrem,项目名称:NLP_MiniProject,代码行数:30,代码来源:linkcrawler.py


示例15: upload

def upload():
    upload_url = "http://127.0.0.1:8080/upload"
    url = urllib3.util.parse_url(upload_url)
    cb_url = url.request_uri
     
    if url.port is not None:
        server = "%s:%d"%(url.host, url.port)            
    else:
        server = url.host
     
    conn = urllib3.connection_from_url(server)
    headers = urllib3.make_headers(keep_alive=True)
    content = "hello world"
    response = conn.urlopen("POST", cb_url, body=content, headers=headers)
    if response.status != 200:
        print "eeeeeeeeeeee"
        sys.exit(1)
    else:
        print response.getheaders()
        print response.read()
        print response.data
        fileid = json.loads(response.data)["fileid"]
     
    path = "/download?fileid=%d"%fileid
    print "download path:", path
    response = conn.urlopen("GET", path, headers=headers)
    if response.status != 200:
        print "download fail"
        sys.exit(1)
    else:
        print response.data
开发者ID:richmonkey,项目名称:haystack,代码行数:31,代码来源:upload.py


示例16: __init__

    def __init__(self, distro=None, release=None, cachedir=None, secure=True):
        if cachedir:
            self.cachedir = cachedir
        os.makedirs(self.cachedir, exist_ok=True)

        self._parts = {'region':self.region, 'distro':distro, 'release':release}
        base = distro
        if not distro.startswith('http'):
            base = self._urls[distro]
        elif not distro.endswith('/'):
            base = base+'/'
        self.baseurl = base%self._parts

        self._pool =  connection_from_url(self.baseurl)
        self._etag = {}
        self._content = {}

        release = self.get('Release')
        if secure:
            release_gpg = self.get('Release.gpg')
            gpg_verify(release, release_gpg)
        else:
            _log.warn('Skipping signature check of RELEASE')

        release = self.release = Release(release)

        self.codename = release['Codename']
        self.archs = set(release['Architectures'].split())
        self.components = set(release['Components'].split())

        info = proc_release(release)

        self._top = self.Manifest(self, info, '')
开发者ID:mdavidsaver,项目名称:qemustuff,代码行数:33,代码来源:archive.py


示例17: openUrl

	def openUrl(self,url):
		conn = urllib3.connection_from_url(url)
		if self.USER is not None:
			base64string = base64.encodestring('{0}:{1}'.format(self.USER, self.PASSWORD)).replace('\n', '')
			return conn.get_url(url, headers={"Authorization","Basic {0}".format(base64string)}).data
		else:
			return conn.get_url(url).data
开发者ID:backuitist,项目名称:jenkins-notifier,代码行数:7,代码来源:jenkins-notifier-python3.py


示例18: __init__

 def __init__(self, APIKey, Secret,tovalue):
     self.APIKey = str(APIKey)
     self.Secret = str(Secret)
     self.toValue = tovalue
     self.nonce = int(time.time())
     self.http_pool = connection_from_url('https://btc-e.com')
     self.table = ConversionTable(self.getMarketsGraph())
开发者ID:sDessens,项目名称:Bitcoin-Exchange-Crawler,代码行数:7,代码来源:btce.py


示例19: get_con_pool

def get_con_pool(host,
                 key_file=None,
                 cert_file=None,
                 socket_timeout=15.0,
                 max_pool_size=3,
                 verify_https=True):
    """
    Return a ConnectionPool instance of given host
    :param socket_timeout:
        socket timeout for each connection in seconds
    """
    kwargs = {
        "timeout": socket_timeout,
        "maxsize": max_pool_size,
        "block": True,
        }

    if key_file is not None and cert_file is not None:
        kwargs["key_file"] = key_file
        kwargs["cert_file"] = cert_file

    if urisplit(host).scheme == "https":
        kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
        if verify_https:
            kwargs["cert_reqs"] = "CERT_REQUIRED"
            kwargs["ca_certs"] = getattr(settings, "RESTCLIENTS_CA_BUNDLE",
                                         "/etc/ssl/certs/ca-bundle.crt")

    return connection_from_url(host, **kwargs)
开发者ID:uwwebservices,项目名称:uw-registry,代码行数:29,代码来源:url_lib_wrapper.py


示例20: openUrl

 def openUrl(self, url):
     conn = urllib3.connection_from_url(url)
     if self.USER is not None:
         authHeaders = urllib3.util.make_headers(basic_auth="%s:%s" % (self.USER, self.PASSWORD))
         return conn.request("GET", url, headers=authHeaders).data
     else:
         return conn.get_url("GET", url).data
开发者ID:backuitist,项目名称:jenkins-notifier,代码行数:7,代码来源:jenkins-notifier.py



注:本文中的urllib3.connection_from_url函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python urllib3.disable_warnings函数代码示例发布时间:2022-05-27
下一篇:
Python urlparse.urlsplit函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap