• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python urllib2.Request类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib2.Request的典型用法代码示例。如果您正苦于以下问题:Python Request类的具体用法?Python Request怎么用?Python Request使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Request类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: modified_run

    def modified_run(self):
        import sys

        try:
            try:
                from urllib2 import HTTPHandler, build_opener
                from urllib2 import urlopen, Request
                from urllib import urlencode
            except ImportError:
                from urllib.request import HTTPHandler, build_opener
                from urllib.request import urlopen, Request
                from urllib.parse import urlencode

            os_ver = platform.system()
            py_ver = "_".join(str(x) for x in sys.version_info)
            now_ver = __version__.replace(".", "_")

            code = "os:{0},py:{1},now:{2}".format(os_ver, py_ver, now_ver)
            action = command_subclass.action
            cid = getnode()
            payload = {"v": "1", "tid": "UA-61791314-1", "cid": str(cid), "t": "event", "ec": action, "ea": code}

            url = "http://www.google-analytics.com/collect"
            data = urlencode(payload).encode("utf-8")
            request = Request(url, data=data)
            request.get_method = lambda: "POST"
            connection = urlopen(request)
        except:
            pass
        orig_run(self)
开发者ID:hugobowne,项目名称:noworkflow,代码行数:30,代码来源:setup.py


示例2: hit_endpoint

  def hit_endpoint(self, url, data_dict={}, http_method='GET'):
    """
    A reusable method that actually performs the request to the specified Atlas API endpoint.
    """

    if self.verbose == 'true':
      print "HIT_ENDPOINT"

    data_dict.update({ "access_token" : self.access_token })
    if self.verbose == 'true':
      print "  Added access_token to data_dict (inside hit_endpoint)"

    if self.verbose == 'true':
      print "  Constructing request URL"
    request = Request(url, urllib.urlencode(data_dict))
    
    if self.verbose == 'true':
      print "    Setting request http_method: %s" % http_method
    request.get_method = lambda: http_method
    
    try:
      if self.verbose == 'true':
        print "  Opening Request URL: %s?%s" % (request.get_full_url(),request.get_data())
      response = urlopen(request)
    except URLError, e:
      raise SystemExit(e)
开发者ID:bkyoung,项目名称:sysadmin-utils,代码行数:26,代码来源:atlas-uploader.py


示例3: report_now

 def report_now(self, registry=None, timestamp=None):
     if self.autocreate_database and not self._did_create_database:
         self._create_database()
     timestamp = timestamp or int(round(self.clock.time()))
     metrics = (registry or self.registry).dump_metrics()
     post_data = []
     for key, metric_values in metrics.items():
         if not self.prefix:
             table = key
         else:
             table = "%s.%s" % (self.prefix, key)
         values = ",".join(["%s=%s" % (k, v if type(v) is not str \
                                            else '"{}"'.format(v))
                           for (k, v) in metric_values.items()])
         line = "%s %s %s" % (table, values, timestamp)
         post_data.append(line)
     post_data = "\n".join(post_data)
     path = "/write?db=%s&precision=s" % self.database
     url = "%s://%s:%s%s" % (self.protocol, self.server, self.port, path)
     request = Request(url, post_data.encode("utf-8"))
     if self.username:
         auth = _encode_username(self.username, self.password)
         request.add_header("Authorization", "Basic %s" % auth.decode('utf-8'))
     try:
         response = urlopen(request)
         response.read()
     except URLError as err:
         LOG.warning("Cannot write to %s: %s",
                     self.server, err.reason)
开发者ID:hajs,项目名称:pyformance,代码行数:29,代码来源:influx.py


示例4: test_http_doubleslash

    def test_http_doubleslash(self):
        # Checks that the presence of an unnecessary double slash in a url doesn't break anything
        # Previously, a double slash directly after the host could cause incorrect parsing of the url
        h = urllib2.AbstractHTTPHandler()
        o = h.parent = MockOpener()

        data = ""
        ds_urls = [
            "http://example.com/foo/bar/baz.html",
            "http://example.com//foo/bar/baz.html",
            "http://example.com/foo//bar/baz.html",
            "http://example.com/foo/bar//baz.html",
        ]

        for ds_url in ds_urls:
            ds_req = Request(ds_url, data)

            # Check whether host is determined correctly if there is no proxy
            np_ds_req = h.do_request_(ds_req)
            self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")

            # Check whether host is determined correctly if there is a proxy
            ds_req.set_proxy("someproxy:3128",None)
            p_ds_req = h.do_request_(ds_req)
            self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
