Python pycurl 模块,LOW_SPEED_TIME 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用pycurl.LOW_SPEED_TIME

项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_post(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_post_data(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, data="data", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.POSTFIELDSIZE: 4,
                          pycurl.READFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_cainfo(self):
        curl = CurlStub(b"result")
        result = fetch("https://example.com", cainfo="cainfo", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"https://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.CAINFO: b"cainfo",
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_headers(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com",
                       headers={"a": "1", "b": "2"}, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.HTTPHEADER: ["a: 1", "b: 2"],
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_pycurl_insecure(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com/get-ca-cert", curl=curl,
                       insecure=True)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com/get-ca-cert",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.SSL_VERIFYPEER: False,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def initHandle(self):
        """ sets common options to curl handle """
        self.c.setopt(pycurl.FOLLOWLOCATION, 1)
        self.c.setopt(pycurl.MAXREDIRS, 5)
        self.c.setopt(pycurl.CONNECTTIMEOUT, 30)
        self.c.setopt(pycurl.NOSIGNAL, 1)
        self.c.setopt(pycurl.NOPROGRESS, 1)
        if hasattr(pycurl, "AUTOREFERER"):
            self.c.setopt(pycurl.AUTOREFERER, 1)
        self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
        self.c.setopt(pycurl.LOW_SPEED_TIME, 30)
        self.c.setopt(pycurl.LOW_SPEED_LIMIT, 5)

        #self.c.setopt(pycurl.VERBOSE, 1)

        self.c.setopt(pycurl.USERAGENT,
            "Mozilla/5.0 (Windows NT 6.1; Win64; x64;en; rv:5.0) Gecko/20110619 Firefox/5.0")
        if pycurl.version_info()[7]:
            self.c.setopt(pycurl.ENCODING, "gzip, deflate")
        self.c.setopt(pycurl.HTTPHEADER, ["Accept: */*",
                                          "Accept-Language: en-US,en",
                                          "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7",
                                          "Connection: keep-alive",
                                          "Keep-Alive: 300",
                                          "Expect:"])
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def submit(self, captcha, captchaType="file", match=None):
        req = get_request()

        #: Raise timeout threshold
        req.c.setopt(pycurl.LOW_SPEED_TIME, 80)

        try:
            res = self.load(self.SUBMIT_URL,
                            post={'vendor_key': self.PYLOAD_KEY,
                                  'key': self.config.get('passkey'),
                                  'gen_task_id': "1",
                                  'file': (pycurl.FORM_FILE, captcha)},
                            req=req)
        finally:
            req.close()

        data = dict(x.split(' ', 1) for x in res.splitlines())
        if not data or "Value" not in data:
            raise BypassCaptchaException(res)

        result = data['Value']
        ticket = data['TaskId']
        self.log_debug("Result %s : %s" % (ticket, result))

        return ticket, result
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def _process_captcha(self, task):
        task.data['ticket'] = ticket = uuid.uuid4()
        result = None

        with open(task.captchaFile, 'rb') as f:
            data = f.read()

        req = get_request()
        #: Raise timeout threshold
        req.c.setopt(pycurl.LOW_SPEED_TIME, 80)

        try:
            result = self.load(self.API_URL,
                               post={'action': "upload",
                                     'key': self.config.get('passkey'),
                                     'file': base64.b64encode(data),
                                     'gen_task_id': ticket},
                               req=req)
        finally:
            req.close()

        self.log_debug("Result %s : %s" % (ticket, result))
        task.setResult(result)
项目:pyload-plugins    作者:pyload    | 项目源码 | 文件源码
def submit(self, captcha, captchaType="file", match=None):
        req = get_request()

        #: Raise timeout threshold
        req.c.setopt(pycurl.LOW_SPEED_TIME, 80)

        try:
            res = self.load(self.SUBMIT_URL,
                            post={'vendor_key': self.PYLOAD_KEY,
                                  'key': self.config.get('passkey'),
                                  'gen_task_id': "1",
                                  'file': (pycurl.FORM_FILE, captcha)},
                            req=req)
        finally:
            req.close()

        data = dict(x.split(' ', 1) for x in res.splitlines())
        if not data or "Value" not in data:
            raise BypassCaptchaException(res)

        result = data['Value']
        ticket = data['TaskId']
        self.log_debug("Result %s : %s" % (ticket, result))

        return ticket, result
项目:pyload-plugins    作者:pyload    | 项目源码 | 文件源码
def _process_captcha(self, task):
        task.data['ticket'] = ticket = uuid.uuid4()
        result = None

        with open(task.captchaFile, 'rb') as f:
            data = f.read()

        req = get_request()
        #: Raise timeout threshold
        req.c.setopt(pycurl.LOW_SPEED_TIME, 80)

        try:
            result = self.load(self.API_URL,
                               post={'action': "upload",
                                     'key': self.config.get('passkey'),
                                     'file': base64.b64encode(data),
                                     'gen_task_id': ticket},
                               req=req)
        finally:
            req.close()

        self.log_debug("Result %s : %s" % (ticket, result))
        task.setResult(result)
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_basic(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_create_curl(self):
        curls = []

        def pycurl_Curl():
            curl = CurlStub(b"result")
            curls.append(curl)
            return curl
        Curl = pycurl.Curl
        try:
            pycurl.Curl = pycurl_Curl
            result = fetch("http://example.com")
            curl = curls[0]
            self.assertEqual(result, b"result")
            self.assertEqual(curl.options,
                             {pycurl.URL: b"http://example.com",
                              pycurl.FOLLOWLOCATION: 1,
                              pycurl.MAXREDIRS: 5,
                              pycurl.CONNECTTIMEOUT: 30,
                              pycurl.LOW_SPEED_LIMIT: 1,
                              pycurl.LOW_SPEED_TIME: 600,
                              pycurl.NOSIGNAL: 1,
                              pycurl.WRITEFUNCTION: Any(),
                              pycurl.DNS_CACHE_TIMEOUT: 0,
                              pycurl.ENCODING: b"gzip,deflate"})
        finally:
            pycurl.Curl = Curl
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def setInterface(self, options):

        interface, proxy, ipv6 = options["interface"], options["proxies"], options["ipv6"]

        if interface and interface.lower() != "none":
            self.c.setopt(pycurl.INTERFACE, str(interface))

        if proxy:
            if proxy["type"] == "socks4":
                self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4)
            elif proxy["type"] == "socks5":
                self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
            else:
                self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP)

            self.c.setopt(pycurl.PROXY, str(proxy["address"]))
            self.c.setopt(pycurl.PROXYPORT, proxy["port"])

            if proxy["username"]:
                self.c.setopt(pycurl.PROXYUSERPWD, str("%s:%s" % (proxy["username"], proxy["password"])))

        if ipv6:
            self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
        else:
            self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)

        if "auth" in options:
            self.c.setopt(pycurl.USERPWD, str(options["auth"]))

        if "timeout" in options:
            self.c.setopt(pycurl.LOW_SPEED_TIME, options["timeout"])
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def submit(self, captcha, captchaType="file", match=None):
        req = get_request()
        #: Raise timeout threshold
        req.c.setopt(pycurl.LOW_SPEED_TIME, 80)

        try:
            #@NOTE: Workaround multipart-post bug in HTTPRequest.py
            if re.match("^\w*$", self.config.get('password')):
                multipart = True
                data = (pycurl.FORM_FILE, captcha)
            else:
                multipart = False
                with open(captcha, 'rb') as f:
                    data = f.read()
                data = base64.b64encode(data)

            res = self.load(self.SUBMIT_URL,
                            post={'action': "UPLOADCAPTCHA",
                                  'username': self.config.get('username'),
                                  'password': self.config.get('password'), 'file': data},
                            multipart=multipart,
                            req=req)
        finally:
            req.close()

        if res.startswith("ERROR"):
            raise ImageTyperzException(res)
        else:
            data = res.split('|')
            if len(data) == 2:
                ticket, result = data
            else:
                raise ImageTyperzException("Unknown response: %s" % res)

        return ticket, result
项目:pyload-requests    作者:pyload    | 项目源码 | 文件源码
def init_handle(self):
        """
        Sets common options to curl handle.
        """
        self.setopt(pycurl.FOLLOWLOCATION, 1)
        self.setopt(pycurl.MAXREDIRS, 5)
        self.setopt(pycurl.CONNECTTIMEOUT, 30)
        self.setopt(pycurl.NOSIGNAL, 1)
        self.setopt(pycurl.NOPROGRESS, 1)
        if hasattr(pycurl, "AUTOREFERER"):
            self.setopt(pycurl.AUTOREFERER, 1)
        self.setopt(pycurl.SSL_VERIFYPEER, 0)
        # Interval for low speed, detects connection loss, but can abort dl if
        # hoster stalls the download
        self.setopt(pycurl.LOW_SPEED_TIME, 45)
        self.setopt(pycurl.LOW_SPEED_LIMIT, 5)

        # do not save the cookies
        self.setopt(pycurl.COOKIEFILE, '')
        self.setopt(pycurl.COOKIEJAR, '')

        # self.setopt(pycurl.VERBOSE, 1)

        self.setopt(
            pycurl.USERAGENT,
            'Mozilla/5.0 (Windows NT 10.0; Win64; rv:53.0) '
            'Gecko/20100101 Firefox/53.0')
        if pycurl.version_info()[7]:
            self.setopt(pycurl.ENCODING, 'gzip,deflate')

        self.headers.update(
            {'Accept': "*/*",
             'Accept-Language': "en-US,en",
             'Accept-Charset': "ISO-8859-1,utf-8;q=0.7,*;q=0.7",
             'Connection': "keep-alive",
             'Keep-Alive': "300",
             'Expect': ""})
项目:pyload-requests    作者:pyload    | 项目源码 | 文件源码
def set_interface(self, options):
        interface, proxy, ipv6 = options[
            'interface'], options['proxies'], options['ipv6']

        if interface and interface.lower() != "none":
            self.setopt(pycurl.INTERFACE, interface)

        if proxy:
            if proxy['type'] == "socks4":
                self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4)
            elif proxy['type'] == "socks5":
                self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
            else:
                self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP)

            self.setopt(pycurl.PROXY, proxy['host'])
            self.setopt(pycurl.PROXYPORT, proxy['port'])

            if proxy['username']:
                userpwd = "{0}:{1}".format(
                    proxy['username'], proxy['password'])
                self.setopt(pycurl.PROXYUSERPWD, userpwd)

        if ipv6:
            self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
        else:
            self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)

        if "timeout" in options:
            self.setopt(pycurl.LOW_SPEED_TIME, options['timeout'])

        if "auth" in options:
            self.setopt(pycurl.USERPWD, self.options['auth'])
