forked from netzbegruenung/green-spider
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob.py
147 lines (114 loc) · 4.9 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
Dieses Script wird vom RQ worker ausgeführt, um einen einzelnen Job aus der
Spider-Warteschlange abzuarbeiten.
"""
import json
import os
from datetime import datetime
import time
import logging
import docker
from google.cloud import datastore
# Maximum per-job runtime in seconds. This can be increased for second, third attempt
# via the environment JOB_TIMEOUT variable.
TIMEOUT = int(os.environ.get("JOB_TIMEOUT", "50"))
DOCKER_IMAGE = 'ghcr.io/netzbegruenung/green-spider:latest'
CREDENTIALS_PATH = '/secrets/datastore-writer.json'
client = docker.from_env()
low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock')
datastore_client = datastore.Client.from_service_account_json("." + CREDENTIALS_PATH)
pwd = os.path.abspath(".")
secrets_path = pwd + "/secrets"
chromedir_path = pwd + "/volumes/chrome-userdir"
screenshots_path = pwd + "/screenshots"
volumes = {}
volumes[secrets_path] = {'bind': '/secrets', 'mode': 'ro'}
volumes[chromedir_path] = {'bind': '/opt/chrome-userdir', 'mode': 'rw'}
volumes[screenshots_path] = {'bind': '/screenshots', 'mode': 'rw'}
logger = logging.getLogger('rq.worker')
logger.setLevel(logging.DEBUG)
def run(job):
"""
Runs a spider container with the given job.
Returns the container logs. If the execution takes longer than the
duration defined by the JOB_TIMEOUT environment variable (in seconds),
the container gets killed.
"""
cmd_template = ("python cli.py --credentials-path={path} "
" --loglevel=debug "
" spider "
" --job='{job_json}'")
cmd = cmd_template.format(path=CREDENTIALS_PATH,
job_json=json.dumps(job))
container = client.containers.run(image=DOCKER_IMAGE,
command=cmd,
detach=True,
remove=True,
shm_size='2G',
stdout=True,
stderr=True,
tty=False,
volumes=volumes)
id = container.id
# Data about this spider run, to be written to datastore
key = datastore_client.key('spider-runs')
entity = datastore.Entity(key=key)
results = {
'datetime': datetime.utcnow(),
'url': job['url'],
'success': True,
'error': '',
'duration_seconds': 0,
'cpu_usage_seconds': 0,
'network_received_bytes': 0,
'network_transmitted_bytes': 0,
'memory_max_bytes': 0,
}
# wait for finish
start = datetime.utcnow()
while True:
time.sleep(1)
clist = client.containers.list(filters={'id': id})
if len(clist) == 0:
break
for c in clist:
# Collect stats
try:
stats = low_level_client.stats(id, stream=False)
cpu_usage = stats['cpu_stats']['cpu_usage']['total_usage'] / 1000000000.0
if 'networks' in stats:
network_received_bytes = stats['networks']['eth0']['rx_bytes']
network_transmitted_bytes = stats['networks']['eth0']['tx_bytes']
memory_max_bytes = 0
if 'max_usage' in stats['memory_stats']:
memory_max_bytes = stats['memory_stats']['max_usage']
results['memory_max_bytes'] = memory_max_bytes
#logger.debug("Stats: CPU time %d Sec, RX %d KB, Mem %d MB" % (cpu_usage, network_received_bytes/1000, memory_max_bytes/1000000))
if cpu_usage > 0:
results['cpu_usage_seconds'] = round(cpu_usage)
if network_received_bytes > 0:
results['network_received_bytes'] = network_received_bytes
if network_transmitted_bytes > 0:
results['network_transmitted_bytes'] = network_transmitted_bytes
except docker.errors.APIError as e:
logger.error("Could not get stats: %s" % e)
except json.decoder.JSONDecodeError:
# This means we didn't get proper stats
pass
runtime = (datetime.utcnow() - start).seconds
results['duration_seconds'] = round(runtime)
#if c.status != "running":
# logger.info("Container %s status: %s" % (c.id, c.status))
if c.status == "exited":
logger.debug("Container %s is exited." % c.id)
break
if runtime > TIMEOUT:
c.kill()
results['success'] = False
results['error'] = 'TIMEOUT'
entity.update(results)
datastore_client.put(entity)
raise Exception("Execution took too long. Killed container after %s seconds." % TIMEOUT)
entity.update(results)
datastore_client.put(entity)
return results