开发者ID:005,项目名称:gevent,代码行数:25,代码来源:test_urllib2.py


示例5: _send_webhook_msg

    def _send_webhook_msg(self, ip, port, payload_str, url_path='',
                          content_len=-1, content_type='application/json',
                          get_method=None):
        headers = {
            'content-type': content_type,
        }

        if not payload_str:
            content_len = None
            payload = None
        else:
            payload = bytes(payload_str, encoding='utf-8')

        if content_len == -1:
            content_len = len(payload)

        if content_len is not None:
            headers['content-length'] = str(content_len)

        url = 'http://{ip}:{port}/{path}'.format(ip=ip, port=port,
                                                 path=url_path)

        req = Request(url, data=payload, headers=headers)


        if get_method is not None:
            req.get_method = get_method

        return urlopen(req)
开发者ID:MikeVister,项目名称:python-telegram-bot,代码行数:29,代码来源:test_updater.py


示例6: test_compare_triples

def test_compare_triples():
    for mime, fext in MIME_TYPES.items():
        dump_path = path.join(DUMP_DIR, path.basename(mime))

        for url in URLs:
            if six.PY2:
                fname = '%s.%s' % (path.basename(urlparse.urlparse(url).path), fext)
            else:
                fname = '%s.%s' % (path.basename(urlparse(url).path), fext)

            fname = path.join(dump_path, fname)

            req = Request(url)
            req.add_header('Accept', mime)
            res = urlopen(req)

            g_fdp.parse(data=res.read(), format=mime)
            g_dump.parse(fname, format=mime)

            both, first, second = graph_diff(g_fdp, g_dump)
            n_first = len(first)
            # n_second = len(second)
            # n_both = len(both)

            assert_equals(
               n_first, 0, '{} triple(s) different from reference:\n\n{}===\n{}\n'.format(
                  n_first, first.serialize(format='turtle'), second.serialize(format='turtle')))
开发者ID:NLeSC,项目名称:ODEX-FAIRDataPoint,代码行数:27,代码来源:test_fdp.py


示例7: get

	def get(self, bugid):
		"""Get an ElementTree representation of a bug.

		@param bugid: bug id
		@type  bugid: int

		@rtype: ElementTree
		"""
		if not self.authenticated and not self.skip_auth:
			self.auth()

		qparams = config.params['show'].copy()
		qparams['id'] = bugid

		req_params = urlencode(qparams, True)
		req_url = urljoin(self.base,  config.urls['show'])
		req_url += '?' + req_params
		req = Request(req_url, None, config.headers)
		if self.httpuser and self.httppassword:
			base64string = base64.encodestring('%s:%s' % (self.httpuser, self.httppassword))[:-1]
			req.add_header("Authorization", "Basic %s" % base64string)
		resp = self.opener.open(req)

		fd = StringIO(resp.read())
		# workaround for ill-defined XML templates in bugzilla 2.20.2
		parser = ForcedEncodingXMLTreeBuilder(encoding = 'utf-8')
		etree = ElementTree.parse(fd, parser)
		bug = etree.find('.//bug')
		if bug and bug.attrib.has_key('error'):
			return None
		else:
			return etree
开发者ID:PabloCastellano,项目名称:pybugz,代码行数:32,代码来源:bugzilla.py


示例8: fetch

def fetch(url, auth=None):
    """Fetch URL, optional with HTTP Basic Authentication."""

    if not (url.startswith('http://') or url.startswith('https://')):
        try:
            with io.open(url, 'r', encoding='utf-8', errors='replace') as fp:
                return u''.join(fp.readlines())
        except OSError as e:
            raise AcrylamidException(e.args[0])

    req = Request(url)
    if auth:
        req.add_header('Authorization', 'Basic ' + b64encode(auth))

    try:
        r = urlopen(req)
    except HTTPError as e:
        raise AcrylamidException(e.msg)

    if r.getcode() == 401:
        user = input('Username: ')
        passwd = getpass.getpass()
        fetch(url, user + ':' + passwd)
    elif r.getcode() == 200:
        try:
            enc = re.search('charset=(.+);?', r.headers.get('Content-Type', '')).group(1)
        except AttributeError:
            enc = 'utf-8'
        return u'' + r.read().decode(enc)

    raise AcrylamidException('invalid status code %i, aborting.' % r.getcode())
