我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用requests.compat.urljoin()。
def login(self): cookie_dict = requests.utils.dict_from_cookiejar(self.session.cookies) if cookie_dict.get('session'): return True login_params = { 'submit': 'Login', 'username': self.username, 'password': self.password, 'keeplogged': 0, } if not self.get_url(self.urls['login'], post_data=login_params, returns='text'): logger.log(u"Unable to connect to provider", logger.WARNING) return False response = self.get_url(urljoin(self.urls['base_url'],'index.php'), returns='text') if re.search('<title>Login :: BJ-Share</title>', response): logger.log(u"Invalid username or password. Check your settings", logger.WARNING) return False return True
def _request_get(self, path, params=None): url = urljoin(BASE_URL, path) headers = self._get_request_headers() response = requests.get(url, params=params, headers=headers) if response.status_code >= 500: backoff = self._initial_backoff for _ in range(self._max_retries): time.sleep(backoff) backoff_response = requests.get(url, params=params, headers=headers) if backoff_response.status_code < 500: response = backoff_response break backoff *= 2 response.raise_for_status() return response.json()
def cloud_auth(session, login=LOGIN, password=PASSWORD): try: r = session.post('https://auth.mail.ru/cgi-bin/auth?lang=ru_RU&from=authpopup', data = {'Login': login, 'Password': password, 'page': urljoin(CLOUD_URL, '?from=promo'), 'new_auth_form': 1, 'Domain': get_email_domain(login)}, verify = VERIFY_SSL) except Exception as e: if LOGGER: LOGGER.error('Cloud auth HTTP request error: {}'.format(e)) return None if r.status_code == requests.codes.ok: if LOGIN_CHECK_STRING in r.text: return True elif LOGGER: LOGGER.error('Cloud authorization request error. Check your credentials settings in {}. \ Do not forget to accept cloud LA by entering it in browser. \ HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text)) elif LOGGER: LOGGER.error('Cloud authorization request error. Check your connection. \ HTTP code: {}, msg: {}'.format(r.status_code, r.text)) return None
def get_csrf(session): try: r = session.get(urljoin(CLOUD_URL, 'tokens/csrf'), verify = VERIFY_SSL) except Exception as e: if LOGGER: LOGGER.error('Get csrf HTTP request error: {}'.format(e)) return None if r.status_code == requests.codes.ok: r_json = r.json() token = r_json['body']['token'] assert len(token) == 32, 'invalid CSRF token <{}> lentgh'.format(token) return token elif LOGGER: LOGGER.error('CSRF token request error. Check your connection and credentials settings in {}. \ HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text)) return None
def get_upload_domain(session, csrf=''): """ return current cloud's upload domain url it seems that csrf isn't necessary in session, but forcing assert anyway to avoid possible future damage """ assert csrf is not None, 'no CSRF' url = urljoin(CLOUD_URL, 'dispatcher?token=' + csrf) try: r = session.get(url, verify = VERIFY_SSL) except Exception as e: if LOGGER: LOGGER.error('Get upload domain HTTP request error: {}'.format(e)) return None if r.status_code == requests.codes.ok: r_json = r.json() return r_json['body']['upload'][0]['url'] elif LOGGER: LOGGER.error('Upload domain request error. Check your connection. \ HTTP code: {}, msg: {}'.format(r.status_code, r.text)) return None
def _read_actions(self): action_url = urljoin(self._url_base, self._control_url) for action_node in self._findall('actionList/action'): name = action_node.findtext('name', namespaces=action_node.nsmap) argsdef_in = [] argsdef_out = [] for arg_node in action_node.findall( 'argumentList/argument', namespaces=action_node.nsmap): findtext = partial(arg_node.findtext, namespaces=arg_node.nsmap) arg_name = findtext('name') arg_statevar = self.statevars[findtext('relatedStateVariable')] if findtext('direction').lower() == 'in': argsdef_in.append((arg_name, arg_statevar)) else: argsdef_out.append((arg_name, arg_statevar)) action = Action(action_url, self.service_type, name, argsdef_in, argsdef_out) self.action_map[name] = action self.actions.append(action)
def get_log_types(): url = "https://www.bro.org/sphinx/script-reference/" resp = requests.get(url=url + "log-files.html") soup = BeautifulSoup(resp.content, "html.parser") bro_logs = dict(logs=[]) for table in soup.find_all("table", {"class": "docutils"}): for row in table.find('tbody').find_all('tr'): log = {} cols = row.find_all('td') cols = [ele.text.strip() for ele in cols] tds = [ele for ele in cols if ele] log['file'] = tds[0] log['log_type'] = os.path.splitext(log['file'])[0] log['description'] = tds[1] log['fields'] = [] link = row.find('a', href=True) # do not add a URL for notice_alarm.log if link is not None and 'notice_alarm' not in log['log_type']: log['url'] = urljoin(url, link['href']) logger.info('adding log type: {}'.format(log['log_type'])) bro_logs['logs'].append(log) return bro_logs
def _authenticate(email=None, password=None): """ :param email: :param password: :return: """ if email is None: try: input_fun = raw_input except NameError: input_fun = input email = input_fun('IBM QE user (e-mail) > ') if password is None: password = getpass.getpass(prompt='IBM QE password > ') r = requests.post(urljoin(_api_url, 'users/login'), data={"email": email, "password": password}) r.raise_for_status() json_data = r.json() user_id = json_data['userId'] access_token = json_data['id'] return user_id, access_token
def test_send_real_device_offline(monkeypatch): def mocked_requests_get(*args, **kwargs): class MockResponse: def __init__(self, json_data, status_code): self.json_data = json_data self.status_code = status_code def json(self): return self.json_data # Accessing status of device. Return online. status_url = 'Backends/ibmqx2/queue/status' if args[0] == urljoin(_api_url_status, status_url): return MockResponse({"state": False}, 200) monkeypatch.setattr("requests.get", mocked_requests_get) shots = 1 json_qasm = "my_json_qasm" name = 'projectq_test' with pytest.raises(_ibm_http_client.DeviceOfflineError): _ibm_http_client.send(json_qasm, device="ibmqx2", user=None, password=None, shots=shots, verbose=True)
def construct_collection(self, instance, spec, loc, context): """ Constructor for `.collection` predicate. This constructor aims to aggregate the cerberus validation schemas for every single field defined by the collection. """ instance = super(self.__class__, self).construct_collection( instance, spec, loc, context) self.init_adapter_conf(instance) schema = {field_name: schema.get(self.ADAPTER_CONF, {}) for field_name, schema in doc.doc_get( instance, ('*',)).iteritems()} collection = context.get('parent_name') endpoint = urljoin( self.root_url, TRAILING_SLASH.join([loc[0], collection])) endpoint += TRAILING_SLASH instance[self.ADAPTER_CONF] = schema client = ApimasClient(endpoint, schema) self.clients[loc[0] + '/' + collection] = client return instance
def _request_post(self, path, data=None, params=None): url = urljoin(BASE_URL, path) headers = self._get_request_headers() response = requests.post(url, json=data, params=params, headers=headers) response.raise_for_status() if response.status_code == 200: return response.json()
def _request_put(self, path, data=None, params=None): url = urljoin(BASE_URL, path) headers = self._get_request_headers() response = requests.put(url, json=data, params=params, headers=headers) response.raise_for_status() if response.status_code == 200: return response.json()
def _request_delete(self, path, params=None): url = urljoin(BASE_URL, path) headers = self._get_request_headers() response = requests.delete(url, params=params, headers=headers) response.raise_for_status() if response.status_code == 200: return response.json()
def req(url, hdr): try: res = requests.get(urljoin(BASE_URL, url), headers=hdr, timeout=10.0) except requests.Timeout: raise RequestTimeoutError(url) except requests.ConnectionError: raise RequestTimeoutError(url) if res.status_code != 200: raise StatusCodeError(url, res.status_code) return res
def get_cloud_space(session, csrf='', login=LOGIN): """ returns available free space in bytes """ assert csrf is not None, 'no CSRF' timestamp = str(int(time.mktime(datetime.datetime.now().timetuple())* 1000)) quoted_login = quote_plus(login) command = ('user/space?api=' + str(API_VER) + '&email=' + quoted_login + '&x-email=' + quoted_login + '&token=' + csrf + '&_=' + timestamp) url = urljoin(CLOUD_URL, command) try: r = session.get(url, verify = VERIFY_SSL) except Exception as e: if LOGGER: LOGGER.error('Get cloud space HTTP request error: {}'.format(e)) return 0 if r.status_code == requests.codes.ok: r_json = r.json() total_bytes = r_json['body']['total'] * 1024 * 1024 used_bytes = r_json['body']['used'] * 1024 * 1024 return total_bytes - used_bytes elif LOGGER: LOGGER.error('Cloud free space request error. Check your connection. \ HTTP code: {}, msg: {}'.format(r.status_code, r.text)) return 0
def post_file(session, domain='', file='', login=LOGIN): """ posts file to the cloud's upload server param: file - string filename with path """ assert domain is not None, 'no domain' assert file is not None, 'no file' filetype = guess_type(file)[0] if not filetype: filetype = DEFAULT_FILETYPE if LOGGER: LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE)) filename = os.path.basename(file) quoted_login = quote_plus(login) timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp) m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)}) try: r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL) except Exception as e: if LOGGER: LOGGER.error('Post file HTTP request error: {}'.format(e)) return (None, None) if r.status_code == requests.codes.ok: if len(r.content): hash = r.content[:40].decode() size = int(r.content[41:-2]) return (hash, size) elif LOGGER: LOGGER.error('File {} post error, no hash and size received'.format(file)) elif LOGGER: LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text)) return (None, None)
def prepare_url(value): # Issue #1483: Make sure the URL always has a trailing slash httpbin_url = value.url.rstrip('/') + '/' def inner(*suffix): return urljoin(httpbin_url, '/'.join(suffix)) return inner
def test_get_rfs_url(self): CONF.podm.url = "https://127.0.0.1:8443" expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1") # test without service_ext result = redfish.get_rfs_url("/Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("/Systems/1") self.assertEqual(expected, result) result = redfish.get_rfs_url("Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("Systems/1") self.assertEqual(expected, result) # test with service_ext result = redfish.get_rfs_url("/redfish/v1/Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("/redfish/v1/Systems/1") self.assertEqual(expected, result) result = redfish.get_rfs_url("redfish/v1/Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("redfish/v1/Systems/1") self.assertEqual(expected, result)
def test_get_rfs_url_with_tailing_slash(self): CONF.podm.url = "https://127.0.0.1:8443/" expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1") # test without service_ext result = redfish.get_rfs_url("/Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("/Systems/1") self.assertEqual(expected, result) result = redfish.get_rfs_url("Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("Systems/1") self.assertEqual(expected, result) # test with service_ext result = redfish.get_rfs_url("/redfish/v1/Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("/redfish/v1/Systems/1") self.assertEqual(expected, result) result = redfish.get_rfs_url("redfish/v1/Systems/1/") self.assertEqual(expected, result) result = redfish.get_rfs_url("redfish/v1/Systems/1") self.assertEqual(expected, result)
def __init__(self, url_base, service_type, service_id, control_url, scpd_url, event_sub_url): self._url_base = url_base self.service_type = service_type self.service_id = service_id self._control_url = control_url self.scpd_url = scpd_url self._event_sub_url = event_sub_url self.actions = [] self.action_map = {} self.statevars = {} self._log = _getLogger('Service') self._log.debug('%s url_base: %s', self.service_id, self._url_base) self._log.debug('%s SCPDURL: %s', self.service_id, self.scpd_url) self._log.debug('%s controlURL: %s', self.service_id, self._control_url) self._log.debug('%s eventSubURL: %s', self.service_id, self._event_sub_url) url = urljoin(self._url_base, self.scpd_url) self._log.info('Reading %s', url) resp = requests.get(url, timeout=HTTP_TIMEOUT) resp.raise_for_status() self.scpd_xml = etree.fromstring(resp.content) self._find = partial(self.scpd_xml.find, namespaces=self.scpd_xml.nsmap) self._findtext = partial(self.scpd_xml.findtext, namespaces=self.scpd_xml.nsmap) self._findall = partial(self.scpd_xml.findall, namespaces=self.scpd_xml.nsmap) self._read_state_vars() self._read_actions()
def stats(self): """ Get global cryptocurrencies statistics. Returns: dict: Global markets statistics """ url = urljoin(self.urls["api"], 'global/') response = get(url).json(parse_int=self.parse_int, parse_float=self.parse_float) return response ####### WEB PARSER METHODS #######
def _get_ranks(self, query, temp): """Internal function for get gainers and losers Args: query: Query to obtain ranks, gainers or losers temp: Temporal period obtaining gainers or losers, 1h, 24h or 7d """ url = urljoin(self.urls["web"], 'gainers-losers/') html = self._html(url) call = str(query) + '-' + str(temp) response = [] html_rank = html.find('div', {'id': call}).find_all('tr') for curr in html_rank[1:]: _childs, childs = (curr.contents, []) for c in _childs: if c != '\n': childs.append(c) for n, g in enumerate(childs): if n == 1: name = str(g.a.getText()) elif n == 2: symbol = str(g.string) elif n == 3: _volume_24h = sub(r'\$|,', '', g.a.getText()) volume_24h = self.parse_int(_volume_24h) elif n == 4: _price = sub(r'\$|,', '', g.a.getText()) price = self.parse_float(_price) elif n == 5: percent = self.parse_float(sub(r'%', '', g.string)) currency = {'symbol': symbol, 'name': name, '24h_volume_usd': volume_24h, 'price_usd': price, 'percent_change': percent} response.append(currency) return response
def global_cap(self, bitcoin=True, start=None, end=None): """Get global market capitalization graphs, including or excluding Bitcoin Args: bitcoin (bool, optional): Indicates if Bitcoin will be includedin global market capitalization graph. As default True start (optional, datetime): Time to start retrieving graphs data. If not provided get As default None end (optional, datetime): Time to end retrieving graphs data. Returns (dict): List of lists with timestamp and values """ base_url = self.urls["graphs_api"] if bitcoin: endpoint = "global/marketcap-total/" else: endpoint = "global/marketcap-altcoin/" url = urljoin(base_url, endpoint) if start and end: url += self._add_start_end(url, start, end) return get(url).json()
def build_url(current_url, next_url): if is_url(next_url): return next_url else: return urljoin(current_url, next_url)
def _run(qasm, device, user_id, access_token, shots): suffix = 'codes/execute' r = requests.post(urljoin(_api_url, suffix), data=qasm, params={"access_token": access_token, "deviceRunType": device, "fromCache": "false", "shots": shots}, headers={"Content-Type": "application/json"}) r.raise_for_status() r_json = r.json() execution_id = r_json["id"] return execution_id
def _get_result(execution_id, access_token, num_retries=300, interval=1): suffix = 'Executions/{execution_id}'.format(execution_id=execution_id) for _ in range(num_retries): r = requests.get(urljoin(_api_url, suffix), params={"access_token": access_token}) r.raise_for_status() r_json = r.json() status = r_json["status"]["id"] if status == "DONE": return r_json["result"] time.sleep(interval)
def __call__(self, value): if value is None: return value return urljoin(self.ref_endpoint, value).rstrip('/') + '/'
def format_endpoint(self, resource_id): """ This method concatenates the resource's endpoint with a specified identifier. Example: endpoint/<pk>/ """ if isinstance(resource_id, unicode): resource_id = resource_id.encode("utf-8") return urljoin(self.endpoint, quote( str(resource_id))) + TRAILING_SLASH
def markets(self, currency): """Get available coinmarketcap markets data. It needs a currency as argument. Args: currency (str): Currency to get market data Returns: list: markets on wich provided currency is currently tradeable """ if self.is_symbol(currency): currency = self.correspondences[currency] url = urljoin(self.urls["web"], "currencies/%s/" % currency) html = self._html(url) response = [] marks = html.find('tbody').find_all('tr') for m in marks: _volume_24h = m.find('span', {'class': 'volume'}).getText() volume_24h = self.parse_int(sub(r'\D', '', _volume_24h)) _price = m.find('span', {'class': 'price'}).getText() _price = sub(r'\$| |\*|,', '', _price) price = self.parse_float(_price) _childs, childs = (m.contents, []) for c in _childs: if c != '\n': childs.append(c) for n, c in enumerate(childs): nav = c.string if n == 1: exchange = str(nav) elif n == 2: pair = str(c.getText()).replace('/', self.pair_separator) if pair[-1] == '*': pair = pair.replace(' *', '') elif n == 5: percent_volume = self.parse_float(nav.replace('%', '')) elif n == 6: updated = nav == "Recently" market = {'exchange': exchange, 'pair': pair, '24h_volume_usd': volume_24h, 'price_usd': price, 'percent_volume': percent_volume, "updated": updated} response.append(market) return response
def exchange(self, name): """Obtain data from a exchange passed as argument Example: exchange('poloniex') Args: name (str): Exchange to retrieve data Returns: list: Data from all markets in a exchange """ url = urljoin(self.urls["web"], 'exchanges/%s/' % name) html = self._html(url) marks = html.find('table').find_all('tr') response = [] for m in marks[1:]: _childs, childs = (m.contents, []) for c in _childs: if c != '\n': childs.append(c) for n, c in enumerate(childs): if n == 0: rank = self.parse_int(c.getText()) elif n == 1: name = str(c.getText()) elif n == 2: market = str(c.getText().replace( '/', self.pair_separator )) elif n == 3: _volume_24h_usd = sub(r'\$|,', '', c.getText()) volume_24h_usd = self.parse_int(_volume_24h_usd) elif n == 4: _price_usd = sub(r'\$| |\*', '', c.getText()) price_usd = self.parse_float(_price_usd) elif n == 5: _perc_volume = c.getText().replace('%', '') perc_volume = self.parse_float(_perc_volume) indicators = {'rank': rank, 'name': name, 'market': market, 'volume_24h_usd': volume_24h_usd, 'price_usd': price_usd, 'perc_volume': perc_volume} response.append(indicators) return response
def get_test_data(fn): def _dir_content(url): r = requests.get(url) r.raise_for_status() entries = re.findall(r'href="([a-zA-Z0-9_.-]+/?)"', r.text) files = sorted(set(fn for fn in entries if not fn.endswith('/'))) return files def _file_size(url): r = requests.head(url, headers={'Accept-Encoding': 'identity'}) r.raise_for_status() return int(r.headers['content-length']) def _download_file(url, fn_local): if op.exists(fn_local): if os.stat(fn_local).st_size == _file_size(url): logger.info('Using cached file %s' % fn_local) return fn_local logger.info('Downloading %s...' % url) fsize = _file_size(url) r = requests.get(url, stream=True) r.raise_for_status() dl_bytes = 0 with open(fn_local, 'wb') as f: for d in r.iter_content(chunk_size=1024): dl_bytes += len(d) f.write(d) if dl_bytes != fsize: raise DownloadError('Download incomplete!') logger.info('Download completed.') return fn_local url = urljoin(data_uri, fn) dl_dir = data_dir _makedir(dl_dir) if fn.endswith('/'): dl_dir = op.join(data_dir, fn) _makedir(dl_dir) dl_files = _dir_content(url) dl_files = zip([urljoin(url, u) for u in dl_files], [op.join(dl_dir, f) for f in dl_files]) else: dl_files = (url, op.join(data_dir, fn)) return _download_file(*dl_files) return [_download_file(*f) for f in dl_files]
def send(qasm, device='sim_trivial_2', user=None, password=None, shots=1, verbose=False): """ Sends QASM through the IBM API and runs the quantum circuit. Args: qasm: QASM representation of the circuit to run. device (str): 'sim_trivial_2' or 'real' to run on simulator or on the real chip, respectively. user (str): IBM quantum experience user. password (str): IBM quantum experience user password. shots (int): Number of runs of the same circuit to collect statistics. verbose (bool): If True, additional information is printed, such as measurement statistics. Otherwise, the backend simply registers one measurement result (same behavior as the projectq Simulator). """ try: # check if the device is online if device in ['ibmqx2', 'ibmqx4']: url = 'Backends/{}/queue/status'.format(device) r = requests.get(urljoin(_api_url_status, url)) online = r.json()['state'] if not online: print("The device is offline (for maintenance?). Use the " "simulator instead or try again later.") raise DeviceOfflineError("Device is offline.") if device == 'ibmqx2': device = 'real' if verbose: print("Authenticating...") user_id, access_token = _authenticate(user, password) if verbose: print("Running code...") execution_id = _run(qasm, device, user_id, access_token, shots) if verbose: print("Waiting for results...") res = _get_result(execution_id, access_token) if verbose: print("Done.") return res except requests.exceptions.HTTPError as err: print("There was an error running your code:") print(err) except requests.exceptions.RequestException as err: print("Looks like something is wrong with server:") print(err) except KeyError as err: print("Failed to parse response:") print(err)