Skip to content

Commit

Permalink
API cleanup: objectified Constants, docstrings
Browse files Browse the repository at this point in the history
Instead of a somewhat clumsy class-in-a-class, have Constants as a wholly
independent class with a user-specific instance constructed by the API
object. This simplifies calls to getUser<>Page.
  • Loading branch information
Noiredd committed May 17, 2020
1 parent e1decca commit 497dadf
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 105 deletions.
7 changes: 4 additions & 3 deletions filmatyk/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from math import ceil

import containers
from filmweb import ConnectionError, FilmwebAPI

class Database(object):
def __init__(self, itemtype:str, api:object, callback):
def __init__(self, itemtype:str, api:FilmwebAPI, callback:callable):
self.itemtype = itemtype
self.callback = callback
self.items = []
Expand All @@ -27,7 +28,7 @@ def __iter__(self):

# Serialization-deserialization
@staticmethod
def restoreFromString(itemtype:str, string:str, api:object, callback):
def restoreFromString(itemtype:str, string:str, api:FilmwebAPI, callback:callable):
newDatabase = Database(itemtype, api, callback)
if not string:
# simply return a raw, empty DB
Expand All @@ -47,7 +48,7 @@ def softUpdate(self):
try:
# in case there are network problems
first_request = self.api.getNumOf(self.itemtype)
except self.api.ConnectionError:
except ConnectionError:
self.callback(-1, abort=True) #hide the progress bar
return None
if first_request is None:
Expand Down
217 changes: 122 additions & 95 deletions filmatyk/filmweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,77 @@

import containers

class FilmwebAPI(object):
class Constants(object):
login_path = 'https://ssl.filmweb.pl/j_login'
base_path = 'https://www.filmweb.pl'
auth_error = 'błędny e-mail lub hasło' #TODO: be a bit more intelligent here
main_class = 'userVotesPage__results'
item_class = 'userVotesPage__result'
rating_source = 'userVotes'
rating_stype = 'application/json'
m_count_span = 'blockHeader__titleInfoCount'
s_count_span = 'blockHeader__titleInfoCount'
g_count_span = 'blockHeader__titleInfoCount'
@classmethod
def getUserPage(self, username):
return self.base_path + '/user/' + username
@classmethod
def getUserMoviePage(self, username, page=1):
userpage = self.getUserPage(username)
return userpage + '/films?page={}'.format(page)
@classmethod
def getUserSeriesPage(self, username, page=1):
userpage = self.getUserPage(username)
return userpage + '/serials?page={}'.format(page)
@classmethod
def getUserGamePage(self, username, page=1):
userpage = self.getUserPage(username)
return userpage + '/games?page={}'.format(page)

ConnectionError = requests_html.requests.ConnectionError

ConnectionError = requests_html.requests.ConnectionError

class Constants():
"""URLs and HTML component names for data acquisition.
Create an instance for a given user to generate their specific URLs.
"""
login_path = 'https://ssl.filmweb.pl/j_login'
base_path = 'https://www.filmweb.pl'
main_class = 'userVotesPage__results'
item_class = 'userVotesPage__result'
rating_source = 'userVotes'
rating_stype = 'application/json'
movie_count_span = 'blockHeader__titleInfoCount'
series_count_span = 'blockHeader__titleInfoCount'
game_count_span = 'blockHeader__titleInfoCount'

def __init__(self, username):
self.username = username
self.userpage = self.getUserPage()

def getUserPage(self):
return self.base_path + '/user/' + self.username

def getUserMoviePage(self, page=1):
return self.userpage + '/films?page={}'.format(page)

def getUserSeriesPage(self, page=1):
return self.userpage + '/serials?page={}'.format(page)

def getUserGamePage(self, page=1):
return self.userpage + '/games?page={}'.format(page)


class FilmwebAPI():
"""HTML-based API for acquiring data from Filmweb."""
@staticmethod
def login(username, password):
"""Attempt to acquire an authenticated user session."""
session = requests_html.HTMLSession()
auth_package = {
'j_username': username,
'j_password': password,
}
# Catch connection errors
try:
log = session.post(
FilmwebAPI.Constants.login_path,
data={'j_username': username, 'j_password': password}
)
except FilmwebAPI.ConnectionError:
return (False, True) # login error, connection error
if FilmwebAPI.Constants.auth_error in log.text:
log = session.post(Constants.login_path, data=auth_package)
except ConnectionError:
return (False, True)
# Catch authentication errors
if len(session.cookies) == 0:
print('BŁĄD LOGOWANIA')
return (False, False)
else:
return (True, session)