开发者ID:DebVortex,项目名称:acrylamid,代码行数:31,代码来源:imprt.py


示例9: __init__

    def __init__(self, url, method="GET", headers=None, data=None,
                 downloadTo=None, closeConnection=False, proxy=None,
                 redirectedFrom=None, unredirectedHeaders=None, **kw):
        """
        """
        headers = headers or dict()
        urllib2_Request.__init__(
                self, str(url), data=data, headers=headers,
                origin_req_host=kw.get("origin_req_host", redirectedFrom),
                unverifiable=kw.get("unverifiable", False),
            )
        Message.__init__(self, url, method, self.headers)
        self.host = self._url.host
        self.port = self._url.port
        self.setProxy(proxy)

        assert isinstance(self.headers, util.InsensitiveDict)
        unredirectedHeaders = unredirectedHeaders or dict()
        self.unredirectedHeaders = util.InsensitiveDict(unredirectedHeaders)

        self.closeConnection = closeConnection is True

        self.downloadTo = downloadTo
        self.redirectedTo = None
        self.redirectedFrom = tuple()
        self.response = defer.Deferred()
开发者ID:alexstaytuned,项目名称:tx-pendrell,代码行数:26,代码来源:messages.py


示例10: makeRequest

    def makeRequest(self, method, path, params=None):
        if not params:
            params = {}
        params['key'] = self.api_key
        params['token'] = self.oauth_token
        
        url = self.base_url + path
        data = None

        if method == 'GET':
            url += '?' + urlencode(params)
        elif method in ['DELETE', 'POST', 'PUT']:
            data = urlencode(params).encode('utf-8')

        request = Request(url)
        if method in ['DELETE', 'PUT']:
            request.get_method = lambda: method

        try:
            if data:
                response = urlopen(request, data=data)
            else:
                response = urlopen(request)
        except HTTPError as e:
            print(e)
            print(e.read())
            result = None
        else:
            result = json.loads(response.read().decode('utf-8'))

        return result
开发者ID:bdoms,项目名称:trello,代码行数:31,代码来源:__init__.py


示例11: auth

	def auth(self):
		"""Authenticate a session.
		"""
		if self.try_auth():
			return

		# prompt for username if we were not supplied with it
		if not self.user:
			self.log('No username given.')
			self.user = self.get_input('Username: ')

		# prompt for password if we were not supplied with it
		if not self.password:
			self.log('No password given.')
			self.password = getpass.getpass()

		# perform login
		qparams = config.params['auth'].copy()
		qparams['Bugzilla_login'] = self.user
		qparams['Bugzilla_password'] = self.password

		req_url = urljoin(self.base, config.urls['auth'])
		req = Request(req_url, urlencode(qparams), config.headers)
		if self.httpuser and self.httppassword:
			base64string = base64.encodestring('%s:%s' % (self.httpuser, self.httppassword))[:-1]
			req.add_header("Authorization", "Basic %s" % base64string)
		resp = self.opener.open(req)
		if resp.info().has_key('Set-Cookie'):
			self.authenticated = True
			if not self.forget:
				self.cookiejar.save()
				os.chmod(self.cookiejar.filename, 0700)
			return True
		else:
			raise RuntimeError("Failed to login")
开发者ID:dhirajkhatiwada1,项目名称:uludag,代码行数:35,代码来源:bugzilla.py


示例12: try_auth

	def try_auth(self):
		"""Check whether the user is already authenticated."""
		if self.authenticated:
			return True

		# try seeing if we really need to request login
		if not self.forget:
			try:
				self.cookiejar.load()
			except IOError:
				pass

		req_url = urljoin(self.base, config.urls['auth'])
		req_url += '?GoAheadAndLogIn=1'
		req = Request(req_url, None, config.headers)
		if self.httpuser and self.httppassword:
			base64string = base64.encodestring('%s:%s' % (self.httpuser, self.httppassword))[:-1]
			req.add_header("Authorization", "Basic %s" % base64string)
		resp = self.opener.open(req)
		re_request_login = re.compile(r'<title>.*Log in to Bugzilla</title>')
		if not re_request_login.search(resp.read()):
			self.log('Already logged in.')
			self.authenticated = True
			return True
		return False
开发者ID:dhirajkhatiwada1,项目名称:uludag,代码行数:25,代码来源:bugzilla.py


