我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用pycurl.LOW_SPEED_TIME。
def test_post(self): curl = CurlStub(b"result") result = fetch("http://example.com", post=True, curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.POST: True, pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_post_data(self): curl = CurlStub(b"result") result = fetch("http://example.com", post=True, data="data", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.POST: True, pycurl.POSTFIELDSIZE: 4, pycurl.READFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_cainfo(self): curl = CurlStub(b"result") result = fetch("https://example.com", cainfo="cainfo", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"https://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.CAINFO: b"cainfo", pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_headers(self): curl = CurlStub(b"result") result = fetch("http://example.com", headers={"a": "1", "b": "2"}, curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.HTTPHEADER: ["a: 1", "b: 2"], pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_pycurl_insecure(self): curl = CurlStub(b"result") result = fetch("http://example.com/get-ca-cert", curl=curl, insecure=True) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com/get-ca-cert", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.SSL_VERIFYPEER: False, pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def initHandle(self): """ sets common options to curl handle """ self.c.setopt(pycurl.FOLLOWLOCATION, 1) self.c.setopt(pycurl.MAXREDIRS, 5) self.c.setopt(pycurl.CONNECTTIMEOUT, 30) self.c.setopt(pycurl.NOSIGNAL, 1) self.c.setopt(pycurl.NOPROGRESS, 1) if hasattr(pycurl, "AUTOREFERER"): self.c.setopt(pycurl.AUTOREFERER, 1) self.c.setopt(pycurl.SSL_VERIFYPEER, 0) self.c.setopt(pycurl.LOW_SPEED_TIME, 30) self.c.setopt(pycurl.LOW_SPEED_LIMIT, 5) #self.c.setopt(pycurl.VERBOSE, 1) self.c.setopt(pycurl.USERAGENT, "Mozilla/5.0 (Windows NT 6.1; Win64; x64;en; rv:5.0) Gecko/20110619 Firefox/5.0") if pycurl.version_info()[7]: self.c.setopt(pycurl.ENCODING, "gzip, deflate") self.c.setopt(pycurl.HTTPHEADER, ["Accept: */*", "Accept-Language: en-US,en", "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7", "Connection: keep-alive", "Keep-Alive: 300", "Expect:"])
def submit(self, captcha, captchaType="file", match=None): req = get_request() #: Raise timeout threshold req.c.setopt(pycurl.LOW_SPEED_TIME, 80) try: res = self.load(self.SUBMIT_URL, post={'vendor_key': self.PYLOAD_KEY, 'key': self.config.get('passkey'), 'gen_task_id': "1", 'file': (pycurl.FORM_FILE, captcha)}, req=req) finally: req.close() data = dict(x.split(' ', 1) for x in res.splitlines()) if not data or "Value" not in data: raise BypassCaptchaException(res) result = data['Value'] ticket = data['TaskId'] self.log_debug("Result %s : %s" % (ticket, result)) return ticket, result
def _process_captcha(self, task): task.data['ticket'] = ticket = uuid.uuid4() result = None with open(task.captchaFile, 'rb') as f: data = f.read() req = get_request() #: Raise timeout threshold req.c.setopt(pycurl.LOW_SPEED_TIME, 80) try: result = self.load(self.API_URL, post={'action': "upload", 'key': self.config.get('passkey'), 'file': base64.b64encode(data), 'gen_task_id': ticket}, req=req) finally: req.close() self.log_debug("Result %s : %s" % (ticket, result)) task.setResult(result)
def test_basic(self): curl = CurlStub(b"result") result = fetch("http://example.com", curl=curl) self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"})
def test_create_curl(self): curls = [] def pycurl_Curl(): curl = CurlStub(b"result") curls.append(curl) return curl Curl = pycurl.Curl try: pycurl.Curl = pycurl_Curl result = fetch("http://example.com") curl = curls[0] self.assertEqual(result, b"result") self.assertEqual(curl.options, {pycurl.URL: b"http://example.com", pycurl.FOLLOWLOCATION: 1, pycurl.MAXREDIRS: 5, pycurl.CONNECTTIMEOUT: 30, pycurl.LOW_SPEED_LIMIT: 1, pycurl.LOW_SPEED_TIME: 600, pycurl.NOSIGNAL: 1, pycurl.WRITEFUNCTION: Any(), pycurl.DNS_CACHE_TIMEOUT: 0, pycurl.ENCODING: b"gzip,deflate"}) finally: pycurl.Curl = Curl
def setInterface(self, options): interface, proxy, ipv6 = options["interface"], options["proxies"], options["ipv6"] if interface and interface.lower() != "none": self.c.setopt(pycurl.INTERFACE, str(interface)) if proxy: if proxy["type"] == "socks4": self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4) elif proxy["type"] == "socks5": self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) else: self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP) self.c.setopt(pycurl.PROXY, str(proxy["address"])) self.c.setopt(pycurl.PROXYPORT, proxy["port"]) if proxy["username"]: self.c.setopt(pycurl.PROXYUSERPWD, str("%s:%s" % (proxy["username"], proxy["password"]))) if ipv6: self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) else: self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) if "auth" in options: self.c.setopt(pycurl.USERPWD, str(options["auth"])) if "timeout" in options: self.c.setopt(pycurl.LOW_SPEED_TIME, options["timeout"])
def submit(self, captcha, captchaType="file", match=None): req = get_request() #: Raise timeout threshold req.c.setopt(pycurl.LOW_SPEED_TIME, 80) try: #@NOTE: Workaround multipart-post bug in HTTPRequest.py if re.match("^\w*$", self.config.get('password')): multipart = True data = (pycurl.FORM_FILE, captcha) else: multipart = False with open(captcha, 'rb') as f: data = f.read() data = base64.b64encode(data) res = self.load(self.SUBMIT_URL, post={'action': "UPLOADCAPTCHA", 'username': self.config.get('username'), 'password': self.config.get('password'), 'file': data}, multipart=multipart, req=req) finally: req.close() if res.startswith("ERROR"): raise ImageTyperzException(res) else: data = res.split('|') if len(data) == 2: ticket, result = data else: raise ImageTyperzException("Unknown response: %s" % res) return ticket, result
def init_handle(self): """ Sets common options to curl handle. """ self.setopt(pycurl.FOLLOWLOCATION, 1) self.setopt(pycurl.MAXREDIRS, 5) self.setopt(pycurl.CONNECTTIMEOUT, 30) self.setopt(pycurl.NOSIGNAL, 1) self.setopt(pycurl.NOPROGRESS, 1) if hasattr(pycurl, "AUTOREFERER"): self.setopt(pycurl.AUTOREFERER, 1) self.setopt(pycurl.SSL_VERIFYPEER, 0) # Interval for low speed, detects connection loss, but can abort dl if # hoster stalls the download self.setopt(pycurl.LOW_SPEED_TIME, 45) self.setopt(pycurl.LOW_SPEED_LIMIT, 5) # do not save the cookies self.setopt(pycurl.COOKIEFILE, '') self.setopt(pycurl.COOKIEJAR, '') # self.setopt(pycurl.VERBOSE, 1) self.setopt( pycurl.USERAGENT, 'Mozilla/5.0 (Windows NT 10.0; Win64; rv:53.0) ' 'Gecko/20100101 Firefox/53.0') if pycurl.version_info()[7]: self.setopt(pycurl.ENCODING, 'gzip,deflate') self.headers.update( {'Accept': "*/*", 'Accept-Language': "en-US,en", 'Accept-Charset': "ISO-8859-1,utf-8;q=0.7,*;q=0.7", 'Connection': "keep-alive", 'Keep-Alive': "300", 'Expect': ""})
def set_interface(self, options): interface, proxy, ipv6 = options[ 'interface'], options['proxies'], options['ipv6'] if interface and interface.lower() != "none": self.setopt(pycurl.INTERFACE, interface) if proxy: if proxy['type'] == "socks4": self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4) elif proxy['type'] == "socks5": self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) else: self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP) self.setopt(pycurl.PROXY, proxy['host']) self.setopt(pycurl.PROXYPORT, proxy['port']) if proxy['username']: userpwd = "{0}:{1}".format( proxy['username'], proxy['password']) self.setopt(pycurl.PROXYUSERPWD, userpwd) if ipv6: self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) else: self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) if "timeout" in options: self.setopt(pycurl.LOW_SPEED_TIME, options['timeout']) if "auth" in options: self.setopt(pycurl.USERPWD, self.options['auth'])
def version_update(self): if not obplayer.Config.setting('sync_url'): return obplayer.Log.log('sending player version to server: ' + obplayer.Config.version, 'sync') postfields = {} postfields['id'] = obplayer.Config.setting('sync_device_id') postfields['pw'] = obplayer.Config.setting('sync_device_password') postfields['version'] = obplayer.Config.version postfields['longitude'] = obplayer.Config.setting('location_longitude') postfields['latitude'] = obplayer.Config.setting('location_latitude') curl = pycurl.Curl() enc_postfields = urllib.urlencode(postfields) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player') curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=version') curl.setopt(pycurl.HEADER, False) curl.setopt(pycurl.POST, True) curl.setopt(pycurl.POSTFIELDS, enc_postfields) curl.setopt(pycurl.LOW_SPEED_LIMIT, 10) curl.setopt(pycurl.LOW_SPEED_TIME, 60) curl.setopt(pycurl.NOPROGRESS, 0) curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress) class CurlResponse: def __init__(self): self.buffer = u'' def __call__(self, data): self.buffer += data.decode('utf-8') curl_response = CurlResponse() curl.setopt(pycurl.WRITEFUNCTION, curl_response) try: curl.perform() except: obplayer.Log.log("exception in VersionUpdate thread", 'error') obplayer.Log.log(traceback.format_exc(), 'error') curl.close() if curl_response.buffer: version = json.loads(curl_response.buffer) obplayer.Log.log("server version reported as " + str(version), 'sync') if not self.check_min_version(version): obplayer.Log.log("minimum server version " + str(MIN_SERVER_VERSION) + " is required. Please update server software before continuing", 'error') else: obplayer.Log.log("server did not report a version number", 'warning')
def now_playing_update_thread(self, playlist_id, playlist_end, media_id, media_end, show_name): if not obplayer.Config.setting('sync_url'): return postfields = {} postfields['id'] = obplayer.Config.setting('sync_device_id') postfields['pw'] = obplayer.Config.setting('sync_device_password') postfields['playlist_id'] = playlist_id postfields['media_id'] = media_id postfields['show_name'] = show_name if playlist_end != '': postfields['playlist_end'] = int(round(playlist_end)) else: postfields['playlist_end'] = '' if media_end != '': postfields['media_end'] = int(round(media_end)) else: postfields['media_end'] = '' curl = pycurl.Curl() enc_postfields = urllib.urlencode(postfields) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player') curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=now_playing') curl.setopt(pycurl.HEADER, False) curl.setopt(pycurl.POST, True) curl.setopt(pycurl.POSTFIELDS, enc_postfields) #curl.setopt(pycurl.FOLLOWLOCATION, 1) curl.setopt(pycurl.LOW_SPEED_LIMIT, 10) curl.setopt(pycurl.LOW_SPEED_TIME, 60) curl.setopt(pycurl.NOPROGRESS, 0) curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress) try: curl.perform() except: obplayer.Log.log("exception in NowPlayingUpdate thread", 'error') obplayer.Log.log(traceback.format_exc(), 'error') curl.close() # # Request sync data from web application. # This is used by sync (with request_type='schedule') and sync_priority_broadcasts (with request_type='emerg'). # Function outputs XML response from server. #
def sync_request(self, request_type='', data=False): sync_url = obplayer.Config.setting('sync_url') if not sync_url: obplayer.Log.log("sync url is blank, skipping sync request", 'sync') return '' curl = pycurl.Curl() postfields = {} postfields['id'] = obplayer.Config.setting('sync_device_id') postfields['pw'] = obplayer.Config.setting('sync_device_password') postfields['hbuffer'] = obplayer.Config.setting('sync_buffer') if data: postfields['data'] = data enc_postfields = urllib.urlencode(postfields) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player') curl.setopt(pycurl.URL, sync_url + '?action=' + request_type) curl.setopt(pycurl.HEADER, False) curl.setopt(pycurl.POST, True) curl.setopt(pycurl.POSTFIELDS, enc_postfields) # some options so that it'll abort the transfer if the speed is too low (i.e., network problem) # low speed abort set to 0.01Kbytes/s for 60 seconds). curl.setopt(pycurl.LOW_SPEED_LIMIT, 10) curl.setopt(pycurl.LOW_SPEED_TIME, 60) curl.setopt(pycurl.NOPROGRESS, 0) curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress) class CurlResponse: def __init__(self): self.buffer = u'' def __call__(self, data): self.buffer += data.decode('utf-8') curl_response = CurlResponse() curl.setopt(pycurl.WRITEFUNCTION, curl_response) try: curl.perform() #except pycurl.error as error: # (errno, errstr) = error # obplayer.Log.log('network error: ' + errstr, 'error') except: obplayer.Log.log("exception in sync " + request_type + " thread", 'error') obplayer.Log.log(traceback.format_exc(), 'error') curl.close() return curl_response.buffer # # Fetch media from web application. Saves under media directory. # media_id : id of the media we want # filename : filename to save under. #