项目:pyload-plugins    作者:pyload    | 项目源码 | 文件源码
def submit(self, captcha, captchaType="file", match=None):
        req = get_request()
        #: Raise timeout threshold
        req.c.setopt(pycurl.LOW_SPEED_TIME, 80)

        try:
            #@NOTE: Workaround multipart-post bug in HTTPRequest.py
            if re.match("^\w*$", self.config.get('password')):
                multipart = True
                data = (pycurl.FORM_FILE, captcha)
            else:
                multipart = False
                with open(captcha, 'rb') as f:
                    data = f.read()
                data = base64.b64encode(data)

            res = self.load(self.SUBMIT_URL,
                            post={'action': "UPLOADCAPTCHA",
                                  'username': self.config.get('username'),
                                  'password': self.config.get('password'), 'file': data},
                            multipart=multipart,
                            req=req)
        finally:
            req.close()

        if res.startswith("ERROR"):
            raise ImageTyperzException(res)
        else:
            data = res.split('|')
            if len(data) == 2:
                ticket, result = data
            else:
                raise ImageTyperzException("Unknown response: %s" % res)

        return ticket, result
项目:obplayer    作者:openbroadcaster    | 项目源码 | 文件源码
def version_update(self):
        if not obplayer.Config.setting('sync_url'):
            return
        obplayer.Log.log('sending player version to server: ' + obplayer.Config.version, 'sync')

        postfields = {}
        postfields['id'] = obplayer.Config.setting('sync_device_id')
        postfields['pw'] = obplayer.Config.setting('sync_device_password')
        postfields['version'] = obplayer.Config.version
        postfields['longitude'] = obplayer.Config.setting('location_longitude')
        postfields['latitude'] = obplayer.Config.setting('location_latitude')

        curl = pycurl.Curl()

        enc_postfields = urllib.urlencode(postfields)

        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
        curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=version')
        curl.setopt(pycurl.HEADER, False)
        curl.setopt(pycurl.POST, True)
        curl.setopt(pycurl.POSTFIELDS, enc_postfields)

        curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
        curl.setopt(pycurl.LOW_SPEED_TIME, 60)

        curl.setopt(pycurl.NOPROGRESS, 0)
        curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)

        class CurlResponse:
            def __init__(self):
                self.buffer = u''
            def __call__(self, data):
                self.buffer += data.decode('utf-8')

        curl_response = CurlResponse()
        curl.setopt(pycurl.WRITEFUNCTION, curl_response)

        try:
            curl.perform()
        except:
            obplayer.Log.log("exception in VersionUpdate thread", 'error')
            obplayer.Log.log(traceback.format_exc(), 'error')

        curl.close()

        if curl_response.buffer:
            version = json.loads(curl_response.buffer)
            obplayer.Log.log("server version reported as " + str(version), 'sync')
            if not self.check_min_version(version):
                obplayer.Log.log("minimum server version " + str(MIN_SERVER_VERSION) + " is required. Please update server software before continuing", 'error')
        else:
            obplayer.Log.log("server did not report a version number", 'warning')