示例13: download_checked

    def download_checked(cls, url, target_file, expected_digest):
        if os.path.exists(target_file):
            # file already exists, see if we can use it.
            if PBFile.md5_digest(target_file) == expected_digest:
                # local file is ok
                return
            else:
                os.unlink(target_file)

        user_agent = ("pbunder/%s " % (cls.my_version) +
                      "(http://github.com/zeha/pbundler/issues)")

        try:
            req = Request(url)
            req.add_header("User-Agent", user_agent)
            req.add_header("Accept", "*/*")
            with file(target_file, 'wb') as f:
                sock = urlopen(req)
                try:
                    f.write(sock.read())
                finally:
                    sock.close()

        except Exception as ex:
            raise PBundlerException("Downloading %s failed (%s)" % (url, ex))

        local_digest = PBFile.md5_digest(target_file)
        if local_digest != expected_digest:
            os.unlink(target_file)
            msg = ("Downloading %s failed (MD5 Digest %s did not match expected %s)" %
                   (url, local_digest, expected_digest))
            raise PBundlerException(msg)
        else:
            # local file is ok
            return
开发者ID:cirnatdan,项目名称:pbundler,代码行数:35,代码来源:util.py


示例14: _download_as_child

def _download_as_child(url, if_modified_since):
	from httplib import HTTPException
	from urllib2 import urlopen, Request, HTTPError, URLError
	try:
		#print "Child downloading", url
		if url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:'):
			req = Request(url)
			if url.startswith('http:') and if_modified_since:
				req.add_header('If-Modified-Since', if_modified_since)
			src = urlopen(req)
		else:
			raise Exception(_('Unsupported URL protocol in: %s') % url)

		try:
			sock = src.fp._sock
		except AttributeError:
			sock = src.fp.fp._sock	# Python 2.5 on FreeBSD
		while True:
			data = sock.recv(256)
			if not data: break
			os.write(1, data)

		sys.exit(RESULT_OK)
	except (HTTPError, URLError, HTTPException) as ex:
		if isinstance(ex, HTTPError) and ex.code == 304: # Not modified
			sys.exit(RESULT_NOT_MODIFIED)
		print >>sys.stderr, "Error downloading '" + url + "': " + (str(ex) or str(ex.__class__.__name__))
		sys.exit(RESULT_FAILED)
开发者ID:gvsurenderreddy,项目名称:zeroinstall,代码行数:28,代码来源:_download_child.py


示例15: parseFeed

    def parseFeed(cls, url, lastModified=None, etag=None):
        '''
        Return tuple of feed object, last-modified, etag.
        '''
        req = Request(normalize_url(url))
        if lastModified:
            req.add_header('if-modified-since', lastModified)
        if etag:
            req.add_header('if-none-match', etag)
        resp = None
        try:
            resp = urlopen(req, None, 10)
        except HTTPError as error:
            # HTTP 304 not modifed raise an exception
            resp = error

        # url of local file returns empty code
        if resp.code and resp.code != 200:
            return None

        feedDoc = etree.parse(resp)
        feedType = None
        for ft in cls._feedTypes:
            if ft.accept(feedDoc):
                feedType = ft
                break
        if not feedType:
            raise ValueError('Cannot handle ' + feedDoc.getroot().tag)
        return (feedType(url, feedDoc),
                resp.headers.get('last-modified'),
                resp.headers.get('etag'))
开发者ID:goncha,项目名称:feed2mobi,代码行数:31,代码来源:feed.py


示例16: openURL

def openURL(url_base, data, method='Get', cookies=None, username=None, password=None):
    ''' function to open urls - wrapper around urllib2.urlopen but with additional checks for OGC service exceptions and url formatting, also handles cookies and simple user password authentication'''
    url_base.strip() 
    lastchar = url_base[-1]
    if lastchar not in ['?', '&']:
        if url_base.find('?') == -1:
            url_base = url_base + '?'
        else:
            url_base = url_base + '&'
            
    if username and password:
        # Provide login information in order to use the WMS server
        # Create an OpenerDirector with support for Basic HTTP 
        # Authentication...
        passman = HTTPPasswordMgrWithDefaultRealm()
        passman.add_password(None, url_base, username, password)
        auth_handler = HTTPBasicAuthHandler(passman)
        opener = urllib2.build_opener(auth_handler)
        openit = opener.open
    else:
        openit = urlopen
   
    try:
        if method == 'Post':
            req = Request(url_base, data)
        else:
            req=Request(url_base + data)
        if cookies is not None:
            req.add_header('Cookie', cookies)
        u = openit(req)
    except HTTPError, e: #Some servers may set the http header to 400 if returning an OGC service exception or 401 if unauthorised.
        if e.code in [400, 401]:
            raise ServiceException, e.read()
        else:
            raise e
