我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pycurl.NOBODY。
def file(self, path, output, args={}, progress_callback=lambda *x: None): self.logger.debug('??????????') self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, False) self.curl.setopt(pycurl.WRITEDATA, output) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status)
def file_size(self, path, args={}): self.logger.debug('????????????') self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, True) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, io.BytesIO()) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status) return self.curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD)
def web_redirect(self, path, args={}): self.logger.debug('????????????') self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) headers = io.BytesIO() self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, NoneIO()) self.curl.setopt(pycurl.HEADERFUNCTION, headers.write) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 302: raise ServerError(status) for header_line in headers.getvalue().split(b'\r\n'): if header_line.startswith(b'Location:'): return header_line.split(b':', maxsplit=1)[1].strip().decode() return None
def api(self, args, encoding='utf-8', allow_return_none=False): self.logger.debug('???? API ??') if args.get('mode', '') == 'semester': semester = args.get('semester', '') if allow_return_none and self.api_cache == semester: self.logger.debug('????? {} ?? API ??'.format(semester)) return self.api_cache = semester query_args = dict() query_args.update(self.api_args) query_args.update(args) url = self.api_url + '?' + urllib.parse.urlencode(query_args) data = io.BytesIO() self.logger.debug('HTTP ?????{}'.format(url)) self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.api_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, data) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status) try: value = data.getvalue() return json.loads(value.decode(encoding)) except json.decoder.JSONDecodeError: raise NotJSONError(value.decode(encoding))
def web(self, path, args={}, encoding=None, allow_return_none=False): self.logger.debug('????????') if allow_return_none: if path in self.web_cache and self.web_cache[path] == args: self.logger.debug('????? {} ????'.format(path)) self.logger.debug('???{}'.format(args)) return self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) data = io.BytesIO() self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, data) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status) data.seek(io.SEEK_SET) return etree.parse(data, etree.HTMLParser( encoding=encoding, remove_comments=True))
def head(self): conn=pycurl.Curl() conn.setopt(pycurl.SSL_VERIFYPEER,False) conn.setopt(pycurl.SSL_VERIFYHOST,0) conn.setopt(pycurl.URL,self.completeUrl) conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD conn.setopt(pycurl.WRITEFUNCTION, self.header_callback) conn.perform() rp=Response() rp.parseResponse(self.__performHead) self.response=rp
def load(self, url, get={}, post={}, referer=True, cookies=True, just_header=False, multipart=False, decode=False): """ load and returns a given page """ self.setRequestContext(url, get, post, referer, cookies, multipart) self.header = "" self.c.setopt(pycurl.HTTPHEADER, self.headers) if just_header: self.c.setopt(pycurl.FOLLOWLOCATION, 0) self.c.setopt(pycurl.NOBODY, 1) self.c.perform() rep = self.header self.c.setopt(pycurl.FOLLOWLOCATION, 1) self.c.setopt(pycurl.NOBODY, 0) else: self.c.perform() rep = self.getResponse() self.c.setopt(pycurl.POSTFIELDS, "") self.lastEffectiveURL = self.c.getinfo(pycurl.EFFECTIVE_URL) self.code = self.verifyHeader() self.addCookies() if decode: rep = self.decodeResponse(rep) return rep
def _init_method(self): if self.m_request.m_method == "GET": self.m_handle.setopt(pycurl.HTTPGET, 1) elif self.m_request.m_method == "PUT": self.m_handle.setopt(pycurl.PUT, 1) elif self.m_request.m_method == "POST": if self.m_request.m_data: l_data = self.m_request.m_data self.m_handle.setopt(pycurl.POSTFIELDS, l_data) else: self.m_handle.setopt(pycurl.CUSTOMREQUEST, "POST") elif self.m_request.m_method == "HEAD": self.m_handle.setopt(pycurl.NOBODY, 1) elif self.m_request.m_method == "DELETE": self.m_handle.setopt(pycurl.CUSTOMREQUEST, "DELETE")
def to_pycurl_object(c, req): c.setopt(pycurl.MAXREDIRS, 5) c.setopt(pycurl.WRITEFUNCTION, req.body_callback) c.setopt(pycurl.HEADERFUNCTION, req.header_callback) c.setopt(pycurl.NOSIGNAL, 1) c.setopt(pycurl.SSL_VERIFYPEER, False) c.setopt(pycurl.SSL_VERIFYHOST, 0) c.setopt(pycurl.URL,req.completeUrl) if req.getConnTimeout(): c.setopt(pycurl.CONNECTTIMEOUT, req.getConnTimeout()) if req.getTotalTimeout(): c.setopt(pycurl.TIMEOUT, req.getTotalTimeout()) authMethod, userpass = req.getAuth() if authMethod or userpass: if authMethod == "basic": c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) elif authMethod == "ntlm": c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM) elif authMethod == "digest": c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST) c.setopt(pycurl.USERPWD, userpass) c.setopt(pycurl.HTTPHEADER, req.getHeaders()) if req.method == "POST": c.setopt(pycurl.POSTFIELDS, req.postdata) if req.method != "GET" and req.method != "POST": c.setopt(pycurl.CUSTOMREQUEST, req.method) if req.method == "HEAD": c.setopt(pycurl.NOBODY, True) if req.followLocation: c.setopt(pycurl.FOLLOWLOCATION, 1) proxy = req.getProxy() if proxy != None: c.setopt(pycurl.PROXY, proxy) if req.proxytype=="SOCKS5": c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS5) elif req.proxytype=="SOCKS4": c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS4) req.delHeader("Proxy-Connection") return c
def process(self, pyfile): p_url = urlparse.urlparse(pyfile.url) netloc = p_url.netloc pyfile.name = parse_name(p_url.path.rpartition('/')[2]) if not "@" in netloc: #@TODO: Recheck in 0.4.10 if self.account: servers = [x['login'] for x in self.account.getAllAccounts()] else: servers = [] if netloc in servers: self.log_debug("Logging on to %s" % netloc) self.req.addAuth(self.account.get_login('password')) else: pwd = self.get_password() if ':' in pwd: self.log_debug("Logging on to %s" % netloc) self.req.addAuth(pwd) else: self.log_debug("Using anonymous logon") try: headers = self.load(pyfile.url, just_header=True) except pycurl.error, e: if "530" in e.args[1]: self.fail(_("Authorization required")) else: self.fail(_("Error %d: %s") % e.args) self.req.http.c.setopt(pycurl.NOBODY, 0) self.log_debug(self.req.http.header) if "content-length" in headers: pyfile.size = headers.get("content-length") self.download(pyfile.url) else: #: Naive ftp directory listing if re.search(r'^25\d.*?"', self.req.http.header, re.M): pyfile.url = pyfile.url.rstrip('/') pkgname = "/".join([pyfile.package().name, urlparse.urlparse(pyfile.url).path.rpartition('/')[2]]) pyfile.url += '/' self.req.http.c.setopt(48, 1) #: CURLOPT_DIRLISTONLY res = self.load(pyfile.url, decode=False) links = [pyfile.url + x for x in res.splitlines()] self.log_debug("LINKS", links) self.pyload.api.addPackage(pkgname, links) else: self.fail(_("Unexpected server response"))
def load(self, url, get={}, post={}, referer=True, cookies=True, just_header=False, multipart=False, decode=False): """ Load and returns a given page. """ self.set_request_context(url, get, post, referer, cookies, multipart) # TODO: use http/rfc message instead self.header = "" if "header" in self.options: # TODO # print("custom header not implemented") self.setopt(pycurl.HTTPHEADER, self.options['header']) if just_header: self.setopt(pycurl.FOLLOWLOCATION, 0) self.setopt(pycurl.NOBODY, 1) # TODO: nobody= no post? # overwrite HEAD request, we want a common request type if post: self.setopt(pycurl.CUSTOMREQUEST, 'POST') else: self.setopt(pycurl.CUSTOMREQUEST, 'GET') try: self.c.perform() rep = self.header finally: self.setopt(pycurl.FOLLOWLOCATION, 1) self.setopt(pycurl.NOBODY, 0) self.unsetopt(pycurl.CUSTOMREQUEST) else: self.c.perform() rep = self.get_response() self.setopt(pycurl.POSTFIELDS, '') self.last_url = safequote(url) self.last_effective_url = self.c.getinfo(pycurl.EFFECTIVE_URL) if self.last_effective_url: self.last_url = self.last_effective_url self.code = self.verify_header() if cookies: self.parse_cookies() if decode: rep = self.decode_response(rep) return rep