-
Notifications
You must be signed in to change notification settings - Fork 0
/
tamandua_manager.py
executable file
·211 lines (151 loc) · 5.4 KB
/
tamandua_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python3
"""Tamandua Manager provides administrative tools."""
import os
import sys
import atexit
BASEDIR = os.path.abspath(os.path.dirname(__file__))
sys.path.append(BASEDIR)
if True: # noqa: E402
import click
from datetime import datetime, timedelta
import subprocess
from src.repository.factory import RepositoryFactory
from src.repository.misc import SearchScope
from src.config import Config
from src.constants import CONFIGFILE
from src.expression.builder import ExpressionBuilder
Config().setup(
os.path.join(BASEDIR, CONFIGFILE),
BASEDIR
)
pidfile = os.path.join(BASEDIR, 'tamandua.pid')
global cleanupPID
cleanupPID = True
def exit_if_already_running():
if os.path.exists(pidfile):
print('There is already a tamandua process running. Exiting.')
global cleanupPID
cleanupPID = False
sys.exit(1)
with open(pidfile, 'w') as f:
f.write(str(os.getpid()))
def run_remotesshwrapper_command(
command: str, args: list = [], stdout: object = subprocess.PIPE
) -> subprocess.Popen:
return subprocess.Popen(
[
'ssh',
'-o',
'IdentitiesOnly=yes',
'-i',
'~/.ssh/remotesshwrapper',
'root@phd-systemxen',
'/usr/local/bin/remotesshwrapper',
command,
' '.join(args)
],
stdout=stdout
)
@atexit.register
def remove_pid():
global cleanupPID
if cleanupPID:
os.remove(pidfile)
try:
os.remove(os.path.join(BASEDIR, 'logfile'))
except FileNotFoundError:
pass
@click.group()
def cli():
"""Tamandua manager with admin tools."""
pass
@cli.command()
def reset_logfile_pos():
"""Reset the reader position of the logfile to 0."""
RepositoryFactory \
.create_repository() \
.save_position_of_last_read_byte(0)
print('Reset last logfile position to 0 successful.')
@cli.command()
def reset_logfile_size():
"""Reset the last size of the logfile to 0."""
RepositoryFactory \
.create_repository() \
.save_size_of_last_logfile(0)
print('Reset last logfile size to 0 successful.')
@cli.command()
def remove_incomplete():
"""Delete all incomplete data."""
RepositoryFactory \
.create_repository() \
.delete({}, SearchScope.INCOMPLETE)
print('Successfully deleted all incomplete data.')
@cli.command()
def remove_complete():
"""Delete all complete data."""
RepositoryFactory \
.create_repository() \
.delete({}, SearchScope.COMPLETE)
print('Successfully deleted all complete data.')
@cli.command()
@click.pass_context
def remove_all(ctx):
"""Delete all data and reset the reader position."""
ctx.invoke(remove_complete)
ctx.invoke(remove_incomplete)
ctx.invoke(reset_logfile_pos)
ctx.invoke(reset_logfile_size)
@cli.command()
def cache_document_keys():
"""Build cache with all distinct keys of documents."""
RepositoryFactory \
.create_repository() \
.get_all_keys(True)
print('Successfully built cache of unique document keys.')
@cli.command()
@click.option('--days', help='Number of days to keep', default=30, show_default=True)
def cleanup(days):
"""Deletes entries which are older than n days."""
repository = RepositoryFactory.create_repository()
keepDate = datetime.today() - timedelta(days=days)
builder = ExpressionBuilder().set_end_datetime(keepDate)
repository.delete(builder.expression, SearchScope.ALL)
print('Deleted all data older than ' + str(days) + ' days successful.')
@cli.command()
@click.pass_context
def run(ctx):
"""Get logfile and run the parser. This can be used within cronjobs."""
repository = RepositoryFactory.create_repository()
logfilename = os.path.join(BASEDIR, 'logfile')
lastlogfilesize = repository.get_size_of_last_logfile()
process = run_remotesshwrapper_command('getmaillogsize')
currlogfilesize = int(process.communicate()[0].decode('utf-8'))
print('Last logfile size: %d' % lastlogfilesize)
print('Current logfile size: %d' % currlogfilesize)
if currlogfilesize < lastlogfilesize:
print('New logfile detected, reading from beginning.')
ctx.invoke(reset_logfile_pos)
repository.save_size_of_last_logfile(currlogfilesize)
currByte = repository.get_position_of_last_read_byte()
print('Position of last read byte: %d' % currByte)
if currlogfilesize < currByte:
print('Logfile is smaller than last position, reading from beginning.')
ctx.invoke(reset_logfile_pos)
currByte = 0
with open(logfilename, 'wb') as f:
print('Start transferring logfile diff to local machine\n')
process = run_remotesshwrapper_command('tamandua', args=[str(currByte)], stdout=f)
process.wait()
from tamandua_parser import main as tamandua_main
from tamandua_parser import DefaultArgs
args = DefaultArgs()
args.logfile = logfilename
args.printmsgs = True
print('\nStart reading the logfile')
tamandua_main(args)
ctx.invoke(cleanup)
repository.save_time_of_last_run(datetime.now())
os.remove(logfilename)
if __name__ == '__main__':
exit_if_already_running()
cli()