Skip to content

Commit

Permalink
Completed #1, shortened multiple lines
Browse files Browse the repository at this point in the history
  • Loading branch information
thelabcat committed May 10, 2024
1 parent a2b84e7 commit 08dd046
Showing 1 changed file with 107 additions and 37 deletions.
144 changes: 107 additions & 37 deletions src/rumchat_actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

#How long to wait after performing any browser action, for the webpage to load its response
BROWSER_ACTION_DELAY = 2
#How long to wait maximum for a condition to be true in the browser
BROWSER_WAIT_TIMEOUT = 30

#How long to wait between sending messages
SEND_MESSAGE_COOLDOWN = 3
Expand Down Expand Up @@ -52,18 +54,21 @@

class ChatCommand():
"""A chat command, internal use only"""
def __init__(self, name, actor, cooldown = BROWSER_ACTION_DELAY, amount_cents = 0, exclusive = False, allowed_badges = ["subscriber"], whitelist_badges = ["moderator"], target = None):
def __init__(self, name, actor, cooldown = SEND_MESSAGE_COOLDOWN, amount_cents = 0, exclusive = False, allowed_badges = ["subscriber"], whitelist_badges = ["moderator"], target = None):
"""name: The !name of the command
actor: The RumleChatActor host object
amount_cents: The minimum cost of the command. Defaults to free
exclusive: If this command can only be run by users with allowed badges. Defaults to False
allowed_badges: Badges that are allowed to run this command (if it is exclusive). Defaults to subscribers, admin is added internally.
allowed_badges: Badges that are allowed to run this command (if it is exclusive).
Defaults to subscribers, admin is added internally.
whitelist_badges: Badges which if borne give the user free-of-charge command access
target: The function(message, actor) to call on successful command usage. Defaults to self.run"""
target: The command function(message, actor) to call. Defaults to self.run"""
assert " " not in name, "Name cannot contain spaces"
self.name = name
self.actor = actor
assert cooldown >= BROWSER_ACTION_DELAY, f"Cannot set a cooldown shorter than {BROWSER_ACTION_DELAY}"
assert cooldown >= SEND_MESSAGE_COOLDOWN, \
f"Cannot set a cooldown shorter than {SEND_MESSAGE_COOLDOWN}"

self.cooldown = cooldown
self.amount_cents = amount_cents #Cost of the command
self.exclusive = exclusive
Expand All @@ -75,19 +80,31 @@ def __init__(self, name, actor, cooldown = BROWSER_ACTION_DELAY, amount_cents =
def call(self, message):
"""The command was called"""
#this command is exclusive, and the user does not have the required badge
if self.exclusive and not (True in [badge.slug in self.allowed_badges for badge in message.user.badges]):
self.actor.send_message(f"@{message.user.username} That command is exclusive to: " + ", ".join(self.allowed_badges))
if self.exclusive and \
not (True in [badge.slug in self.allowed_badges for badge in message.user.badges]):

self.actor.send_message(f"@{message.user.username} That command is exclusive to: " +
", ".join(self.allowed_badges)
)

return

#The command is still on cooldown
if (curtime := time.time()) - self.last_use_time < self.cooldown:
self.actor.send_message(f"@{message.user.username} That command is still on cooldown. Try again in {int(self.last_use_time + self.cooldown - curtime + 0.5)} seconds.")
self.actor.send_message(
f"@{message.user.username} That command is still on cooldown. " +
f"Try again in {int(self.last_use_time + self.cooldown - curtime + 0.5)} seconds."
)

return

#the user did not pay enough for the command and they do not have a free pass
if message.rant_price_cents < self.amount_cents and not (True in [badge.slug in self.whitelist_badges for badge in message.user.badges]):
self.actor.send_message(f"@{message.user.username} That command costs ${self.amount_cents/100:.2f}.")
if message.rant_price_cents < self.amount_cents and \
not (True in [badge.slug in self.whitelist_badges for badge in message.user.badges]):

self.actor.send_message("@" + message.user.username +
f"That command costs ${self.amount_cents/100:.2f}."
)
return

#the command was called successfully
Expand All @@ -103,7 +120,9 @@ def run(self, message):
return

#Run method was never defined
self.actor.send_message(f"@{message.user.username} Hello, this command never had a target defined. :-)")
self.actor.send_message("@" + message.user.username +
"Hello, this command never had a target defined. :-)"
)

class RumbleChatActor():
"""Actor that interacts with Rumble chat"""
Expand All @@ -116,7 +135,7 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof
streamer_username: The username of the person streaming
streamer_channel: The channel doing the livestream
is_channel_stream: If the livestream is on a channel or not
ignore_users: List of usernames, will ignore all their messages, useful if you are using TheRumbleBot"""
ignore_users: List of usernames, will ignore all their messages"""

#The info of the person streaming
self.__streamer_username = streamer_username
Expand All @@ -133,7 +152,8 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof
if stream_id:
self.stream_id, self.stream_id_b10 = utils.stream_id_36_and_10(stream_id)

#It is not our livestream or we have no Live Stream API, LS API functions are not available
#It is not our livestream or we have no Live Stream API,
#so LS API functions are not available
if not self.rum_api or self.stream_id not in self.rum_api.livestreams:
self.api_stream = None

Expand Down Expand Up @@ -173,17 +193,29 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof
#We have credentials
if username and password:
sign_in_buttn.click()
time.sleep(BROWSER_ACTION_DELAY)
self.browser.find_element(By.ID, "login-username").send_keys(username + Keys.RETURN)
WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until(
EC.visibility_of_element_located(By.ID, "login-username"),
"Timed out waiting for sign-in dialouge"
)

uname_field = self.browser.find_element(By.ID, "login-username")
uname_field.send_keys(username + Keys.RETURN)
self.browser.find_element(By.ID, "login-password").send_keys(password + Keys.RETURN)
break #We only need to do that once

#We do not have credentials, ask for manual sign in
self.browser.maximize_window()
input("Please log in at the browser, then press enter here.")

#Wait for signed in loading to complete
time.sleep(BROWSER_ACTION_DELAY)
#Wait for signed in loading to complete
WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until(
EC.invisibility_of_element(uname_field),
"Timeout waiting for username field to disappear"
)

WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until(
EC.visibility_of_element_located(By.ID, "chat-message-text-input"),
"Timed out waiting for chat message field to appear"
)

#Find our username
if username:
Expand All @@ -198,9 +230,6 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof
#Ignore these users when processing messages
self.ignore_users = ignore_users

#Wait for potential further page load?
time.sleep(BROWSER_ACTION_DELAY)

#History of the bot's messages so they do not get loop processed
self.sent_messages = []

Expand All @@ -224,9 +253,11 @@ def __init__(self, stream_id = None, init_message = "Hello, Rumble world!", prof
while (m := self.ssechat.get_message()).user.username != self.username:
pass

assert "moderator" in m.user.badges or "admin" in m.user.badges, "Actor cannot function without being a moderator"
assert "moderator" in m.user.badges or "admin" in m.user.badges, \
"Actor cannot function without being a moderator"

#Functions that are to be called on each message, must return False if the message was deleted
#Functions that are to be called on each message,
#must return False if the message was deleted
self.message_actions = []

#Instances of RumbleChatCommand, by name
Expand All @@ -252,7 +283,10 @@ def streamer_channel(self):
#We are the ones streaming, and the API URL is under the channel
if self.api_stream and self.rum_api.channel_name:
self.__streamer_channel = self.rum_api.channel_name
#We are not the ones streaming, or the API URL was not under our channel, and we can confirm this is a channel stream

#We are not the ones streaming,
#or the API URL was not under our channel,
#and we are sure this is a channel stream
else:
self.__streamer_channel = input("Enter the channel of the person streaming: ")

Expand Down Expand Up @@ -307,7 +341,9 @@ def _sender_loop(self):

def __send_message(self, text):
"""Send a message in chat"""
assert len(text) < MAX_MESSAGE_LEN, f"Message with prefix cannot be longer than {MAX_MESSAGE_LEN} characters"
assert len(text) < MAX_MESSAGE_LEN, \
f"Message with prefix cannot be longer than {MAX_MESSAGE_LEN} characters"

self.sent_messages.append(text)
self.last_message_send_time = time.time()
self.browser.find_element(By.ID, "chat-message-text-input").send_keys(text + Keys.RETURN)
Expand All @@ -327,12 +363,20 @@ def open_moderation_menu(self, message):
#Find the message by ID
elif isinstance(message, int):
message_id = message
message_li = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-message-id='{message_id}']")
message_li = self.browser.find_element(
By.XPATH,
"//li[@class='chat-history--row js-chat-history-item']" +
f"[@data-message-id='{message_id}']"
)

#The message has a message ID attribute
elif hasattr(message, "message_id"):
message_id = message.message_id
message_li = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-message-id='{message_id}']")
message_li = self.browser.find_element(
By.XPATH,
"//li[@class='chat-history--row js-chat-history-item']" +
f"[@data-message-id='{message_id}']"
)

#Not a valid message type
else:
Expand All @@ -341,7 +385,10 @@ def open_moderation_menu(self, message):
#Hover over the message
self.hover_element(message_li)
#Find the moderation menu
menu_bttn = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-message-id='{message_id}']/button[@class='js-moderate-btn chat-history--kebab-button']")
menu_bttn = message_li.find_element(
By.XPATH,
".//button[@class='js-moderate-btn chat-history--kebab-button']"
)
#Click the moderation menu button
menu_bttn.click()

Expand All @@ -350,23 +397,40 @@ def open_moderation_menu(self, message):
def delete_message(self, message):
"""Delete a message in the chat"""
m_id = self.open_moderation_menu(message)
del_bttn = self.browser.find_element(By.XPATH, f"//button[@class='cmi js-btn-delete-current'][@data-message-id='{m_id}']")
del_bttn = self.browser.find_element(
By.XPATH,
f"//button[@class='cmi js-btn-delete-current'][@data-message-id='{m_id}']"
)

del_bttn.click()
time.sleep(BROWSER_ACTION_DELAY)

#Wait for the confirmation to appear
WebDriverWait(self.browser, BROWSER_WAIT_TIMEOUT).until(
EC.alert_is_present(),
"Timed out waiting for deletion confirmation dialouge to appear"
)

#Confirm the confirmation dialog
Alert(self.browser).accept()

def mute_by_message(self, message, mute_level = "5"):
"""Mute a user by message"""
self.open_moderation_menu(message)
timeout_bttn = self.browser.find_element(By.XPATH, f"//button[@class='{MUTE_LEVELS[mute_level]}']")
timeout_bttn = self.browser.find_element(
By.XPATH,
f"//button[@class='{MUTE_LEVELS[mute_level]}']"
)

timeout_bttn.click()

def mute_by_appearname(self, name, mute_level = "5"):
"""Mute a user by the name they are appearing with"""
#Find any chat message by this user
message_li = self.browser.find_element(By.XPATH, f"//li[@class='chat-history--row js-chat-history-item'][@data-username='{name}']")
message_li = self.browser.find_element(
By.XPATH,
f"//li[@class='chat-history--row js-chat-history-item'][@data-username='{name}']"
)

self.mute_by_message(message = message_li, mute_level = mute_level)

def pin_message(self, message):
Expand All @@ -378,7 +442,11 @@ def pin_message(self, message):
def unpin_message(self):
"""Unpin the currently pinned message"""
try:
unpin_bttn = self.browser.find_element(By.XPATH, "//button[@data-js='remove_pinned_message_button']")
unpin_bttn = self.browser.find_element(
By.XPATH,
"//button[@data-js='remove_pinned_message_button']"
)

except selenium.common.exceptions.NoSuchElementException:
return False #No message was pinned

Expand Down Expand Up @@ -412,7 +480,8 @@ def register_command(self, command, name = None):
"""Register a command"""
#Is a ChatCommand instance
if isinstance(command, ChatCommand):
assert not name or name == command.name, "ChatCommand instance has different name than one passed"
assert not name or name == command.name, \
"ChatCommand instance has different name than one passed"
self.chat_commands[command.name] = command

#Is a callable
Expand All @@ -435,7 +504,7 @@ def __process_message(self, message):
if message.text in self.sent_messages:
return

#If the message is from the same account as us, reset our message cooldown from the message send time if it is newer
#If the message is from the same account as us, consider it in message send cooldown
if message.user.username == self.username:
self.last_message_send_time = max((self.last_message_send_time, message.time))

Expand All @@ -444,7 +513,8 @@ def __process_message(self, message):
return

for action in self.message_actions:
if message.message_id in self.ssechat.deleted_message_ids or not action(message, self): #The message got deleted, possibly by this action
#The message got deleted, possibly by this action
if message.message_id in self.ssechat.deleted_message_ids or not action(message, self):
return

self.__run_if_command(message)
Expand Down

0 comments on commit 08dd046

Please sign in to comment.