Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Alexei | 34fdfa2c82 | 2 years ago |
4 changed files with 195 additions and 3 deletions
@ -0,0 +1,141 @@ |
|||||||
|
#-*-coding utf-8-*- |
||||||
|
# Общественное достояние, 2023, Алексей Безбородов (Alexei Bezborodov) <AlexeiBv+mirocod_platform_bot@narod.ru> |
||||||
|
|
||||||
|
from bot_sys import interfaces, bot_bd, keyboard, user_access, test_log |
||||||
|
|
||||||
|
import asyncio |
||||||
|
|
||||||
|
class UserData(): |
||||||
|
def __init__(self, a_id, a_username, a_first_name, a_last_name, a_is_bot, a_language_code): |
||||||
|
self.id = a_id |
||||||
|
self.username = a_username |
||||||
|
self.first_name = a_first_name |
||||||
|
self.last_name = a_last_name |
||||||
|
self.is_bot = a_is_bot |
||||||
|
self.language_code = a_language_code |
||||||
|
|
||||||
|
class UserPhoto(): |
||||||
|
def __init__(self, a_PhotoID): |
||||||
|
self.file_id = a_PhotoID |
||||||
|
|
||||||
|
class UserMessage(): |
||||||
|
def __init__(self, a_UserData, a_Message, a_PhotoID): |
||||||
|
self.from_user = a_UserData |
||||||
|
self.text = a_Message |
||||||
|
if not a_PhotoID: |
||||||
|
self.photo = None |
||||||
|
self.photo = [UserPhoto(a_PhotoID)] |
||||||
|
|
||||||
|
class BotMessage(): |
||||||
|
def __init__(self, a_Message, a_PhotoIDs, a_KeyboardButtons, a_InlineKeyboardButtons, parse_mode = None): |
||||||
|
self.m_Message = a_Message |
||||||
|
self.m_PhotoIDs = a_PhotoIDs |
||||||
|
self.m_KeyboardButtons = a_KeyboardButtons |
||||||
|
self.m_InlineKeyboardButtons = a_InlineKeyboardButtons |
||||||
|
self.m_parse_mode = parse_mode |
||||||
|
|
||||||
|
class BotMessageHandler(): |
||||||
|
def __init__(self, a_MessageHandler, a_CheckFunc=None, commands=None, regexp=None, content_types=None, state=None): |
||||||
|
self.m_MessageHandler = a_MessageHandler |
||||||
|
self.m_CheckFunc = a_CheckFunc |
||||||
|
self.m_commands = commands |
||||||
|
self.m_regexp = regexp |
||||||
|
self.m_content_types = content_types |
||||||
|
self.m_state = state |
||||||
|
|
||||||
|
class TestBot(interfaces.IBot): |
||||||
|
def __init__(self, a_BDFileName, a_RootIDs, a_Log): |
||||||
|
self.m_BDFileName = a_BDFileName |
||||||
|
self.m_RootIDs = a_RootIDs |
||||||
|
self.m_Log = a_Log |
||||||
|
self.m_UserMessage = {} |
||||||
|
self.m_BotMessage = {} |
||||||
|
self.m_BotMessageHandlers = [] |
||||||
|
|
||||||
|
def SendUserMessage(self, a_UserData : UserData, a_Message, a_PhotoIDs): |
||||||
|
user_id = a_UserData.id |
||||||
|
if not self.m_UserMessage.get(user_id, None): |
||||||
|
self.m_UserMessage[user_id] = [] |
||||||
|
self.m_UserMessage[user_id] += [UserMessage(a_UserData, a_Message, a_PhotoIDs)] |
||||||
|
|
||||||
|
def GetBotMessage(self, a_UserID): # return [BotMessage] |
||||||
|
result = self.m_BotMessage.get(a_UserID, None) |
||||||
|
self.m_BotMessage[a_UserID] = [] |
||||||
|
return result |
||||||
|
|
||||||
|
def GetRootIDs(self): |
||||||
|
return self.m_RootIDs |
||||||
|
|
||||||
|
def GetLog(self): |
||||||
|
return self.m_Log |
||||||
|
|
||||||
|
def SQLRequest(self, a_Request : str, commit = False, return_error = False, param = None): |
||||||
|
return bot_bd.SQLRequest(self.m_Log, self.m_BDFileName, a_Request, commit = commit, return_error = return_error, param = param) |
||||||
|
|
||||||
|
async def SendMessage(self, a_UserID, a_Message, a_PhotoIDs, a_KeyboardButtons, a_InlineKeyboardButtons, parse_mode = None): |
||||||
|
self.m_BotMessage[a_UserID] = [BotMessage(a_Message, a_PhotoIDs, a_KeyboardButtons, a_InlineKeyboardButtons, parse_mode = parse_mode)] |
||||||
|
|
||||||
|
async def SendDocument(self, a_UserID, a_Document, a_Caption, a_KeyboardButtons, a_InlineKeyboardButtons): |
||||||
|
pass |
||||||
|
|
||||||
|
def RegisterMessageHandler(self, a_MessageHandler, a_CheckFunc=None, commands=None, regexp=None, content_types=None, state=None): |
||||||
|
self.m_BotMessageHandlers += [BotMessageHandler(a_MessageHandler, a_CheckFunc=a_CheckFunc, commands=commands, regexp=regexp, content_types=content_types, state=state)] |
||||||
|
|
||||||
|
def RegisterCallbackHandler(self, a_CallbackHandler, a_CheckFunc=None, commands=None, regexp=None, content_types=None, state=None): |
||||||
|
pass |
||||||
|
|
||||||
|
def StartPolling(self): |
||||||
|
for user_id, user_msgs in self.m_UserMessage.items(): |
||||||
|
for user_msg in user_msgs: |
||||||
|
for message_handler in self.m_BotMessageHandlers: |
||||||
|
check = False |
||||||
|
if message_handler.m_CheckFunc: |
||||||
|
check = m_CheckFunc(user_msg) |
||||||
|
cmds = message_handler.m_commands |
||||||
|
if cmds: |
||||||
|
for c in cmds: |
||||||
|
if '/'+ c == user_msg.text: |
||||||
|
check = True |
||||||
|
print('user_msg.text', user_msg.text, c, check) |
||||||
|
# TODO : добавить обработку regexp content_types state |
||||||
|
if check: |
||||||
|
print('message_handler', message_handler.m_MessageHandler, user_msg) |
||||||
|
asyncio.run(message_handler.m_MessageHandler(user_msg, state = message_handler.m_state)) |
||||||
|
self.m_UserMessage = {} |
||||||
|
|
||||||
|
def Test(): |
||||||
|
print('Test') |
||||||
|
log = test_log.TestLog() |
||||||
|
root_id = '234353425' |
||||||
|
a_RootIDs = [root_id] |
||||||
|
a_BDFileName = 'test_bd.bd' |
||||||
|
bot = TestBot(a_BDFileName, a_RootIDs, log) |
||||||
|
|
||||||
|
user_kb = [['user_kb']] |
||||||
|
user_ikb = [['user_ikb']] |
||||||
|
user_parse_mode = 'html' |
||||||
|
async def TestMessageHandler(a_Message, state = None): |
||||||
|
user_id = str(a_Message.from_user.id) |
||||||
|
print('user_id', user_id) |
||||||
|
await bot.SendMessage(user_id, a_Message.text, a_Message.photo[0].file_id, user_kb, user_ikb, user_parse_mode) |
||||||
|
|
||||||
|
bot.RegisterMessageHandler(TestMessageHandler, commands = ['start']) |
||||||
|
user_photo_id = '435456567' |
||||||
|
user_message = '/start' |
||||||
|
|
||||||
|
user_data = UserData(root_id, 'a_username', 'a_first_name', 'a_last_name', 'False', 'ru') |
||||||
|
bot.SendUserMessage(user_data, user_message, user_photo_id) |
||||||
|
|
||||||
|
bot.StartPolling() |
||||||
|
|
||||||
|
bot_messages = bot.GetBotMessage(root_id) |
||||||
|
|
||||||
|
assert len(bot_messages) == 1 |
||||||
|
bot_message = bot_messages[0] |
||||||
|
|
||||||
|
assert bot_message.m_Message == user_message |
||||||
|
assert bot_message.m_PhotoIDs == user_photo_id |
||||||
|
assert bot_message.m_KeyboardButtons[0][0] == user_kb[0][0] |
||||||
|
assert bot_message.m_InlineKeyboardButtons[0][0] == user_ikb[0][0] |
||||||
|
assert bot_message.m_parse_mode == user_parse_mode |
||||||
|
|
@ -0,0 +1,51 @@ |
|||||||
|
#-*-coding utf-8-*- |
||||||
|
# Общественное достояние 2023, Алексей Безбородов (Alexei Bezborodov) <AlexeiBv+mirocod_platform_bot@narod.ru> |
||||||
|
|
||||||
|
# --------------------------------------------------------- |
||||||
|
# Логирование событий в список |
||||||
|
|
||||||
|
# Четыре типа уведомлений: |
||||||
|
# Info - Информация. |
||||||
|
# Warn - Предупреждение. |
||||||
|
# Error - Ошибка. |
||||||
|
# Success - Успех. |
||||||
|
|
||||||
|
class TestLog: |
||||||
|
def __init__(self): |
||||||
|
self.m_LogMessage = [] |
||||||
|
|
||||||
|
def GetLogMessage(): |
||||||
|
result = self.m_LogMessage |
||||||
|
self.m_LogMessage = [] |
||||||
|
return result |
||||||
|
|
||||||
|
def GetFileName(self): |
||||||
|
return self.m_FileName |
||||||
|
|
||||||
|
def GetTimeNow(self): |
||||||
|
return datetime.datetime.now() |
||||||
|
|
||||||
|
def GetTime(self): |
||||||
|
now = self.GetTimeNow() |
||||||
|
time = now.strftime(f"[%d.%m.%Y, %H:%M]") |
||||||
|
return time |
||||||
|
|
||||||
|
def Info(self, a_LogMessage): |
||||||
|
time = self.GetTime() |
||||||
|
self.WriteToList(f'{time} | {a_LogMessage}') |
||||||
|
|
||||||
|
def Warn(self, a_LogMessage): |
||||||
|
time = self.GetTime() |
||||||
|
self.WriteToList(f'{time} | {a_LogMessage}') |
||||||
|
|
||||||
|
def Error(self, a_LogMessage): |
||||||
|
time = self.GetTime() |
||||||
|
self.WriteToList(f'{time} | {a_LogMessage}') |
||||||
|
|
||||||
|
def Success(self, a_LogMessage): |
||||||
|
time = self.GetTime() |
||||||
|
self.WriteToList(f'{time} | {a_LogMessage}') |
||||||
|
|
||||||
|
def WriteToList(self, a_LogMessage): |
||||||
|
self.m_LogMessage += [a_LogMessage] |
||||||
|
|
Loading…
Reference in new issue