开发者ID:sabman,项目名称:OWSLib,代码行数:35,代码来源:util.py


示例17: get_page_urls

def get_page_urls(url, user_agent=None):
    req = Request(url)
    if user_agent:
        req.add_header('User-Agent', user_agent)
    response = urlopen(req)
    urls = REGEX_URLS.findall(str(response.read()))
    return set(url[0] for url in urls)
开发者ID:afraca,项目名称:cryptophp,代码行数:7,代码来源:check_url.py


示例18: howmuch

def howmuch(loc_param, date_param, apt, pyung):
    res=''
    data=[]
    start = time.time()

    request = Request(url+'&LAWD_CD='+loc_param+'&DEAL_YMD='+date_param)
    request.get_method = lambda: 'GET'
    try:
      res_body = urlopen(request).read()
    except:
      return '',[], traceback.format_exc().splitlines()[-1]
    end = time.time()

    soup = BeautifulSoup(res_body, 'html.parser')
    items = soup.findAll('item')
    for item in items:
        item = item.text.encode('utf-8')
        item = re.sub('<.*?>', '|', item)
        parsed = item.split('|')
        try:
            #res = parsed[3]+' '+parsed[4]+', '+parsed[7]+'m², '+parsed[9]+'F, '+parsed[1].strip()+'만원\n'
            if parsed[7]==pyung and parsed[4].find(apt)>-1:
              res+='<tr><td>'+parsed[2]+'/'+parsed[5]+'/'+parsed[6]+'</td><td>'+parsed[3]+' '+parsed[4]+'</td><td>'+parsed[7]+'</td><td>'+parsed[9]+'</td><td>'+parsed[1]+'</td></tr>'
              data.append( (parsed[2],parsed[5],parsed[6], parsed[1]) )
        except IndexError:
            continue

    return res, data, None, int((end-start)*1000)
开发者ID:alotofthings,项目名称:realestate-bot,代码行数:28,代码来源:r.py


示例19: namedcmd

	def namedcmd(self, cmd):
		"""Run command stored in Bugzilla by name.

		@return: Result from the stored command.
		@rtype: list of dicts
		"""

		if not self.authenticated and not self.skip_auth:
			self.auth()

		qparams = config.params['namedcmd'].copy()
		# Is there a better way of getting a command with a space in its name
		# to be encoded as foo%20bar instead of foo+bar or foo%2520bar?
		qparams['namedcmd'] = quote(cmd)
		req_params = urlencode(qparams, True)
		req_params = req_params.replace('%25','%')

		req_url = urljoin(self.base, config.urls['list'])
		req_url += '?' + req_params
		req = Request(req_url, None, config.headers)
		if self.user and self.hpassword:
			base64string = base64.encodestring('%s:%s' % (self.user, self.hpassword))[:-1]
			req.add_header("Authorization", "Basic %s" % base64string)
		resp = self.opener.open(req)

		return self.extractResults(resp)
开发者ID:PabloCastellano,项目名称:pybugz,代码行数:26,代码来源:bugzilla.py


示例20: _uploadBatch

    def _uploadBatch(self, batch):
        """
        Uploads a batch to the server
        """

        url = "%s/ImportXML" % self._url

        self._logger.debug('getting a batch')

        tstart = time.time()
        # get a batch

        self._logger.info('Generating metadata')
        data = self._agent._getMetadata(batch, logger=self._logger)
        self._logger.info('Metadata ready ')

        postData = {
            'xml': data
            }

        tgen = time.time() - tstart

        req = Request(url)
        # remove line break
        cred = base64.encodestring(
            '%s:%s' % (self._username, self._password)).strip()

        req.add_header("Authorization", "Basic %s" % cred)

        try:
            result = urlopen(req, data=urlencode(postData))
        except HTTPError, e:
            self._logger.exception("Status %s: \n %s" % (e.code, e.read()))
            raise Exception('upload failed')
开发者ID:aninhalacerda,项目名称:indico,代码行数:34,代码来源:agent.py



注:本文中的urllib2.Request类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python urlparse.urljoin函数代码示例发布时间:2022-05-27
下一篇:
Python urllib2.OpenerDirector类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap