This repository has been archived by the owner on Jun 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathscan_iterator.c
129 lines (105 loc) · 3.14 KB
/
scan_iterator.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#ifndef SCAN_ITERATOR_H
#define SCAN_ITERATOR_H
/*
* We store block number into ItemPointers during insert_tuple() but actual
* write to disk occures later (by the end of transaction in some cases). We do
* not want another process writing to the same block before us. What we do
* is allocate a single block and its number would serve as an identification
* for cryo block and be used in ItemPointers. The rest of the blocks may be
* stored separately (e.g. another concurrent writing transaction allocates its
* block it between). This cause blocks being stored separately one from
* another. Therefore we need more sofisticated way to read them from disk and
* prevent reading blocks that we have already read. To deal with that we use
* SeqScanIterator which keeps track of blocks that we have read and serves
* starting BlockNumber for next block to read.
*/
#include "postgres.h"
#include "nodes/pg_list.h"
#include "scan_iterator.h"
typedef struct
{
BlockNumber start;
BlockNumber end;
} BlockRange;
struct SeqScanIterator
{
List *ranges;
};
static void
add_range(SeqScanIterator *iter, BlockNumber start, BlockNumber end)
{
BlockRange *r = palloc(sizeof(BlockRange));
r->start = start;
r->end = end;
iter->ranges = lappend(iter->ranges, r);
}
SeqScanIterator *
cryo_seqscan_iter_create(void)
{
SeqScanIterator *iter = palloc(sizeof(SeqScanIterator));
iter->ranges = NIL;
add_range(iter, 1, InvalidBlockNumber);
return iter;
}
BlockNumber
cryo_seqscan_iter_next(SeqScanIterator *iter)
{
BlockRange *r;
BlockNumber res;
Assert(iter->ranges);
Assert(list_length(iter->ranges) > 0);
/* take the first element of the first range */
r = linitial(iter->ranges);
res = r->start++;
/*
* Remove first block from iterator
*
* XXX probably let the client code to call cryo_seqscan_iter_exclude()
* explicitly
*/
if (r->start > r->end)
list_delete_ptr(iter->ranges, r);
return res;
}
void
cryo_seqscan_iter_exclude(SeqScanIterator *iter, BlockNumber block, bool miss_ok)
{
ListCell *lc;
int pos = 0;
if (!iter)
return;
foreach (lc, iter->ranges)
{
BlockRange *r = lfirst(lc);
if (block >= r->start && block <= r->end)
{
if (block == r->start)
r->start++;
else if (block == r->end)
r->end--;
else
{
/* split range */
BlockRange *new_r = palloc(sizeof(BlockRange));
new_r->start = block + 1;
new_r->end = r->end;
r->end = block - 1;
#if PG_VERSION_NUM < 130000
lappend_cell(iter->ranges, lc, new_r);
#else
list_insert_nth(iter->ranges, pos + 1, new_r);
#endif
return;
}
if (r->start > r->end)
list_delete_ptr(iter->ranges, r);
return;
}
pos++;
}
if (!miss_ok)
elog(ERROR,
"pg_cryogen: iternal error; block %u is not the part of seqscan iterator",
block);
}
#endif /* SCAN_ITERATOR_H */