项目:obplayer    作者:openbroadcaster    | 项目源码 | 文件源码
def now_playing_update_thread(self, playlist_id, playlist_end, media_id, media_end, show_name):

        if not obplayer.Config.setting('sync_url'):
            return

        postfields = {}
        postfields['id'] = obplayer.Config.setting('sync_device_id')
        postfields['pw'] = obplayer.Config.setting('sync_device_password')

        postfields['playlist_id'] = playlist_id
        postfields['media_id'] = media_id
        postfields['show_name'] = show_name

        if playlist_end != '':
            postfields['playlist_end'] = int(round(playlist_end))
        else:
            postfields['playlist_end'] = ''

        if media_end != '':
            postfields['media_end'] = int(round(media_end))
        else:
            postfields['media_end'] = ''

        curl = pycurl.Curl()

        enc_postfields = urllib.urlencode(postfields)

        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
        curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=now_playing')
        curl.setopt(pycurl.HEADER, False)
        curl.setopt(pycurl.POST, True)
        curl.setopt(pycurl.POSTFIELDS, enc_postfields)
        #curl.setopt(pycurl.FOLLOWLOCATION, 1)

        curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
        curl.setopt(pycurl.LOW_SPEED_TIME, 60)

        curl.setopt(pycurl.NOPROGRESS, 0)
        curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)

        try:
            curl.perform()
        except:
            obplayer.Log.log("exception in NowPlayingUpdate thread", 'error')
            obplayer.Log.log(traceback.format_exc(), 'error')

        curl.close()

    #
    # Request sync data from web application.
    # This is used by sync (with request_type='schedule') and sync_priority_broadcasts (with request_type='emerg').
    # Function outputs XML response from server.
    #
