Python discord.ext.commands 模块,Context() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用discord.ext.commands.Context()。
def check_last(self, ctx: commands.Context, name):
""" Shows you how long other users' last session lasted.
:param name: The name (not the nick) of the person to check.
Must use the name#discriminator format.
"""
time_str = printable_time(db_manager.get_user_last_session(name))
if time_str is None:
time_str = "None found."
lib.log("{} queried for {}'s last session time. Result: {}"
.format(lib.get_author_name(ctx, True),
"their" if name == str(ctx.message.author)
else (name + "'s"), time_str))
await self.bot.say("```{}```".format(time_str),
delete_after=self.bot.ans_lifespan * 3)
def total(self, ctx: commands.Context, name=None):
""" Shows you the total time a user has used the timer for.
:param name: The name (not the nick) of the person to check.
Must use the name#discriminator format. If none is provided, it will
check your own record.
"""
if name is None:
name = ctx.message.author
time_str = printable_time(db_manager.get_user_total(name))
if time_str is None:
time_str = "None found."
name = str(name)
lib.log("{} queried for {}'s last session time. Result: {}"
.format(lib.get_author_name(ctx, True),
"their" if name == str(ctx.message.author)
else (name + "'s"), time_str))
await self.bot.say("```{}```".format(time_str),
delete_after=self.bot.ans_lifespan * 3)
def toggle_countdown(self, ctx: commands.Context, toggle=None):
""" Turns the timer's countdown setting on or off. If no state is
specified, it will toggle it
:param toggle: True, yes or on to turn the setting on. False, no or off
to turn the setting off. If not specified, or None is given,
it will toggle the setting.
"""
author = ctx.message.author
channel = self.bot.spoof(author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
timer = interface.timer
toggle = lib.to_boolean(toggle)
if timer.countdown == toggle:
return # No need to edit it if it's the same.
timer.toggle_countdown(toggle)
await self.bot.edit_message(interface.time_message, timer.time())
await self.bot.say("Successfully toggled the countdown setting {}!"
.format("on" if timer.countdown else "off"),
delete_after=self.bot.ans_lifespan)
def timer_stop(self, ctx: commands.Context):
""" Stops the timer, if it's running.
Resets the current period and time, but keeps the setup.
"""
channel = self.bot.spoof(ctx.message.author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
if interface.timer.stop():
send = "Timer will stop soon."
await self.bot.say(send, delete_after=interface.timer.step)
else:
await self.bot.remove_messages(channel)
send = "Timer has stopped."
await self.bot.say(send, tts=interface.tts)
lib.log(send, channel_id=channel.id)
def timer_reset(self, ctx: commands.Context):
""" Resets the timer setup.
"""
channel = self.bot.spoof(ctx.message.author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
if interface.timer.get_state() == State.STOPPED:
interface.timer.set_state(None)
interface.timer = None
interface.time_message = None
interface.list_message = None
log = lib.get_author_name(ctx) + " reset the timer."
send = "Successfully reset session configuration."
else:
log = (lib.get_author_name(ctx) + " tried resetting a timer that "
"was running or paused.")
send = "Cannot do that while the timer is not stopped."
lib.log(log, channel_id=channel.id)
await self.bot.say(send, delete_after=self.bot.ans_lifespan)
def timer_superreset(self, ctx: commands.Context):
""" Ignores all conditions and resets the channel's timer.
Requires elevated permissions.
"""
channel = self.bot.spoof(ctx.message.author, lib.get_channel(ctx))
interface = self.bot.get_interface(channel)
if interface.timer.get_state() == State.RUNNING:
self.bot.timers_running -= 1
await self.bot.update_status()
await self.bot.remove_messages(channel)
interface.timer = None
lib.log("Successfully forced a reset on this channel's timer.",
channel_id=channel.id)
await self.bot.say("Timer has been force-reset",
delete_after=self.bot.ans_lifespan)
def admin_shutdown(self, ctx: commands.Context):
""" Exits the program. Administrator only!
"""
lib.log("Shutting down...")
await self.bot.say("Hope I did well, bye!")
for channel, timer in self.bot.valid_timers().items():
if timer.get_state() != State.STOPPED:
timer.stop()
if lib.get_channel(ctx) != channel:
await self.bot.safe_send(
channel,
"I'm sorry, I have to go. See you later!"
)
await self.bot.remove_messages(channel)
self.bot.unsub_all()
await self.bot.logout()
def has_permission(ctx: commands.Context) -> bool:
""" Checks if a user is an administrator or if has the role
that grants elevated permissions.
:param ctx: The context to check the command in.
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "no permissions"
"""
if isinstance(ctx.bot, PomodoroBot) and \
ctx.bot.has_permission(ctx.message.author):
return True
raise commands.CheckFailure(message="no permissions")
def is_admin(ctx: commands.Context) -> bool:
""" Checks if the author of the command is the administrator / owner
of the bot.
:param ctx: The context to check the command in.
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "not admin"
"""
if isinstance(ctx.bot, PomodoroBot) and \
ctx.bot.is_admin(ctx.message.author):
return True
raise commands.CheckFailure(message="not admin")
def channel_has_timer(ctx: commands.Context) -> bool:
""" Checks if a channel has a valid timer set.
:param ctx: The context to check the command in
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "timer not found"
"""
if isinstance(ctx.bot, PomodoroBot):
channel = ctx.bot.spoof(ctx.message.author, lib.get_channel(ctx))
if ctx.bot.get_interface(channel).timer is not None:
return True
raise commands.CheckFailure(message="timer not found")
def unlocked_or_allowed(ctx: commands.Context) -> bool:
""" Checks if a timer is unlocked, or if the author of the command
has permissions to execute such command on a locked timer.
:param ctx: The context to check the command in
:type ctx: commands.Context
:return: True if the command succeeds, else raises an exception.
:raises: commands.CheckFailure: If the check fails.
message : "timer locked"
"""
if isinstance(ctx.bot, PomodoroBot) and \
ctx.bot.is_locked(lib.get_channel(ctx)) and \
not ctx.bot.has_permission(ctx.message.author):
raise commands.CheckFailure(message="timer locked")
return True
def users(self, ctx: commands.Context, count=10):
"""
Show users with the most messages
"""
count = count
# Show at least 1 user and 20 at most
count = max(1, count)
count = min(20, count)
try:
server = self.orm.Server.get(discord_id=ctx.message.server.id)
except DoesNotExist as e:
# If there's no server in the db an exception will be raised
self.config.logger.error(e)
else:
users = await self.orm.query.user_top_list(count, server)
embed = discord.Embed(color=discord.Color(self.config.constants.embed_color),
timestamp=datetime.datetime.now())
for user in users:
# the user might not have a name if s/he hasn't sent a message already
# so in that case use the id instead
name = user.name if user.name != '' else user.discord_id
embed.add_field(name=name, value='Total messages: {}'.format(user.count), inline=False)
await self.bot.say(content='Top active users:', embed=embed)
def telljoke(self, ctx: commands.Context) -> None:
"""
Responds with a random joke from theoatmeal.com
"""
# TODO: get more joke formats (or a better source)
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://theoatmeal.com/djtaf/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
joke_ul = soup.find(id='joke_0')
joke = joke_ul.find('h2', {'class': 'part1'}).text.lstrip() + '\n' + joke_ul.find(id='part2_0').text
await self.bot.say(joke)
def randomfact(self, ctx: commands.Context) -> None:
"""
Responds with a random fact scraped from unkno.com
"""
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://www.unkno.com/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
await self.bot.say(soup.find(id='content').text)
def clear(self, ctx: commands.Context, number: int, member: discord.Member = None) -> None:
"""
Purges messages from the channel
:param ctx: The message context
:param number: The number of messages to purge
:param member: The member whose messages will be cleared
"""
if number < 1:
await command_error(ctx, "You must attempt to purge at least 1 message!")
return
def predicate(msg: discord.Message) -> bool:
return msg == ctx.message or member is None or msg.author == member
if number <= 100:
# Add 1 to limit to include command message, subtract 1 from the return to not count it.
msgs = await self.bot.purge_from(ctx.message.channel, limit=number + 1, check=predicate)
send(self.bot, '{} message{} cleared.'.format(len(msgs) - 1, "s" if len(msgs) - 1 != 1 else ""),
ctx.message.channel, True)
else:
await command_error(ctx, 'Cannot delete more than 100 messages at a time.')
def yes_no(ctx: commands.Context,
message: str="Are you sure? Type **yes** within 10 seconds to confirm. o.o"):
"""Yes no helper. Ask a confirmation message with a timeout of 10 seconds.
ctx - The context in which the question is being asked.
message - Optional messsage that the question should ask.
"""
await ctx.send(message)
try:
message = await ctx.bot.wait_for("message", timeout=10,
check=lambda message: message.author == ctx.message.author)
except asyncio.TimeoutError:
await ctx.send("Timed out waiting. :<")
return False
if message.clean_content.lower() not in ["yes", "y"]:
await ctx.send("Command cancelled. :<")
return False
return True
def format_command_error(ex: Exception, context: Context) -> tuple:
"""
Format a command error to display and log.
:param ex: the exception raised.
:param context: the context.
:return: a message to be displayed and logged, and triggered message
"""
triggered = context.message.content
four_space = ' ' * 4
ex_type = type(ex).__name__
msg = (
f'{four_space}Triggered message: {triggered}\n'
f'{four_space}Type: {ex_type}\n'
f'{four_space}Exception: {ex}'
)
return msg, triggered
def convert(self, ctx: commands.Context, argument):
def finder(entry):
try:
user_id = int(argument)
except ValueError:
user_id = None
return (str(entry.user) == argument or # username#discriminator
entry.user.name == argument or # username
entry.user.id == user_id) # id
try:
entry = discord.utils.find(finder, await ctx.guild.bans())
if entry is None:
raise commands.BadArgument(
'Banned user not found. You can specify by ID, username, or username#discriminator.'
)
return entry.user
except discord.Forbidden:
raise commands.BadArgument("I can't view the bans for this server.")
def update_statistics(pg: asyncpg.connection.Connection, ctx: commands.Context):
"""
Updates command statistics for a specific `discord.ext.commands.Context`.
If no record was found for a command, it is created. Otherwise, the ``times_used`` and
``last_used`` fields are updated.
"""
row = await get_statistics(pg, str(ctx.command))
if row is None:
# first time command is being used, insert it into the database
insert = 'INSERT INTO command_statistics VALUES ($1, 1, $2)'
await pg.execute(insert, str(ctx.command), datetime.datetime.utcnow())
logger.info('First command usage for %s', ctx.command)
else:
# command was used before, increment time_used and update last_used
update = ('UPDATE command_statistics SET times_used = times_used + 1, last_used = $2 '
'WHERE command_name = $1')
await pg.execute(update, str(ctx.command), datetime.datetime.utcnow())
def _leaderboard(self, ctx: commands.Context, league_id: str, matchday: str=None):
"""Gets league leaderboard"""
headers = [' ', 'ID', 'Team', 'Points', 'P', 'G', 'GA', 'GD']
data = await self._get_league_leaderboard(ctx.message.server.id, league_id, matchday)
pretty_data = []
# await self.bot.say('```diff\n+ ' + data['leagueCaption'] + '\n- Matchday: ' + str(data['matchday']) + '\n```')
await self.bot.say('```diff\n+ {}\n- Matchday: {}\n```'.format(data['leagueCaption'], data['matchday']))
if 'standing' in data:
for team in data['standing']:
pretty_data.append([team['rank'], team['teamId'], team['team'], team['points'], team['playedGames'], team['goals'], team['goalsAgainst'], team['goalDifference']])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
elif 'standings' in data:
for group, v in data['standings'].items():
asyncio.sleep(1)
await self.bot.say('```diff\n+ Group ' + group + '```')
pretty_data = []
for team in v:
pretty_data.append([team['rank'], team['team'], team['points'], team['playedGames'], team['goals'], team['goalsAgainst'], team['goalDifference']])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
def _nextfixtures(self, ctx: commands.Context, league_id: str):
"""Gets last matchday fixtures"""
headers = ['ID', 'Home', ' ', 'Away', 'Date']
data = await self._get_league_fixtures_timeframe(ctx.message.server.id, league_id, 'n7')
await self.bot.say('```diff\n+ Next fixtures```')
pretty_data = []
for fixture in data['fixtures']:
pretty_data.append([
fixture['id'],
'[{}] {}'.format(fixture['homeTeamId'], fixture['homeTeamName']),
' - ',
'[{}] {}'.format(fixture['awayTeamId'], fixture['awayTeamName']),
fixture['date']
])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
def _matchdayfixtures(self, ctx: commands.Context, league_id: str, matchday: str='1'):
"""Gets specific matchday fixtures
Defaults to matchday 1"""
headers = ['ID', 'Home', ' ', ' ', 'Away']
data = await self._get_league_fixtures_matchday(ctx.message.server.id, league_id, matchday)
await self.bot.say('```diff\n+ Matchday ' + matchday + ' fixtures```')
pretty_data = []
for fixture in data['fixtures']:
pretty_data.append([
fixture['id'],
'[{}] {}'.format(fixture['homeTeamId'], fixture['homeTeamName']),
fixture['result']['goalsHomeTeam'],
fixture['result']['goalsAwayTeam'],
'[{}] {}'.format(fixture['awayTeamId'], fixture['awayTeamName'])
])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
def _parse_roles(self, ctx: Context, roles: str, is_primary: int = 0) -> List[Tuple]:
roles = roles.rstrip(", \t\n\r")
roles_arr = roles.split(",")
alias = None
rows = []
for r in roles_arr:
if "=" in r:
role, alias = r.split("=")
role = role.strip(" \t\n\r\"'")
alias = alias.strip(" \t\n\r\"'")
else:
role = r.strip(" \t\n\r\"'")
try:
role_conv = RoleConverter(ctx, role).convert()
except BadArgument as e:
# Unable to convert this role
msg = e.args[0]
print(msg)
await self.bot.say("Couldn't find role `{}` on this server".format(role))
continue
rows.append((role_conv, alias, is_primary))
return rows
def listwords(self, ctx: commands.Context, channel: discord.Channel=None):
"""List all words for the specified channel
Optional parameters:
- channel: the channel to show words for (defaults to the current channel)"""
author = ctx.message.author
if not channel:
channel = ctx.message.channel
if author.id not in self.settings or\
"words" not in self.settings[author.id] or\
channel.id not in self.settings[author.id]["words"] or\
not self.settings[author.id]["words"][channel.id]:
await self.bot.say("You haven't set any words to be tracked!")
else:
head = "Tracked words for {}#{} in #{}".format(author.name, author.discriminator, channel.name)
msg = ""
for word in self.settings[author.id]["words"][channel.id]:
msg += "{}\n".format(word)
await self.bot.say(box(msg, lang=head))
def get(self, ctx: commands.Context, subreddit, num_posts=5, category='hot'):
"""Base command for returning data from a subreddit.
Keyword arguments:
posts -- Number of posts to return (default 5)
category -- Category to look at [hot, new, rising, controversial, top] (default hot)
"""
if num_posts > 25:
await self.bot.say('Number of posts must be no greater than 25.')
return
if subreddit.strip():
if category in self.categories:
result = await self.get_subreddit_top(
session=self.session, subreddit=subreddit, num_posts=num_posts, category=category)
await self.bot.say('\n\n'.join(result))
else:
await self.bot.say('Valid categories are {}: '.format(', '.join(self.categories)))
else:
await self.bot.pm_help(ctx)
def removesheet(self, ctx: commands.Context, name: str):
"""Remove a sheet which has been added.
Arguments:
- <name> The name of the sheet to remove"""
scopes = (ctx.message.channel.id,
ctx.message.server.id,
GLOBAL)
for scope in scopes:
try:
self.sheets[scope].pop(name)
except:
pass
else:
dataIO.save_json(SHEETS_PATH, self.sheets)
await self.bot.say("The sheet has been removed.")
return
await self.bot.say("Couldn't find a sheet with that name in your scope.")
def logerrors(self, ctx: commands.Context):
"""Toggle error logging in this channel."""
channel = ctx.message.channel
task = ENABLE
if channel.id in self.log_channels:
task = DISABLE
await self.bot.say("This will {} error logging in this channel. Are you sure about this? Type `yes` to agree".format(task))
message = await self.bot.wait_for_message(author=ctx.message.author)
if message is not None and message.content == 'yes':
if task == ENABLE:
self.log_channels.append(channel.id)
elif task == DISABLE:
self.log_channels.remove(channel.id)
dataIO.save_json(SETTINGS_PATH, self.log_channels)
await self.bot.say("Error logging {}d.".format(task))
else:
await self.bot.say("The operation was cancelled.")
def regedit(self, ctx: commands.Context):
"""Manages valid register roles."""
if ctx.invoked_subcommand is None:
# Send server's current settings
server = ctx.message.server
await self.bot.send_cmd_help(ctx)
if server.id not in self.settings:
msg = box("Register is not enabled in this server. "
"Use [p]regedit addrole to enable it.")
else:
valid_roles = [r.name for r in server.roles if r.id in self.settings[server.id]["roles"]]
delete_after = self.settings[server.id]["delete_after"]
quiet_status = None
if delete_after is None:
quiet_status = "Quiet mode is disabled."
else:
quiet_status = "Register commands are cleaned up after {} seconds".format(delete_after)
msg = box("{}\n"
"Valid register roles:\n"
"{}"
"".format(quiet_status, ", ".join(sorted(valid_roles)) if valid_roles else None)
)
await self.bot.say(msg)
def trigger_set_text(self, ctx: commands.Context, *,
text: str):
"""Trigger if a message contains a word or phrase.
This text is not case sensitive and strips the message of leading
or trailing whitespace.
"""
text = text.strip().lower()
emojis = await self._get_trigger_emojis(ctx)
if emojis:
self.triggers["text_triggers"][text] = emojis
_save(self.triggers)
emojis_str = " ".join(str(self._lookup_emoji(emoji)) for emoji in emojis)
await self.bot.say("Done - I will now react to messages containing `{text}` with"
" {emojis}.".format(text=text,
emojis=emojis_str))
elif text in self.triggers['text_triggers']:
del self.triggers['text_triggers'][text]
_save(self.triggers)
await self.bot.say("Done - I will no longer react to messages containing `{text}`."
"".format(text=text))
else:
await self.bot.say("Done - no triggers were changed.")
def trigger_set_user(self, ctx: commands.Context,
user: discord.User):
"""Trigger if a message is from some user."""
emojis = await self._get_trigger_emojis(ctx)
if emojis:
self.triggers["user_triggers"][user.id] = emojis
_save(self.triggers)
emojis_str = " ".join(str(self._lookup_emoji(emoji)) for emoji in emojis)
await self.bot.say("Done - I will now react to messages from `{user}` with"
" {emojis}.".format(user=str(user),
emojis=emojis_str))
elif user.id in self.triggers['user_triggers']:
del self.triggers['user_triggers'][user.id]
_save(self.triggers)
await self.bot.say("Done - I will no longer react to messages from `{user}`."
"".format(user=str(user)))
else:
await self.bot.say("Done - no triggers were changed.")
def streamlock_lockmsg(self,
ctx: commands.Context,
*,
message: str=None):
"""Set the message for when the channel is locked.
Leave <message> blank to see the current message.
To include the name of the stream in the message, simply
use the placeholder {stream} in the message."""
channel = ctx.message.channel
settings = self._load(channel=channel)
if message is None:
await self.bot.send_cmd_help(ctx)
await self.bot.say("Current message:\n{}"
"".format(box(settings["LOCK_MSG"])))
return
settings["LOCK_MSG"] = message
await self.bot.say("Done. Sending test message here...")
await self.send_lock_msg("ExampleStream", channel)
self._save(settings, channel=channel)
def streamlock_unlockmsg(self,
ctx: commands.Context,
*,
message: str=None):
"""Set the message for when the channel is unlocked.
Leave <message> blank to see the current message.
To include the name of the stream in the message, simply
use the placeholder {stream} in the message."""
channel = ctx.message.channel
settings = self._load(channel=channel)
if message is None:
await self.bot.send_cmd_help(ctx)
await self.bot.say("Current message:\n{}"
"".format(box(settings["UNLOCK_MSG"])))
return
settings["UNLOCK_MSG"] = message
await self.bot.say("Done. Sending test message here...")
await self.send_lock_msg("ExampleStream", channel, unlock=True)
self._save(settings, channel=channel)
def _channel(self, ctx: commands.Context,
channel: discord.Channel=None):
"""Change le channel où doit être envoyé les messages d'activation de trigger.
Par défaut le présent."""
await self.bot.type()
server = ctx.message.server
if not channel:
channel = server.default_channel
if not self.speak_permissions(server, channel):
await self.bot.say(
"Je n'ai pas les permissions d'envoyer de message sur {0.mention}.".format(channel))
return
self.settings[server.id]["channel"] = channel.id
dataIO.save_json(self.settings_path, self.settings)
channel = self.get_welcome_channel(server)
await self.bot.send_message(channel,"{0.mention}, " + "Je vais maintenant envoyer les messages d'annonce" + "sur {1.mention}.".format(ctx.message.author, channel))
def _delay(self, ctx: commands.Context, seconds: int):
"""Sets the delay between game changes.
Must be at least 15 seconds.
"""
if seconds < 15:
await self.bot.reply(
cf.error("Delay must be at least 15 seconds."))
return
self.settings["delay"] = seconds
dataIO.save_json(self.settings_path, self.settings)
await self.bot.reply(
cf.info("Delay set to {} seconds.".format(seconds)))
def _del(self, ctx: commands.Context, *, game: str):
"""Removes a game from the list."""
try:
self.settings["games"].remove(game)
except ValueError:
await self.bot.reply(
cf.warning("{} is not in the game list.".format(game)))
return
self.settings["del"].append(game)
dataIO.save_json(self.settings_path, self.settings)
await self.bot.reply(
cf.info("{} removed from the game list.".format(game)))
def _set(self, ctx: commands.Context, *games: str):
"""Replaces the game list with the given list."""
games_str = ", ".join(sorted(list(games)))
await self.bot.reply(cf.question(
"You are about to replace the current game list with this:{}"
"Are you sure you want to proceed? (yes/no)".format(
cf.box(games_str))))
answer = await self.bot.wait_for_message(timeout=15,
author=ctx.message.author)
if answer is None or answer.content.lower().strip() != "yes":
await self.bot.reply("Game list not replaced.")
return
self.settings["del"] += self.settings["games"]
self.settings["games"] = list(games)
dataIO.save_json(self.settings_path, self.settings)
await self.bot.reply(cf.info("Game list replaced."))
def _mdm(self, ctx: commands.Context,
role: discord.Role, *, message: str):
"""Sends a DM to all Members with the given Role.
Allows for the following customizations:
{0} is the member being messaged.
{1} is the role they are being message through.
{2} is the person sending the message.
"""
server = ctx.message.server
sender = ctx.message.author
try:
await self.bot.delete_message(ctx.message)
except:
pass
dm_these = self._get_users_with_role(server, role)
for user in dm_these:
try:
await self.bot.send_message(user,
message.format(user, role, sender))
except (discord.Forbidden, discord.HTTPException):
continue
def _changeanswer(self, ctx: commands.Context,
survey_id: str):
"""Changes the calling user's response for the given survey."""
user = ctx.message.author
server_id = self._get_server_id_from_survey_id(survey_id)
if survey_id in self.surveys["closed"]:
await self.bot.send_message(user,
cf.error("That survey is closed."))
return
if not server_id:
await self.bot.send_message(user, cf.error(
"Survey with ID {} not found.".format(survey_id)))
return
new_task = self.bot.loop.create_task(
self._send_message_and_wait_for_message(server_id,
survey_id, user,
change=True))
self.tasks[survey_id].append(new_task)
def _setsteamid(self, ctx: commands.Context, steamID: str):
"""Associates your Discord account to the Steam profile
with the given ID.
You MUST provide your text ID, which has the form 'STEAM_X:X:XXXXXX'.
You can use http://steamidfinder.com/ to get it.
"""
if re.compile(r"STEAM_\d:\d:\d+").match(steamID) is None:
await self.bot.reply(cf.error(
"Provided Steam ID does not seem to be in the correct format. "
"You need to provide the ID of the form 'STEAM_X:X:XXXXXX'. "
"You can use http://steamidfinder.com/ to get it."))
return
server = ctx.message.server
self.settings[server.id]["steam_ids"][steamID.split(":")[-1]] = ctx.message.author.id
dataIO.save_json(self.settings_path, self.settings)
await self.bot.reply(cf.info("Steam ID set."))
def _addquote(self, ctx: commands.Context, *,
new_quote: str):
"""Adds a new quote."""
await self.bot.type()
server = ctx.message.server
if server.id not in self.settings:
self.settings[server.id] = deepcopy(default_settings)
dataIO.save_json(self.settings_path, self.settings)
idx = self.settings[server.id]["next_index"]
self.settings[server.id]["quotes"][str(idx)] = new_quote
self.settings[server.id]["next_index"] += 1
dataIO.save_json(self.settings_path, self.settings)
await self.bot.reply(
cf.info("Quote added as number {}.".format(idx)))
def _profile(self, ctx: commands.Context,
player: str, platform: str="uplay"):
"""Get the profile for the given player.
'platform' must be one of 'uplay', 'xbox', or 'playstation', and
defaults to 'uplay'.
"""
p = await self.get_player(player, platform)
if p is None:
return
await p.check_general()
await p.check_level()
e = discord.Embed(description="Player Summary")
e.set_author(name=p.name, url=p.url)
e.set_thumbnail(url=p.icon_url)
e.add_field(name="Level", value=p.level)
e.add_field(name="XP", value=p.xp)
e.add_field(name="Platform", value=p.platform)
await self.bot.say(embed=e)
def _toggle(self, ctx: commands.Context):
"""Toggles StreamRole on/off."""
await self.bot.type()
server = ctx.message.server
if (not self.settings[server.id]["enabled"] and
self.settings[server.id]["role"] is None):
await self.bot.reply(cf.warning(
"You need to set the role before turning on StreamRole."
" Use `{}streamroleset role`".format(ctx.prefix)))
return
self.settings[server.id][
"enabled"] = not self.settings[server.id]["enabled"]
if self.settings[server.id]["enabled"]:
await self.bot.reply(
cf.info("StreamRole is now enabled."))
else:
await self.bot.reply(
cf.info("StreamRole is now disabled."))
dataIO.save_json(self.settings_path, self.settings)
def _membershipset(self, ctx: commands.Context):
"""Sets membership settings."""
server = ctx.message.server
if server.id not in self.settings:
self.settings[server.id] = deepcopy(default_settings)
self.settings[server.id]["channel"] = server.default_channel.id
dataIO.save_json(self.settings_path, self.settings)
if ctx.invoked_subcommand is None:
await self.bot.send_cmd_help(ctx)
msg = "```"
msg += "ON: {}\n".format(self.settings[server.id]["on"])
msg += "CHANNEL: #{}\n".format(self.get_welcome_channel(server))
msg += "JOIN: {}\n".format(self.settings[server.id]["join_message"])
msg += "LEAVE: {}\n".format(self.settings[server.id]["leave_message"])
msg += "BAN: {}\n".format(self.settings[server.id]["ban_message"])
msg += "UNBAN: {}\n".format(self.settings[server.id]["unban_message"])
msg += "```"
await self.bot.say(msg)
def _jumptop(self, ctx: commands.Context):
"""Gets the top stats for the given jump type.
Optionally provide a limit (default is 10).
"""
await self.bot.type()
server = ctx.message.server
if server.id not in self.settings:
self.settings[server.id] = deepcopy(default_settings)
dataIO.save_json(self.settings_path, self.settings)
if not self._check_settings(server.id):
await self.bot.reply(
cf.error("You need to set up this cog before you can use it."
" Use `{}kzset`.".format(ctx.prefix)))
return
await self._update_database(server.id)
if ctx.invoked_subcommand is None:
await ctx.invoke(self._all)
def say_and_pm(ctx, content):
"""Send message to current channel as well as the command message's author.
`ctx` can be either `discord.Message` or `commands.Context`
"""
channel = ctx.channel
author = ctx.author
to_say = content.format(channel='')
to_pm = content.format(channel=f'in {channel.mention}')
return (await ctx.send(to_say),
await author.send(to_pm))
def get_server(context: Context) -> discord.Server:
""" Gets the server to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The server
"""
return context.message.server
def get_server_id(context: Context) -> str:
""" Gets the ID of the server to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The server's ID
"""
server = get_server(context)
return None if server is None else server.id
def get_channel(context: Context) -> discord.Channel:
""" Gets a channel to which a command was sent, based on the command's
context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel
"""
return context.message.channel
def get_channel_id(context: Context) -> str:
""" Gets the ID of the channel to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel's ID
"""
return get_channel(context).id
def get_channel_name(context: Context) -> str:
""" Gets the name of the channel to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel's name
"""
return get_channel(context).name