Python discord.ext.commands 模块,group() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用discord.ext.commands.group()。
def hots_free(self, ctx):
"""Retrieves the Heroes of the Storm free rotation."""
url = "http://us.battle.net/heroes/en/"
soup = BeautifulSoup(urlopen(url), 'html5lib')
parsedText = [] ## End result: finds cleaned text from HTML
freeHeroes = [] ## Index of all free heroes found
freeIndicator = "|| Free" ## This is what Blizzard uses to indicate free heroes in their webpage
for elem in soup.find_all('li'):
if "free-rotation" in str(elem):
index = str(elem).find(freeIndicator)
## General area of where the string needed is. I'm lazy. ##
parsedText.append(str(elem)[index-100:index+100])
## Find text via regex ##
for string in parsedText:
result = re.search('data-analytics-placement="(.*) Free"', string)
freeHeroes.append(result.group(1)[0:-3])
## Formats via Discord's markdown ##
botText = "```md\n<Free-Rotation>```" + "```"
for hero in freeHeroes:
botText += hero+", "
botText = botText[0:-2] + "```"
await self.bot.say(botText)
def _display_embed(self, ctx, name=None, *entities, whitelist, type_):
colour, action, icon = _value_embed_mappings[whitelist]
embed = (discord.Embed(colour=colour)
.set_author(name=f'{type_} {action}!', icon_url=icon)
)
if name not in {ALL_MODULES_KEY, None}:
cog, _, name = _extract_from_node(name)
embed.add_field(name=type_, value=name or cog)
sorted_entities = sorted(entities, key=_get_class_name)
for k, group in itertools.groupby(sorted_entities, _get_class_name):
group = list(group)
name = f'{k}{"s" * (len(group) != 1)}'
value = truncate(', '.join(map(str, group)), 1024, '...')
embed.add_field(name=name, value=value, inline=False)
await ctx.send(embed=embed)
def _leaderboard(self, ctx: commands.Context, league_id: str, matchday: str=None):
"""Gets league leaderboard"""
headers = [' ', 'ID', 'Team', 'Points', 'P', 'G', 'GA', 'GD']
data = await self._get_league_leaderboard(ctx.message.server.id, league_id, matchday)
pretty_data = []
# await self.bot.say('```diff\n+ ' + data['leagueCaption'] + '\n- Matchday: ' + str(data['matchday']) + '\n```')
await self.bot.say('```diff\n+ {}\n- Matchday: {}\n```'.format(data['leagueCaption'], data['matchday']))
if 'standing' in data:
for team in data['standing']:
pretty_data.append([team['rank'], team['teamId'], team['team'], team['points'], team['playedGames'], team['goals'], team['goalsAgainst'], team['goalDifference']])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
elif 'standings' in data:
for group, v in data['standings'].items():
asyncio.sleep(1)
await self.bot.say('```diff\n+ Group ' + group + '```')
pretty_data = []
for team in v:
pretty_data.append([team['rank'], team['team'], team['points'], team['playedGames'], team['goals'], team['goalsAgainst'], team['goalDifference']])
await self.bot.say(box(tabulate(pretty_data, headers=headers)))
def find_command(self, command):
# This method ensures the command given is valid. We need to loop through commands
# As self.bot.commands only includes parent commands
# So we are splitting the command in parts, looping through the commands
# And getting the subcommand based on the next part
# If we try to access commands of a command that isn't a group
# We'll hit an AttributeError, meaning an invalid command was given
# If we loop through and don't find anything, cmd will still be None
# And we'll report an invalid was given as well
cmd = None
for part in command.split():
try:
if cmd is None:
cmd = self.bot.commands.get(part)
else:
cmd = cmd.commands.get(part)
except AttributeError:
cmd = None
break
return cmd
def log(self, ctx, *, user: str):
"""Shows mod log entries for a user.
Only searches the past 300 cases.
"""
mod_log = ctx.message.server.get_channel('222010090226581504')
entries = []
async for m in self.bot.logs_from(mod_log, limit=300):
entry = self.pollr.match(m.content)
if entry is None:
continue
if user in entry.group('user'):
entries.append(m.content)
fmt = 'Found {} entries:\n{}'
await self.bot.say(fmt.format(len(entries), '\n\n'.join(entries)))
def raw(self, ctx, *, name: TagName(lower=True)):
"""Gets the raw content of the tag.
This is with markdown escaped. Useful for editing.
"""
try:
tag = await self.get_tag(ctx.guild.id, name, connection=ctx.db)
except RuntimeError as e:
return await ctx.send(e)
transformations = {
re.escape(c): '\\' + c
for c in ('*', '`', '_', '~', '\\', '<')
}
def replace(obj):
return transformations.get(re.escape(obj.group(0)), '')
pattern = re.compile('|'.join(transformations.keys()))
await ctx.send(pattern.sub(replace, tag['content']))
def uniquegroup(self, ctx, role: discord.Role, groupid: int):
"""Set a role to a unique group ID,
This means that a user cannot have more then one role from the same group.
Any role sharing the same ID will be considered a group.
GroupID 0 will not be considered unique and can share other roles."""
server = ctx.message.server
if role.id not in self.settings_dict[server.id]['roles']:
await self.bot.say('This role ins\'t in the buyrole list')
elif groupid < 0:
await self.bot.say('The group ID cannot be negative.')
else:
# Set the uniquegroup ID here, logic will remain in a subfunction of buyrole
self.settings_dict[server.id]['roles'][role.id]['uniquegroup'] = groupid
self.save_json()
if groupid == 0:
await self.bot.say('Unique Group ID set. {} isn\'t considered unique.'.format(role.name))
else:
await self.bot.say('Unique Group ID set. {} will now be unique in group ID {}'.format(role.name, groupid))
def __init__(self, bot):
self.bot = bot
self.dbpath = "data/gaming/settings.json"
self.db = fileIO(self.dbpath, "load")
self.version = "1.0.0"
self.update_type = "release"
self.patchnote = """
**Gaming cog, first release!**
Main purpose of this cog is to help large gaming communities. There are two groups of commands at the moment `profiles` and `lfg`
`profiles` is used for managing gaming networks profiles, such as steam, psn, xbl, etx. I see it being used to get users profile, while he/she is away.
Use `[p]help profiles` for more info
`lfg` is a bit barebone at the moment. It can be used to set your status as "looking for group", so other users can see you on the list
Use `[p]help lfg` and `[p]help lfg looking` for more info
More to come!
"""
# useful methids
def uniquegroup(self, ctx, role: discord.Role, groupid: int):
"""Set a role to a unique group ID,
This means that a user cannot have more then one role from the same group.
Any role sharing the same ID will be considered a group.
GroupID 0 will not be considered unique and can share other roles."""
server = ctx.message.server
if role.id not in self.settings_dict[server.id]['roles']:
await self.bot.say('This role ins\'t in the buyrole list')
elif groupid < 0:
await self.bot.say('The group ID cannot be negative.')
else:
# Set the uniquegroup ID here, logic will remain in a subfunction of buyrole
self.settings_dict[server.id]['roles'][role.id]['uniquegroup'] = groupid
self.save_json()
if groupid == 0:
await self.bot.say('Unique Group ID set. {} isn\'t considered unique.'.format(role.name))
else:
await self.bot.say('Unique Group ID set. {} will now be unique in group ID {}'.format(role.name, groupid))
def setcookie(self, ctx):
"""Cookie settings group command"""
if ctx.invoked_subcommand is None:
await send_cmd_help(ctx)
def roulette_round(self, settings, server, players, turn):
roulette_circle = players[:]
chamber = 6
await self.bot.say("*{} put one round into the six shot revolver and gave it a good spin. "
"With a flick of the wrist, it locks in place."
"*".format(self.bot.user.name))
await asyncio.sleep(4)
await self.bot.say("Let's begin round {}.".format(turn))
while chamber >= 1:
if not roulette_circle:
roulette_circle = players[:] # Restart the circle when list is exhausted
chance = random.randint(1, chamber)
player = random.choice(roulette_circle)
await self.bot.say("{} presses the revolver to their temple and slowly squeezes the "
"trigger...".format(player.name))
if chance == 1:
await asyncio.sleep(4)
msg = "**BOOM**\n```{} died and was removed from the group.```".format(player.name)
await self.bot.say(msg)
msg2 = random.choice(kill_message)
settings["Players"].pop(player.id)
remaining = [server.get_member(x) for x in list(settings["Players"].keys())]
player2 = random.choice(remaining)
death_time = strftime("%H:%M:%S", gmtime())
await asyncio.sleep(5)
await self.bot.say(msg2.format(player.name, player2.name, death_time))
await asyncio.sleep(5)
break
else:
await asyncio.sleep(4)
await self.bot.say("**CLICK**\n```{} survived and passed the "
"revolver.```".format(player.name))
await asyncio.sleep(3)
roulette_circle.remove(player)
chamber -= 1
def __init__(self, bot):
self.bot = bot
self.group = []
self.version = 2.01
def dtable(self, ctx):
"""Shows a list under this group commands."""
if ctx.invoked_subcommand is None:
await send_cmd_help(ctx)
def setrace(self, ctx):
"""Race cog's settings group command"""
if ctx.invoked_subcommand is None:
await send_cmd_help(ctx)
def group(name=None, **attrs):
return commands.command(name=name, cls=Group, **attrs)
def get_tsun(self, index: str):
r = requests.get('https://www.instagram.com/tsuntsunlive/')
html = r.content
soup = bs4.BeautifulSoup(html, 'html.parser')
tag_list = soup.find_all("script", type="text/javascript")
tag_list = [str(tag) for tag in tag_list]
tag_list = sorted(tag_list, key=len)
data_tag = tag_list[-1]
after = index.split()
try:
index = int(after[0])
except ValueError:
index = None
except IndexError:
index = None
post_list = re.split('"caption": "', data_tag)[1:]
if index is None:
post = random.choice(post_list)
else:
post = post_list[index - 1]
caption = post[:re.search('", "likes"', post).start()]
caption = re.sub(r"(\\u[0-9a-f]{4})", lambda match: codecs.decode(match.group(1), "unicode_escape"), caption)
caption = re.sub(r"\\n", "\n", caption)
img_part = post[re.search('"display_src": "', post).end():]
img = img_part[:re.search("\?", img_part).start()]
img = re.sub(r"\\", "", img)
data = [img, caption]
return data
def tax(self, ctx):
if ctx.invoked_subcommand is None:
await self.bot.say('Please lookup !help tax for commands in this group')
def channel_mangement(self, ctx):
if ctx.invoked_subcommand is None:
await global_methods.say_other(ctx, 'Please lookup !help man for commands in this group', self.bot)
def mention_id(mention):
match = ID_RE.match(mention)
if match is not None:
return int(match.group(1))
def group(*args, **kwargs):
if 'pass_context' not in kwargs:
kwargs['pass_context'] = True
ret = commands.group(*args, **kwargs)
return ret
def chan(self, ctx):
"""Voice Channel Access `role` command group
[p]vca chan add (voice channel id) (role id)
[p]vca chan rem (voice channel id)"""
pass
def THP(self, ctx, link): ## ctx is an invisible parameter ##
"""Dead site."""
author = ctx.message.author.id
frontpage = []
threads = []
frontpage_str = ''
try:
url = link ## build the web address ##
soup = BeautifulSoup(urlopen(url), 'html5lib')
except Exception:
url = "https://" + link
soup = BeautifulSoup(urlopen(url), 'html5lib')
try:
for elem in soup.find_all('li'):
if "@" in str(elem):
frontpage.append(str(elem)[4:17] + " - " + str(elem.a.contents)[2:-2])
if frontpage:
for link in frontpage:
frontpage_str += link + '\n'
else:
for elem in soup.select('span[id]'):
if "hidden." in str(elem):
regex_str = re.search('>(.*?)</a>', str(elem))
if regex_str:
threads.append(regex_str.group(1))
for thread in threads:
for elem in soup.find_all("input", value=thread):#, string=thread):
regex_str = re.search('>[\r\n]+(.*?)[\r\n]+</span>', str(elem.find_next()))
if regex_str:
frontpage.append(regex_str.group(1))
for link in frontpage:
frontpage_str += link + '\n'
await self.bot.say("```" + frontpage_str + "```")
except Exception as e:
await self.bot.say("Try a valid link.")
def disable(self, ctx, command: str):
"""
Disables a command in this server.
In order to disable a group, simply pass in the group's name.
For example: "d?disable feedback" will disable all feedback commands.
"""
if self.bot.has_prefix(command):
await ctx.send('You must leave off the prefix.')
return
await self.bot.disable_command(ctx.guild, command)
await ctx.ok()
def reddit(self, ctx):
"""
This command group contains all commands related to Reddit feeds.
Feeds will be updated every 30 minutes. Both self and link posts will be posted to the channel. NSFW posts will
only be posted if the channel that the bot is posting in is NSFW. Stickied posts are never posted.
"""
pass
def __init_subclass__(cls, *, game_cls, cmd=None, aliases=(), **kwargs):
super().__init_subclass__(**kwargs)
cls.__game_class__ = game_cls
cmd_name = cmd or cls.__name__.lower()
group_help = inspect.getdoc(cls._game).format(name=cls.name)
group_command = commands.group(
name=cmd_name, aliases=aliases, help=group_help, invoke_without_command=True
)(cls._game)
setattr(cls, f'{cmd_name}', group_command)
gc = group_command.command
for name, member in inspect.getmembers(cls):
if not name.startswith('_game_'):
continue
name = name[6:]
if name in {'invite', 'create'}:
# Special treatment is needed for these two
continue
help = inspect.getdoc(member).format(name=cls.name, cmd=cmd_name)
command = gc(name=name, help=help)(member)
setattr(cls, f'{cmd_name}_{name}', command)
setattr(cls, f'_{cls.__name__}__error', cls._error)
# Deprecate create and invite
dc = functools.partial(gc, cls=DeprecatedCommand, version='1.2')
setattr(cls, f'{cmd_name}_create', dc(name='create', instead=f'{cmd_name}')(cls._game_create))
setattr(cls, f'{cmd_name}_invite', dc(name='invite', instead=f'{cmd_name} @user')(cls._game_invite))
def warn_error(self, ctx, error):
original = getattr(error, 'original', None)
if isinstance(original, RuntimeError):
await ctx.send(original)
# XXX: Should this be a group?
def channel_online(self, twitch_url: str):
# Check a specific channel's data, and get the response in text format
channel = re.search("(?<=twitch.tv/)(.*)", twitch_url).group(1)
url = "https://api.twitch.tv/kraken/streams/{}".format(channel)
response = await utils.request(url, payload=self.params)
# For some reason Twitch's API call is not reliable, sometimes it returns stream as None
# That is what we're checking specifically, sometimes it doesn't exist in the returned JSON at all
# Sometimes it returns something that cannot be decoded with JSON (which means we'll get None back)
# In either error case, just assume they're offline, the next check will most likely work
try:
return response['stream'] is not None
except (KeyError, TypeError):
return False
def twitch(self, ctx, *, member: discord.Member = None):
"""Use this command to check the twitch info of a user
EXAMPLE: !twitch @OtherPerson
RESULT: Information about their twitch URL"""
await ctx.message.channel.trigger_typing()
if member is None:
member = ctx.message.author
result = await utils.get_content('twitch', member.id)
if result is None:
await self.bot.say("{} has not saved their twitch URL yet!".format(member.name))
return
url = result['twitch_url']
user = re.search("(?<=twitch.tv/)(.*)", url).group(1)
twitch_url = "https://api.twitch.tv/kraken/channels/{}".format(user)
payload = {'client_id': self.key}
data = await utils.request(twitch_url, payload=payload)
embed = discord.Embed(title=data['display_name'], url=url)
if data['logo']:
embed.set_thumbnail(url=data['logo'])
embed.add_field(name='Title', value=data['status'])
embed.add_field(name='Followers', value=data['followers'])
embed.add_field(name='Views', value=data['views'])
if data['game']:
embed.add_field(name='Game', value=data['game'])
embed.add_field(name='Language', value=data['broadcaster_language'])
await self.bot.say(embed=embed)
def add_tag(self, ctx, *, result: str):
"""Use this to add a new tag that can be used in this server
Format to add a tag is !tag add <tag> - <result>
EXAMPLE: !tag add this is my new tag - This is what it will be
RESULT: A tag that can be called by '!tag this is my new tag' and will output 'This is what it will be'"""
try:
# Use regex to get the matche for everything before and after a -
match = re.search("(.*) - (.*)", result)
tag = match.group(1).strip()
tag_result = match.group(2).strip()
# Next two checks are just to ensure there was a valid match found
except AttributeError:
await self.bot.say(
"Please provide the format for the tag in: {}tag add <tag> - <result>".format(ctx.prefix))
return
# If our regex failed to find the content (aka they provided the wrong format)
if len(tag) == 0 or len(tag_result) == 0:
await self.bot.say(
"Please provide the format for the tag in: {}tag add <tag> - <result>".format(ctx.prefix))
return
# Make sure the tag created does not mention everyone/here
if '@everyone' in tag_result or '@here' in tag_result:
await self.bot.say("You cannot create a tag that mentions everyone!")
return
entry = {'server_id': ctx.message.server.id, 'tag': tag, 'result': tag_result}
r_filter = lambda row: (row['server_id'] == ctx.message.server.id) & (row['tag'] == tag)
# Try to create new entry first, if that fails (it already exists) then we update it
if not await config.update_content('tags', entry, r_filter):
await config.add_content('tags', entry)
await self.bot.say(
"I have just updated the tag `{0}`! You can call this tag by entering !tag {0}".format(tag))
def add_synonym(self, other):
"""Every word in a group of synonyms shares the same list."""
self.synonyms.extend(other.synonyms)
other.synonyms = self.synonyms
def __init__(self, argument):
compiled = re.compile(r"(?:(?P<hours>\d+)h)?(?:(?P<minutes>\d+)m)?(?:(?P<seconds>\d+)s)?")
self.original = argument
try:
self.seconds = int(argument)
except ValueError as e:
match = compiled.match(argument)
if match is None or not match.group(0):
raise commands.BadArgument('Failed to parse time.') from e
self.seconds = 0
hours = match.group('hours')
if hours is not None:
self.seconds += int(hours) * 3600
minutes = match.group('minutes')
if minutes is not None:
self.seconds += int(minutes) * 60
seconds = match.group('seconds')
if seconds is not None:
self.seconds += int(seconds)
if self.seconds < 0:
raise commands.BadArgument('I don\'t do negative time.')
if self.seconds > 604800: # 7 days
raise commands.BadArgument('That\'s a bit too far in the future for me.')
def convert(self, ctx, argument):
guild = ctx.message.guild
if not guild:
raise commands.NoPrivateMessage()
match = self._get_id_match(argument) or re.match(r'<@&([0-9]+)>$', argument)
params = dict(id=int(match.group(1))) if match else dict(name=argument)
result = discord.utils.get(guild.roles, **params)
if result is None:
return argument
return result
def gaf(self, ctx):
"""
Information on The Never Ending GAF
"""
await ctx.send("The Never Ending GAF is a small group of idiots that play games and insult each other. "
"\nWe reside on our Discord server, usually in the beloved cesspool intelligently "
"titled \"Channel 1\".\nFeel free to pop by, probably get told to piss off, and maybe "
"play some games."
"\nMore info and invite links: <http://www.neverendinggaf.com>")
def jc3(self, ctx):
"""Main command group for JoséCoin v3 commands.
NOTE: this should be REMOVED once JoséCoin v3 becomes stable.
"""
pass
def log(self, ctx):
"""Command group for managing logging."""
if ctx.invoked_subcommand is None:
await edit(ctx, content='\N{HEAVY EXCLAMATION MARK SYMBOL} ``on``, ``off``, ``status``, ``show``, ``key <word>``, ``guild``, ``channel``, ``blacklist channel``, ``blacklist <word>`` or ``blacklist user <user>``', ttl=5)
# Log On
def convert(self):
guild = self.ctx.bot.get_server(BLOB_GUILD_ID)
emojis = {e.id: e for e in guild.emojis}
m = EMOJI_REGEX.match(self.argument)
if m is not None:
emoji = emojis.get(m.group(1))
elif self.argument.isdigit():
emoji = emojis.get(self.argument)
else:
emoji = discord.utils.find(lambda e: e.name == self.argument, emojis.values())
if emoji is None:
raise commands.BadArgument('Not a valid blob emoji.')
return emoji
def partial_emoji(argument, *, regex=EMOJI_REGEX):
if argument.isdigit():
# assume it's an emoji ID
return int(argument)
m = regex.match(argument)
if m is None:
raise commands.BadArgument("That's not a custom emoji...")
return int(m.group(1))
def valid_rank(argument, *, _rank=_rank):
m = _rank.match(argument.strip('"'))
if m is None:
raise commands.BadArgument('Could not figure out mode or rank.')
mode = m.group('mode')
valid = {
'zones': 'Splat Zones',
'splat zones': 'Splat Zones',
'sz': 'Splat Zones',
'zone': 'Splat Zones',
'splat': 'Splat Zones',
'tower': 'Tower Control',
'control': 'Tower Control',
'tc': 'Tower Control',
'tower control': 'Tower Control',
'rain': 'Rainmaker',
'rainmaker': 'Rainmaker',
'rain maker': 'Rainmaker',
'rm': 'Rainmaker'
}
try:
mode = valid[mode.lower()]
except KeyError:
raise commands.BadArgument(f'Unknown Splatoon 2 mode: {mode}') from None
rank = m.group('rank').upper()
number = m.group('number')
if number:
number = int(number)
return mode, { 'rank': rank, 'number': number }
def idc_emoji_or_just_string(val):
match = re.match(r'<(?P<animated>a)?:(?P<name>[a-zA-Z0-9]+):(?P<id>[0-9]+)>$', val)
if match:
return FakeEmoji(match.group("name"), match.group("id"), bool(match.group("animated")))
return FakeEmoji(val.replace(':', ''), None, False) # guess it's not animated
def idc_emoji(val):
match = re.match(r'<(?P<animated>a)?:(?P<name>[a-zA-Z0-9]+):(?P<id>[0-9]+)>$', val)
if not match:
raise errors.BadArgument("Not a valid custom emoji")
return FakeEmoji(match.group("name"), match.group("id"), bool(match.group("animated")))
def _new_message(self, message):
"""Finds the message and checks it for regex"""
user = message.author
if message.server is None:
return
if message.server.id in self.json:
if self.json[message.server.id]['toggle'] is True:
roles = [r.name for r in user.roles]
bot_admin = settings.get_server_admin(message.server)
bot_mod = settings.get_server_mod(message.server)
if message.channel.id in self.json[message.server.id]['excluded_channels']:
return
elif user.id == settings.owner:
return
elif bot_admin in roles:
return
elif bot_mod in roles:
return
elif user.permissions_in(message.channel).manage_messages is True:
return
elif user == message.server.me:
return
if self.json[message.server.id]['strict']:
for match in self.regex_url.finditer(message.content):
if self.emoji_string not in match.group(0):
asyncio.sleep(0.5)
await self.bot.delete_message(message)
if self.json[message.server.id]['dm'] is True:
await self.bot.send_message(message.author, self.json[message.server.id]['message'])
break
elif self.regex.search(message.content) is not None or self.regex_discordme.search(message.content) is not None:
asyncio.sleep(0.5)
await self.bot.delete_message(message)
if self.json[message.server.id]['dm'] is True:
await self.bot.send_message(message.author, self.json[message.server.id]['message'])
def fetch_info(self, ctx, cmd, title):
data = await self.get_xml(cmd, title)
try:
root = ET.fromstring(data)
except ET.ParseError:
return await self.bot.say("I couldn't find anything!")
else:
if len(root) == 1:
entry = root[0]
else:
msg = "**Please choose one by giving its number.**\n"
msg += "\n".join(['{} - {}'.format(n + 1, entry[1].text)
for n, entry in enumerate(root) if n < 10])
await self.bot.say(msg)
check = lambda m: m.content.isdigit() and int(m.content) in range(1, len(root) + 1)
resp = await self.bot.wait_for_message(timeout=15, author=ctx.message.author,
check=check)
if resp is None:
return
entry = root[int(resp.content)-1]
link = 'http://myanimelist.net/{}/{}'.format(cmd, entry.find('id').text)
desc = "MAL [{}]({})".format(entry.find('title').text, link)
syn_raw = entry.find('synopsis').text
title = entry.find('title').text
if syn_raw:
replace = {'"': '\"', '<br />': '', '—': ' - ', ''': '\'',
'“': '\"', '”': '\"', '[i]': '*', '[/i]': '*', '[b]': '**',
'[/b]': '**', '[url=': '', ']': ' - ', '[/url]': ''}
rep_sorted = sorted(replace, key=lambda s: len(s[0]), reverse=True)
rep_escaped = [re.escape(replacement) for replacement in rep_sorted]
pattern = re.compile("|".join(rep_escaped), re.I)
synopsis = pattern.sub(lambda match: replace[match.group(0)],
entry.find('synopsis').text)
else:
synopsis = "There is not a synopsis for {}".format(title)
# Build Embed
embed = discord.Embed(colour=0x0066FF, description=desc)
embed.title = title
embed.set_thumbnail(url=entry.find('image').text)
embed.set_footer(text=synopsis)
for k in switcher:
spec = entry.find(k)
if spec is not None and spec.text is not None:
embed.add_field(name=k.capitalize(),
value=html.unescape(spec.text.replace('<br />', '')))
await self.bot.say(embed=embed)
def process_uesp(self, search, random = False, redirect = True):
# TODO: Add User-Agent
if random:
async with clients.aiohttp_session.get("http://en.uesp.net/w/api.php", params = {"action": "query", "list": "random", "rnnamespace": "0|" + '|'.join(str(i) for i in range(100, 152)) + "|200|201", "format": "json"}) as resp:
data = await resp.json()
search = data["query"]["random"][0]["title"]
else:
async with clients.aiohttp_session.get("http://en.uesp.net/w/api.php", params = {"action": "query", "list": "search", "srsearch": search, "srinfo": "suggestion", "srlimit": 1, "format": "json"}) as resp:
data = await resp.json()
try:
search = data["query"].get("searchinfo", {}).get("suggestion") or data["query"]["search"][0]["title"]
except IndexError:
await self.bot.embed_reply(":no_entry: Page not found")
return
async with clients.aiohttp_session.get("http://en.uesp.net/w/api.php", params = {"action": "query", "redirects": "", "prop": "info|revisions|images", "titles": search, "inprop": "url", "rvprop": "content", "format": "json"}) as resp:
data = await resp.json()
if "pages" not in data["query"]:
await self.bot.embed_reply(":no_entry: Error")
return
page_id = list(data["query"]["pages"].keys())[0]
page = data["query"]["pages"][page_id]
if "missing" in page:
await self.bot.embed_reply(":no_entry: Page not found")
elif "invalid" in page:
await self.bot.embed_reply(":no_entry: Error: {}".format(page["invalidreason"]))
elif redirect and "redirects" in data["query"]:
await self.process_wikipedia(data["query"]["redirects"][-1]["to"], redirect = False)
# TODO: Handle section links/tofragments
else:
description = page["revisions"][0]['*']
description = re.sub("\s+ \s+", ' ', description)
while re.findall("{{[^{]+?}}", description):
description = re.sub("{{[^{]+?}}", "", description)
while re.findall("{[^{]*?}", description):
description = re.sub("{[^{]*?}", "", description)
description = re.sub("<.+?>", "", description, flags = re.DOTALL)
description = re.sub("__.+?__", "", description)
description = description.strip()
description = '\n'.join(line.lstrip(':') for line in description.split('\n'))
while len(description) > 1024:
description = '\n'.join(description.split('\n')[:-1])
description = description.split("==")[0]
## description = description if len(description) <= 1024 else description[:1024] + "..."
description = re.sub("\[\[Category:.+?\]\]", "", description)
description = re.sub("\[\[(.+?)\|(.+?)\]\]|\[(.+?)[ ](.+?)\]", lambda match: "[{}](http://en.uesp.net/wiki/{})".format(match.group(2), match.group(1).replace(' ', '_')) if match.group(1) else "[{}]({})".format(match.group(4), match.group(3)), description)
description = description.replace("'''", "**").replace("''", "*")
description = re.sub("\n+", '\n', description)
thumbnail = data["query"]["pages"][page_id].get("thumbnail")
image_url = thumbnail["source"].replace("{}px".format(thumbnail["width"]), "1200px") if thumbnail else None
await self.bot.embed_reply(description, title = page["title"], title_url = page["fullurl"], image_url = image_url) # canonicalurl?
def show(self, ctx):
"""
Display the roster
The roster includes the name, Destiny 2 class,
and timezone of server members. Note that only
users who have set a role or timezone will be
displayed on the roster.
"""
manager = MessageManager(self.bot, ctx.author, ctx.channel, ctx.prefix, [ctx.message])
roster_groups = []
roster = self.bot.db.get_roster(ctx.guild.id)
if len(roster) != 0:
text = "```\n"
for row in roster:
# Add a single entry to the roster message
member = ctx.guild.get_member(row.get('user_id'))
role = row.get('role')
timezone = row.get('timezone')
if member:
name = member.display_name
formatted_name = (name[:16] + '..') if len(name) > 16 else name
role = role if role else "---"
timezone = timezone if timezone else "---"
text += '{:18} {:6} {:7}\n'.format(formatted_name, timezone, role)
# If the message is too big, place it into a group
if len(text) > 2000:
text += "```"
roster_groups.append(text)
text = "```\n"
# Add any remaining entries into a roster group
if len(text) > 5:
text += "```"
roster_groups.append(text)
# Send the initial roster message
embed_msg = discord.Embed(color=constants.BLUE)
embed_msg.title="{} Roster".format(ctx.guild.name)
embed_msg.description = roster_groups[0]
await manager.say(embed_msg, embed=True, delete=False)
# Send additional roster messages if the roster is too long
for group in roster_groups[1:]:
embed_msg = discord.Embed(color=constants.BLUE)
embed_msg.title="{} Roster (continued)".format(ctx.guild.name)
embed_msg.description = group
await manager.say(embed_msg, embed=True, delete=False)
else:
await manager.say("No roster exists yet. Use '{}roster settimezone' or '{}roster ".format(ctx.prefix, ctx.prefix)
+ "setclass' to add the first entry!")
await manager.clear()
def _remove(self, ctx, rolename, user: discord.Member=None):
"""Removes a role from user, defaults to author
Role name must be in quotes if there are spaces.
You will need a 'Bot Commander' role in order to use this"""
server = ctx.message.server
author = ctx.message.author
role = self._role_from_string(server, rolename)
if role is None:
await self.bot.say("Role not found.")
return
if user is None:
user = author
if role in user.roles:
try:
await self.bot.send_typing(channel)
await self.bot.remove_roles(user, role)
await asyncio.sleep(1)
await self.bot.say("Role successfully removed.")
except discord.Forbidden:
await self.bot.send_typing(channel)
await asyncio.sleep(1)
await self.bot.say("I don't have permissions to manage roles!")
else:
await self.bot.send_typing(channel)
await asyncio.sleep(1)
await self.bot.say("User does not have that role.")
#@commands.group(pass_context=True, no_pm=True)
#@checks.mod_or_permissions()
#async def welcome(self, ctx)
#"""Shows your server's current welcome message or changes it. Bot Commander required"""
#server = ctx.message.server
#lm = load_messages()
#wlc = lm[server.id]['welcome'].format('user')
#await self.bot.say("**your server's current welcome message:** `{}`".format(wlc))
#@welcome.command(pass_context=True, no_pm=True)
#@checks.mod_or_permissions()
#async def onjoin(self, ctx, args)
#"""Sets the server's welcome message to when a new user joins the server"""
#server= ctx.message.server
#lm = load_messages()
def create_environment(cog: 'Exec', ctx: DogbotContext) -> Dict[Any, Any]:
async def upload(file_name: str) -> Message:
"""Shortcut to upload a file."""
with open(file_name, 'rb') as fp:
return await ctx.send(file=discord.File(fp))
async def send(*args, **kwargs) -> Message:
"""Shortcut to send()."""
return await ctx.send(*args, **kwargs)
def better_dir(*args, **kwargs) -> List[str]:
"""dir(), but without magic methods."""
return [n for n in dir(*args, **kwargs) if not n.endswith('__') and not n.startswith('__')]
T = TypeVar('T')
def grabber(lst: List[T]) -> Callable[[int], T]:
"""Returns a function that, when called, grabs an item by ID from a list of objects with an ID."""
def _grabber_function(thing_id: int) -> T:
return discord.utils.get(lst, id=thing_id)
return _grabber_function
env = {
'bot': ctx.bot,
'ctx': ctx,
'msg': ctx.message,
'guild': ctx.guild,
'channel': ctx.channel,
'me': ctx.message.author,
'cog': cog,
# modules
'discord': discord,
'commands': commands,
'command': commands.command,
'group': commands.group,
# utilities
'_get': discord.utils.get,
'_find': discord.utils.find,
'_upload': upload,
'_send': send,
# grabbers
'_g': grabber(ctx.bot.guilds),
'_u': grabber(ctx.bot.users),
'_c': grabber(list(ctx.bot.get_all_channels())),
# last result
'_': cog.last_result,
'_p': cog.previous_code,
'dir': better_dir,
}
# add globals to environment
env.update(globals())
return env
def _do_command(*, thing):
_toggle_help = f"""
Sets whether or not I announce when someone {thing.action}s the server.
Specifying with no arguments will toggle it.
"""
_channel_help = f"""
Sets the channel where I will {thing}.
If no arguments are given, it shows the current channel.
This **must** be specified due to the fact that default channels
are no longer a thing. ([see here]({_DEFAULT_CHANNEL_CHANGE_URL}))
If this isn't specified, or the channel was deleted, the message
will not show.
"""
_delete_after_help = f"""
Sets the time it takes for {thing} messages to be auto-deleted.
Passing it with no arguments will return the current duration.
A number less than or equal 0 will disable automatic deletion.
"""
_message_help = f"""
Sets the bot's message when a member {thing.action}s this server.
The following special formats can be in the message:
`{{{{user}}}}` = The member that {thing.past_tense}. If one isn't placed,
it's placed at the beginning of the message.
`{{{{uid}}}}` = The ID of member that {thing.past_tense}.
`{{{{server}}}}` = The name of the server.
`{{{{count}}}}` = How many members are in the server now.
`{{{{countord}}}}` = Like `{{{{count}}}}`, but as an ordinal,
(e.g. instead of `5` it becomes `5th`.)
`{{{{time}}}}` = The date and time when the member {thing.past_tense}.
"""
@commands.group(name=thing.command_name, help=_toggle_help, invoke_without_command=True)
@_server_message_check()
async def group(self, ctx, enable: bool=None):
await self._toggle_config(ctx, enable, thing=thing)
@group.command(name='message', help=_message_help)
@_server_message_check()
async def group_message(self, ctx, *, message: special_message):
await self._message_config(ctx, message, thing=thing)
@group.command(name='channel', help=_channel_help)
@_server_message_check()
async def group_channel(self, ctx, *, channel: discord.TextChannel):
await self._channel_config(ctx, channel, thing=thing)
@group.command(name='delete', help=_delete_after_help)
@_server_message_check()
async def group_delete(self, ctx, *, duration: int):
await self._delete_after_config(ctx, duration, thing=thing)
return group, group_message, group_channel, group_delete
def check_emotes(self, message):
# check if setting is on in this server
# Let emotes happen in PMs always
server = message.server
# Filter unauthorized users, bots and empty messages
if not (self.bot.user_allowed(message) and message.content):
return
# Don't respond to commands
for m in self.bot.settings.get_prefixes(server):
if message.content.startswith(m):
return
if server is not None:
if server.id not in self.servers:
# default off
self.servers[server.id] = dict({"status": False})
if "emotes" not in self.servers[server.id]:
self.servers[server.id]["emotes"] = dict()
dataIO.save_json(self.data_path, self.servers)
# emotes is off, so ignore
if "status" not in self.servers[server.id]:
self.servers[server.id] = dict({"status": False})
if "emotes" not in self.servers[server.id]:
self.servers[server.id]["emotes"] = dict()
dataIO.save_json(self.data_path, self.servers)
if not self.servers[server.id]["status"]:
return
msg = message.content.lower().split()
listed = []
regexen = []
for n in sorted(self.servers[server.id]["emotes"]):
if not n[0].isalnum():
regexen.append(re.compile(r"\B"+n+r"\b"))
else:
regexen.append(re.compile(r"\b"+n+r"\b"))
for w, r in itertools.product(msg, regexen):
match = r.search(w)
if match:
listed.append(self.servers[server.id]["emotes"][match.group(0)])
pnglisted = list(filter(lambda n: not n.endswith('.gif'), listed))
giflisted = list(filter(lambda n: n.endswith('.gif'), listed))
if pnglisted and len(pnglisted) > 1:
ims = self.imgprocess(pnglisted)
await self.bot.send_file(message.channel, self.emote+ims)
elif pnglisted:
await self.bot.send_file(message.channel, self.emote+pnglisted[0])
if giflisted:
for ims in giflisted:
await self.bot.send_file(message.channel, self.emote+ims)
def add_twitch_url(self, ctx, url: str):
"""Saves your user's twitch URL
EXAMPLE: !twitch add MyTwitchName
RESULT: Saves your twitch URL; notifications will be sent to this server when you go live"""
await ctx.message.channel.trigger_typing()
# This uses a lookbehind to check if twitch.tv exists in the url given
# If it does, it matches twitch.tv/user and sets the url as that
# Then (in the else) add https://www. to that
# Otherwise if it doesn't match, we'll hit an AttributeError due to .group(0)
# This means that the url was just given as a user (or something complete invalid)
# So set URL as https://www.twitch.tv/[url]
# Even if this was invalid such as https://www.twitch.tv/google.com/
# For example, our next check handles that
try:
url = re.search("((?<=://)?twitch.tv/)+(.*)", url).group(0)
except AttributeError:
url = "https://www.twitch.tv/{}".format(url)
else:
url = "https://www.{}".format(url)
# Try to find the channel provided, we'll get a 404 response if it does not exist
status = await utils.request(url, attr='status')
if not status == 200:
await self.bot.say("That twitch user does not exist! "
"What would be the point of adding a nonexistant twitch user? Silly")
return
key = ctx.message.author.id
entry = {'twitch_url': url,
'servers': [ctx.message.server.id],
'notifications_on': 1,
'live': 0,
'member_id': key}
update = {'twitch_url': url}
# Check to see if this user has already saved a twitch URL
# If they have, update the URL, otherwise create a new entry
# Assuming they're not live, and notifications should be on
if not await utils.add_content('twitch', entry):
await utils.update_content('twitch', update, key)
await self.bot.say("I have just saved your twitch url {}".format(ctx.message.author.mention))
def picarto(self, ctx, member: discord.Member = None):
"""This command can be used to view Picarto stats about a certain member
EXAMPLE: !picarto @otherPerson
RESULT: Info about their picarto stream"""
# If member is not given, base information on the author
member = member or ctx.message.author
picarto_entry = await utils.get_content('picarto', member.id)
if picarto_entry is None:
await self.bot.say("That user does not have a picarto url setup!")
return
member_url = picarto_entry['picarto_url']
# Use regex to get the actual username so that we can make a request to the API
stream = re.search("(?<=picarto.tv/)(.*)", member_url).group(1)
url = BASE_URL + '/channel/{}'.format(stream)
payload = {'key': api_key}
data = await utils.request(url, payload=payload)
if data is None:
await self.bot.say("I couldn't connect to Picarto!")
return
# Not everyone has all these settings, so use this as a way to print information if it does, otherwise ignore it
things_to_print = ['channel', 'commissions_enabled', 'is_nsfw', 'program', 'tablet', 'followers',
'content_type']
embed = discord.Embed(title='{}\'s Picarto'.format(data['channel']), url=url)
if data['avatar_url']:
embed.set_thumbnail(url=data['avatar_url'])
for i, result in data.items():
if i in things_to_print and str(result):
i = i.title().replace('_', ' ')
embed.add_field(name=i, value=str(result))
# Social URL's can be given if a user wants them to show
# Print them if they exist, otherwise don't try to include them
for i, result in data['social_urls'].items():
embed.add_field(name=i.title(), value=result)
await self.bot.say(embed=embed)