我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用pycurl.HTTPGET。
def scan(self, time): scan_time,mz,scan_name,st,scan_mode = min(self.scans(), key=lambda si: abs(time - si[0])) self.crl.setopt(pycurl.HTTPGET, True) self.crl.setopt(pycurl.URL, str(scan_name + '.txt')) response = cStringIO.StringIO() self.crl.setopt(pycurl.WRITEFUNCTION, response.write) for i in range(5): #print 'scan %d' % i self.crl.perform() if response.getvalue(): break scan = response.getvalue().splitlines() # how to get charge for an mzURL scan? return mzScan([tuple(float(v) for v in s.split()) for s in scan], scan_time, mode=scan_mode, mz=mz)
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None): xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time, start_mz, stop_mz))) self.crl.setopt(pycurl.HTTPGET, True) self.crl.setopt(pycurl.URL, xic_url) response = cStringIO.StringIO() self.crl.setopt(pycurl.WRITEFUNCTION, response.write) for i in range(5): #print 'xic %d' % i self.crl.perform() if response.getvalue(): break scan = response.getvalue().splitlines() return [tuple(float(v) for v in s.split()) for s in scan]
def _init_method(self): if self.m_request.m_method == "GET": self.m_handle.setopt(pycurl.HTTPGET, 1) elif self.m_request.m_method == "PUT": self.m_handle.setopt(pycurl.PUT, 1) elif self.m_request.m_method == "POST": if self.m_request.m_data: l_data = self.m_request.m_data self.m_handle.setopt(pycurl.POSTFIELDS, l_data) else: self.m_handle.setopt(pycurl.CUSTOMREQUEST, "POST") elif self.m_request.m_method == "HEAD": self.m_handle.setopt(pycurl.NOBODY, 1) elif self.m_request.m_method == "DELETE": self.m_handle.setopt(pycurl.CUSTOMREQUEST, "DELETE")
def curl_get(self, url, refUrl=None): buf = cStringIO.StringIO() curl = pycurl.Curl() curl.setopt(curl.URL, url) curl.setopt(curl.WRITEFUNCTION, buf.write) curl.setopt(pycurl.SSL_VERIFYPEER, 0) #curl.setopt(pycurl.SSL_VERIFYHOST, 0) #curl.setopt(pycurl.HEADERFUNCTION, self.headerCookie) curl.setopt(pycurl.VERBOSE, 0) curl.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:46.0) Gecko/20100101 Firefox/46.0') #curl.setopt(pycurl.HTTPGET,1) #curl.setopt(pycurl.COOKIE, Cookie) #curl.setopt(pycurl.POSTFIELDS, 'j_username={ngnms_user}&j_password={ngnms_password}'.format(**self.ngnms_login)) curl.setopt(pycurl.COOKIEJAR, '/htdocs/logs/py_cookie.txt') curl.setopt(pycurl.COOKIEFILE, '/htdocs/logs/py_cookie.txt') if refUrl: curl.setopt(pycurl.REFERER, refUrl) #curl.setopt(c.CONNECTTIMEOUT, 5) #curl.setopt(c.TIMEOUT, 8) curl.perform() backinfo = '' if curl.getinfo(pycurl.RESPONSE_CODE) == 200: backinfo = buf.getvalue() curl.close() return backinfo
def get(self, url="", params=None): "Ship a GET request for a specified URL, capture the response." if params: url += "?" + urllib_parse.urlencode(params) self.set_option(pycurl.HTTPGET, 1) return self.__request(url)
def request(self, method, url, headers, post_data=None): s = util.StringIO.StringIO() rheaders = util.StringIO.StringIO() curl = pycurl.Curl() proxy = self._get_proxy(url) if proxy: if proxy.hostname: curl.setopt(pycurl.PROXY, proxy.hostname) if proxy.port: curl.setopt(pycurl.PROXYPORT, proxy.port) if proxy.username or proxy.password: curl.setopt( pycurl.PROXYUSERPWD, "%s:%s" % (proxy.username, proxy.password)) if method == 'get': curl.setopt(pycurl.HTTPGET, 1) elif method == 'post': curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.POSTFIELDS, post_data) else: curl.setopt(pycurl.CUSTOMREQUEST, method.upper()) # pycurl doesn't like unicode URLs curl.setopt(pycurl.URL, util.utf8(url)) curl.setopt(pycurl.WRITEFUNCTION, s.write) curl.setopt(pycurl.HEADERFUNCTION, rheaders.write) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.CONNECTTIMEOUT, 30) curl.setopt(pycurl.TIMEOUT, 80) curl.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k, v) for k, v in headers.items()]) if self._verify_ssl_certs: curl.setopt(pycurl.CAINFO, os.path.join( os.path.dirname(__file__), 'data/ca-certificates.crt')) else: curl.setopt(pycurl.SSL_VERIFYHOST, False) try: curl.perform() except pycurl.error as e: self._handle_request_error(e) rbody = s.getvalue() rcode = curl.getinfo(pycurl.RESPONSE_CODE) return rbody, rcode, self.parse_headers(rheaders.getvalue())
def scans(self): '''Gets the scan_info and caches it, to save extra trips to the server''' if not self._scans: self.crl.setopt(pycurl.HTTPGET, True) self.crl.setopt(pycurl.URL, str(self.data_file + '/scan_info')) # the scan_name is the URL of that scan, which is just the time scan_url = self.data_file + '/scans/%s' response = cStringIO.StringIO() self.crl.setopt(pycurl.WRITEFUNCTION, response.write) #self.crl.perform() for i in range(5): #print 'scans %d' % i self.crl.perform() if response.getvalue(): break scans = response.getvalue().splitlines() self._scans = [] for scan_line in scans: time,mz,scan_type,scan_mode = scan_line.split() self._scans.append((float(time), float(mz), scan_url % time, scan_type.upper(), scan_mode.lower())) self._scans.sort() return self._scans