Python flask.request 模块,host_url() 实例源码
我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用flask.request.host_url()。
def createTeam():
""" """
if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]:
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
dbEnv = AresSql.SqliteDB(request.args['report_name'])
if not newTeam:
team = Team(request.args['team'], request.args['team_email'])
db.session.add(team)
db.session.commit()
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
team_id = newTeam.team_id
role = request.args.get('role', 'user')
dbEnv.modify("""INSERT INTO team_def (team_id, team_name, role) VALUES (%s, '%s', '%s');
INSERT INTO env_auth (env_id, team_id)
SELECT env_def.env_id, %s
FROM env_def
WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name']))
return json.dumps('Success'), 200
return json.dumps('Forbidden', 403)
def __init__(self, **kwargs):
"""Initialises a new ``Self`` link instance. Accepts the same
Keyword Arguments as :class:`.Link`.
Additional Keyword Args:
external (bool): if true, force link to be fully-qualified URL, defaults to False
See Also:
:class:`.Link`
"""
url = request.url
external = kwargs.get('external', False)
if not external and current_app.config['SERVER_NAME'] is None:
url = request.url.replace(request.host_url, '/')
return super(Self, self).__init__('self', url, **kwargs)
def auth():
# Handles the Azure AD authorization endpoint response and sends second response to get access token.
try:
# Gets the token_id from the flask response form dictionary.
token_id = request.form['id_token']
# Gets the authorization code from the flask response form dictionary.
code = request.form['code']
# Constructs redirect uri to be send as query string param to Azure AD token issuance endpoint.
redirect_uri = '{0}auth'.format(request.host_url)
# Constructs Azure AD token issuance endpoint url.
url = issuance_url(token_id, c['AUTHORITY'])
# Requests access token and stores it in session.
token = access_token(url, redirect_uri, c['CLIENT_ID'], code, c['CLIENT_SECRET'])
if token != '':
session['access_token'] = token
else:
flash('Could not get access token.')
except:
flash('Something went wrong.')
return redirect(url_for('home'))
# This script runs the application using a development server.
def handle_content_message(event):
if isinstance(event.message, ImageMessage):
ext = 'jpg'
elif isinstance(event.message, VideoMessage):
ext = 'mp4'
elif isinstance(event.message, AudioMessage):
ext = 'm4a'
else:
return
message_content = line_bot_api.get_message_content(event.message.id)
with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix=ext + '-', delete=False) as tf:
for chunk in message_content.iter_content():
tf.write(chunk)
tempfile_path = tf.name
dist_path = tempfile_path + '.' + ext
dist_name = os.path.basename(dist_path)
os.rename(tempfile_path, dist_path)
line_bot_api.reply_message(
event.reply_token, [
TextSendMessage(text='Save content.'),
TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name))
])
def handle_file_message(event):
message_content = line_bot_api.get_message_content(event.message.id)
with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix='file-', delete=False) as tf:
for chunk in message_content.iter_content():
tf.write(chunk)
tempfile_path = tf.name
dist_path = tempfile_path + '-' + event.message.file_name
dist_name = os.path.basename(dist_path)
os.rename(tempfile_path, dist_path)
line_bot_api.reply_message(
event.reply_token, [
TextSendMessage(text='Save file.'),
TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name))
])
def forgot_password():
if request.method=="GET":
return render_template("forgot_password.html")
username = request.form["username"]
email = request.form["email"]
try:
user = ldaptools.getuser(username)
assert(user)
assert(email == user.email[0])
token = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(24))
url = request.host_url+"recovery/"+token
recoverymap[token] = username
emailtools.render_email(email, "Password Recovery", "forgot_password.txt", url=url, config=app.config)
flash("Email sent to "+email, "success")
except Exception as e:
print e
flash("Username/Email mismatch", "danger")
return redirect("/login")
def api_makepublic():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = PublicKey()
key.public = True
if file.publickey is None:
file.publickey = key
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
button = get_sharebutton(file.publickey, 'ban', "Disable Public")
return jsonify(response=responds['PUBLIC_KEY_GENERATED'], url=url, button=button)
else:
url = request.host_url + "pub/dl/" + file.publickey.hash
return jsonify(response=responds['PUBLIC_KEY_ALREADY_GEN'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def initiate():
"""
1. step
Initiate app installation
"""
args = request.args
# get shop url from args
shop_url = args.get('shop')
# TODO: validate HMAC, so we know that request really is from shopify
if not current_user.is_authenticated:
return redirect(url_for('main.signup', next=url_join(request.host_url,
url_for('shopify.initiate',
shop=shop_url))))
api_key = current_app.config['SHOPIFY_API_KEY']
secret = current_app.config['SHOPIFY_API_SECRET']
url = get_permission_url(shop_url, api_key, secret)
return redirect(url)
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def is_safe_redirect_url(target):
"""https://security.openstack.org/guidelines/dg_avoid-unvalidated-redirects.html""" # noqa
host_url = urlparse(request.host_url)
redirect_url = urlparse(urljoin(request.host_url, target))
return redirect_url.scheme in ('http', 'https') and \
host_url.netloc == redirect_url.netloc
def home():
# Renders the home page.
redirect_uri = '{0}auth'.format(request.host_url)
# Generates Azure AD authorization endpoint url with parameters so the user authenticates and consents, if consent is required.
url = login_url(redirect_uri, c['CLIENT_ID'], c['RESOURCE'], c['AUTHORITY'])
user = {}
# Checks if access token has already been set in flask session.
if 'access_token' in session:
# Gets authenticated user details from SharePoint tenant if access token is acquired.
user = user_details(c['RESOURCE'], session['access_token'])
# Renders the index template with additional params for the login url and user.
return render_template('index.html', url=url, user=user)
def send_mail_for_execute_success(sql_id):
if (settings.EMAIL_SEND_ENABLE):
sql_info = get_sql_info_by_id(sql_id)
sql_info.status_str = settings.SQL_WORK_STATUS_DICT[sql_info.status]
sql_info.host_url = request.host_url
sql_info.email = "{0},{1}".format(cache.MyCache().get_user_email(sql_info.create_user_id), cache.MyCache().get_user_email(sql_info.audit_user_id))
if (len(sql_info.email) > 0):
subject = "SQL??-[{0}]-????".format(sql_info.title)
sql_info.work_url = "{0}execute/sql/execute/new/{1}".format(request.host_url, sql_info.id)
content = render_template("mail_template.html", sql_info=sql_info)
common_util.send_html(subject, sql_info.email, content)
# ??????????
# ???????????
def get_request_log():
# parse args and forms
values = ''
if len(request.values) > 0:
for key in request.values:
values += key + ': ' + request.values[key] + ', '
route = '/' + request.base_url.replace(request.host_url, '')
request_log = {
'route': route,
'method': request.method,
}
# ??xml ??
category = request_log['route'].split('/')[1]
if category != "virtual_card":
return request_log
if len(request.get_data()) != 0:
body = json.loads(request.get_data())
if "password" in body:
body.pop("password")
request_log['body'] = body
if len(values) > 0:
request_log['values'] = values
return request_log
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and\
ref_url.netloc == test_url.netloc
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
def show_vulnerability(vulnerability_id):
if authenticated() is False:
return render_template('login.html',
return_to='?return_to=' + request.host_url + 'vulnerability/{}'.format(vulnerability_id)
)
return render_template('vulnerability.html', vulnerability_id=vulnerability_id)
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def is_safe_url(target: str) -> bool:
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return (
# same scheme
test_url.scheme in ('http', 'https') and
# same host and port
ref_url.netloc == test_url.netloc and
# and different endoint
ref_url.path != test_url.path
)
def render_graph():
url = request.args.get('url')
if not url:
return 'Specify a graph URL in the request', 400
return render_template('graph.html', url=request.host_url[:-1] + url)
def register():
hostname = request.args.get('hostname')
master = request.args.get('master')
if not master:
campaign = get_best_campaign()
if not campaign:
return 'No active campaigns', 404
instance = models.FuzzerInstance.create(hostname=hostname)
instance.start_time = time.time()
campaign.fuzzers.append(instance)
campaign.commit()
else:
campaign = models.Campaign.get(id=master)
if not campaign:
return 'Could not find specified campaign', 404
if campaign.fuzzers.filter_by(master=True).first():
return 'Campaign already has a master', 400
instance = models.FuzzerInstance.create(hostname=hostname, master=True)
instance.start_time = time.time()
campaign.fuzzers.append(instance)
campaign.commit()
# avoid all hosts uploading at the same time from reporting at the same time
deviation = random.randint(15, 30)
return jsonify(
id=instance.id,
name=secure_filename(instance.name),
program=campaign.executable_name,
program_args=campaign.executable_args.split(' ') if campaign.executable_args else [], # TODO: add support for spaces
args=campaign.afl_args.split(' ') if campaign.afl_args else [],
campaign_id=campaign.id,
campaign_name=secure_filename(campaign.name),
download=request.host_url[:-1] + url_for('fuzzers.download', campaign_id=campaign.id),
submit=request.host_url[:-1] + url_for('fuzzers.submit', instance_id=instance.id),
submit_crash=request.host_url[:-1] + url_for('fuzzers.submit_crash', instance_id=instance.id),
upload=request.host_url[:-1] + url_for('fuzzers.upload', instance_id=instance.id),
upload_in=current_app.config['UPLOAD_FREQUENCY'] + deviation
)
def download(campaign_id):
campaign = models.Campaign.get(id=campaign_id)
sync_dir = os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(campaign.name), 'sync_dir', '*.tar')
return jsonify(
executable=request.host_url[:-1] + url_for('fuzzers.download_executable', campaign_id=campaign.id),
libraries=request.host_url[:-1] + url_for('fuzzers.download_libraries', campaign_id=campaign.id),
testcases=request.host_url[:-1] + url_for('fuzzers.download_testcases', campaign_id=campaign.id),
ld_preload=request.host_url[:-1] + url_for('fuzzers.download_ld_preload', campaign_id=campaign.id),
dictionary=request.host_url[:-1] + url_for('fuzzers.download_dictionary', campaign_id=campaign.id) if campaign.has_dictionary else None,
sync_dirs=[
request.host_url[:-1] + url_for('fuzzers.download_syncdir', campaign_id=campaign.id, filename=os.path.basename(filename)) for filename in glob.glob(sync_dir)
],
sync_in=current_app.config['DOWNLOAD_FREQUENCY'],
)
def analysis_queue(campaign_id):
campaign = models.Campaign.get(id=campaign_id)
return jsonify(
program=campaign.executable_name,
program_args=campaign.executable_args.split(' ') if campaign.executable_args else [], # TODO: add support for spaces
crashes=[{
'crash_id': crash.id,
'download': request.host_url[:-1] + url_for('fuzzers.download_crash', crash_id=crash.id)
} for crash in campaign.crashes.filter_by(analyzed=False)]
)
def _build_bundle_url(hit: dict, replica: Replica) -> str:
uuid, version = hit['_id'].split('.', 1)
return request.host_url + str(UrlBuilder()
.set(path='v1/bundles/' + uuid)
.add_query("version", version)
.add_query("replica", replica.name)
)
def _build_scroll_url(_scroll_id: str, per_page: int, replica: Replica, output_format: str) -> str:
return request.host_url + str(UrlBuilder()
.set(path="v1/search")
.add_query('per_page', str(per_page))
.add_query("replica", replica.name)
.add_query("_scroll_id", _scroll_id)
.add_query("output_format", output_format)
)
def api_unpublish():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = file.publickey
if key is not None:
file.publickey.public = False
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
return jsonify(response=responds['PUBLIC_KEY_UNPUBLISH'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def api_publish():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = file.publickey
if (key is not None) and (key.public is False):
file.publickey.public = True
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
return jsonify(response=responds['PUBLIC_KEY_PUBLISH'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
def is_safe_url(target):
if not target:
return False
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def is_safe_url(target):
if not target:
return False
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def isSafeUrl(target):
""" Checks URL for safety to ensure that it does not redirect unexpectedly.
Args:
target (str): URL for the target to test.
Returns:
bool: True if the URL is safe.
"""
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
###################################################
def is_safe_url(target):
ref_url = urlparse.urlparse(request.host_url)
test_url = urlparse.urlparse(urlparse.urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
def _DL(self, src, execute_in_gid, group_config_type, executor_permission, text):
regex_list = packer_factory._DL
regex_result = tool.regex_finder.find_match(regex_list, text)
if regex_result is None:
return
if regex_result.match_at == 0:
package_id = regex_result.group(1)
including_sound = regex_result.group(2) is not None
try:
sticker_meta = self._sticker_dl.get_pack_meta(package_id)
except tool.MetaNotFoundException:
return error.main.miscellaneous(u'???????(??ID: {})'.format(package_id))
dl_result = self._sticker_dl.download_stickers(sticker_meta, including_sound)
with self._flask_app.test_request_context():
url = request.host_url
ret = [u'???????????????', u'?????????????', u'LINE??????????????????????????', u'?????????gif???? https://ezgif.com/apng-to-gif', u'']
ret.append(u'??ID: {}'.format(sticker_meta.pack_id))
ret.append(u'{} (? {} ??)'.format(sticker_meta.title, sticker_meta.author))
ret.append(u'')
ret.append(u'??????: (??)')
ret.append(u'???? {:.3f} ?'.format(dl_result.downloading_consumed_time))
ret.append(u'???? {:.3f} ?'.format(dl_result.compression_consumed_time))
ret.append(u'???? {} ?'.format(dl_result.sticker_count))
return [bot.line_api_wrapper.wrap_text_message(txt, self._webpage_generator) for txt in (u'\n'.join(ret), url + dl_result.compressed_file_path.replace("\\", "\\\\"))]
else:
raise RegexNotImplemented(error.sys_command.regex_not_implemented(u'DL', regex_result.match_at, regex_result.regex))
def validate_redirect_url(url):
if url is None or url.strip() == '':
return False
url_next = urlsplit(url)
url_base = urlsplit(request.host_url)
if (url_next.netloc or url_next.scheme) and url_next.netloc != url_base.netloc:
return False
return True
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
is_safe = test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
return is_safe
def try_login(self, identity_url, ask_for=None, ask_for_optional=None,
extensions=None, immediate=False):
"""This tries to login with the given identity URL. This function
must be called from the login_handler. The `ask_for` and
`ask_for_optional`parameter can be a set of values to be asked
from the openid provider, where keys in `ask_for` are marked as
required, and keys in `ask_for_optional` are marked as optional.
The following strings can be used in the `ask_for` and
`ask_for_optional` parameters:
``aim``, ``blog``, ``country``, ``dob`` (date of birth), ``email``,
``fullname``, ``gender``, ``icq``, ``image``, ``jabber``, ``language``,
``msn``, ``nickname``, ``phone``, ``postcode``, ``skype``,
``timezone``, ``website``, ``yahoo``
`extensions` can be a list of instances of OpenID extension requests
that should be passed on with the request. If you use this, please make
sure to pass the Response classes of these extensions when initializing
OpenID.
`immediate` can be used to indicate this request should be a so-called
checkid_immediate request, resulting in the provider not showing any
UI.
Note that this adds a new possible response: SetupNeeded, which is the
server saying it doesn't have enough information yet to authorized or
reject the authentication (probably, the user needs to sign in or
approve the trust root).
"""
if ask_for and __debug__:
for key in ask_for:
if key not in ALL_KEYS:
raise ValueError('invalid key %r' % key)
if ask_for_optional:
for key in ask_for_optional:
if key not in ALL_KEYS:
raise ValueError('invalid optional key %r' % key)
try:
consumer = Consumer(SessionWrapper(self), self.store_factory())
auth_request = consumer.begin(identity_url)
if ask_for or ask_for_optional:
self.attach_reg_info(auth_request, ask_for, ask_for_optional)
if extensions:
for extension in extensions:
auth_request.addExtension(extension)
except discover.DiscoveryFailure:
self.signal_error(u'The OpenID was invalid')
return redirect(self.get_current_url())
if self.url_root_as_trust_root:
trust_root = request.url_root
else:
trust_root = request.host_url
return redirect(auth_request.redirectURL(trust_root,
self.get_success_url(),
immediate=immediate))
def _upload():
if request.method == 'POST':
file = request.files['files[]']
# get filename and folders
file_name = secure_filename(file.filename)
directory = str(unique_id())
upload_folder = myapp.config['UPLOAD_FOLDER']
if file.filename == '':
return redirect(request.url)
if file:
#and allowed_file(file.filename)
save_dir = os.path.join(upload_folder, directory)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
cmpl_path = os.path.join(save_dir, file_name)
file.save(cmpl_path)
size = os.stat(cmpl_path).st_size
# create our file from the model and add it to the database
dbfile = File(file_name, directory, size, file.mimetype)
g.user.uploads.append(dbfile)
db.session().add(dbfile)
db.session().commit()
if "image" in dbfile.mimetype:
get_thumbnail(cmpl_path, "100")
thumbnail_url = request.host_url + 'thumbs/' + directory
else:
thumbnail_url = ""
url = request.host_url + 'uploads/' + directory
delete_url = url
delete_type = "DELETE"
file = {"name": file_name, "url": url, "thumbnailUrl": thumbnail_url, "deleteUrl": delete_url,
"deleteType": delete_type, "uid": directory}
return jsonify(files=[file])
else:
return jsonify(files=[{"name": file_name, "error": responds['FILETYPE_NOT_ALLOWED']}])
def try_login(self, identity_url, ask_for=None, ask_for_optional=None,
extensions=None, immediate=False):
"""This tries to login with the given identity URL. This function
must be called from the login_handler. The `ask_for` and
`ask_for_optional`parameter can be a set of values to be asked
from the openid provider, where keys in `ask_for` are marked as
required, and keys in `ask_for_optional` are marked as optional.
The following strings can be used in the `ask_for` and
`ask_for_optional` parameters:
``aim``, ``blog``, ``country``, ``dob`` (date of birth), ``email``,
``fullname``, ``gender``, ``icq``, ``image``, ``jabber``, ``language``,
``msn``, ``nickname``, ``phone``, ``postcode``, ``skype``,
``timezone``, ``website``, ``yahoo``
`extensions` can be a list of instances of OpenID extension requests
that should be passed on with the request. If you use this, please make
sure to pass the Response classes of these extensions when initializing
OpenID.
`immediate` can be used to indicate this request should be a so-called
checkid_immediate request, resulting in the provider not showing any
UI.
Note that this adds a new possible response: SetupNeeded, which is the
server saying it doesn't have enough information yet to authorized or
reject the authentication (probably, the user needs to sign in or
approve the trust root).
"""
if ask_for and __debug__:
for key in ask_for:
if key not in ALL_KEYS:
raise ValueError('invalid key %r' % key)
if ask_for_optional:
for key in ask_for_optional:
if key not in ALL_KEYS:
raise ValueError('invalid optional key %r' % key)
try:
consumer = Consumer(SessionWrapper(self), self.store_factory())
auth_request = consumer.begin(identity_url)
if ask_for or ask_for_optional:
self.attach_reg_info(auth_request, ask_for, ask_for_optional)
if extensions:
for extension in extensions:
auth_request.addExtension(extension)
except discover.DiscoveryFailure:
self.signal_error(u'The OpenID was invalid')
return redirect(self.get_current_url())
if self.url_root_as_trust_root:
trust_root = request.url_root
else:
trust_root = request.host_url
return redirect(auth_request.redirectURL(trust_root,
self.get_success_url(),
immediate=immediate))
def try_login(self, identity_url, ask_for=None, ask_for_optional=None,
extensions=None, immediate=False):
"""This tries to login with the given identity URL. This function
must be called from the login_handler. The `ask_for` and
`ask_for_optional`parameter can be a set of values to be asked
from the openid provider, where keys in `ask_for` are marked as
required, and keys in `ask_for_optional` are marked as optional.
The following strings can be used in the `ask_for` and
`ask_for_optional` parameters:
``aim``, ``blog``, ``country``, ``dob`` (date of birth), ``email``,
``fullname``, ``gender``, ``icq``, ``image``, ``jabber``, ``language``,
``msn``, ``nickname``, ``phone``, ``postcode``, ``skype``,
``timezone``, ``website``, ``yahoo``
`extensions` can be a list of instances of OpenID extension requests
that should be passed on with the request. If you use this, please make
sure to pass the Response classes of these extensions when initializing
OpenID.
`immediate` can be used to indicate this request should be a so-called
checkid_immediate request, resulting in the provider not showing any
UI.
Note that this adds a new possible response: SetupNeeded, which is the
server saying it doesn't have enough information yet to authorized or
reject the authentication (probably, the user needs to sign in or
approve the trust root).
"""
if ask_for and __debug__:
for key in ask_for:
if key not in ALL_KEYS:
raise ValueError('invalid key %r' % key)
if ask_for_optional:
for key in ask_for_optional:
if key not in ALL_KEYS:
raise ValueError('invalid optional key %r' % key)
try:
consumer = Consumer(SessionWrapper(self), self.store_factory())
auth_request = consumer.begin(identity_url)
if ask_for or ask_for_optional:
self.attach_reg_info(auth_request, ask_for, ask_for_optional)
if extensions:
for extension in extensions:
auth_request.addExtension(extension)
except discover.DiscoveryFailure:
self.signal_error(u'The OpenID was invalid')
return redirect(self.get_current_url())
if self.url_root_as_trust_root:
trust_root = request.url_root
else:
trust_root = request.host_url
return redirect(auth_request.redirectURL(trust_root,
self.get_success_url(),
immediate=immediate))