-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathlog_parser.py
45 lines (36 loc) · 1.03 KB
/
log_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List, Optional
from metaphor.common.logger import get_logger
logger = get_logger()
@dataclass
class ParsedLog:
log_time: datetime
user: str
database: str
session: str
log_level: str
log_body: List[str]
def parse_postgres_log(log: str) -> Optional[ParsedLog]:
"""
log prefix format: %t:%r:%u@%d:[%p]:
log message format: [message type]: [log body]
"""
parts = log.split(":")
if len(parts) < 8:
# log did not match the format
logger.warning(
f"Not able to parse the log, not enough parts, # of parts: {len(parts)}"
)
return None
user, database = parts[4].split("@")
return ParsedLog(
log_time=datetime.strptime(":".join(parts[:3]), "%Y-%m-%d %H:%M:%S %Z").replace(
tzinfo=timezone.utc
),
log_level=parts[6],
session=parts[5],
user=user,
database=database,
log_body=parts[7:],
)