From 08dd04666be4465a6d6b25cea168051c67617f7f Mon Sep 17 00:00:00 2001 From: Wilbur Jaywright Date: Fri, 10 May 2024 10:04:55 -0400 Subject: [PATCH] Completed #1, shortened multiple lines --- src/rumchat_actor/__init__.py | 144 +++++++++++++++++++++++++--------- 1 file changed, 107 insertions(+), 37 deletions(-) diff --git a/src/rumchat_actor/__init__.py b/src/rumchat_actor/__init__.py index ec4f6f2..614d54d 100644 --- a/src/rumchat_actor/__init__.py +++ b/src/rumchat_actor/__init__.py @@ -15,9 +15,11 @@ from selenium.webdriver.common.alert import Alert from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait -#How long to wait after performing any browser action, for the webpage to load its response -BROWSER_ACTION_DELAY = 2 +#How long to wait maximum for a condition to be true in the browser +BROWSER_WAIT_TIMEOUT = 30 #How long to wait between sending messages SEND_MESSAGE_COOLDOWN = 3 @@ -52,18 +54,21 @@ class ChatCommand(): """A chat command, internal use only""" - def __init__(self, name, actor, cooldown = BROWSER_ACTION_DELAY, amount_cents = 0, exclusive = False, allowed_badges = ["subscriber"], whitelist_badges = ["moderator"], target = None): + def __init__(self, name, actor, cooldown = SEND_MESSAGE_COOLDOWN, amount_cents = 0, exclusive = False, allowed_badges = ["subscriber"], whitelist_badges = ["moderator"], target = None): """name: The !name of the command actor: The RumleChatActor host object amount_cents: The minimum cost of the command. Defaults to free exclusive: If this command can only be run by users with allowed badges. Defaults to False - allowed_badges: Badges that are allowed to run this command (if it is exclusive). Defaults to subscribers, admin is added internally. + allowed_badges: Badges that are allowed to run this command (if it is exclusive). + Defaults to subscribers, admin is added internally. whitelist_badges: Badges which if borne give the user free-of-charge command access - target: The function(message, actor) to call on successful command usage. Defaults to self.run""" + target: The command function(message, actor) to call. Defaults to self.run""" assert " " not in name, "Name cannot contain spaces" self.name = name self.actor = actor - assert cooldown >= BROWSER_ACTION_DELAY, f"Cannot set a cooldown shorter than {BROWSER_ACTION_DELAY}" + assert cooldown >= SEND_MESSAGE_COOLDOWN, \ + f"Cannot set a cooldown shorter than {SEND_MESSAGE_COOLDOWN}" + self.cooldown = cooldown self.amount_cents = amount_cents #Cost of the command self.exclusive = exclusive @@ -75,19 +80,31 @@ def __init__(self, name, actor, cooldown = BROWSER_ACTION_DELAY, amount_cents = def call(self, message): """The command was called""" #this command is exclusive, and the user does not have the required badge - if self.exclusive and not (True in [badge.slug in self.allowed_badges for badge in message.user.badges]): - self.actor.send_message(f"@{message.user.username} That command is exclusive to: " + ", ".join(self.allowed_badges)) + if self.exclusive and \ + not (True in [badge.slug in self.allowed_badges for badge in message.user.badges]): + + self.actor.send_message(f"@{message.user.username} That command is exclusive to: " + + ", ".join(self.allowed_badges) + ) + return #The command is still on cooldown if (curtime := time.time()) - self.last_use_time < self.cooldown: - self.actor.send_message(f"@{message.user.username} That command is still on cooldown. Try again in {int(self.last_use_time + self.cooldown - curtime + 0.5)} seconds.") + self.actor.send_message( + f"@{message.user.username} That command is still on cooldown. " + + f"Try again in {int(self.last_use_time + self.cooldown - curtime + 0.5)} seconds." + ) return #the user did not pay enough for the command and they do not have a free pass - if message.rant_price_cents < self.amount_cents and not (True in [badge.slug in self.whitelist_badges for badge in message.user.badges]): - self.actor.send_message(f"@{message.user.username} That command costs ${self.amount_cents/100:.2f}.") + if message.rant_price_cents < self.amount_cents and \ + not (True in [badge.slug in self.whitelist_badges for badge in message.user.badges]): + + self.actor.send_message("@" + message.user.username + + f"That command costs ${self.amount_cents/100:.2f}." + ) return #the command was called successfully @@ -103,7 +120,9 @@ def run(self, message): return #Run method was never defined - self.actor.send_message(f"@{message.user.username} Hello, this command never had a target defined. :-)") + self.actor.send_message("@" + message.user.username + + "Hello, this command never had a target defined. :-)" + ) class RumbleChatActor(): """Actor that interacts with Rumble chat""" @@ -116,7 +135,7 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof streamer_username: The username of the person streaming streamer_channel: The channel doing the livestream is_channel_stream: If the livestream is on a channel or not - ignore_users: List of usernames, will ignore all their messages, useful if you are using TheRumbleBot""" + ignore_users: List of usernames, will ignore all their messages""" #The info of the person streaming self.__streamer_username = streamer_username @@ -133,7 +152,8 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof if stream_id: self.stream_id, self.stream_id_b10 = utils.stream_id_36_and_10(stream_id) - #It is not our livestream or we have no Live Stream API, LS API functions are not available + #It is not our livestream or we have no Live Stream API, + #so LS API functions are not available if not self.rum_api or self.stream_id not in self.rum_api.livestreams: self.api_stream = None @@ -173,17 +193,29 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof #We have credentials if username and password: sign_in_buttn.click() - time.sleep(BROWSER_ACTION_DELAY) - self.browser.find_element(By.ID, "login-username").send_keys(username + Keys.RETURN) + WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until( + EC.visibility_of_element_located(By.ID, "login-username"), + "Timed out waiting for sign-in dialouge" + ) + + uname_field = self.browser.find_element(By.ID, "login-username") + uname_field.send_keys(username + Keys.RETURN) self.browser.find_element(By.ID, "login-password").send_keys(password + Keys.RETURN) - break #We only need to do that once #We do not have credentials, ask for manual sign in self.browser.maximize_window() input("Please log in at the browser, then press enter here.") - #Wait for signed in loading to complete - time.sleep(BROWSER_ACTION_DELAY) + #Wait for signed in loading to complete + WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until( + EC.invisibility_of_element(uname_field), + "Timeout waiting for username field to disappear" + ) + + WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until( + EC.visibility_of_element_located(By.ID, "chat-message-text-input"), + "Timed out waiting for chat message field to appear" + ) #Find our username if username: @@ -198,9 +230,6 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof #Ignore these users when processing messages self.ignore_users = ignore_users - #Wait for potential further page load? - time.sleep(BROWSER_ACTION_DELAY) - #History of the bot's messages so they do not get loop processed self.sent_messages = [] @@ -224,9 +253,11 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof while (m := self.ssechat.get_message()).user.username != self.username: pass - assert "moderator" in m.user.badges or "admin" in m.user.badges, "Actor cannot function without being a moderator" + assert "moderator" in m.user.badges or "admin" in m.user.badges, \ + "Actor cannot function without being a moderator" - #Functions that are to be called on each message, must return False if the message was deleted + #Functions that are to be called on each message, + #must return False if the message was deleted self.message_actions = [] #Instances of RumbleChatCommand, by name @@ -252,7 +283,10 @@ def streamer_channel(self): #We are the ones streaming, and the API URL is under the channel if self.api_stream and self.rum_api.channel_name: self.__streamer_channel = self.rum_api.channel_name - #We are not the ones streaming, or the API URL was not under our channel, and we can confirm this is a channel stream + + #We are not the ones streaming, + #or the API URL was not under our channel, + #and we are sure this is a channel stream else: self.__streamer_channel = input("Enter the channel of the person streaming: ") @@ -307,7 +341,9 @@ def _sender_loop(self): def __send_message(self, text): """Send a message in chat""" - assert len(text) < MAX_MESSAGE_LEN, f"Message with prefix cannot be longer than {MAX_MESSAGE_LEN} characters" + assert len(text) < MAX_MESSAGE_LEN, \ + f"Message with prefix cannot be longer than {MAX_MESSAGE_LEN} characters" + self.sent_messages.append(text) self.last_message_send_time = time.time() self.browser.find_element(By.ID, "chat-message-text-input").send_keys(text + Keys.RETURN) @@ -327,12 +363,20 @@ def open_moderation_menu(self, message): #Find the message by ID elif isinstance(message, int): message_id = message - message_li = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-message-id='{message_id}']") + message_li = self.browser.find_element( + By.XPATH, + "//li[@class='chat-history--row js-chat-history-item']" + + f"[@data-message-id='{message_id}']" + ) #The message has a message ID attribute elif hasattr(message, "message_id"): message_id = message.message_id - message_li = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-message-id='{message_id}']") + message_li = self.browser.find_element( + By.XPATH, + "//li[@class='chat-history--row js-chat-history-item']" + + f"[@data-message-id='{message_id}']" + ) #Not a valid message type else: @@ -341,7 +385,10 @@ def open_moderation_menu(self, message): #Hover over the message self.hover_element(message_li) #Find the moderation menu - menu_bttn = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-message-id='{message_id}']/button[@class='js-moderate-btn chat-history--kebab-button']") + menu_bttn = message_li.find_element( + By.XPATH, + ".//button[@class='js-moderate-btn chat-history--kebab-button']" + ) #Click the moderation menu button menu_bttn.click() @@ -350,9 +397,18 @@ def open_moderation_menu(self, message): def delete_message(self, message): """Delete a message in the chat""" m_id = self.open_moderation_menu(message) - del_bttn = self.browser.find_element(By.XPATH, f"//button[@class='cmi js-btn-delete-current'][@data-message-id='{m_id}']") + del_bttn = self.browser.find_element( + By.XPATH, + f"//button[@class='cmi js-btn-delete-current'][@data-message-id='{m_id}']" + ) + del_bttn.click() - time.sleep(BROWSER_ACTION_DELAY) + + #Wait for the confirmation to appear + WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until( + EC.alert_is_present(), + "Timed out waiting for deletion confirmation dialouge to appear" + ) #Confirm the confirmation dialog Alert(self.browser).accept() @@ -360,13 +416,21 @@ def delete_message(self, message): def mute_by_message(self, message, mute_level = "5"): """Mute a user by message""" self.open_moderation_menu(message) - timeout_bttn = self.browser.find_element(By.XPATH, f"//button[@class='{MUTE_LEVELS[mute_level]}']") + timeout_bttn = self.browser.find_element( + By.XPATH, + f"//button[@class='{MUTE_LEVELS[mute_level]}']" + ) + timeout_bttn.click() def mute_by_appearname(self, name, mute_level = "5"): """Mute a user by the name they are appearing with""" #Find any chat message by this user - message_li = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-username='{name}']") + message_li = self.browser.find_element( + By.XPATH, + f"//li[@class='chat-history--row js-chat-history-item'][@data-username='{name}']" + ) + self.mute_by_message(message = message_li, mute_level = mute_level) def pin_message(self, message): @@ -378,7 +442,11 @@ def pin_message(self, message): def unpin_message(self): """Unpin the currently pinned message""" try: - unpin_bttn = self.browser.find_element(By.XPATH, "//button[@data-js='remove_pinned_message_button']") + unpin_bttn = self.browser.find_element( + By.XPATH, + "//button[@data-js='remove_pinned_message_button']" + ) + except selenium.common.exceptions.NoSuchElementException: return False #No message was pinned @@ -412,7 +480,8 @@ def register_command(self, command, name = None): """Register a command""" #Is a ChatCommand instance if isinstance(command, ChatCommand): - assert not name or name == command.name, "ChatCommand instance has different name than one passed" + assert not name or name == command.name, \ + "ChatCommand instance has different name than one passed" self.chat_commands[command.name] = command #Is a callable @@ -435,7 +504,7 @@ def __process_message(self, message): if message.text in self.sent_messages: return - #If the message is from the same account as us, reset our message cooldown from the message send time if it is newer + #If the message is from the same account as us, consider it in message send cooldown if message.user.username == self.username: self.last_message_send_time = max((self.last_message_send_time, message.time)) @@ -444,7 +513,8 @@ def __process_message(self, message): return for action in self.message_actions: - if message.message_id in self.ssechat.deleted_message_ids or not action(message, self): #The message got deleted, possibly by this action + #The message got deleted, possibly by this action + if message.message_id in self.ssechat.deleted_message_ids or not action(message, self): return self.__run_if_command(message)