Python requests 模块,auth() 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用requests.auth()。
def getESCXBalance(address):
try:
payload = {
"method": "get_balances",
"params": {
"filters":[{"field": "address", "op": "==", "value": address},
{"field": "asset", "op": "==", "value": "ESCX"}],
"filterop": "and"
},
"jsonrpc":"2.0",
"id":0
}
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
def auth():
# Basic Authentication
requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
requests.get('https://api.github.com/user', auth=('user', 'pass'))
# Digest Authentication
url = 'http://httpbin.org/digest-auth/auth/user/pass'
requests.get(url, auth=HTTPDigestAuth('user', 'pass'))
# OAuth2 Authentication????requests-oauthlib
url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
requests.get(url, auth=auth)
pass
def getESCXBalance(address):
try:
payload = {
"method": "get_balances",
"params": {
"filters":[{"field": "address", "op": "==", "value": address},
{"field": "asset", "op": "==", "value": "ESCX"}],
"filterop": "and"
},
"jsonrpc":"2.0",
"id":0
}
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
def retrieve_access_token(self):
""" once you have the authorization code, you can call this function to get
the access_token. The access_token gives you full access to the API and is
valid throughout the day
"""
if self.api_key is None:
raise (TypeError, 'Value api_key cannot be None. Please go to the Developer Console to get this value')
if self.redirect_uri is None:
raise (TypeError, 'Value redirect_uri cannot be None. Please go to the Developer Console to get this value')
if self.api_secret is None:
raise (TypeError, 'Value api_secret cannot be None. Please go to the Developer Console to get this value')
if self.code is None:
raise (TypeError, 'Value code cannot be None. Please visit the login URL to generate a code')
params = {'code': self.code, 'redirect_uri': self.redirect_uri, 'grant_type': 'authorization_code'}
url = self.config['host'] + self.config['routes']['accessToken']
headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key}
r = requests.post(url, auth=(self.api_key, self.api_secret), data=json.dumps(params), headers=headers)
body = json.loads(r.text)
if 'access_token' not in body:
raise SystemError(body);
return body['access_token']
def get_state(self) -> str:
"""Get device state.
Returns:
"on", "off", or "unknown"
"""
if self.state_cmd is None:
return "unknown"
resp = requests.request(self.state_method, self.state_cmd,
data=self.state_data, json=self.state_json,
headers=self.headers, auth=self.auth)
if self.state_response_off in resp.text:
return "off"
elif self.state_response_on in resp.text:
return "on"
return "unknown"
def _ghost2loggergetGUID(self):
# Let's deal with the JSON API calls here
HEADERS = {"Content-Type": 'application/json'}
_URL = self._ghost_url + "/v1/guid"
try:
r = requests.get(_URL, headers=HEADERS, verify=False,
auth=(self._username, self._password))
_data = r.json()
if _data['host'] == "GUID":
for _item in _data['pattern']:
_guid = str(_item)
return str(_guid)
else:
return ""
except Exception as e:
self._logger.info('[Ghost2logger]: Cannot get GUID: ' +
str(e))
def _ghost2loggergpurge(self):
# Let's deal with the JSON API calls here
self._logger.info('[Ghost2logger]: Purge Ghost2logger')
HEADERS = {"Content-Type": 'application/json'}
_URL = self._ghost_url + "/v1/all"
try:
r = requests.delete(_URL, headers=HEADERS, verify=False,
auth=(self._username, self._password))
_data = r.json()
self._logger.info('[Ghost2logger]: Purged ghost2logger' +
str(_data))
except Exception as e:
self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' +
str(e))
def __getAppSpaces(self):
"""Returns a list with the name of availables appspaces on FileNet,
to apps variable.
"""
try:
appspaces = requests.get(self.baseurl+'appspacenames',
auth=self.cred)
appspaces.raise_for_status()
self.appspaces = appspaces.json()
self.apps = appspaces.json().keys()
except Exception as e:
print (str(e)+':\n'+str(appspaces.json()['UserMessage']['Text']))
print (appspaces.json())
self.apps = appspaces.json()
def __getQueues(self):
"""Creates a list with URL adresses from workbaskets. Also creates a
dictionary with workbasket name as key and it's URL as value.
"""
for apps in self.roles.keys():
for roles in self.roles.values():
if roles:
for role in roles:
my_role = requests.get(self.baseurl
+'appspaces/'
+apps+'/roles/'
+role,
auth = self.cred)
if my_role.ok:
for uri in my_role.json()['workbaskets'].values():
self.queue_urls.append(uri['URI'])
self.workbaskets[uri['URI'].split(
'/')[-1]] = uri['URI']
def getQueue(self, work_basket):
"""Returns a Queue for a given Workbasket.
Usage:
>>> my_queue = pe.getQueue('workbasket_name')
>>> my_queue.get('count')->Variable with the total tasks in this Queue.
"""
queue = requests.get(self.client.baseurl
+ self.client.workbaskets.get(work_basket),
auth = self.client.cred)
count = requests.get(queue.url + '/queueelements/count',
auth = self.client.cred).json()['count']
queue = queue.json()
queue['count'] = count
return queue
def getAllTasks(self):
"""Returns all tasks from all Queues.
Usage:
>>> tasks = pe.getAllTasks()
When a Queue has no tasks, a message informing which Queues are empty,
will be printed.
"""
tasks = []
for uri in self.client.queue_urls:
queue = requests.get(self.client.baseurl + uri,
auth = self.client.cred)
found_tasks = self.getTasks(queue.json())
if found_tasks:
tasks.append(found_tasks)
return [tsk for task in tasks for tsk in task]
def lockTask(self, task):
"""Receives a task dictionary, obtainned with getTasks() method,
and locks the task so other users can't access this task at same time.
Usage:
>>> pe.lockTask(task)
"""
locked = requests.get(self.client.baseurl
+task['stepElement'],
auth = self.client.cred)
eTag = locked.headers['ETag']
locked = requests.put(self.client.baseurl
+ task['stepElement'],
auth = self.client.cred,
params={'action':'lock',
'If-Match':eTag}
)
def endTask(self, task, comment=None):
"""Receives a task and finishes it, finishing the workflow itself or
moving to the next step in the task. Is also possible to create a
comment before ending the task.
Usage:
>>> pe.endTask(task) #or
>>> pe.endTask(task, u'Completed the task!')
"""
params = {'action':'dispatch'}
step = self.getStep(task)
if step.get('systemProperties').get('responses')\
and not step.get('systemProperties').get('selectedResponse'):
return "This task needs to be updated. Check the updateTask method."
else:
params['selectedResponse'] = 1
if comment:
task = self.saveAndUnlockTask(task, comment)
lock = self.lockTask(task)
params['If-Match'] = task['ETag']
dispatched = requests.put(self.client.baseurl + task['stepElement'],
auth = self.client.cred,
params=params)
def getUser(self, search_string):
"""Receives a string and looks for it in directory service. If the
string search isn't found, the message "User not Found" will be
returned, otherwise a list with all matching cases, limited to 50
results will be returned.
Usage:
>>> users = pe.getUser('user_name')
"""
users = []
user = requests.get(self.client.baseurl+'users',
auth=self.client.cred,
params={'searchPattern':search_string,
'searchType':4, 'limit':50})
if user.json().get('users'):
for usr in user.json()['users']:
users.append(usr['displayName'])
else:
return "User not Found"
return users
def getGroup(self, search_string):
"""Receives a string and looks for it in directory service. If the
string search isn't found, the message "Group not Found" will be
returned, otherwise a list with all matching cases, limited to 50
results will be returned.
Usage:
>>> users = pe.getGroup('group_name')
"""
groups = []
group = requests.get(self.client.baseurl+'groups',
auth=self.client.cred,
params={'searchPattern':search_string,
'searchType':4, 'limit':3000})
if group.json().get('groups'):
for grp in group.json()['groups']:
groups.append(grp['displayName'])
else:
return "Group not Found"
return groups
def sendMessage() -> str:
""" Send a message (internal or external) to the HackerOne report identified by the given ID"""
data = request.get_json(force=True)
message = data['message']
internal = data['internal']
id = data['id']
if config.DEBUG:
print("/v1/sendMessage: id=%s, internal=%s" % (id, internal))
if config.DEBUGVERBOSE:
print("message=%s" % message)
h1Data = {'data': {'type': 'activity-comment',
'attributes': {'message': message,
'internal': internal}}}
headers = {'Content-Type': 'application/json'}
resp = requests.post('https://api.hackerone.com/v1/reports/%s/activities' % id,
headers=headers,
data=json.dumps(h1Data).encode('utf-8'),
auth=(config.apiName, secrets.apiToken))
return json.dumps(resp.json())
def changeStatus() -> str:
""" Change the status of the report at the given ID to the given status """
data = request.get_json(force=True)
status = data['status']
message = data['message']
id = data['id']
if config.DEBUG:
print("/v1/changeStatus: id=%s, status=%s" % (id, status))
if config.DEBUGVERBOSE:
print("message=%s" % message)
h1Data = {'data': {'type': 'state-change',
'attributes': {'message': message,
'state': status}}}
headers = {'Content-Type': 'application/json'}
resp = requests.post('https://api.hackerone.com/v1/reports/%s/state_changes' % id,
headers=headers,
data=json.dumps(h1Data).encode('utf-8'),
auth=(config.apiName, secrets.apiToken))
return json.dumps(resp.json())
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping:
""" Post a comment to the report with the given ID using the information in the given VulnTestInfo
- Set internal=True in order to post an internal comment
- Set addStopMessage=True in order to add the stop message """
if config.DEBUG:
print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id))
if addStopMessage:
message = vti.message + '\n\n' + constants.disableMessage
else:
message = vti.message
postMessage("Posting Message: \n\n%s" % message) # TODO: Delete this
resp = requests.post('http://api:8080/v1/sendMessage',
json={'message': message, 'internal': internal, 'id': id},
auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken))
if config.triageOnReproduce and vti.reproduced:
changeStatus(id, 'triaged')
return resp.json()
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None):
"""
:param api_url: str Moira API URL
:param auth_custom: dict auth custom headers
:param auth_user: str auth user
:param auth_pass: str auth password
:param login: str auth login
"""
if not api_url.endswith('/'):
self.api_url = api_url + '/'
else:
self.api_url = api_url
self.auth = None
self.auth_custom = {'X-Webauth-User': login}
if auth_user and auth_pass:
self.auth = HTTPBasicAuth(auth_user, auth_pass)
if auth_custom:
self.auth_custom.update(auth_custom)
def get(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.get(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def delete(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def put(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool:
self.total_size = 0
self.expected_torrent_name = ''
lf = NamedTemporaryFile()
lf.write(torrent_data)
params = {'action': 'add-file', 'token': self.token}
files = {'torrent_file': open(lf.name, 'rb')}
try:
response = requests.post(
self.UTORRENT_URL,
auth=self.auth,
params=params,
files=files,
timeout=25).json()
lf.close()
if 'error' in response:
return False
else:
return True
except RequestException:
lf.close()
return False
def add_url(self, url: str, download_dir: str=None) -> bool:
self.total_size = 0
self.expected_torrent_name = ''
params = {'action': 'add-url', 'token': self.token, 's': url}
try:
response = requests.get(
self.UTORRENT_URL,
auth=self.auth,
# cookies=self.cookies,
params=params,
timeout=25).json()
if 'error' in response:
return False
else:
return True
except RequestException:
return False
def push_pact(self, *, pact_file, provider, consumer, consumer_version):
request_url = urls.PUSH_PACT_URL.format(
broker_url=self.broker_url,
provider=provider,
consumer=consumer,
consumer_version=consumer_version
)
with open(pact_file) as data_file:
pact_json = json.load(data_file)
response = requests.put(
request_url,
auth=self._auth,
json=pact_json
)
response.raise_for_status()
return response, (
f'Pact between {consumer} and {provider} pushed.'
)
def tag_consumer(self, *, provider, consumer, consumer_version, tag):
request_url = urls.TAG_CONSUMER_URL.format(
broker_url=self.broker_url,
consumer=consumer,
consumer_version=consumer_version,
tag=tag
)
response = requests.put(
request_url,
headers={'Content-Type': 'application/json'},
auth=self._auth
)
response.raise_for_status()
return response, (
f'{consumer} version {consumer_version} tagged as {tag}'
)
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None):
if not base_url.endswith('/'):
base_url += '/'
collection, project = self.get_collection_and_project(project)
# Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection
# API responce only in Project, without subproject
self._url = base_url + '%s/_apis/' % collection
if project:
self._url_prj = base_url + '%s/%s/_apis/' % (collection, project)
else:
self._url_prj = self._url
self.http_session = requests.Session()
auth = auth_type(user, password)
self.http_session.auth = auth
self.timeout = timeout
self._verify = verify
if not self._verify:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def _compose_args(self, method, data=None):
args = {}
if data is None:
data = {}
query = 'params' if method == 'get' else 'data'
args[query] = self._structure.get('common-params', {})
args[query].update(data)
args['headers'] = self._structure.get('common-headers', {})
auth = self._structure.get('auth')
if auth == 'params':
args[query].update({'username': self.billing_username,
'password': self.billing_password})
elif auth == 'headers':
args['auth'] = HTTPBasicAuth(
self.billing_username, self.billing_password)
if self._structure.get('verify') == False:
args['verify'] = False
return args
def get_url(url):
'''request url'''
headers = {'Content-type': 'application/json'}
auth = (controller['any_user'], controller['any_pass'])
logging.info(url)
try:
response = requests.get(url, headers=headers, auth=auth, verify=False)
logging.info("Url GET Status: %s" % response.status_code)
if response.status_code in [200]:
return True, response.json()
else:
return False, str(response.text)
except requests.exceptions.ConnectionError, e:
logging.error('Connection Error: %s' % e.message)
return False, str(e.message)
def post_xml(url, data):
'''request post'''
headers = {'Content-type': 'application/xml'}
auth = (controller['any_user'], controller['any_pass'])
try:
response = requests.post(url, data=data, auth=auth, headers=headers, verify=False)
logging.info("Url POST Status: %s" % response.status_code)
if response.status_code in [200, 204]:
if len(response.text) > 0:
return True, response.json()
else:
return True, {}
else:
return False, str(response.text)
except requests.exceptions.ConnectionError, e:
logging.error('Connection Error: %s' % e.message)
return False, str(e.message)
def put_xml(url, data):
"""request post"""
headers = {'Content-type': 'application/xml'}
auth = (controller['any_user'], controller['any_pass'])
try:
response = requests.put(url, data=data, auth=auth, headers=headers, verify=False)
logging.info("Url PUT Status: %s" % response.status_code)
if response.status_code in [200, 201, 204]:
if len(response.text) > 0:
return True, response.json()
else:
return True, {}
else:
return False, str(response.text)
except requests.exceptions.ConnectionError, e:
logging.error('Connection Error: %s' % e.message)
return False, str(e.message)
def put_json(url, data):
"""request post"""
headers = {'Content-type': 'application/json'}
auth = (controller['any_user'], controller['any_pass'])
try:
response = requests.put(url, data=json.dumps(data), auth=auth, headers=headers, verify=False)
logging.info("Url PUT Status: %s" % response.status_code)
if response.status_code in [200, 204]:
if len(response.text) > 0:
return True, response.json()
else:
return True, {}
else:
return False, str(response.text)
except requests.exceptions.ConnectionError, e:
logging.error('Connection Error: %s' % e.message)
return False, str(e.message)
def grant_access_token(self, code):
res = self._session.post(
'https://bitbucket.org/site/oauth2/access_token',
data={
'grant_type': 'authorization_code',
'code': code,
},
auth=HTTPBasicAuth(self._oauth_key, self._oauth_secret)
)
try:
res.raise_for_status()
except requests.RequestException as reqe:
error_info = res.json()
raise BitbucketAPIError(
res.status_code,
error_info.get('error', ''),
error_info.get('error_description', ''),
request=reqe.request,
response=reqe.response
)
data = res.json()
self._access_token = data['access_token']
self._refresh_token = data['refresh_token']
self._token_type = data['token_type']
return data
def refresh_token(self):
"""
Refresh token regardless of when it expires.
Raises HTTPError on any non 2xx response code
"""
refresh_token = self._token['refresh_token']
params = {
'refresh_token': refresh_token,
'grant_type': 'refresh_token'
}
auth = HTTPBasicAuth(self.client_id, self.client_secret)
response = self.session.post(self.TOKEN_URL, data=params, auth=auth)
response.raise_for_status()
token = response.json()
token['expires_at'] = int(time.time()) + token['expires_in']
if 'refresh_token' not in token:
token['refresh_token'] = refresh_token
self.token = token
def jsonrpc_call(url, method, params={}, verify_ssl_cert=True,
username=None, password=None):
payload = {"method": method, "params": params, "jsonrpc": "2.0", "id": 0}
kwargs = {
"url": url,
"headers": {'content-type': 'application/json'},
"data": json.dumps(payload),
"verify": verify_ssl_cert,
}
if username and password:
kwargs["auth"] = HTTPBasicAuth(username, password)
global method_cumulative_calltime
begin = time.time()
response = requests.post(**kwargs).json()
method_cumulative_calltime[method] += (time.time() - begin)
if "result" not in response:
raise JsonRpcCallFailed(payload, response)
return response["result"]
def get_strategy_for(self, url):
"""Retrieve the authentication strategy for a specified URL.
:param str url: The full URL you will be making a request against. For
example, ``'https://api.github.com/user'``
:returns: Callable that adds authentication to a request.
.. code-block:: python
import requests
a = AuthHandler({'example.com', ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com/example')
assert isinstance(strategy, requests.auth.HTTPBasicAuth)
"""
key = self._key_from_url(url)
return self.strategies.get(key, NullAuthStrategy())
def notify(tee, servers, meta_data):
for server in servers:
connector_access = server['connector_access']
try:
json_data = connector_access.get('json_data')
if connector_access.get('add_meta_data'):
json_data = connector_access.get('json_data', {})
for key, val in meta_data.items():
json_data[key] = val
r = requests.post(
connector_access['url'],
json=json_data,
auth=_auth(connector_access.get('auth')),
verify=connector_access.get('ssl_verify', True)
)
r.raise_for_status()
except:
tee('Could not notify server {}: {}'.format(connector_access['url'], format_exc()))
def get_strategy_for(self, url):
"""Retrieve the authentication strategy for a specified URL.
:param str url: The full URL you will be making a request against. For
example, ``'https://api.github.com/user'``
:returns: Callable that adds authentication to a request.
.. code-block:: python
import requests
a = AuthHandler({'example.com', ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com/example')
assert isinstance(strategy, requests.auth.HTTPBasicAuth)
"""
key = self._key_from_url(url)
return self.strategies.get(key, NullAuthStrategy())
def test_prepared_from_session(self, httpbin):
class DummyAuth(requests.auth.AuthBase):
def __call__(self, r):
r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
return r
req = requests.Request('GET', httpbin('headers'))
assert not req.auth
s = requests.Session()
s.auth = DummyAuth()
prep = s.prepare_request(req)
resp = s.send(prep)
assert resp.json()['headers'][
'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
def _user_agent_digest_hash(self):
"""
Hash the user agent and user agent password
Section 3.10 of https://www.nar.realtor/retsorg.nsf/retsproto1.7d6.pdf
:return: md5
"""
if not self.version:
raise MissingVersion("A version is required for user agent auth. The RETS server should set this"
"automatically but it has not. Please instantiate the session with a version argument"
"to provide the version.")
version_number = self.version.strip('RETS/')
user_str = '{0!s}:{1!s}'.format(self.user_agent, self.user_agent_password).encode('utf-8')
a1 = hashlib.md5(user_str).hexdigest()
session_id = self.session_id if self.session_id is not None else ''
digest_str = '{0!s}::{1!s}:{2!s}'.format(a1, session_id, version_number).encode('utf-8')
digest = hashlib.md5(digest_str).hexdigest()
return digest
def division_standings():
url = base_url + 'division_team_standings.json'
response = requests.get((url),
auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw))
data = response.json()
for division in data['divisionteamstandings']['division']:
for team in division['teamentry']:
team_id = team['team']['ID']
rank = team['rank']
conn = sqlite3.connect('nflpool.sqlite')
cur = conn.cursor()
cur.execute('''INSERT INTO division_standings(team_id, rank)
VALUES(?,?)''', (team_id, rank))
conn.commit()
conn.close()
# Get Playoff Standings for each team (need number 5 & 6 in each conference for Wild Card picks)
def playoff_standings():
url = base_url + 'playoff_team_standings.json'
response = requests.get((url),
auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw))
data = response.json()
for conference in data['playoffteamstandings']['conference']:
for team in conference['teamentry']:
team_id = team['team']['ID']
rank = team['rank']
conn = sqlite3.connect('nflpool.sqlite')
cur = conn.cursor()
cur.execute('''INSERT INTO playoff_rankings(team_id, rank)
VALUES(?,?)''', (team_id, rank))
conn.commit()
conn.close()
# Get individual statistics for each category
def parse_auth_argument(args):
auth = args.auth
if auth == 'basic':
if not (args.username and args.password):
print("For basic authentication, 'username' and 'password' "
"parameter are needed")
exit(3)
auth = HTTPBasicAuth(args.username, args.password)
elif auth == 'oauth':
if not (args.consumer_key and args.private_key):
print("For oauth authentication, 'consumer-key' "
"and 'private-key' parameter are needed")
exit(3)
auth = get_oauth1session(args.consumer_key, args.consumer_secret,
args.private_key, args.passphrase)
return auth
def parse_auth_argument(args):
auth = args.auth
if auth == 'basic':
if not (args.username and args.password):
print("For basic authentication, 'username' and 'password' "
"parameter are needed")
exit(3)
auth = HTTPBasicAuth(args.username, args.password)
elif auth == 'oauth':
if not (args.consumer_key and args.private_key):
print("For oauth authentication, 'consumer-key' "
"and 'private-key' parameter are needed")
exit(3)
auth = get_oauth1session(args.consumer_key, args.consumer_secret,
args.private_key, args.passphrase)
return auth
def parse_auth_argument(args):
auth = args.auth
if auth == 'basic':
if not (args.username and args.password):
print("For basic authentication, 'username' and 'password' "
"parameter are needed")
exit(3)
auth = HTTPBasicAuth(args.username, args.password)
elif auth == 'oauth':
if not (args.consumer_key and args.private_key):
print("For oauth authentication, 'consumer-key' "
"and 'private-key' parameter are needed")
exit(3)
auth = get_oauth1session(args.consumer_key, args.consumer_secret,
args.private_key, args.passphrase)
return auth
def run(self):
try:
if self.data_type == 'hash':
query_url = 'scan/'
query_data = self.getParam('data', None, 'Hash is missing')
elif self.data_type == 'file':
query_url = 'scan/'
hashes = self.getParam('attachment.hashes', None)
if hashes is None:
filepath = self.getParam('file', None, 'File is missing')
query_data = hashlib.sha256(open(filepath, 'r').read()).hexdigest()
else:
# find SHA256 hash
query_data = next(h for h in hashes if len(h) == 64)
elif self.data_type == 'filename':
query_url = 'search?query=filename:'
query_data = self.getParam('data', None, 'Filename is missing')
else:
self.notSupported()
url = str(self.basic_url) + str(query_url) + str(query_data)
error = True
while error:
r = requests.get(url, headers=self.headers, auth=HTTPBasicAuth(self.api_key, self.secret), verify=False)
if "error" in r.json().get('response') == "Exceeded maximum API requests per minute(5). Please try again later.":
time.sleep(60)
else:
error = False
self.report({'results': r.json()})
except ValueError as e:
self.unexpectedError(e)
def get_token(self, token_only=True, scopes=None):
if scopes is None:
scopes = ['send_notification', 'view_room']
cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes))
def gen_token():
data = {
'grant_type': 'client_credentials',
'scope': ' '.join(scopes),
}
resp = requests.post(
self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10
)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 401:
raise OauthClientInvalidError(self)
else:
raise Exception('Invalid token: %s' % resp.text)
if token_only:
token = cache.get(cache_key)
if not token:
data = gen_token()
token = data['access_token']
cache.set(cache_key, token, data['expires_in'] - 20)
return token
return gen_token()