#@staticmethod -- but not actually a static method, see:
# https://stackoverflow.com/q/21382801/6919631
# https://stackoverflow.com/q/11058686/6919631
#in short: THIS METHOD SHOULD NEVER BE CALLED DIRECTLY
def enforceSession(fun):
#this decorator makes the function being called cause the caller
#(assumed to be the first object in the argument list) to call the
#session check method (which in turn can request a new session)
"""Decorator to mark API functions that require a live session.
It will perform a session check before calling the actual function.
Because it assumes that the first argument of the wrapped function is
a bound FilmwebAPI instance ("self"), it shall only be used with FilmwebAPI
methods.
Because it is meant to be used as a class-level function decorator, it has
no real "self" argument. It is effectively something like a static method.
See the following links for more info:
https://stackoverflow.com/q/21382801/6919631
https://stackoverflow.com/q/11058686/6919631
The bottom line is that it should NEVER be called directly.
"""
def wrapper(*args, **kwargs):
self = args[0]
if self.checkSession():
Expand All @@ -69,43 +86,48 @@ def wrapper(*args, **kwargs):
return None
return wrapper

def __init__(self, callback, username:str=''):
def __init__(self, login_handler, username:str=''):
self.username = username
self.requestLogin = callback
self.constants = Constants(username)
self.login_handler = login_handler
self.session = None
self.parsingRules = {}
self.__cacheAllParsingRules()
for container in containers.classByString.keys():
self.__cacheParsingRules(container)
# bind specific methods and constants to their item types
self.urlGenerationMethods = {
'Movie': self.Constants.getUserMoviePage,
'Series': self.Constants.getUserSeriesPage,
'Game': self.Constants.getUserGamePage
'Movie': self.constants.getUserMoviePage,
'Series': self.constants.getUserSeriesPage,
'Game': self.constants.getUserGamePage
}
self.countSpanClasses = {
'Movie': self.Constants.m_count_span,
'Series': self.Constants.s_count_span,
'Game': self.Constants.g_count_span
'Movie': self.constants.movie_count_span,
'Series': self.constants.series_count_span,
'Game': self.constants.game_count_span
}

def __cacheAllParsingRules(self):
for container in containers.classByString.keys():
self.__cacheParsingRules(container)

def __cacheParsingRules(self, itemtype:str):
#get all the blueprints of a given class
"""Converts parsing rules for a given type into a neater representation.
The rules for each Blueprint are expressed in a human-readable and human-
writable form that makes them easy to modify if need be, but not very
convenient for the parser. This method groups rules in a parser-friendly
representation that makes its job easier.
"""
# Get all the blueprints of a given class
rawRules = {}
for key, val in containers.classByString[itemtype].blueprints.items():
rawRules[key] = val.getParsing()
#convert them to a parsing tree
# Convert them to a parsing tree
pTree = {}
classes = set(rule['tag'] for rule in rawRules.values() if rule is not None)
for c in classes:
pTree[c] = {}
for key, rule in rawRules.items():
#ignore any non-parsable fields
# Ignore any non-parsable fields
if rule is None:
continue
#process only the rules of class c
# Process only the rules of class c
if rule['tag'] != c:
continue
pClass = rule['class']
Expand All @@ -116,44 +138,49 @@ def __cacheParsingRules(self, itemtype:str):
'attr': rule['attr'] if 'attr' in rule.keys() else None,
'type': rule['type'] if 'type' in rule.keys() else None
}
#store the result
# Bind the result to a type name
self.parsingRules[itemtype] = pTree

def checkSession(self):
#check if there is a session and acquire one if not
"""Check if there exists a live session and acquire a new one if not.
#TODO: now with improved session handling we need something smarter
(cause we'll nearly always have a session, except it might sometimes get stale
resulting in an acquisition failure)
"""
if not self.session:
self.requestSession()
#check again - in case the user cancelled a login
# Check again - in case the user cancelled a login
if not self.session:
return False
#at this point everything is definitely safe
# At this point everything is definitely safe
return True

def requestSession(self):
#call the GUI callback for login handling
#it will halt execution until the user logs in or cancels
session, username = self.requestLogin(self.username)
#if the session wasn't acquired, don't care about username
#but for good session, check if it agrees with the existing one (or set it)
"""Call the GUI to handle a login and bind a session object to self."""
# This pauses execution until the user logs in or cancels
session, username = self.login_handler(self.username)
if session:
# Set the username in case it's a first run (it will be empty)
if not self.username:
self.username = username
else:
# If it's not the first log in, make sure the user has logged as the
# same user. If the GUI is to be trusted, it shouldn't be possible, but
# we can still check in case of an accident during external usage.
# Returned value isn't important. *NOT* setting self.session is.
if username != self.username:
#this should never happen, if GUI is to be trusted
#returning is not as important as *not* setting self.session
return None
self.session = session

@enforceSession
def getNumOf(self, itemtype:str):
#return a tuple: (number of rated movies, number of movies per page)
try:
getURL = self.urlGenerationMethods[itemtype]
spanClass = self.countSpanClasses[itemtype]
except KeyError:
return 0, 0 # should never happen though
url = getURL(self.username)
"""Return the number of items of a given type that the user has rated.
Returns a tuple: (number of rated items, number of items per page).
"""
getURL = self.urlGenerationMethods[itemtype]
spanClass = self.countSpanClasses[itemtype]
url = getURL()
page = self.fetchPage(url)
# TODO: in principle, this page could be cached for some small time
#the number of user's movies is inside a span of a specific class
Expand All @@ -169,21 +196,21 @@ def getNumOf(self, itemtype:str):
for div in page.body.find_all('div'):
if not div.has_attr('data-id') or not div.has_attr('class'):
continue
if not self.Constants.item_class in div.attrs['class']:
if not self.constants.item_class in div.attrs['class']:
continue
per_page += 1
return items, per_page

@enforceSession
def getItemsPage(self, itemtype:str, page:int=1):
# Get the URL of a page containing items of requested type, fetch and parse
# it and return data (as list of objects). Instead of several ifs, retrieve
# the proper methods from a dict prepared during init.
try:
getURL = self.urlGenerationMethods[itemtype]
except KeyError:
return [] # should never happen though
url = getURL(self.username, page)
"""Acquire items of a given type from a given page number.
The user's ratings are displayed on pages. This fetches a page by number,
parses it and returns a list of Item-based objects. URL is delivered by a
cached dict, binding URL-generating functions to respective item types.
"""
getURL = self.urlGenerationMethods[itemtype]
url = getURL(page)
page = self.fetchPage(url)
data = self.parsePage(page, itemtype)
return data
Expand Down Expand Up @@ -216,11 +243,11 @@ def parsePage(self, page, itemtype:str):

def extractDataSource(self, page):
"""Extract the div that holds all the data."""
return page.find('div', attrs={'class': self.Constants.main_class})
return page.find('div', attrs={'class': self.constants.main_class})

def extractItems(self, div):
"""From the main div, extract all divs holding item details."""
sub_divs = div.find_all('div', attrs={'class': self.Constants.item_class})
sub_divs = div.find_all('div', attrs={'class': self.constants.item_class})
sub_divs = [div for div in sub_divs if div.has_attr('data-id')]
return sub_divs

Expand All @@ -229,8 +256,8 @@ def extractRatings(self, div):
They're held in a specific span as <script> contents.
"""
span = div.find('span', attrs={'data-source': self.Constants.rating_source})
scripts = span.find_all('script', attrs={'type': self.Constants.rating_stype})
span = div.find('span', attrs={'data-source': self.constants.rating_source})
scripts = span.find_all('script', attrs={'type': self.constants.rating_stype})
ratings = [script.getText() for script in scripts]
return ratings

Expand Down Expand Up @@ -278,19 +305,19 @@ def parseRating(self, text):
Item's addRating method.
"""
origDict = json.loads(text)
#ensure all date keys are present
# Ensure all date keys are present
try:
date_ = origDict['d']
except KeyError:
# once I've seen a bugged span in which this key was not present
# I've seen a bugged span once, which didn't have this key
date_ = {'y':2000, 'm':1, 'd':1}
if 'm' not in date_.keys():
date_['m'] = 1
if 'd' not in date_.keys():
date_['d'] = 1
# unescape HTML-coded characters from the comment
# Unescape HTML-coded characters from the comment
comment = html.unescape(origDict['c'] if 'c' in origDict.keys() else '')
# translate that dict to more readable standard
# Translate that dict to a more readable standard
iid = origDict['eId']
isFaved = origDict['f'] if 'f' in origDict.keys() else 0
ratingDict = {
Expand Down
Loading

0 comments on commit 497dadf

Please sign in to comment.