Python requests 模块,request() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.request()。
def fetch(self):
url = "https://www.reddit.com/top/.json"
headers = {
"User-Agent": f"ChangeMeClient/0.1 by YOUR_USERNAME_ON_REDDIT"
}
response = requests.request(method='GET', url=url, headers=headers)
json = response.json()
self.logger.debug(f'Fetched. Code: {response.status_code}')
data = []
if response.status_code == 200:
data = json['data']['children']
return data
# get necessary data
def meraki_requests(url,meraki_api_key,error_handle):
url = "https://dashboard.meraki.com/api/v0%s" % url
if meraki_debug == "1":
print("GET: %s" % url)
querystring = {}
headers = {
'x-cisco-meraki-api-key': meraki_api_key,
'content-type': "application/json",
'cache-control': "no-cache",
}
response = requests.request("GET", url, headers=headers, params=querystring)
if response.status_code == 200:
json_data = json.loads(response.text)
return json_data
else:
if meraki_debug == "1":
print(response.text)
if error_handle == "enable":
sys.exit("Failed: Code %s" % response.status_code)
# Meraki REST API Call - PUT
def get_user_api_token(logger):
"""
Generate iAuditor API Token
:param logger: the logger
:return: API Token if authenticated else None
"""
username = input("iAuditor username: ")
password = getpass()
generate_token_url = "https://api.safetyculture.io/auth"
payload = "username=" + username + "&password=" + password + "&grant_type=password"
headers = {
'content-type': "application/x-www-form-urlencoded",
'cache-control': "no-cache",
}
response = requests.request("POST", generate_token_url, data=payload, headers=headers)
if response.status_code == requests.codes.ok:
return response.json()['access_token']
else:
logger.error('An error occurred calling ' + generate_token_url + ': ' + str(response.json()))
return None
def _req(self, method: str='GET', endpoint: str=None, params: dict=None,
data: dict=None, uses_oauth: bool=False):
if params is None:
params = {}
fetch = self.session.request if uses_oauth else requests.request
res = fetch(
method=method,
url='{}/{}'.format(self.base_url, endpoint),
params={
'format': 'xml',
'key': self._developer_key,
**params
},
data=data
)
res.raise_for_status()
return res
def fetch(self):
section = 'hot' # hot | top | user
sort = 'viral' # viral | top | time | rising (only available with user section)
show_viral = 'true'
show_mature = 'true'
album_previews = 'false'
url = f'https://api.imgur.com/3/gallery/{section}/{sort}'
querystring = {"showViral": f"{show_viral}", "mature": f"{show_mature}", "album_previews": f"{album_previews}"}
headers = {'authorization': f'Client-ID {client_id}'}
response = requests.request("GET", url, headers=headers, params=querystring)
json = response.json()
self.logger.debug(f'Fetched. Code: {response.status_code}')
return json['data'][:FETCH_LIMIT]
def create_logset(logset_name=None, params=None):
"""
Add a new logset to the current account.
If a filename is given, load the contents of the file
as json parameters for the request.
If a name is given, create a new logset with the given name
"""
if params is not None:
request_params = params
else:
request_params = {
'logset': {
'name': logset_name
}
}
headers = api_utils.generate_headers('rw')
try:
response = requests.post(_url()[1], json=request_params, headers=headers)
handle_response(response, 'Creating logset failed.\n', 201)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def add_new_user(first_name, last_name, email):
"""
Add a new user to the current account.
"""
action, url = _url(('users',))
json_content = {
"user":
{
"email": str(email),
"first_name": str(first_name),
"last_name": str(last_name)
}
}
body = json.dumps(json_content)
headers = api_utils.generate_headers('owner', method='POST', action=action, body=body)
try:
response = requests.request('POST', url, json=json_content, headers=headers)
handle_create_user_response(response)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def delete_user(user_key):
"""
Delete a user from the current account.
"""
action, url = _url(('users', user_key))
headers = api_utils.generate_headers('owner', method='DELETE', action=action, body='')
try:
response = requests.request('DELETE', url, data='', headers=headers)
if response_utils.response_error(response) is True: # Check response has no errors
sys.stderr.write('Delete user failed, status code: %s' % response.status_code)
sys.exit(1)
elif response.status_code == 204:
print 'Deleted user'
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def read(self, buf_len):
"""
Implementation note: due to a constraint of the requests library, the
buf_len that is used the first time this method is called will cause
all future requests to ``read`` to have the same ``buf_len`` even if
a different ``buf_len`` is passed in on subsequent requests.
"""
if self._iter is None: # lazy load response body iterator
method = self.input_spec.get('method', 'GET').upper()
headers = self.input_spec.get('headers', {})
params = self.input_spec.get('params', {})
req = requests.request(
method, self.input_spec['url'], headers=headers, params=params,
stream=True, allow_redirects=True)
req.raise_for_status() # we have the response headers already
self._iter = req.iter_content(buf_len, decode_unicode=False)
try:
return six.next(self._iter)
except StopIteration:
return b''
def push(data, spec, **kwargs):
task_output = kwargs.get('task_output', {})
target = task_output.get('target', 'memory')
url = spec['url']
method = spec.get('method', 'POST').upper()
if target == 'filepath':
with open(data, 'rb') as fd:
request = requests.request(
method, url, headers=spec.get('headers', {}), data=fd,
params=spec.get('params', {}), allow_redirects=True)
elif target == 'memory':
request = requests.request(
method, url, headers=spec.get('headers', {}), data=data,
params=spec.get('params', {}), allow_redirects=True)
else:
raise Exception('Invalid HTTP fetch target: ' + target)
try:
request.raise_for_status()
except Exception:
print 'HTTP push failed (%s). Response: %s' % (url, request.text)
raise
def _func(self, conv):
try:
request = registration_request(conv)
except NoSuchEvent:
self._status = ERROR
else:
# 'offline_access only allow if response_type == 'code'
try:
req_scope = request['scope']
except KeyError:
pass
else:
if 'offline_access' in req_scope:
if request['response_type'] != 'code':
self._status = ERROR
self._message = 'Offline access not allowed for ' \
'anything but code flow'
return {}
def _func(self, conv):
try:
request = registration_request(conv)
except NoSuchEvent:
self._status = ERROR
else:
try:
_ss = request['software_statement']
except KeyError:
pass
else:
missing = []
for claim in ['redirect_uris', 'grant_types', 'client_name',
'client_uri']:
if claim not in _ss:
missing.append(claim)
if 'jwks_uri' not in _ss and 'jwks' not in _ss:
missing.append('jwks_uri/jwks')
if missing:
self._status = WARNING
self._message = 'Missing "{}" claims from Software ' \
'Statement'.format(missing)
return {}
def _func(self, conv):
try:
request = registration_request(conv)
except NoSuchEvent:
self._status = ERROR
else:
try:
req_scopes = request['scope']
except KeyError:
pass
else:
if 'offline_access' in req_scopes:
if request['response_type'] != ['code']:
self._status = ERROR
self._message = 'Offline access only when using ' \
'"code" flow'
return {}
def _func(self, conv):
request = access_token_request(conv)
ca = request['parsed_client_assertion']
missing = []
for claim in ["iss", "sub", "aud", "iat", "exp", "jti"]:
if claim not in ca:
missing.append(claim)
if missing:
self._status = ERROR
self._message = 'Redirect_uri not registered'
# verify jti entropy
bits = calculate(ca['jti'])
if bits < 128:
self._status = WARNING
self._message = 'Not enough entropy in string: {} < 128'.format(
bits)
return {}
def search(api_key, term, location):
"""Query the Search API by a search term and location.
Args:
term (str): The search term passed to the API.
location (str): The search location passed to the API.
Returns:
dict: The JSON response from the request.
"""
url_params = {
'term': term.replace(' ', '+'),
'location': location.replace(' ', '+'),
'limit': SEARCH_LIMIT
}
return request(API_HOST, SEARCH_PATH, api_key, url_params=url_params)
def get_state(self) -> str:
"""Get device state.
Returns:
"on", "off", or "unknown"
"""
if self.state_cmd is None:
return "unknown"
resp = requests.request(self.state_method, self.state_cmd,
data=self.state_data, json=self.state_json,
headers=self.headers, auth=self.auth)
if self.state_response_off in resp.text:
return "off"
elif self.state_response_on in resp.text:
return "on"
return "unknown"
def stt(self, audio, language, limit):
""" Web API wrapper for performing Speech to Text (STT)
Args:
audio (bytes): The recorded audio, as in a FLAC file
language (str): A BCP-47 language code, e.g. 'en-US'
limit (int): Maximum minutes to transcribe(?)
Returns:
str: JSON structure with transcription results
"""
return self.request({
'method': 'POST',
'headers': {'Content-Type': 'audio/x-flac'},
'query': {'lang': language, 'limit': limit},
'data': audio
})
def request_jira(client, url, method='GET', **kwargs):
jwt_authorization = 'JWT %s' % encode_token(
method, url, app.config.get('ADDON_KEY'),
client.sharedSecret)
result = requests.request(
method,
client.baseUrl.rstrip('/') + url,
headers={
"Authorization": jwt_authorization,
"Content-Type": "application/json"
},
**kwargs)
try:
result.raise_for_status()
except requests.HTTPError as e:
raise requests.HTTPError(e.response.text, response=e.response)
return result
def copy(self, destination):
'''
Method to copy resource to another location
Args:
destination (rdflib.term.URIRef, str): URI location to move resource
Returns:
(Resource) new, moved instance of resource
'''
# set move headers
destination_uri = self.repo.parse_uri(destination)
# http request
response = self.repo.api.http_request('COPY', self.uri, data=None, headers={'Destination':destination_uri.toPython()})
# handle response
if response.status_code == 201:
return destination_uri
else:
raise Exception('HTTP %s, could not move resource %s to %s' % (response.status_code, self.uri, destination_uri))
def _build_rdf(self, data=None):
'''
Parse incoming rdf as self.rdf.orig_graph, create copy at self.rdf.graph
Args:
data (): payload from GET request, expected RDF content in various serialization formats
Returns:
None
'''
# recreate rdf data
self.rdf = SimpleNamespace()
self.rdf.data = data
self.rdf.prefixes = SimpleNamespace()
self.rdf.uris = SimpleNamespace()
# populate prefixes
for prefix,uri in self.repo.context.items():
setattr(self.rdf.prefixes, prefix, rdflib.Namespace(uri))
# graph
self._parse_graph()
def revert_to(self):
'''
method to revert resource to this version by issuing PATCH
Args:
None
Returns:
None: sends PATCH request, and refreshes parent resource
'''
# send patch
response = self.resource.repo.api.http_request('PATCH', self.uri)
# if response 204
if response.status_code == 204:
logger.debug('reverting to previous version of resource, %s' % self.uri)
# refresh current resource handle
self._current_resource.refresh()
else:
raise Exception('HTTP %s, could not revert to resource version, %s' % (response.status_code, self.uri))
def test_add_two_simple_handlers_with_same_route(server: FakeServer,
methods: List[str],
routes: List[str],
statuses: List[int]):
server. \
on_(methods[0], routes[0]). \
response(status=statuses[0])
server. \
on_(methods[1], routes[1]). \
response(status=statuses[1])
response_0 = requests.request(methods[0], server.base_uri + routes[0])
response_1 = requests.request(methods[1], server.base_uri + routes[1])
assert response_0.status_code == statuses[0]
assert response_1.status_code == statuses[1]
def _request(self, method, path, **kwargs):
url = self.url_for('api', 'v{self.version}'.format(self=self), *path.split('/'))
try:
headers = self.headers.update(kwargs['headers'])
except KeyError:
headers = self.headers
try:
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.ConnectionError:
raise VMFarmsAPIError('Cannot connect to VM Farms API at {}.'.format(self.url))
except requests.exceptions.HTTPError as error:
self._raise_http_error_as_api_error(error)
except ValueError:
raise VMFarmsAPIError('Unexpected response from server.', response)
def _req(self, method, frag, data=None):
resp = requests.request(method, self.url + frag,
data=json.dumps(data, default=_json_serial),
verify=self.verify,
auth=self.credentials,
headers={'Content-Type': 'application/json'})
# try to extract reason from response when request returns error
if 400 <= resp.status_code < 600:
try:
resp.reason = json.loads(resp.text)['Message'];
except:
pass;
resp.raise_for_status()
return resp
def sendjson(self, method, urlpath, obj=None):
"""Send json to the OpenDaylight controller."""
headers = {'Content-Type': 'application/json'}
data = jsonutils.dumps(obj, indent=2) if obj else None
url = '/'.join([self.url, urlpath])
LOG.debug("Sending METHOD (%(method)s) URL (%(url)s) JSON (%(obj)s)" %
{'method': method, 'url': url, 'obj': obj})
r = requests.request(method, url=url,
headers=headers, data=data,
auth=self.auth, timeout=self.timeout)
try:
r.raise_for_status()
except Exception as ex:
LOG.error("Error Sending METHOD (%(method)s) URL (%(url)s)"
"JSON (%(obj)s) return: %(r)s ex: %(ex)s rtext: "
"%(rtext)s" %
{'method': method, 'url': url, 'obj': obj, 'r': r,
'ex': ex, 'rtext': r.text})
return r
try:
return json.loads(r.content)
except Exception:
LOG.debug("%s" % r)
return
def execute_as_string(self, request):
"""Execute a given HttpRequest to get a string response back
Args:
request (HttpRequest): The given HttpRequest to execute.
Returns:
HttpResponse: The response of the HttpRequest.
"""
response = requests.request(HttpMethodEnum.to_string(request.http_method),
request.query_url,
headers=request.headers,
params=request.query_parameters,
data=request.parameters,
files=request.files,
auth=(request.username, request.password))
return self.convert_response(response, False)
def execute_as_binary(self, request):
"""Execute a given HttpRequest to get a binary response back
Args:
request (HttpRequest): The given HttpRequest to execute.
Returns:
HttpResponse: The response of the HttpRequest.
"""
response = requests.request(HttpMethodEnum.to_string(request.http_method),
request.query_url,
headers=request.headers,
params=request.query_parameters,
data=request.parameters,
files=request.files,
auth=(request.username, request.password))
return self.convert_response(response, True)
def scrapeSource(url, magicFrag='2017',
scraperFunction=getNYTText, token='None'):
urlBodies = {}
requests = urllib3.PoolManager()
response = requests.request('GET', url)
soup = BeautifulSoup(response.data)
# the above lines of code sets up the beautifulSoup page
# now we find links
# links are always of the form <a href='url'> link-text </a>
for a in soup.findAll('a'):
try:
# the line above refers to indiv. scrapperFunction
# for NYT & washPost
if body and len(body) > 0:
urlBodies[url] = body
print(url)
except:
numErrors = 0
numErrors += 1
def get_version(self):
"""Fetches the current version number of the Graph API being used."""
args = {"access_token": self.access_token}
try:
response = requests.request("GET",
"https://graph.facebook.com/" +
self.version + "/me",
params=args,
timeout=self.timeout,
proxies=self.proxies)
except requests.HTTPError as e:
response = json.loads(e.read())
raise GraphAPIError(response)
try:
headers = response.headers
version = headers["facebook-api-version"].replace("v", "")
return float(version)
except Exception:
raise GraphAPIError("API version number not available")
def debug_access_token(self, token, app_id, app_secret):
"""
Gets information about a user access token issued by an app. See
<https://developers.facebook.com/docs/facebook-login/access-tokens
#debug>
We can generate the app access token by concatenating the app
id and secret: <https://developers.facebook.com/docs/
facebook-login/access-tokens#apptokens>
"""
args = {
"input_token": token,
"access_token": "%s|%s" % (app_id, app_secret)
}
return self.request("/debug_token", args=args)
def __init__(self):
self.http_method = None
# mandatory headers for each request
self.http_headers = {
'Authorization': None,
'Date': None,
}
self.dss_url = config.get_service_url('dss')
if(self.dss_url.endswith('/')):
self.dss_url = self.dss_url[:-1]
self.access_key = config.get_access_key()
self.secret_key = config.get_secret_key()
self.is_secure_request = config.check_secure()
self.dss_op_path = None
self.dss_query_str = None
self.dss_query_str_for_signature = None
def __request(self, method, url, params=None):
logger.info('{} {} Params: {}'.format(method, url, params))
cookies = {'JSESSIONID': self.auth_cookie}
h = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'X-Rundeck-Auth-Token': self.token
}
r = requests.request(
method, url, cookies=cookies, headers=h, json=params,
verify=self.verify
)
logger.debug(r.content)
r.raise_for_status()
try:
return r.json()
except ValueError as e:
logger.error(e.message)
return r.content
def create_request(self, method, path, options):
"""Creating a request with the given arguments
If api_version is set, appends it immediately after host
"""
version = '/' + options['api_version'] if 'api_version' in options else ''
# Adds a suffix (ex: ".html", ".json") to url
suffix = options['response_type'] if 'response_type' in options else 'json'
path = path + '.' + suffix
path = urlparse.urljoin(self.base, version + path)
if 'api_version' in options:
del options['api_version']
if 'response_type' in options:
del options['response_type']
return requests.request(method, path, **options)
def fetch(self):
section = 'hot' # hot | top | user
sort = 'viral' # viral | top | time | rising (only available with user section)
show_viral = 'true'
show_mature = 'true'
album_previews = 'false'
url = f'https://api.imgur.com/3/gallery/{section}/{sort}'
querystring = {"showViral": f"{show_viral}", "mature": f"{show_mature}", "album_previews": f"{album_previews}"}
headers = {'authorization': f'Client-ID {IMGUR_CLIENT_ID}'}
response = requests.request("GET", url, headers=headers, params=querystring)
json = response.json()
self.logger.debug(f'Fetched. Code: {response.status_code}')
return json['data'][:FETCH_LIMIT]
def _request(self, endpoint, method='GET', files=None, headers={}, **kwargs):
params = {}
if method == 'GET':
params = kwargs
data = None
headers = {'Content-Length': '0'}
else:
data = kwargs
response = requests.request(method,
endpoint,
auth=self.auth,
params=params,
json=data,
headers=headers,
files=files
)
if not response.status_code // 100 == 2:
error = WpApiError.factory(response)
raise error
return response.json()
def request(self, method, url, **kwargs):
try:
options = dict(kwargs)
if "headers" not in options:
options["headers"] = {}
options["timeout"] = 5
options["headers"]["User-Agent"] = "healthchecks.io"
r = requests.request(method, url, **options)
if r.status_code not in (200, 201, 204):
return "Received status code %d" % r.status_code
except requests.exceptions.Timeout:
# Well, we tried
return "Connection timed out"
except requests.exceptions.ConnectionError:
return "Connection failed"
def find_finger():
ps = request.vars.ps
import serial
ser = serial.Serial('/dev/ttyACM0', 9600)
ser.write('5')
# ser.write(b'5') #Prefixo b necessario se estiver utilizando Python 3.X
while True:
line = ser.readline()
print line
if "FINGERFOUND" in line:
id = line.split(",")[1]
ser.close()
r = requests.get("http://174.138.34.125:8081/walletapi/customer/%s/" % id)
obj = json.loads(r.text)
if ps == obj['password']:
return r.text
else:
return 'error'
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': 'Bearer {0}'.format(self.options.get('auth_token'))
}
if not url.startswith(self.api_endpoint):
url = self.api_endpoint + url
r = requests.request(action, url, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE':
return ''
else:
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
# 'Content-Type': 'application/json',
'API-Key': self.options['auth_token']
}
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=data,
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE' or action == 'PUT' or action == 'POST':
return r.text # vultr handles succss/failure via HTTP Codes, Only GET returns a response.
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'X-NSONE-Key': self.options['auth_token']
}
default_auth = None
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=default_auth)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
query_params['format'] = 'json'
query_params['_user'] = self.options['auth_username']
query_params['_key'] = self.options['auth_token']
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if not data:
data = {}
if not query_params:
query_params = {}
result = requests.request(action, self.api_endpoint + url,
params=query_params,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
# GoDaddy use a key/secret pair to authenticate
'Authorization': 'sso-key {0}:{1}'.format(
self.options.get('auth_key'),
self.options.get('auth_secret'))
})
result.raise_for_status()
return result.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
query_params['format'] = 'json'
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
credentials = (self.options['auth_username'], self.options['auth_token'])
response = requests.request(action,
self.api_endpoint + url,
params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=credentials)
# if the request fails for any reason, throw an error.
response.raise_for_status()
return response.json()
# Adds TTL parameter if passed as argument to lexicon.
def _request(self, action='GET', url='/', data=None, query_params=None):
# Set default values for missing arguments
data = data if data else {}
query_params = query_params if query_params else {}
# Merge authentication data into request
if action == 'GET':
query_params.update(self._build_authentication_data())
else:
data.update(self._build_authentication_data())
# Fire request against ClouDNS API and parse result as JSON
r = requests.request(action, self.api_endpoint + url, params=query_params, data=data)
r.raise_for_status()
payload = r.json()
# Check ClouDNS specific status code and description
if 'status' in payload and 'statusDescription' in payload and payload['status'] != 'Success':
raise Exception('ClouDNS API request has failed: ' + payload['statusDescription'])
# Return payload
return payload
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'PddToken': self.options.get('auth_token')
}
if not url.startswith(self.api_endpoint):
url = self.api_endpoint + url
r = requests.request(action, url, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE':
return ''
else:
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
default_auth = (self.options['auth_username'], self.options['auth_token'])
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=default_auth)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def _request(self, action='GET', url='', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
query_params['api_key'] = self.options.get('auth_token')
query_params['resultFormat'] = 'JSON'
query_params['api_action'] = url
r = requests.request(action, self.api_endpoint, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE':
return ''
else:
result = r.json()
if len(result['ERRORARRAY']) > 0:
raise Exception('Linode api error: {0}'.format(result['ERRORARRAY']))
return result
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
default_auth = requests.auth.HTTPBasicAuth(self.options['auth_username'], self.options['auth_token'])
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=default_auth)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def create_record(self, type, name, content):
request = {
'action': 'SET',
'type': type,
'name': self.options['domain'],
'value': content
}
if name is not None:
request['name'] = self._full_name(name)
if self.options.get('ttl'):
request['ttl'] = self.options.get('ttl')
if self.options.get('priority'):
request['prio'] = self.options.get('priority')
payload = self._get('/dns/dyndns.jsp', request)
if payload.find('is_ok').text != 'OK:':
raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))
logger.debug('create_record: %s', True)
return True
def delete_record(self, identifier=None, type=None, name=None, content=None):
if identifier is not None:
type, name, content = self._parse_identifier(identifier)
request = {
'action' : 'DELETE',
'name': self.options['domain']
}
if type is not None:
request['type'] = type
if name is not None:
request['name'] = self._full_name(name)
if content is not None:
request['value'] = content
payload = self._get('/dns/dyndns.jsp', request)
if payload.find('is_ok').text != 'OK:':
raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))
logger.debug('delete_record: %s', True)
return True