Skip to content

Commit

Permalink
[fix] #35(SSL Error), video download error
Browse files Browse the repository at this point in the history
  • Loading branch information
Seng Feng committed Aug 21, 2023
1 parent b7adf3d commit 6c659c5
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 90 deletions.
Binary file modified requirements.txt
Binary file not shown.
2 changes: 2 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
black==23.7.0
pylint==2.17.5
2 changes: 1 addition & 1 deletion src/ArtStationDownloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Copyright 2018 Sean Feng([email protected])
"""

__version__ = "0.3.2"
__version__ = "0.3.3"
# $Source$

import argparse
Expand Down
118 changes: 29 additions & 89 deletions src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@
Copyright 2018-2019 Sean Feng([email protected])
"""

from enum import Enum
import os
from concurrent import futures
from enum import Enum
from multiprocessing import cpu_count
from urllib.parse import urlparse
import http.client as http_client

from bs4 import BeautifulSoup, element
import pafy
import requests
import json
from config import Config
from pytube import YouTube

from http_client import HttpClient


class DownloadSorting(Enum):
Expand All @@ -28,12 +26,13 @@ def __str__(self) -> str:

class Core:
def log(self, message):
print(message)
if self._log_print:
self._log_print(message)
else:
print(message)

def __init__(self, log_print=None):
if log_print:
global print
print = log_print
self._log_print = log_print
max_workers = cpu_count() * 4
self.executor = futures.ThreadPoolExecutor(max_workers)
self.executor_video = futures.ThreadPoolExecutor(1)
Expand All @@ -42,96 +41,36 @@ def __init__(self, log_print=None):
self.root_path: str = None
self.download_sorting: DownloadSorting = None
self.futures = []
self._session = requests.session()
self.proxy_setup()

def http_client_get(self, url):
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
parsed_url = urlparse(url)
conn = http_client.HTTPSConnection(parsed_url.netloc)
conn.request(
"GET", parsed_url.path + "?" + parsed_url.query, headers=headers
)

resp = conn.getresponse()

except Exception as e:
print(f"Connect error [{e}]")

return resp

def http_client_get_json(self, url):
resp = self.http_client_get(url)
try:
resp_str = resp.read().decode()
json_result = json.loads(resp_str)
except json.decoder.JSONDecodeError:
print(f"json decode error\nurl:{url}\n{resp_str}")
return json_result

def http_get(self, url):
try:
resp = self._session.get(url, timeout=10)
except requests.exceptions.InvalidURL:
print(f'"{url}" is not valid url')
return
return resp

def proxy_setup(self):
session = self._session
# 设置 User Agent
session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36"
}
)
# 设置代理
config = Config("config.ini")
http = config.get("Proxy", "http")
https = config.get("Proxy", "https")
if http or https:
proxys = {}
if http:
proxys["http"] = http
os.environ["HTTP_PROXY"] = http
if https:
proxys["https"] = https
os.environ["HTTPS_PROXY"] = https
session.proxies.update(proxys)
self.http_client = HttpClient(log_print=log_print)

def download_file(self, url, file_path, file_name):
url = url.replace("/large/", "/4k/")
file_full_path = os.path.join(file_path, file_name)
if os.path.exists(file_full_path):
self.log("[Exist][image][{}]".format(file_full_path))
else:
resp = self.http_get(url)
resp = self.http_client.http_get(url)
os.makedirs(file_path, exist_ok=True)
with open(file_full_path, "wb") as code:
code.write(resp.content)
self.log("[Finish][image][{}]".format(file_full_path))

def download_video(self, id, file_path):
file_full_path = os.path.join(file_path, "{}.{}".format(id, "mp4"))
def download_video(self, youtube_id, file_path):
file_full_path = os.path.join(file_path, "{}.{}".format(youtube_id, "mp4"))
if os.path.exists(file_full_path):
self.log("[Exist][video][{}]".format(file_full_path))
else:
video = pafy.new(id)
best = video.getbest(preftype="mp4")
resp = self.http_get(best.url)
os.makedirs(file_path, exist_ok=True)
with open(file_full_path, "wb") as code:
code.write(resp.content)
self.log("[Finish][video][{}]".format(file_full_path))
try:
yt = YouTube(f'https://www.youtube.com/watch?v={youtube_id}')
stream = yt.streams.filter(file_extension='mp4').first()
stream.download(output_path=file_path)
self.log("[Finish][video][{}]".format(file_full_path))
except Exception as e:
self.log("[Error][video][{}]".format(e))

