Skip to content
This repository has been archived by the owner on Sep 10, 2023. It is now read-only.

Commit

Permalink
Merge pull request #41 from ConorIA/code_cleanup
Browse files Browse the repository at this point in the history
Code cleanup (replaces #8, #34, #39, #40, fixes #35, #38)

Mega PR from @ConorIA. Glad to merge. Thanks again!
  • Loading branch information
tommeagher authored Nov 11, 2017
2 parents e081b1c + 3af89c6 commit 2decedc
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 93 deletions.
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This project should work in the latest releases of Python 2.7 and Python 3. By d

## Configuring

There are several parameters that control the behavior of the bot. You can adjust them by setting them in your `local_settings.py` file.
There are several parameters that control the behavior of the bot. You can adjust them by setting them in your `local_settings.py` file.

```
ODDS = 8
Expand All @@ -42,14 +42,30 @@ ORDER = 2

The ORDER variable represents the Markov index, which is a measure of associativity in the generated Markov chains. 2 is generally more incoherent and 3 or 4 is more lucid. I tend to stick with 2.

### Additional sources

This bot was originally designed to pull tweets from a Twitter account, however, it can also process comma-separated text in a text file, or scrape content from the web.

#### Static Text
To use a local text file, set `STATIC_TEST = True` and specify the name of a text file containing comma-separated "tweets" as `TEST_SOURCE`.

#### Web Content
To scrape content from the web, set `SCRAPE_URL` to `True`. This bot makes use of the [`find_all()` method](https://www.crummy.com/software/BeautifulSoup/bs4/doc/#find-all) of Python's BeautfulSoup library. The implementation of this method requires the definition of three inputs in `local_settings.py`.

1. A list of URLs to scrape as `SRC_URL`.
2. A list, `WEB_CONTEXT`, of the [names](https://www.crummy.com/software/BeautifulSoup/bs4/doc/#id11) of the elements to extract from the corresponding URL. This can be "div", "h1" for level-one headings, "a" for links, etc. If you wish to search for more than one name for a single page, repeat the URL in the `SRC_URL` list for as many names as you wish to extract.
3. A list, `WEB_ATTRIBUTES` of dictionaries containing [attributes](https://www.crummy.com/software/BeautifulSoup/bs4/doc/#attrs) to filter by. For instance, to limit the search to divs of class "title", one would pass the directory: `{"class": "title"}`. Use an empty dictionary, `{}`, for any page and name for which you don't wish to specify attributes.

__Note:__ Web scraping is experimental and may give you unexpected results. Make sure to test the bot in debugging mode before publishing.

## Debugging

If you want to test the script or to debug the tweet generation, you can skip the random number generation and not publish the resulting tweets to Twitter.

First, adjust the `DEBUG` variable in `local_settings.py`.

```
DEBUG = True
DEBUG = True
```

After that, commit the change and `git push heroku master`. Then run the command `heroku run worker` on the command line and watch what happens.
Expand Down
177 changes: 108 additions & 69 deletions ebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@
import sys
import twitter
import markov
from bs4 import BeautifulSoup
try:
# Python 3
from html.entities import name2codepoint as n2c
from urllib.request import urlopen
except ImportError:
# Python 2
from htmlentitydefs import name2codepoint as n2c
from urllib2 import urlopen
chr = unichr
from local_settings import *


def connect():
api = twitter.Api(consumer_key=MY_CONSUMER_KEY,
consumer_secret=MY_CONSUMER_SECRET,
access_token_key=MY_ACCESS_TOKEN_KEY,
access_token_secret=MY_ACCESS_TOKEN_SECRET)
return api
return twitter.Api(consumer_key=MY_CONSUMER_KEY,
consumer_secret=MY_CONSUMER_SECRET,
access_token_key=MY_ACCESS_TOKEN_KEY,
access_token_secret=MY_ACCESS_TOKEN_SECRET)


def entity(text):
if text[:2] == "&#":
Expand All @@ -34,119 +38,154 @@ def entity(text):
try:
text = chr(numero)
except KeyError:
pass
pass
return text


def filter_tweet(tweet):
tweet.text = re.sub(r'\b(RT|MT) .+','',tweet.text) #take out anything after RT or MT
tweet.text = re.sub(r'(\#|@|(h\/t)|(http))\S+','',tweet.text) #Take out URLs, hashtags, hts, etc.
tweet.text = re.sub(r'\n','', tweet.text) #take out new lines.
tweet.text = re.sub(r'\"|\(|\)', '', tweet.text) #take out quotes.
tweet.text = re.sub(r'\s+\(?(via|says)\s@\w+\)?', '', tweet.text) # remove attribution
tweet.text = re.sub(r'\b(RT|MT) .+', '', tweet.text) # take out anything after RT or MT
tweet.text = re.sub(r'(\#|@|(h\/t)|(http))\S+', '', tweet.text) # Take out URLs, hashtags, hts, etc.
tweet.text = re.sub('\s+', ' ', tweet.text) # collaspse consecutive whitespace to single spaces.
tweet.text = re.sub(r'\"|\(|\)', '', tweet.text) # take out quotes.
tweet.text = re.sub(r'\s+\(?(via|says)\s@\w+\)?', '', tweet.text) # remove attribution
htmlsents = re.findall(r'&\w+;', tweet.text)
if len(htmlsents) > 0 :
for item in htmlsents:
tweet.text = re.sub(item, entity(item), tweet.text)
tweet.text = re.sub(r'\xe9', 'e', tweet.text) #take out accented e
for item in htmlsents:
tweet.text = tweet.text.replace(item, entity(item))
tweet.text = re.sub(r'\xe9', 'e', tweet.text) # take out accented e
return tweet.text





def scrape_page(src_url, web_context, web_attributes):
tweets = []
last_url = ""
for i in range(len(src_url)):
if src_url[i] != last_url:
last_url = src_url[i]
print(">>> Scraping {0}".format(src_url[i]))
try:
page = urlopen(src_url[i])
except Exception:
last_url = "ERROR"
import traceback
print(">>> Error scraping {0}:".format(src_url[i]))
print(traceback.format_exc())
continue
soup = BeautifulSoup(page, 'html.parser')
hits = soup.find_all(web_context[i], attrs=web_attributes[i])
if not hits:
print(">>> No results found!")
continue
else:
errors = 0
for hit in hits:
try:
tweet = str(hit.text).strip()
except (UnicodeEncodeError, UnicodeDecodeError):
errors += 1
continue
if tweet:
tweets.append(tweet)
if errors > 0:
print(">>> We had trouble reading {} result{}.".format(errors, "s" if errors > 1 else ""))
return(tweets)


def grab_tweets(api, max_id=None):
source_tweets=[]
source_tweets = []
user_tweets = api.GetUserTimeline(screen_name=user, count=200, max_id=max_id, include_rts=True, trim_user=True, exclude_replies=True)
max_id = user_tweets[len(user_tweets)-1].id-1
max_id = user_tweets[-1].id - 1
for tweet in user_tweets:
tweet.text = filter_tweet(tweet)
if re.search(SOURCE_EXCLUDE, tweet.text):
continue
if len(tweet.text) != 0:
if tweet.text:
source_tweets.append(tweet.text)
return source_tweets, max_id

if __name__=="__main__":

if __name__ == "__main__":
order = ORDER
if DEBUG==False:
guess = random.choice(range(ODDS))
else:
guess = 0
guess = 0
if ODDS and not DEBUG:
guess = random.randint(0, ODDS - 1)

if guess == 0:
if STATIC_TEST==True:
if guess:
print(str(guess) + " No, sorry, not this time.") # message if the random number fails.
sys.exit()
else:
api = connect()
source_tweets = []
if STATIC_TEST:
file = TEST_SOURCE
print(">>> Generating from {0}".format(file))
string_list = open(file).readlines()
for item in string_list:
source_tweets = item.split(",")
else:
source_tweets = []
source_tweets += item.split(",")
if SCRAPE_URL:
source_tweets += scrape_page(SRC_URL, WEB_CONTEXT, WEB_ATTRIBUTES)
if SOURCE_ACCOUNTS and len(SOURCE_ACCOUNTS[0]) > 0:
twitter_tweets = []
for handle in SOURCE_ACCOUNTS:
user=handle
api=connect()
user = handle
handle_stats = api.GetUser(screen_name=user)
status_count = handle_stats.statuses_count
max_id=None
if status_count<3200:
my_range = (status_count/200) + 1
else:
my_range = 17
for x in range(my_range)[1:]:
source_tweets_iter, max_id = grab_tweets(api,max_id)
source_tweets += source_tweets_iter
print("{0} tweets found in {1}".format(len(source_tweets), handle))
if len(source_tweets) == 0:
max_id = None
my_range = min(17, int((status_count/200) + 1))
for x in range(1, my_range):
twitter_tweets_iter, max_id = grab_tweets(api, max_id)
twitter_tweets += twitter_tweets_iter
print("{0} tweets found in {1}".format(len(twitter_tweets), handle))
if not twitter_tweets:
print("Error fetching tweets from Twitter. Aborting.")
sys.exit()
else:
source_tweets += twitter_tweets
mine = markov.MarkovChainer(order)
for tweet in source_tweets:
if re.search('([\.\!\?\"\']$)', tweet):
pass
else:
tweet+="."
if not re.search('([\.\!\?\"\']$)', tweet):
tweet += "."
mine.add_text(tweet)
for x in range(0,10):

for x in range(0, 10):
ebook_tweet = mine.generate_sentence()

#randomly drop the last word, as Horse_ebooks appears to do.
if random.randint(0,4) == 0 and re.search(r'(in|to|from|for|with|by|our|of|your|around|under|beyond)\s\w+$', ebook_tweet) != None:
print("Losing last word randomly")
ebook_tweet = re.sub(r'\s\w+.$','',ebook_tweet)
print(ebook_tweet)
#if a tweet is very short, this will randomly add a second sentence to it.
if ebook_tweet != None and len(ebook_tweet) < 40:
rando = random.randint(0,10)
if rando == 0 or rando == 7:
# randomly drop the last word, as Horse_ebooks appears to do.
if random.randint(0, 4) == 0 and re.search(r'(in|to|from|for|with|by|our|of|your|around|under|beyond)\s\w+$', ebook_tweet) is not None:
print("Losing last word randomly")
ebook_tweet = re.sub(r'\s\w+.$', '', ebook_tweet)
print(ebook_tweet)

# if a tweet is very short, this will randomly add a second sentence to it.
if ebook_tweet is not None and len(ebook_tweet) < 40:
rando = random.randint(0, 10)
if rando == 0 or rando == 7:
print("Short tweet. Adding another sentence randomly")
newer_tweet = mine.generate_sentence()
if newer_tweet != None:
if newer_tweet is not None:
ebook_tweet += " " + mine.generate_sentence()
else:
ebook_tweet = ebook_tweet
elif rando == 1:
#say something crazy/prophetic in all caps
# say something crazy/prophetic in all caps
print("ALL THE THINGS")
ebook_tweet = ebook_tweet.upper()

#throw out tweets that match anything from the source account.
if ebook_tweet != None and len(ebook_tweet) < 110:
# throw out tweets that match anything from the source account.
if ebook_tweet is not None and len(ebook_tweet) < 110:
for tweet in source_tweets:
if ebook_tweet[:-1] not in tweet:
continue
else:
else:
print("TOO SIMILAR: " + ebook_tweet)
sys.exit()
if DEBUG == False:

if not DEBUG:
status = api.PostUpdate(ebook_tweet)
print(status.text.encode('utf-8'))
else:
print(ebook_tweet)

elif ebook_tweet == None:
elif not ebook_tweet:
print("Tweet is empty, sorry.")
else:
print("TOO LONG: " + ebook_tweet)
else:
print(str(guess) + " No, sorry, not this time.") #message if the random number fails.
22 changes: 13 additions & 9 deletions local_settings_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
Local Settings for a heroku_ebooks account. #fill in the name of the account you're tweeting from here.
'''

#configuration
# Configuration
MY_CONSUMER_KEY = 'Your Twitter API Consumer Key'
MY_CONSUMER_SECRET = 'Your Consumer Secret Key'
MY_ACCESS_TOKEN_KEY = 'Your Twitter API Access Token Key'
MY_ACCESS_TOKEN_SECRET = 'Your Access Token Secret'

SOURCE_ACCOUNTS = [""] #A list of comma-separated, quote-enclosed Twitter handles of account that you'll generate tweets based on. It should look like ["account1", "account2"]. If you want just one account, no comma needed.
ODDS = 8 #How often do you want this to run? 1/8 times?
ORDER = 2 #how closely do you want this to hew to sensical? 2 is low and 4 is high.
SOURCE_EXCLUDE = r'^$' #Source tweets that match this regexp will not be added to the Markov chain. You might want to filter out inappropriate words for example.
DEBUG = True #Set this to False to start Tweeting live
STATIC_TEST = False #Set this to True if you want to test Markov generation from a static file instead of the API.
TEST_SOURCE = ".txt" #The name of a text file of a string-ified list for testing. To avoid unnecessarily hitting Twitter API. You can use the included testcorpus.txt, if needed.
TWEET_ACCOUNT = "" #The name of the account you're tweeting to.
SOURCE_ACCOUNTS = [""] # A list of comma-separated, quote-enclosed Twitter handles of account that you'll generate tweets based on. It should look like ["account1", "account2"]. If you want just one account, no comma needed.
ODDS = 8 # How often do you want this to run? 1/8 times?
ORDER = 2 # How closely do you want this to hew to sensical? 2 is low and 4 is high.
SOURCE_EXCLUDE = r'^$' # Source tweets that match this regexp will not be added to the Markov chain. You might want to filter out inappropriate words for example.
DEBUG = True # Set this to False to start Tweeting live
STATIC_TEST = False # Set this to True if you want to test Markov generation from a static file instead of the API.
TEST_SOURCE = ".txt" # The name of a text file of a string-ified list for testing. To avoid unnecessarily hitting Twitter API. You can use the included testcorpus.txt, if needed.
SCRAPE_URL = False # Set this to true to scrape a webpage.
SRC_URL = ['http://www.example.com/one', 'https://www.example.com/two'] # A comma-separated list of URLs to scrape
WEB_CONTEXT = ['span', 'h2'] # A comma-separated list of the tag or object to search for in each page above.
WEB_ATTRIBUTES = [{'class': 'example-text'}, {}] # A list of dictionaries containing the attributes for each page.
TWEET_ACCOUNT = "" # The name of the account you're tweeting to.
26 changes: 14 additions & 12 deletions markov.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import random
import re


class MarkovChainer(object):
def __init__(self, order):
self.order=order
self.order = order
self.beginnings = []
self.freq = {}

#pass a string with a terminator to the function to add it to the markov lists.
# pass a string with a terminator to the function to add it to the markov lists.
def add_sentence(self, string, terminator):
data = "".join(string)
words = data.split()
buf = []
if len(words) > self.order:
words.append(terminator)
self.beginnings.append(words[0:self.order])
self.beginnings.append(words[0:self.order])
else:
pass

for word in words:
buf.append(word)
if len(buf) == self.order + 1:
Expand All @@ -44,21 +45,21 @@ def add_text(self, text):
else:
sentence = piece

#Generate the goofy sentences that become your tweet.
# Generate the goofy sentences that become your tweet.
def generate_sentence(self):
res = random.choice(self.beginnings)
res = res[:]
if len(res)==self.order:
if len(res) == self.order:
nw = True
while nw != None:
while nw is not None:
restup = (res[-2], res[-1])
try:
nw = self.next_word_for(restup)
if nw != None:
if nw is not None:
res.append(nw)
else:
continue
except:
except Exception:
nw = False
new_res = res[0:-2]
if new_res[0].istitle() or new_res[0].isupper():
Expand All @@ -68,7 +69,7 @@ def generate_sentence(self):
sentence = ""
for word in new_res:
sentence += word + " "
sentence += res[-2] + res[-1]
sentence += res[-2] + ("" if res[-1] in ".!?;:" else " ") + res[-1]

else:
sentence = None
Expand All @@ -79,8 +80,9 @@ def next_word_for(self, words):
arr = self.freq[words]
next_words = random.choice(arr)
return next_words
except:
return None
except Exception:
return None


if __name__ == "__main__":
print("Try running ebooks.py first")
Loading

0 comments on commit 2decedc

Please sign in to comment.