-
Notifications
You must be signed in to change notification settings - Fork 2
/
pg_logfebe.c
813 lines (684 loc) · 18.6 KB
/
pg_logfebe.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
/*
* pg_logfebe.c
*
* Implements a module to be loaded via shared_preload_libraries that,
* should "logfebe.unix_socket" be set in postgresql.conf will cause
* log traffic to be written to the unix socket in question on a
* best-effort basis.
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2012, Heroku
*
*/
#include "postgres.h"
#include <stdint.h>
#include <sys/un.h>
#include <unistd.h>
#include "access/xact.h"
#include "funcapi.h"
#include "lib/stringinfo.h"
#include "libpq/libpq-be.h"
#include "miscadmin.h"
#include "pg_config.h"
#include "pgtime.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/elog.h"
#include "utils/guc.h"
#include "utils/ps_status.h"
/*
* 64-bit byte-swapping, as per
* http://stackoverflow.com/questions/809902/64-bit-ntohl-in-c
*/
#if defined(__linux__)
# include <endian.h>
#elif defined(__FreeBSD__) || defined(__NetBSD__)
# include <sys/endian.h>
#elif defined(__OpenBSD__)
# include <sys/types.h>
# define be16toh(x) betoh16(x)
# define be32toh(x) betoh32(x)
# define be64toh(x) betoh64(x)
#elif defined(__darwin__)
# include <libkern/OSByteOrder.h>
# define htobe32(x) __DARWIN_OSSwapInt32(x)
# define htobe64(x) __DARWIN_OSSwapInt64(x)
#endif
PG_MODULE_MAGIC;
#define FORMATTED_TS_LEN 128
/*
* Startup version string, e.g. "PG-9.2.4/logfebe-1", where the
* "logfebe-1" indicates the pg_logfebe protocol version.
*/
#define PROTO_VERSION ("PG-" PG_VERSION "/logfebe-1")
/* GUC-configured destination of the log pages */
static char *logUnixSocketPath = "";
static char *ident = "";
/* Old hook storage for loading/unloading of the extension */
static emit_log_hook_type prev_emit_log_hook = NULL;
/* Used to detect if values inherited over fork need resetting. */
static int savedPid = 0;
/* Caches the formatted start time */
static char cachedBackendStartTime[FORMATTED_TS_LEN];
/* Counter for log sequence number. */
static long seqNum = 0;
/*
* File descriptor that log records are written to.
*
* Is re-set if a write fails.
*/
static int outSockFd = -1;
/* Dynamic linking hooks for Postgres */
void _PG_init(void);
void _PG_fini(void);
/* Internal function definitions*/
static bool formAddr(struct sockaddr_un *dst, char *path);
static bool isLogLevelOutput(int elevel, int log_min_level);
static void appendStringInfoPtr(StringInfo dst, const char *s);
static void closeSocket(int *fd);
static void fmtLogMsg(StringInfo dst, ErrorData *edata);
static void formatLogTime(char *dst, size_t dstSz, struct timeval tv);
static void formatNow(char *dst, size_t dstSz);
static void gucOnAssignCloseInvalidate(const char *newval, void *extra);
static void logfebe_emit_log_hook(ErrorData *edata);
static void openSocket(int *dst, char *path);
static void optionalGucGet(char **dest, const char *name,
const char *shortDesc);
static void reCacheBackendStartTime(void);
static void sendOrInval(int *fd, char *payload, size_t payloadSz);
/*
* Useful for HUP triggered reassignment: invalidate the socket, which will
* cause path information to be evaluated when reconnection and identification
* to be re-exchanged.
*/
static void
gucOnAssignCloseInvalidate(const char *newval, void *extra)
{
closeSocket(&outSockFd);
}
/*
* Procedure that wraps a bunch of boilerplate GUC options appropriate for all
* the options used in this extension.
*/
static void
optionalGucGet(char **dest, const char *name,
const char *shortDesc)
{
DefineCustomStringVariable(
name,
shortDesc,
"",
dest,
"",
PGC_SIGHUP,
GUC_NOT_IN_SAMPLE,
NULL,
gucOnAssignCloseInvalidate,
NULL);
}
/*
* Form a sockaddr_un for communication, returning false if this could not be
* completed.
*/
static bool
formAddr(struct sockaddr_un *dst, char *path)
{
size_t len;
dst->sun_family = AF_UNIX;
len = strlcpy(dst->sun_path, path, sizeof dst->sun_path);
if (len <= sizeof dst->sun_path)
{
/* The copy could fit, and was copied. */
return true;
}
/* Truncation; dst does not contain the full passed path. */
return false;
}
/*
* _PG_init() - library load-time initialization
*
* DO NOT make this static nor change its name!
*
* Init the module, all we have to do here is getting our GUC
*/
void
_PG_init(void)
{
/* Set up GUCs */
optionalGucGet(&logUnixSocketPath, "logfebe.unix_socket",
"Unix socket to send logs to in FEBE frames.");
optionalGucGet(&ident, "logfebe.identity",
"The identity of the installation of PostgreSQL.");
EmitWarningsOnPlaceholders("logfebe");
/* Install hook */
prev_emit_log_hook = emit_log_hook;
emit_log_hook = logfebe_emit_log_hook;
}
/*
* Given an invalid Fd in *dst, try to open a unix socket connection to the
* given path.
*/
static void
openSocket(int *dst, char *path)
{
const int save_errno = errno;
struct sockaddr_un addr;
bool formed;
int fd = -1;
StringInfoData startup;
/*
* This procedure is only defined on the domain of invalidated file
* descriptors
*/
Assert(*dst < 0);
/* Begin attempting connection, first by forming the address */
formed = formAddr(&addr, path);
if (!formed)
{
/* Didn't work, give up */
goto err;
}
/* Get socket fd, or die */
fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if (fd < 0)
goto err;
/* Connect socket to server. Or die. */
Assert(formed);
do
{
int res;
errno = 0;
res = connect(fd, (void *) &addr, sizeof addr);
if (res < 0 || (errno != EINTR && errno != 0))
goto err;
} while (errno == EINTR);
/*
* Connection established.
*
* Try to send start-up information as a service to the caller. Should
* this fail, sendOrInval will close and invalidate the socket, though.
*/
Assert(fd >= 0);
initStringInfo(&startup);
/* Prepare startup: protocol version ('V') frame */
{
const uint32_t nVlen = htobe32((sizeof PROTO_VERSION) +
sizeof(u_int32_t));
appendStringInfoChar(&startup, 'V');
appendBinaryStringInfo(&startup, (void *) &nVlen, sizeof nVlen);
appendBinaryStringInfo(&startup, PROTO_VERSION, sizeof PROTO_VERSION);
}
/* Prepare startup: system identification ('I') frame */
{
char *payload;
int payloadLen;
uint32_t nPayloadLen;
if (ident == NULL)
payload = "";
else
payload = ident;
payloadLen = strlen(payload) + sizeof '\0';
nPayloadLen = htobe32(payloadLen + sizeof nPayloadLen);
appendStringInfoChar(&startup, 'I');
appendBinaryStringInfo(&startup, (void *) &nPayloadLen,
sizeof nPayloadLen);
appendBinaryStringInfo(&startup, (void *) payload, payloadLen);
}
/*
* Try to send the prepared startup packet, invaliding fd if things go
* awry.
*/
sendOrInval(&fd, startup.data, startup.len);
pfree(startup.data);
*dst = fd;
goto exit;
err:
/* Close and invalidate 'fd' if it got made */
if (fd >= 0)
{
closeSocket(&fd);
Assert(fd < 0);
}
Assert(*dst < 0);
goto exit;
exit:
/* Universal post-condition */
errno = save_errno;
return;
}
/*
* Perform a best-effort to close and invalidate a file descriptor.
*
* This exists to to encapsulate EINTR handling and invalidation.
*/
static void
closeSocket(int *fd)
{
const int save_errno = errno;
/*
* Close *fd and ignore EINTR, on advice from libusual's
* "safe_close" function:
*
* POSIX says close() can return EINTR but fd state is "undefined"
* later. Seems Linux and BSDs close the fd anyway and EINTR is
* simply informative. Thus retry is dangerous.
*/
close(*fd);
*fd = -1;
errno = save_errno;
}
static void
formatLogTime(char *dst, size_t dstSz, struct timeval tv)
{
char msbuf[8];
struct pg_tm *tm;
pg_time_t stamp_time;
stamp_time = (pg_time_t) tv.tv_sec;
tm = pg_localtime(&stamp_time, log_timezone);
Assert(dstSz >= FORMATTED_TS_LEN);
pg_strftime(dst, dstSz,
/* leave room for milliseconds... */
"%Y-%m-%d %H:%M:%S %Z", tm);
/* 'paste' milliseconds into place... */
sprintf(msbuf, ".%03d", (int) (tv.tv_usec / 1000));
strncpy(dst + 19, msbuf, 4);
}
static void
reCacheBackendStartTime(void)
{
pg_time_t stampTime = (pg_time_t) MyStartTime;
/*
* Note: we expect that guc.c will ensure that log_timezone is set up (at
* least with a minimal GMT value) before Log_line_prefix can become
* nonempty or CSV mode can be selected.
*/
pg_strftime(cachedBackendStartTime, FORMATTED_TS_LEN,
"%Y-%m-%d %H:%M:%S %Z",
pg_localtime(&stampTime, log_timezone));
}
static void
formatNow(char *dst, size_t dstSz)
{
struct timeval tv;
gettimeofday(&tv, NULL);
Assert(dstSz >= FORMATTED_TS_LEN);
formatLogTime(dst, dstSz, tv);
}
/*
* isLogLevelOutput -- is elevel logically >= log_min_level?
*
* We use this for tests that should consider LOG to sort out-of-order,
* between ERROR and FATAL. Generally this is the right thing for testing
* whether a message should go to the postmaster log, whereas a simple >=
* test is correct for testing whether the message should go to the client.
*/
static bool
isLogLevelOutput(int elevel, int log_min_level)
{
if (elevel == LOG || elevel == COMMERROR)
{
if (log_min_level == LOG || log_min_level <= ERROR)
return true;
}
else if (log_min_level == LOG)
{
/* elevel != LOG */
if (elevel >= FATAL)
return true;
}
/* Neither is LOG */
else if (elevel >= log_min_level)
return true;
return false;
}
/*
* Append a string in a special format that prepends information about
* its NULL-ity, should it be NULL.
*/
static void
appendStringInfoPtr(StringInfo dst, const char *s)
{
/* 'N' for NULL, 'P' for "Present" */
if (s == NULL)
appendStringInfoChar(dst, 'N');
else
{
appendStringInfoChar(dst, 'P');
appendStringInfoString(dst, s);
}
appendStringInfoChar(dst, '\0');
}
static void
fmtLogMsg(StringInfo dst, ErrorData *edata)
{
{
char formattedLogTime[FORMATTED_TS_LEN];
/* timestamp with milliseconds */
formatNow(formattedLogTime, sizeof formattedLogTime);
/*
* Always present, non-nullable; don't need to write the N/P
* header.
*/
appendStringInfoString(dst, formattedLogTime);
appendStringInfoChar(dst, '\0');
}
/* username */
if (MyProcPort)
appendStringInfoPtr(dst, MyProcPort->user_name);
else
appendStringInfoPtr(dst, NULL);
/* database name */
if (MyProcPort)
appendStringInfoPtr(dst, MyProcPort->database_name);
else
appendStringInfoPtr(dst, NULL);
/* Process id */
{
uint32_t nPid = htobe32(savedPid);
appendBinaryStringInfo(dst, (void *) &nPid, sizeof nPid);
}
/* Remote host and port */
if (MyProcPort && MyProcPort->remote_host)
{
/* 'present' string header, since this string is nullable */
appendStringInfoChar(dst, 'P');
appendStringInfoString(dst, MyProcPort->remote_host);
if (MyProcPort->remote_port && MyProcPort->remote_port[0] != '\0')
{
appendStringInfoChar(dst, ':');
appendStringInfoString(dst, MyProcPort->remote_port);
}
appendStringInfoChar(dst, '\0');
}
else
appendStringInfoPtr(dst, NULL);
/* session id; non-nullable */
appendStringInfo(dst, "%lx.%x", (long) MyStartTime, MyProcPid);
appendStringInfoChar(dst, '\0');
/* Line number */
{
uint64_t nSeqNum = htobe64(seqNum);
appendBinaryStringInfo(dst, (void *) &nSeqNum, sizeof nSeqNum);
}
/* PS display */
if (MyProcPort)
{
StringInfoData msgbuf;
const char *psdisp;
int displen;
initStringInfo(&msgbuf);
psdisp = get_ps_display(&displen);
appendBinaryStringInfo(&msgbuf, psdisp, displen);
appendStringInfoChar(dst, 'P');
appendStringInfoString(dst, msgbuf.data);
appendStringInfoChar(dst, '\0');
pfree(msgbuf.data);
}
else
appendStringInfoPtr(dst, NULL);
/* session start timestamp */
if (cachedBackendStartTime[0] == '\0')
{
/* Rebuild the cache if it was blown */
reCacheBackendStartTime();
}
/* backend start time; non-nullable string */
appendStringInfoString(dst, cachedBackendStartTime);
appendStringInfoChar(dst, '\0');
/*
* Virtual transaction id
*
* keep VXID format in sync with lockfuncs.c
*/
if (MyProc != NULL && MyProc->backendId != InvalidBackendId)
{
appendStringInfoChar(dst, 'P');
appendStringInfo(dst, "%d/%u", MyProc->backendId, MyProc->lxid);
appendStringInfoChar(dst, '\0');
}
else
appendStringInfoPtr(dst, NULL);
/*
* Transaction id
*
* This seems to be a mistake both here and in elog.c; in particular, it's
* not clear how the epoch would get added here. However, leave room in
* the protocol to fix this later by upcasting.
*/
{
uint64_t nTxid = htobe64((uint64) GetTopTransactionIdIfAny());
appendBinaryStringInfo(dst, (void *) &nTxid, sizeof nTxid);
}
/* Error severity */
{
uint32_t nelevel = htobe32(edata->elevel);
appendBinaryStringInfo(dst, (void *) &nelevel, sizeof nelevel);
}
/* SQL state code */
appendStringInfoPtr(dst, unpack_sql_state(edata->sqlerrcode));
/* errmessage */
appendStringInfoPtr(dst, edata->message);
/* errdetail or errdetail_log */
if (edata->detail_log)
appendStringInfoPtr(dst, edata->detail_log);
else
appendStringInfoPtr(dst, edata->detail);
/* errhint */
appendStringInfoPtr(dst, edata->hint);
/* internal query */
appendStringInfoPtr(dst, edata->internalquery);
/* if printed internal query, print internal pos too */
if (edata->internalpos > 0 && edata->internalquery != NULL)
{
uint32_t ninternalpos = htobe32(edata->internalpos);
appendBinaryStringInfo(dst, (void *) &ninternalpos,
sizeof ninternalpos);
}
else
{
uint32_t ninternalpos = htobe32(-1);
appendBinaryStringInfo(dst, (void *) &ninternalpos,
sizeof ninternalpos);
}
/* errcontext */
appendStringInfoPtr(dst, edata->context);
/*
* user query --- only reported if not disabled by the caller.
*
* Also include query position.
*/
if (isLogLevelOutput(edata->elevel, log_min_error_statement) &&
debug_query_string != NULL && !edata->hide_stmt)
{
uint32_t nCursorPos = htobe32(edata->cursorpos);
appendStringInfoPtr(dst, debug_query_string);
appendBinaryStringInfo(dst, (void *) &nCursorPos, sizeof nCursorPos);
}
else
{
uint32_t nCursorPos = htobe32(-1);
appendStringInfoPtr(dst, NULL);
appendBinaryStringInfo(dst, (void *) &nCursorPos, sizeof nCursorPos);
}
/* file error location */
if (Log_error_verbosity >= PGERROR_VERBOSE)
{
StringInfoData msgbuf;
initStringInfo(&msgbuf);
if (edata->funcname && edata->filename)
appendStringInfo(&msgbuf, "%s, %s:%d",
edata->funcname, edata->filename,
edata->lineno);
else if (edata->filename)
appendStringInfo(&msgbuf, "%s:%d",
edata->filename, edata->lineno);
appendStringInfoChar(dst, 'P');
appendStringInfoString(dst, msgbuf.data);
appendStringInfoChar(dst, '\0');
pfree(msgbuf.data);
}
else
appendStringInfoPtr(dst, NULL);
/* application name */
appendStringInfoPtr(dst, application_name);
}
/*
* Send the payload or invalidate *fd.
*
* No confirmation of success or failure is delivered.
*/
static void
sendOrInval(int *fd, char *payload, size_t payloadSz)
{
const int saved_errno = errno;
ssize_t bytesWritten;
writeAgain:
errno = 0;
/*
* Send, and carefully suppress SIGPIPE, which otherwise will
* cause sendOrInval's error handling to function in since a
* failure will come in as a signal rather than an error code.
*
* This is required to allow re-connection in event the server
* closes the connection.
*/
bytesWritten = send(*fd, payload, payloadSz, MSG_NOSIGNAL);
/*
* NB: Carefully perform signed-integer conversion to ssize_t;
* otherwise the comparison delivers unintuitive results.
*/
if (bytesWritten < (ssize_t) payloadSz)
{
/*
* Something went wrong.
*
* The ErrorData passed to the hook goes un-logged in this case (except
* when errno is EINTR).
*
* Because *fd is presumed a blocking socket, it is expected that
* whenever a full write could not be achieved that something is awry,
* and that the connection should abandoned.
*/
Assert(errno != 0);
/* Harmless and brief; just try again */
if (errno == EINTR)
goto writeAgain;
/*
* Close and invalidate the socket fd; a new attempt to get a valid fd
* must come the next time this hook is called.
*/
closeSocket(fd);
}
errno = saved_errno;
}
static void
logfebe_emit_log_hook(ErrorData *edata)
{
int save_errno;
StringInfoData buf;
/*
* This is one of the few places where we'd rather not inherit a static
* variable's value from the postmaster. But since we will, reset it when
* MyProcPid changes.
*/
if (savedPid != MyProcPid)
{
savedPid = MyProcPid;
/* Invalidate all inherited values */
seqNum = 0;
cachedBackendStartTime[0] = '\0';
if (outSockFd >= 0)
{
closeSocket(&outSockFd);
}
}
/*
* Increment log sequence number
*
* Done early on so this happens regardless if there are problems emitting
* the log.
*/
seqNum += 1;
/*
* Early exit if the socket path is not set and isn't in the format of
* an absolute path.
*
* The empty identity ("ident") is a valid one, so it is not rejected in
* the same way an empty logUnixSocketPath is.
*/
if (logUnixSocketPath == NULL ||
strlen(logUnixSocketPath) <= 0 || logUnixSocketPath[0] != '/')
{
/*
* Unsetting the GUCs via SIGHUP would leave a connection
* dangling, if it exists, close it.
*/
if (outSockFd >= 0)
{
closeSocket(&outSockFd);
}
goto quickExit;
}
save_errno = errno;
/*
* Initialize StringInfoDatas early, because pfree is called
* unconditionally at exit.
*/
initStringInfo(&buf);
if (outSockFd < 0)
{
openSocket(&outSockFd, logUnixSocketPath);
/* Couldn't get a valid socket; give up */
if (outSockFd < 0)
goto exit;
}
/*
* Make room for message type byte and length header. The length header
* must be overwritten to the correct value at the end.
*/
{
const char logHdr[5] = {'L', '\0', '\0', '\0', '\0'};
appendBinaryStringInfo(&buf, logHdr, sizeof logHdr);
}
/*
* Format the output, and figure out how long it is, and frame it
* for the protocol.
*/
{
uint32_t *msgSize;
fmtLogMsg(&buf, edata);
/*
* Check that buf is prepared properly, with enough space and
* the placeholder length expected.
*/
Assert(buf.len > 5);
Assert(buf.data[0] == 'L');
msgSize = (uint32_t *)(buf.data + 1);
Assert(*msgSize == 0);
/*
* Fill in *msgSize: buf contains the msg header, which is not
* included in length; subract and byte-swap to paste the
* right length into place.
*/
*msgSize = htobe32(buf.len - 1);
}
/* Finally: try to send the constructed message */
sendOrInval(&outSockFd, buf.data, buf.len);
exit:
pfree(buf.data);
errno = save_errno;
quickExit:
/* Call a previous hook, should it exist */
if (prev_emit_log_hook != NULL)
prev_emit_log_hook(edata);
}
/*
* Module unload callback
*/
void
_PG_fini(void)
{
/* Uninstall hook */
emit_log_hook = prev_emit_log_hook;
}