Python bottle.request 模块,environ() 实例源码
我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用bottle.request.environ()。
def logten():
ip = request['REMOTE_ADDR']
day= datetime.datetime.now().strftime("_%Y_%m_%d_%H")
url = request.environ['PATH_INFO']+day
if 'url' not in cache:
cache['url'] = {}
if ip+url not in cache['url']:
cache['url'][ip+url]=0
if len(cache['url'])>200:
cache['url']={}
logit('clear cache url')
if 'Mozilla/4.0' in request.environ.get('HTTP_USER_AGENT','no agent'):
pass
else:
logit('''url @ %s [ <a href="http://www.baidu.com/s?wd=%s&_=%.0f" target="_blank">%s</a> ] %.1f
<span style="color:gray">%s</span>'''%(url,ip,time.time()/10,ip,cache['pass']-time.time(),request.environ.get('HTTP_USER_AGENT','no agent')))
return True
def _handle_cert(self, domain):
rawcert = request.environ['ssl_certificate']
certconfig = self._get_cert_config_if_allowed(domain, rawcert)
logger.debug('Fetching certificate for domain %s', domain)
(key, crt, chain) = self.acmeproxy.get_cert(
domain=domain,
altname=certconfig.altname,
rekey=certconfig.rekey,
renew_margin=certconfig.renew_margin,
force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object
auto_renew=certconfig.renew_on_fetch
)
return {
'crt': crt.decode(),
'key': key.decode(),
'chain': chain.decode()
}
def local_check(function):
def _view(*args, **kwargs):
if request.environ.get('REMOTE_ADDR', "0") in ('127.0.0.1', 'localhost') \
or request.environ.get('HTTP_HOST','0') == '127.0.0.1:9666':
return function(*args, **kwargs)
else:
return HTTPError(403, "Forbidden")
return _view
def flashgot():
if request.environ['HTTP_REFERER'] != "http://localhost:9666/flashgot" and request.environ['HTTP_REFERER'] != "http://127.0.0.1:9666/flashgot":
return HTTPError()
autostart = int(request.forms.get('autostart', 0))
package = request.forms.get('package', None)
urls = filter(lambda x: x != "", request.forms['urls'].split("\n"))
folder = request.forms.get('dir', None)
if package:
PYLOAD.addPackage(package, urls, autostart)
else:
PYLOAD.generateAndAddPackages(urls, autostart)
return ""
def _handle_list_certs(self):
rawcert = request.environ['ssl_certificate']
self._assert_admin(rawcert)
certs = self.acmeproxy.list_certificates()
return {
'certificates': certs
}
def _handle_renew_all(self):
rawcert = request.environ['ssl_certificate']
self._assert_admin(rawcert)
result = {
'ok': [],
'error': []
}
for cert in self.acmeproxy.list_certificates():
domain = cert['cn']
certconfig = self.certificates_config.match(domain)
if certconfig:
logger.debug('Getting certificate for domain %s', domain)
try:
self.acmeproxy.get_cert(
domain=domain,
altname=certconfig.altname,
rekey=certconfig.rekey,
renew_margin=certconfig.renew_margin,
force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object
)
result['ok'].append(domain)
except Exception as e:
logger.error('Encountered exception while getting certificate for domain %s (%s)', domain, e)
result['error'].append(domain)
else:
logger.error('No configuration found for domain %s', domain)
result['error'].append(domain)
return result
def _handle_revoke_cert(self, domain):
rawcert = request.environ['ssl_certificate']
self._assert_admin(rawcert)
self.acmeproxy.revoke_certificate(domain)
return {
'status': 'revoked'
}
def _handle_delete_cert(self, domain):
rawcert = request.environ['ssl_certificate']
self._assert_admin(rawcert)
self.acmeproxy.delete_certificate(domain)
return {
'status': 'deleted'
}
def test_ims(self):
""" SendFile: If-Modified-Since"""
request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
res = static_file(os.path.basename(__file__), root='./')
self.assertEqual(304, res.status_code)
self.assertEqual(int(os.stat(__file__).st_mtime), parse_date(res.headers['Last-Modified']))
self.assertAlmostEqual(int(time.time()), parse_date(res.headers['Date']))
request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(100))
self.assertEqual(open(__file__,'rb').read(), static_file(os.path.basename(__file__), root='./').body.read())
def test_download(self):
""" SendFile: Download as attachment """
basename = os.path.basename(__file__)
f = static_file(basename, root='./', download=True)
self.assertEqual('attachment; filename="%s"' % basename, f.headers['Content-Disposition'])
request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(100))
f = static_file(os.path.basename(__file__), root='./')
self.assertEqual(open(__file__,'rb').read(), f.body.read())
def test_range(self):
basename = os.path.basename(__file__)
request.environ['HTTP_RANGE'] = 'bytes=10-25,-80'
f = static_file(basename, root='./')
c = open(basename, 'rb'); c.seek(10)
self.assertEqual(c.read(16), tob('').join(f.body))
self.assertEqual('bytes 10-25/%d' % len(open(basename, 'rb').read()),
f.headers['Content-Range'])
self.assertEqual('bytes', f.headers['Accept-Ranges'])
def default():
result = {}
table = ['<table class="u-full-width"><tbody>']
for k, v in sorted(os.environ.iteritems()):
table.append('<tr><th>%s</th><td>%s</td></tr>' % (k, v))
table.append('</tbody></table>')
result['sys_data'] = '\n'.join(table)
table = ['<table class="u-full-width"><tbody>']
for k, v in sorted(dict(request.environ).iteritems()):
table.append('<tr><th>%s</th><td>%s</td></tr>' % (k, v))
table.append('</tbody></table>')
result['req_data'] = '\n'.join(table)
return result
def tearDown(self):
self.session.rollback()
database.drop_all()
database.stop_engine()
# Bottle memoizes the 'request.json' attribute.
# We don't want it memoized between two unit tests.
# So let's dememoize it (yes, I don't think that word exists either).
request.environ.pop('bottle.request.json', None)
request.environ.pop('bottle.request.body', None)
def _set_body_json(self, data):
body = json.dumps(data)
request.environ['CONTENT_LENGTH'] = str(len(tob(body)))
request.environ['CONTENT_TYPE'] = 'application/json'
request.environ['wsgi.input'] = BytesIO()
request.environ['wsgi.input'].write(tob(body))
request.environ['wsgi.input'].seek(0)
def test_start_deployment_with_impersonating(self, mocked):
request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
self._set_body_json({
'target': {
'cluster': None,
'server': None
},
'branch': 'master',
'commit': 'abcde'
})
api.environments_start_deployment(1, self.session)
def test_start_deployment_with_impersonating_unprivilegied_user(self, mocked):
request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
self._set_body_json({
'target': {
'cluster': None,
'server': None
},
'branch': 'master',
'commit': 'abcde'
})
with self.assertRaises(HTTPError) as cm:
api.environments_start_deployment(2, self.session)
self.assertEquals(403, cm.exception.code)
def generic_handler():
"""
The generic handler catches all requests not caught by any other route. It
checks the configuration to see if the URL requested is one registered as a
job's webhook URL handler. If so, it normalizes the request and queues the
job for building.
It returns immediately (aynsc) with a JSON structure containing the job id.
"""
jobdef_manager = request.deps['jobdef_manager']
build_queue = request.deps['build_queue']
config = request.deps['config']
providers = request.deps['providers']
jobdef = jobdef_manager.get_jobdef_from_url(request.path)
if not jobdef:
abort(404, "Not found")
logging.info("Received event for job '{}'".format(jobdef.name))
# Log debug info about the received request
logging.debug("request environ: {}".format(request.environ))
logging.debug("request path: {}".format(request.path))
logging.debug("request method: {}".format(request.method))
for k, v in request.headers.items():
logging.debug("request header: {}={}".format(k, v))
for k, v in request.query.items():
logging.debug("request query: {}={}".format(k, v))
logging.debug("request body: {}".format(request.body.read()))
logging.debug("request auth: {}".format(request.auth))
env = job.make_env(request, jobdef, providers)
job_inst = jobdef.make_job(request.body.read().decode('utf8'), env)
build_queue.put(job_inst)
return {'id': job_inst.id}