-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updates to fix issues caused by TikTok updates (#4)
* Isolate captcha related functions * Refactor to fix issues caused by TikTok changes * Add test cases for future CI * Updated PyPi version
- Loading branch information
Showing
5 changed files
with
371 additions
and
372 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import unittest | ||
from datetime import datetime | ||
|
||
from tiktokdl.post_data import TikTokSlide, TikTokVideo | ||
import asyncio | ||
from tiktokdl.download_post import get_post | ||
|
||
|
||
class Test_TestTikTokURL(unittest.TestCase): | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.url_1 = "https://vm.tiktok.com/ZGeYy3Ekf/" | ||
self.url_2 = "https://vm.tiktok.com/ZGeY1YeXk/" | ||
self.url_3 = "https://vm.tiktok.com/ZGeYA9CJ4/" # slideshow | ||
|
||
def assert_data(self, expected_data, actual_data): | ||
self.assertIsNotNone(actual_data) | ||
|
||
self.assertEqual(expected_data.post_id, actual_data.post_id) | ||
self.assertEqual(expected_data.timestamp, actual_data.timestamp) | ||
self.assertEqual(expected_data.author_username, | ||
actual_data.author_username) | ||
|
||
def test_url_1(self): | ||
expected_data = TikTokVideo( | ||
url="https://tiktok.com/@jimdoga/video/7302355630109773057", | ||
post_id="7302355630109773057", | ||
author_username="jimdoga", | ||
author_display_name="Jimdoga", | ||
author_avatar="some-url", | ||
author_url="https://tiktok.com/@jimdoga", | ||
post_download_setting=0, | ||
post_description="never joining an smp late again... #minecraft " | ||
"#minecraftmoment #fyp #mcyt #herobrine #foryou " | ||
"#foryoupage", | ||
timestamp=datetime(2023, 11, 17, 10, 9, 26), | ||
like_count=205900, | ||
share_count=1572, | ||
comment_count=778, | ||
view_count=1900000, | ||
video_thumbnail="some-url", | ||
file_path=None) | ||
|
||
actual_data = asyncio.run(get_post(url=self.url_1, download=False)) | ||
|
||
self.assert_data(expected_data, actual_data) | ||
|
||
def test_url_2(self): | ||
expected_data = TikTokVideo( | ||
url="https://tiktok.com/@dafuqboom_94/video/7307437770128067871", | ||
post_id="7307437770128067871", | ||
author_username="dafuqboom_94", | ||
author_display_name="Dafuq!?Boom!", | ||
author_avatar="some-url", | ||
author_url="https://tiktok.com/@dafuqboom_94", | ||
post_download_setting=0, | ||
post_description="The End 🤣🤣 #funny #funnyvideos #animals #dog " | ||
"#cat #pet #viarl #foryou #fyp (540)", | ||
timestamp=datetime(2023, 12, 1, 2, 50, 51), | ||
like_count=211500, | ||
share_count=8454, | ||
comment_count=2016, | ||
view_count=3500000, | ||
video_thumbnail="some-url", | ||
file_path=None) | ||
|
||
actual_data = asyncio.run(get_post(url=self.url_2, download=False)) | ||
|
||
self.assert_data(expected_data, actual_data) | ||
|
||
def test_url_3(self): | ||
expected_data = TikTokSlide( | ||
url= | ||
"https://tiktok.com/@the.finals.content/video/7316903402024324384", | ||
post_id="7316903402024324384", | ||
author_username="the.finals.content", | ||
author_display_name="TheFinals Content", | ||
author_avatar="some-url", | ||
author_url="https://tiktok.com/@the.finals.content", | ||
post_download_setting=0, | ||
post_description="... #thefinals #thefinalsgame " | ||
"#thefinalsgameplay ", | ||
timestamp=datetime(2023, 12, 26, 15, 2, 17), | ||
like_count=411500, | ||
share_count=15000, | ||
comment_count=6382, | ||
view_count=4400000, | ||
images=[]) | ||
|
||
actual_data = asyncio.run(get_post(url=self.url_3, download=False)) | ||
|
||
self.assert_data(expected_data, actual_data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import random | ||
import time | ||
from urllib.parse import parse_qs, urlparse | ||
|
||
from playwright.async_api import Error as PlaywrightError | ||
from playwright.async_api import Page, Request | ||
from playwright.async_api import TimeoutError as PlaywrightTimeoutError | ||
|
||
from tiktokdl.image_processing import find_position, image_from_url | ||
|
||
|
||
def __parse_captcha_params_from_url(url: str) -> dict: | ||
parsed_url = urlparse(url, allow_fragments=False) | ||
params = parse_qs(parsed_url.query) | ||
out = {} | ||
for key, value in params.items(): | ||
out[key] = value[0] | ||
return out | ||
|
||
|
||
def __get_captcha_response_params(url: str) -> dict: | ||
request_params = __parse_captcha_params_from_url(url) | ||
request_params["tmp"] = f"{time.time()}{random.randint(111, 999)}" | ||
return request_params | ||
|
||
|
||
async def __get_captcha_response_headers(request: Request) -> dict: | ||
all_headers = await request.all_headers() | ||
all_headers["content-type"] = "application/json;charset=UTF-8" | ||
return all_headers | ||
|
||
|
||
def __generate_random_captcha_steps(piece_position: tuple[int, int], | ||
tip_y_value: int): | ||
x_position = piece_position[0] | ||
|
||
steps = [] | ||
current_distance = 0 | ||
relative_time = random.randint(100, 300) | ||
while current_distance < x_position: | ||
current_distance += random.randint(1, 4) | ||
relative_time += random.randint(6, 9) | ||
steps.append({ | ||
"relative_time": relative_time, | ||
"x": current_distance, | ||
"y": tip_y_value | ||
}) | ||
|
||
if steps[-1].get("x") < x_position or steps[-1].get("x") > x_position: | ||
steps.append({ | ||
"relative_time": relative_time + random.randint(6, 9), | ||
"x": x_position, | ||
"y": tip_y_value | ||
}) | ||
|
||
return steps | ||
|
||
|
||
def __calculate_captcha_solution(captcha_get_data: dict) -> dict: | ||
data = captcha_get_data.get("data").get("question") | ||
|
||
bg_url = data.get("url1") | ||
piece_url = data.get("url2") | ||
tip_value = data.get("tip_y") | ||
|
||
bg_image = image_from_url(bg_url) | ||
piece_image = image_from_url(piece_url) | ||
|
||
position = find_position(bg_image, piece_image) | ||
|
||
body = { | ||
"modified_img_width": 552, | ||
"id": captcha_get_data.get("data").get("id"), | ||
"mode": "slide", | ||
"reply": __generate_random_captcha_steps(position, tip_value) | ||
} | ||
|
||
return body | ||
|
||
|
||
async def handle_captcha(playwright_page: Page, | ||
attempts: int = 3, | ||
timeout: float | None = 5000) -> bool: | ||
captcha_success_status = False | ||
attempt_count = 0 | ||
|
||
while not captcha_success_status and attempt_count < attempts: | ||
try: | ||
async with playwright_page.expect_request( | ||
lambda x: "/captcha/get?" in x.url, | ||
timeout=timeout) as request: | ||
await playwright_page.wait_for_load_state("networkidle") | ||
request_value = await request.value | ||
response = await request_value.response() | ||
response_data = await response.json() | ||
|
||
captcha_solution = __calculate_captcha_solution(response_data) | ||
post_url_query_params = __get_captcha_response_params( | ||
request_value.url) | ||
post_headers = await __get_captcha_response_headers( | ||
request_value) | ||
base_url = urlparse(request_value.url).netloc | ||
api_request_context = playwright_page.request | ||
|
||
await playwright_page.wait_for_timeout(1000) | ||
captcha_status = await api_request_context.post( | ||
f"https://{base_url}/captcha/verify", | ||
data=captcha_solution, | ||
headers=post_headers, | ||
params=post_url_query_params) | ||
|
||
if captcha_status.status != 200: | ||
return False | ||
|
||
captcha_status_data = await captcha_status.json() | ||
captcha_success_status = captcha_status_data.get( | ||
"message") == "Verification complete" | ||
attempt_count += 1 | ||
await playwright_page.locator("#verify-bar-close").click() | ||
|
||
except PlaywrightTimeoutError: | ||
return True | ||
except PlaywrightError: | ||
return False | ||
|
||
return captcha_success_status |
Oops, something went wrong.