Python telegram 模块,Bot() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用telegram.Bot()。
def test_requires_usergroup_no_acc(self):
"""
Test requires usergroup decorator if the user has no access
"""
with patch("ownbot.auth.User") as user_mock:
user_mock.return_value.has_access.return_value = False
@ownbot.auth.requires_usergroup("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
return True
bot_mock = Mock(spec=Bot)
update = self.__get_dummy_update()
called = my_command_handler(bot_mock, update)
self.assertIsNone(called)
def test_requires_usergroup_acc(self):
"""
Test requires usergroup decorator if the user has access
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock:
user_mock = user_mock.return_value
user_mock.has_acces.return_value = True
@ownbot.auth.requires_usergroup("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
return True
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
called = my_command_handler(bot_mock, update_mock)
self.assertTrue(called)
def test_assign_first_to(self):
"""
Test assign first to decorator.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock,\
patch("ownbot.auth.UserManager") as usrmgr_mock:
user_mock = user_mock.return_value
usrmgr_mock.return_value.group_is_empty.return_value = True
@ownbot.auth.assign_first_to("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
my_command_handler(bot_mock, update_mock)
self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
self.assertTrue(user_mock.save.called)
def test_assign_first_to_not_first(self):
"""
Test assign first to decorator if the users is not first.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock,\
patch("ownbot.auth.UserManager") as usrmgr_mock:
user_mock = user_mock.return_value
usrmgr_mock.return_value.group_is_empty.return_value = False
@ownbot.auth.assign_first_to("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
my_command_handler(bot_mock, update_mock)
self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
self.assertFalse(user_mock.save.called)
def test_assign_first_to_with_self(self):
"""
Test assign first to decorator with self as first argument.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock,\
patch("ownbot.auth.UserManager") as usrmgr_mock:
user_mock = user_mock.return_value
usrmgr_mock.return_value.group_is_empty.return_value = True
@ownbot.auth.assign_first_to("foo")
def my_command_handler(self, bot, update):
"""Dummy command handler"""
print(self, bot, update)
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
my_command_handler(None, bot_mock, update_mock)
self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
self.assertTrue(user_mock.save.called)
def admin_access():
def access(func):
@wraps(func)
def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
user = update.effective_user
admins = getattr(self, 'admins', None)
if admins is None:
self.logger.warning('Specify self.admins (list of users ids) parameter in '
'manager or channel classes in order to restrict access to bot.')
return func(self, bot, update, *args, **kwargs)
if user.id not in admins:
self.logger.info(f"Unauthorized access denied for {user}.")
return
return func(self, bot, update, *args, **kwargs)
return wrapped
return access
def admin_access(admins_list=None):
def access(func):
@wraps(func)
def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
user = update.effective_user
admins = None
if admins_list is None:
admins = getattr(self, 'admins', None)
if admins is None:
self.logger.warning('Specify self.admins (list of users ids) parameter in '
'manager or channel classes in order to restrict access to bot.')
return func(self, bot, update, *args, **kwargs)
if user.id not in admins:
self.logger.info(f"Unauthorized access denied for {user}.")
return
return func(self, bot, update, *args, **kwargs)
return wrapped
return access
def command_accept_choice(self, bot: Bot, update: Update):
query = update.callback_query
update.message = query.message # because callback update doesn't have message at all
chat_id = update.message.chat_id
chosen_channel = self.chosen_channels.get(chat_id, None)
if chosen_channel:
chosen_channel.remove_commands_handlers(chat_id)
chosen_channel: BaseChannel = self.channels[query.data]
self.chosen_channels[chat_id] = chosen_channel
chosen_channel.add_commands_handlers(chat_id)
bot.edit_message_text(text=f'Chose {query.data} ({chosen_channel.channel_id}).',
chat_id=query.message.chat_id,
message_id=query.message.message_id)
help = chosen_channel.help_text()
update.message.reply_text('```\n' + help + '\n```', parse_mode=ParseMode.MARKDOWN)
def distribute(self, bot: Bot, update: Update, args):
if args.help:
self.send_code(update, args.help)
elif args.command == 'add':
usage = self.subparsers['add'].format_usage()
self.add(bot, update, args.tags, usage)
elif args.command in ['remove', 'delete']:
usage = self.subparsers['remove'].format_usage()
self.remove(bot, update, args.tags, usage)
elif args.command == 'show':
self.show(bot, update)
else:
self.logger.error('Bad args: ' + str(args))
raise Exception # will never get this far (hopefully)
def add(self, bot: Bot, update: Update, args: List[str], usage: str):
if len(args) != 2:
self.send_code(update, usage)
return
subreddits = {}
name, score = args
if score.isdecimal():
score = int(score)
else:
self.send_code(update, usage)
return
subreddits[name] = score
self.store.add(subreddits)
self.show(bot, update)
def main():
bot = Bot(environ.get('TOKEN'))
chat_id = environ.get('CHAT_ID')
post.updatedb()
new_posts = post.get_posts(sent_only=False)
if new_posts:
for p in new_posts:
try:
bot.sendMessage(chat_id=chat_id, text=p.text(), parse_mode='HTML')
post.mark_as_sent(p.uid)
except BadRequest:
logging.info('Bad post formatting: %d' % p.uid)
logger.info('%d new post(s) have been sent!' % len(new_posts))
else:
logger.info('No new posts at this time!')
def add_keyword(bot: telegram.Bot,
update: telegram.Update,
args: list):
if update.message.chat_id not in global_vars.admin_list['TG']:
return
if len(args) == 0:
update.message.reply_text('Usage: /add_keyword keyword1 keyword2 ...')
return
for keyword in args:
logger.debug('keyword: ' + keyword)
if keyword in global_vars.filter_list['keywords']:
update.message.reply_text('Keyword: "' + keyword + '" already in list')
continue
global_vars.filter_list['keywords'].append(keyword)
update.message.reply_text('Done.')
save_data()
def add_channel(bot: telegram.Bot,
update: telegram.Update):
if update.message.forward_from_chat:
if update.message.forward_from_chat.type != 'channel': # it seems forward_from_chat can only be channels
update.message.reply_text(
'Message type error. Please forward me a message from channel, or use /cancel to stop')
return
if update.message.forward_from_chat.id not in global_vars.filter_list['channels']:
global_vars.filter_list['channels'].append(update.message.forward_from_chat.id)
save_data()
update.message.reply_text('Okay, please send me another, or use /cancel to stop')
else:
update.message.reply_text('Already in list. Send me another or use /cancel to stop')
return CHANNEL
else:
if update.message.text == '/cancel':
update.message.reply_text('Done.')
return ConversationHandler.END
else:
update.message.reply_text(
'Message type error. Please forward me a message from channel, or use /cancel to stop')
return CHANNEL
def audio_from_telegram(bot: telegram.Bot,
update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '[ ?? ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def location_from_telegram(bot: telegram.Bot,
update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
latitude = message.location.latitude
longitude = message.location.longitude
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '????????' + get_location_from_baidu(latitude, longitude)}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def setup(webhook_url=None):
"""If webhook_url is not passed, run with long-polling."""
logging.basicConfig(level=logging.WARNING)
if webhook_url:
bot = Bot(TOKEN)
update_queue = Queue()
dp = Dispatcher(bot, update_queue)
else:
updater = Updater(TOKEN)
bot = updater.bot
dp = updater.dispatcher
dp.add_handler(MessageHandler([], example_handler)) # Remove this line
# Add your handlers here
if webhook_url:
bot.set_webhook(webhook_url=webhook_url)
thread = Thread(target=dp.start, name='dispatcher')
thread.start()
return update_queue, bot
else:
bot.set_webhook() # Delete webhook
updater.start_polling()
updater.idle()
def bot_hook():
"""Entry point for the Telegram connection."""
bot = telegram.Bot(botdata['BotToken'])
dispatcher = Dispatcher(bot, None, workers=0)
dispatcher.add_handler(CommandHandler('Abfahrten', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('abfahrten', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('Abfahrt', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('abfahrt', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('A', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('a', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('Hilfe', hilfe))
dispatcher.add_handler(CommandHandler('hilfe', hilfe))
dispatcher.add_handler(CommandHandler('help', hilfe))
dispatcher.add_handler(MessageHandler(Filters.location, nearest_stations))
update = telegram.update.Update.de_json(request.json, bot)
dispatcher.process_update(update)
return 'OK'
def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[..., Any]:
"""
Decorator to check if the message comes from the correct chat_id
:param command_handler: Telegram CommandHandler
:return: decorated function
"""
def wrapper(*args, **kwargs):
bot, update = kwargs.get('bot') or args[0], kwargs.get('update') or args[1]
# Reject unauthorized messages
chat_id = int(_CONF['telegram']['chat_id'])
if int(update.message.chat_id) != chat_id:
logger.info('Rejected unauthorized message from: %s', update.message.chat_id)
return wrapper
logger.info('Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id)
try:
return command_handler(*args, **kwargs)
except BaseException:
logger.exception('Exception occurred within Telegram module')
return wrapper
def _status_table(bot: Bot, update: Update) -> None:
"""
Handler for /status table.
Returns the current TradeThread status in table format
:param bot: telegram bot
:param update: message update
:return: None
"""
# Fetch open trade
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
if get_state() != State.RUNNING:
send_msg('*Status:* `trader is not running`', bot=bot)
elif not trades:
send_msg('*Status:* `no active order`', bot=bot)
else:
trades_list = []
for trade in trades:
# calculate profit and send message to user
current_rate = exchange.get_ticker(trade.pair)['bid']
trades_list.append([
trade.id,
trade.pair,
shorten_date(arrow.get(trade.open_date).humanize(only_distance=True)),
'{:.2f}'.format(100 * trade.calc_profit(current_rate))
])
columns = ['ID', 'Pair', 'Since', 'Profit']
df_statuses = DataFrame.from_records(trades_list, columns=columns)
df_statuses = df_statuses.set_index(columns[0])
message = tabulate(df_statuses, headers='keys', tablefmt='simple')
message = "<pre>{}</pre>".format(message)
send_msg(message, parse_mode=ParseMode.HTML)
def _balance(bot: Bot, update: Update) -> None:
"""
Handler for /balance
Returns current account balance per crypto
"""
output = ''
balances = [
c for c in exchange.get_balances()
if c['Balance'] or c['Available'] or c['Pending']
]
if not balances:
output = '`All balances are zero.`'
for currency in balances:
output += """*Currency*: {Currency}
*Available*: {Available}
*Balance*: {Balance}
*Pending*: {Pending}
""".format(**currency)
send_msg(output)
def _count(bot: Bot, update: Update) -> None:
"""
Handler for /count.
Returns the number of trades running
:param bot: telegram bot
:param update: message update
:return: None
"""
if get_state() != State.RUNNING:
send_msg('`trader is not running`', bot=bot)
return
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
message = tabulate({
'current': [len(trades)],
'max': [_CONF['max_open_trades']]
}, headers=['current', 'max'], tablefmt='simple')
message = "<pre>{}</pre>".format(message)
logger.debug(message)
send_msg(message, parse_mode=ParseMode.HTML)
def _help(bot: Bot, update: Update) -> None:
"""
Handler for /help.
Show commands of the bot
:param bot: telegram bot
:param update: message update
:return: None
"""
message = """
*/start:* `Starts the trader`
*/stop:* `Stops the trader`
*/status [table]:* `Lists all open trades`
*table :* `will display trades in a table`
*/profit:* `Lists cumulative profit from all finished trades`
*/forcesell <trade_id>|all:* `Instantly sells the given trade or all trades, regardless of profit`
*/performance:* `Show performance of each finished trade grouped by pair`
*/count:* `Show number of trades running compared to allowed number of trades`
*/balance:* `Show account balance per currency`
*/help:* `This help message`
*/version:* `Show version`
"""
send_msg(message, bot=bot)
def broadcast(bot, update):
info_log(update)
bot = Bot(api_key)
# list users from MySQL
accounts_list = mysql_select_accounts_balances()
# some users are bugged & stop broadcast - they deleted chat with bot. So we blacklist them
BLACK_LIST = mysql_select_blacklist()
for account in accounts_list:
# if not in blacklist and has balance
if ((account[0] not in BLACK_LIST) and (int(account[1]) > 0)):
mysql_set_blacklist(account[0])
print(account[0])
push_simple(bot, account[0], update.message.text.replace('/broadcast ', ''))
sleep(0.2)
mysql_delete_blacklist(account[0]) # if someone deleted chat, broadcast will fail and he will remain in blacklist
# bootstrap
def monitoring_block_count():
# set bot
bot = Bot(api_key)
count = int(rpc({"action": "block_count"}, 'count'))
reference_count = int(reference_block_count())
http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
response = http.request('GET', summary_url, headers=header, timeout=20.0)
try:
json_data = json.loads(response.data)
community_count = int(json_data['blocks'])
except ValueError as e:
community_count = reference_count
difference = int(math.fabs(community_count - count))
response = http.request('GET', block_count_url, headers=header, timeout=20.0)
raiwallet_count = int(response.data)
if (difference > block_count_difference_threshold):
# Warning to admins
for user_id in admin_list:
push(bot, user_id, 'Block count: {0}\nCommunity: {1}\nDifference: *{2}*\nReference: {3}\nraiwallet.info: {4}'.format(count, community_count, difference, reference_count, raiwallet_count))
# trying to fix
bootstrap_multi()
def get_telegram_bot():
"""
Creates a telegram bot instance.
Returns:
telegram.Bot: The bot instance.
Raises:
TelegramConfigurationError: If the telegram bot was not properly
configured.
"""
logger = logging.getLogger()
try:
return Bot(Config().telegram_token)
except (FileNotFoundError, ValueError, TelegramError) as exc:
logger.error("Telegram token not present or invalid: '%s'", str(exc))
raise TelegramConfigurationError("Telegram token not "
"present or invalid.")
def connect(self):
config = ConfigParser.ConfigParser()
if self.chatnetwork == 'telegram':
try:
config.read(self.keyslocation)
self.telegram_access_token = config.get('telegram','access_token')
self.logger.debug("Successfully read access keys")
except Exception,e:
self.logger.error("Could not read access keys: %s"%e)
sys.exit(1)
try:
self.telegram_bot = telegram.Bot(token=self.telegram_access_token)
self.telegram_updater = Updater(token=self.telegram_access_token)
self.telegram_dispatcher = self.telegram_updater.dispatcher
self.telegram_dispatcher.addTelegramMessageHandler(self.respond)
except Exception,e:
self.logger.error("Telegram bot connect failed: %s"%e)
sys.exit(1)
else:
self.logger.error('Chatnetwork name not correct, found %s'%self.chatnetwork)
return -1
def cmd_list_tokens(bot: telegram.Bot, update: telegram.Update):
cid = update.message.chat_id
if generic_checks(bot, update, "listtokens", "", 1):
# Extract value
text = update.message.text
cmd, tag = text.split(" ", 1)
q = Query()
tokens = tokens_db.search(q.poll_tag == tag)
msg = "There have been %i tokens generated for your poll:\n" % len(
tokens)
msg = msg + "\n".join(
["%i: %s" % (n + 1, t['token']) for n, t in enumerate(tokens)])
bot.sendMessage(cid, msg)
def user_is_in_chat(uid: int, cid: int, bot: telegram.Bot):
# Returns True if the specified user_id is in the specified chat_id.
#
# Returns False if:
# * the bot isn't in the chat (so it can't see the members), or;
# * the chat doesn't exist, or;
# * the user isn't in the chat, or;
# * the user doesn't exist.
try:
user = bot.get_chat_member(chat_id=cid, user_id=uid)
except telegram.error.BadRequest as err:
if "chat not found" in err.message.lower():
logger.info(err_chat_not_found % (uid, cid))
return False
elif "user not found" in err.message.lower():
logger.info(err_user_not_found % (uid, cid))
return False
else:
logger.exception("Unhandled exception")
raise
logger.info("User %s is in chat %s" % (uid, cid))
return True
def cmd_vote(bot: telegram.Bot, update: telegram.Update):
cid = update.message.chat_id
uid = update.message.from_user.id
polls = get_polls(uid, bot)
if update.message.chat.type != "private":
bot.sendMessage(cid, text_private_chat_only)
return ConversationHandler.END
if len(polls) == 0:
bot.sendMessage(cid, "You aren't eligible to vote in any polls.")
else:
keyboard_choices = [p["tag"] + ": " + p["title"] for p in polls]
# keyboard array is a list of lists
# because each list represents a new row
# and we want each button on a separate row
keyboard_array = [[k, ] for k in keyboard_choices]
keyboard = ReplyKeyboardMarkup(keyboard_array,
one_time_keyboard=True)
bot.sendMessage(cid,
"Click the button for the poll you would like to vote in.",
reply_markup=keyboard)
return state_vote_1
def get_polls(user_id: int, bot: telegram.Bot):
# Gets a list of polls that the given user_id is allowed to vote in.
# * The poll must be active.
# * The user must belong to the poll's target_chat. (This is determined
# by asking the Telegram API - "Does user 123 belong to chat -456?")
all_polls = polls_db.all()
# Convert to set to get unique values
target_chats = set([p['target_chat'] for p in all_polls
if p['active'] == True])
# Ask telegram API - is user 123 in chat -456?
chats_user_is_in = [c for c in target_chats
if user_is_in_chat(user_id, c, bot)]
valid_polls = [p for p in all_polls
if p['target_chat'] in chats_user_is_in
and p['active'] == True]
return valid_polls
def cmd_set_generic(bot: telegram.Bot, update: telegram.Update, field_name: str,
cmd_name: str, example: str):
if generic_checks(bot, update, cmd_name, example, 2):
# Extract value
text = update.message.text
cmd, tag, value = text.split(" ", 2)
# Apply edit and
edit_poll(tag, field_name, value)
# send "success" feedback message
new_p = get_poll(tag)
msg = text_edit_successful.format(p=new_p)
cid = update.message.chat_id
bot.sendMessage(cid, msg)
# Check if complete, and suggest to activate
if check_if_complete(tag):
msg2 = text_activate_suggestion.format(t=tag)
bot.sendMessage(cid, msg2, parse_mode="Markdown")
else:
# A generic check failed.
pass
return
def main():
global update_id
# Telegram Bot Authorization Token
f = open("../token.txt", "r")
token = f.readline()
f.close()
bot = telegram.Bot(token=token)
# get the first pending update_id, this is so we can skip over it in case
# we get an "Unauthorized" exception.
try:
update_id = bot.getUpdates()[0].update_id
except IndexError:
update_id = None
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
while True:
try:
echo(bot)
except NetworkError:
sleep(1)
except Unauthorized:
# The user has removed or blocked the bot.
update_id += 1
def execute(self, chat_id, bot: Bot, update: Update):
handled = False
if update.message.text in self._buildings_dict:
handled = True
building_identifier = self._buildings_dict[update.message.text]
classrooms = self.get_classroom_source().get_classrooms_in_building(building_identifier)
keyboard_button = []
counter = 0
row = -1
for classroom in classrooms:
if counter == 0:
keyboard_button.append([])
row += 1
keyboard_button[row].append(KeyboardButton("/"+classroom.get_name().lower()))
counter += 1
if counter == 3:
counter = 0
keyboard_button.append(["/buildings"])
reply_keyboard = ReplyKeyboardMarkup(keyboard_button)
bot.send_message(chat_id=chat_id, text="Available classrooms", reply_markup=reply_keyboard)
return handled
def help_about(bot: telegram.Bot, update: telegram.Update):
bot.send_message(
chat_id=update.message.chat_id,
text=strings.HELP_MESSAGE,
parse_mode='HTML',
disable_web_page_preview=True,
reply_markup=strings.HELP_MESSAGE_KEYBOARD
)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='help.about'
)
logging.info('Command help: about.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def settings_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_message(bot, chat_settings)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.main'
)
logging.info('Command settings: main.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def send_settings_message(bot: telegram.Bot, chat_settings: bot_types.ChatSettings):
bot.send_message(
chat_id=chat_settings.id,
text=strings.SETTINGS_MESSAGE % (
chat_settings.admin_name,
str(chat_settings.mode),
'???????' if chat_settings.quiet else '????????',
'?????? ????????????? ????' if chat_settings.admin_only else '??? ??????',
chat_settings.yandex_key if chat_settings.yandex_key else '?? ??????????',
str(chat_settings.voice),
str(chat_settings.emotion),
str(chat_settings.speed),
'???????????' if chat_settings.as_audio else '????????? ?????????',
'????????? ?????????' if chat_settings.as_audio else '???????????',
'?????????' if chat_settings.quiet else '????????',
'?????' if chat_settings.admin_only else '?????? ????????????? ????'
),
parse_mode='HTML'
)
# endregion
# region settings voice
def settings_voice_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_voice_message(bot, update.message.chat_id)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.voice.get'
)
logging.info('Command settings: voice.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def send_settings_voice_message_callback(bot: telegram.Bot, voice: bot_types.Voice,
callback_query_id: str, quiet: bool):
if quiet:
bot.answer_callback_query(
callback_query_id=callback_query_id
)
else:
bot.answer_callback_query(
callback_query_id=callback_query_id,
text=strings.NEW_VOICE_MESSAGE % str(voice)
)
# endregion
# region settings emotion
def settings_emotion_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_emotion_message(bot, update.message.chat_id)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.emotions.get'
)
logging.info('Command settings: emotion.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def send_settings_emotion_message_callback(bot: telegram.Bot, emotion: bot_types.Emotion,
callback_query_id: str, quiet: bool):
if quiet:
bot.answer_callback_query(
callback_query_id=callback_query_id
)
else:
bot.answer_callback_query(
callback_query_id=callback_query_id,
text=strings.NEW_EMOTION_MESSAGE % str(emotion)
)
# endregion
# region settings mode
def send_settings_mode_message_callback(bot: telegram.Bot, mode: bot_types.Mode,
callback_query_id: str, quiet: bool):
if quiet:
bot.answer_callback_query(
callback_query_id=callback_query_id
)
else:
bot.answer_callback_query(
callback_query_id=callback_query_id,
text=strings.NEW_MODE_MESSAGE % str(mode)
)
# endregion
# region settings audio
def set_auto(bot: Bot, update: Update):
"""
Handles /auto command
:param bot:
:param update:
:return:
"""
global data
chat_id = update.message.chat_id
if chat_id in data.conversations:
data.conversations[chat_id].auto = not data.conversations[chat_id].auto
if data.conversations[chat_id].auto:
message = "Automatic mode enabled."
else:
message = "Automatic mode disabled."
send_custom_message(bot=bot, chat_id=chat_id, message=message)
else:
send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
def send_matches(bot: Bot, update: Update):
"""
Send list of matches (pictures) to private chat
:param bot:
:param update:
:return:
"""
global data
chat_id = update.message.chat_id
sender_id = update.message.from_user.id
if chat_id in data.conversations:
try:
conversation = data.conversations[chat_id]
matches = conversation.get_matches()
id = 0
# TODO cache the photo urls for some time
for match in matches:
photo = match.user.get_photos(width='172')[0]
try:
is_msg_sent = send_private_photo(bot=bot, caption=match.user.name + " ID=" + str(id), url=photo,
user_id=sender_id)
if not is_msg_sent:
notify_start_private_chat(bot=bot,
chat_id=chat_id,
incoming_message=update.message)
break
id += 1
except error.BadRequest:
send_custom_message(bot=bot, chat_id=sender_id, message="One match could not be loaded: %s" % photo)
time.sleep(0.1)
except AttributeError as e:
message = "An error happened."
send_custom_message(bot=bot, chat_id=sender_id, message=message)
else:
send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
def unlink(bot: Bot, update: Update):
"""
Handles /unlink command
:param bot:
:param update:
:return:
"""
global data
sender = update.message.from_user.id
chat_id = update.message.chat_id
if chat_id in data.conversations:
if sender == data.conversations[chat_id].owner:
del data.conversations[chat_id]
send_message(bot, chat_id, name="account_unlinked")
else:
send_error(bot, chat_id=chat_id, name="command_not_allowed")
else:
send_error(bot, chat_id=chat_id, name="account_not_setup")
def test_admin_help(self):
"""
Test admin help command
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
AdminCommands._AdminCommands__admin_help( # pylint: disable=no-member, protected-access
bot, update)
self.assertTrue(bot.sendMessage.called)
def test_get_users_no_config(self):
"""
Test get users command if no users are registered
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
usrmgr_mock.return_value.config = {}
AdminCommands._AdminCommands__get_users( # pylint: disable=no-member, protected-access
bot, update)
self.assertTrue(bot.sendMessage.called)
def test_get_users(self):
"""
Test get users command
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
config = {"foogroup": {"users": [{"id": 1337,
"username": "@foouser"}],
"unverified": ["@baruser"]}}
usrmgr_mock.return_value.config = config
AdminCommands._AdminCommands__get_users( # pylint: disable=no-member, protected-access
bot, update)
self.assertTrue(bot.sendMessage.called)
def test_adduser_no_args(self):
"""
Test adduser command if the wrong number of args is passed
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
AdminCommands._AdminCommands__add_user( # pylint: disable=no-member, protected-access
bot, update, [])
bot.sendMessage.assert_called_with(
chat_id=1, text="Usage: adduser <user> <group>")
def test_adduser_usr_already_in_grp(self):
"""
Test adduser command if user is already in group
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
usrmgr_mock.return_value.add_user.return_value = False
AdminCommands._AdminCommands__add_user( # pylint: disable=no-member, protected-access
bot, update, ["@foouser", "foogroup"])
bot.sendMessage.assert_called_with(
chat_id=1,
text="The user '@foouser' is already in the group 'foogroup'!")
def test_rmuser_no_args(self):
"""
Test rmuser command if the wrong number of args is passed
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
AdminCommands._AdminCommands__rm_user( # pylint: disable=no-member, protected-access
bot, update, [])
bot.sendMessage.assert_called_with(chat_id=1,
text="Usage: rmuser <user> <group>")