def download_project(self, hash_id):
url = "https://www.artstation.com/projects/{}.json".format(hash_id)
resp = self.http_client_get_json(url)
resp = self.http_client.http_client_get_json(url)
j = resp
assets = j["assets"]
title = j["slug"].strip()
Expand All @@ -150,23 +89,24 @@ def download_project(self, hash_id):
file_path = user_path
if not self.no_image and asset["has_image"]: # 包含图片
url = asset["image_url"]
file_name = urlparse(url).path.split("/")[-1]
file_name = HttpClient.urlparse(url).path.split("/")[-1]
try:
self.futures.append(
self.invoke(self.download_file, url, file_path, file_name)
)
except Exception as e:
print(e)
self.log(e)
if not self.no_video and asset["has_embedded_player"]: # 包含视频
player_embedded = BeautifulSoup(asset["player_embedded"], "html.parser")
src = player_embedded.find("iframe").get("src")
if "youtube" in src:
youtube_id = self.http_client.urlparse(src).path[-11:]
try:
self.futures.append(
self.invoke_video(self.download_video, src, file_path)
self.invoke_video(self.download_video, youtube_id, file_path)
)
except Exception as e:
print(e)
self.log(e)

def get_projects(self, username) -> element.ResultSet[element.Tag]:
data = []
Expand All @@ -175,7 +115,7 @@ def get_projects(self, username) -> element.ResultSet[element.Tag]:
while True:
page += 1
url = "https://{}.artstation.com/rss?page={}".format(username, page)
resp = self.http_client_get(url)
resp = self.http_client.http_client_get(url)
if resp.status != 200:
err = "[Error] [{} {}] ".format(resp.status, resp.reason)
if resp.status == 403:
Expand Down Expand Up @@ -212,7 +152,7 @@ def download_by_username(self, username):
def download_by_usernames(
self, usernames, download_type, download_sorting: DownloadSorting
):
self.proxy_setup()
self.http_client.proxy_setup()
self.no_image = download_type == "video"
self.no_video = download_type == "image"
self.download_sorting = download_sorting
Expand Down
94 changes: 94 additions & 0 deletions src/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import http.client as http_client
import ssl
import json
import os
import ssl
from urllib.parse import urlparse, ParseResult

import requests

from config import Config


class HttpClient:
def log(self, message):
if self._log_print:
self._log_print(message)
else:
print(message)

def __init__(self, log_print=None):
self._log_print = log_print
self._session = requests.session()
self.proxy_setup(self._session)

def http_client_get(self, url, ignoreCertificateError=None):
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
parsed_url = urlparse(url)
if ignoreCertificateError:
context = ssl._create_unverified_context()
else:
context = None
conn = http_client.HTTPSConnection(parsed_url.netloc, context=context)
conn.request(
"GET", parsed_url.path + "?" + parsed_url.query, headers=headers
)

resp = conn.getresponse()
except ssl.SSLCertVerificationError:
return self.http_client_get(url, ignoreCertificateError=True)
except Exception as e:
self.log(f"Connect error [{e}]")
return

return resp

def http_client_get_json(self, url):
resp = self.http_client_get(url)
try:
resp_str = resp.read().decode()
json_result = json.loads(resp_str)
except json.decoder.JSONDecodeError:
self.log(f"json decode error\nurl:{url}\n{resp_str}")
return
return json_result

def http_get(self, url):
try:
resp = self._session.get(url, timeout=10)
except requests.exceptions.InvalidURL:
self.log(f'"{url}" is not valid url')
return
return resp

def proxy_setup(self, session=None):
if not session:
session = self._session
# 设置 User Agent
session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36"
}
)
# 设置代理
config = Config("config.ini")
http = config.get("Proxy", "http")
https = config.get("Proxy", "https")
if http or https:
proxys = {}
if http:
proxys["http"] = http
os.environ["HTTP_PROXY"] = http
if https:
proxys["https"] = https
os.environ["HTTPS_PROXY"] = https
session.proxies.update(proxys)

@staticmethod
def urlparse(url: str) -> ParseResult:
return urlparse(url)

0 comments on commit 6c659c5

Please sign in to comment.