项目:obplayer    作者:openbroadcaster    | 项目源码 | 文件源码
def sync_request(self, request_type='', data=False):
        sync_url = obplayer.Config.setting('sync_url')
        if not sync_url:
            obplayer.Log.log("sync url is blank, skipping sync request", 'sync')
            return ''

        curl = pycurl.Curl()

        postfields = {}
        postfields['id'] = obplayer.Config.setting('sync_device_id')
        postfields['pw'] = obplayer.Config.setting('sync_device_password')
        postfields['hbuffer'] = obplayer.Config.setting('sync_buffer')
        if data:
            postfields['data'] = data

        enc_postfields = urllib.urlencode(postfields)

        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
        curl.setopt(pycurl.URL, sync_url + '?action=' + request_type)
        curl.setopt(pycurl.HEADER, False)
        curl.setopt(pycurl.POST, True)
        curl.setopt(pycurl.POSTFIELDS, enc_postfields)

        # some options so that it'll abort the transfer if the speed is too low (i.e., network problem)
        # low speed abort set to 0.01Kbytes/s for 60 seconds).
        curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
        curl.setopt(pycurl.LOW_SPEED_TIME, 60)

        curl.setopt(pycurl.NOPROGRESS, 0)
        curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)

        class CurlResponse:
            def __init__(self):
                self.buffer = u''
            def __call__(self, data):
                self.buffer += data.decode('utf-8')

        curl_response = CurlResponse()
        curl.setopt(pycurl.WRITEFUNCTION, curl_response)

        try:
            curl.perform()
        #except pycurl.error as error:
        #    (errno, errstr) = error
        #    obplayer.Log.log('network error: ' + errstr, 'error')
        except:
            obplayer.Log.log("exception in sync " + request_type + " thread", 'error')
            obplayer.Log.log(traceback.format_exc(), 'error')

        curl.close()

        return curl_response.buffer

    #
    # Fetch media from web application.  Saves under media directory.
    # media_id : id of the media we want
    # filename : filename to save under.
    #