-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmerge-memop.cpp
173 lines (151 loc) · 5.29 KB
/
merge-memop.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#include "mem.h"
#include "log.h"
#include <cstdio>
#include <sys/stat.h>
#include <cassert>
#include <cstring>
#include <utility>
#include <queue>
#include <vector>
#include <sstream>
using namespace std;
// #define DEBUG
#include "debug.h"
struct QueEnt {
tid_t tid;
struct wait_memop wop;
QueEnt(tid_t t, const struct wait_memop &w) : tid(t), wop(w) {}
bool operator>(const QueEnt &rhs) const {
if (wop.objid == rhs.wop.objid) {
return wop.version > rhs.wop.version;
} else {
return wop.objid > rhs.wop.objid;
}
}
};
typedef priority_queue<QueEnt, vector<QueEnt>, greater<QueEnt> > LogQueue;
static inline void enqueue_next_waitmemop(LogQueue &pq, struct mapped_log &log, struct wait_memop &wop, tid_t tid) {
if (log.buf < log.end) {
memcpy(&wop, log.buf, sizeof(struct wait_memop));
log.buf += sizeof(struct wait_memop);
pq.push(QueEnt(tid, wop));
}
}
static void merge_memop(vector<struct mapped_log> &log, tid_t nthr) {
assert((int)log.size() == nthr);
LogQueue pq;
unsigned long total_size = 0;
for (int i = 0; i < nthr; ++i) {
// Only add file size if it's opened correctly
if (log[i].fd != -1) {
struct stat sb;
if (fstat(log[i].fd, &sb) == -1) {
perror("fstat in merge_memop");
exit(1);
}
total_size += sb.st_size;
}
}
if (total_size == 0) {
return;
}
assert(total_size % sizeof(struct wait_memop) == 0);
// we need to add a tid record to each log entry
int entrycount = total_size / sizeof(struct wait_memop);
struct replay_wait_memop *next_mwm = (struct replay_wait_memop *)create_mapped_file(LOGDIR"memop",
entrycount * sizeof(*next_mwm));
DPRINTF("created memop log\n");
// index buf contains index for an object's log and log entry count
int *indexbuf = (int *)create_mapped_file(LOGDIR"memop-index",
g_nobj * sizeof(int) * 2);
DPRINTF("created memop-index log\n");
struct wait_memop wop;
for (int i = 0; i < nthr; ++i) {
if (log[i].fd == -1)
continue;
enqueue_next_waitmemop(pq, log[i], wop, i);
DPRINTF("Init Queue add T%d %d %d %d\n", i, (int)wop.objid, (int)wop.version,
(int)wop.memop);
}
objid_t prev_id = -1;
int cnt = 0, prev_cnt = 0;;
while (! pq.empty()) {
QueEnt qe = pq.top();
pq.pop();
#ifdef DEBUG
static version_t prev_version = -1;
DPRINTF("T%d %d %ld %ld\n", qe.tid, qe.wop.objid, qe.wop.version, qe.wop.memop);
assert(qe.wop.objid >= prev_id);
if (qe.wop.objid != prev_id) {
prev_version = -1;
} else {
assert(qe.wop.version >= prev_version);
}
prev_version = qe.wop.version;
#endif
// Write object index if needed. Previous id has index written.
if (prev_id != qe.wop.objid) {
if (prev_id != -1) {
// Write out previous object's log entry count
*indexbuf = cnt - prev_cnt;
// XXX note here. The log contains entry with count as -1j
assert(*indexbuf > 0);
DPRINTF("obj %d index %d log entry count %d\n", prev_id, prev_cnt, *indexbuf);
indexbuf++;
}
// Write index as -1 for objid in the range of (previd + 1, curid - 1)
for (int i = prev_id + 1; i < qe.wop.objid; ++i) {
DPRINTF("index for obj %d is %d, empty log\n", i, -1);
*indexbuf++ = -1; // index
*indexbuf++ = 0; // size
}
// Write out current object's index
*indexbuf++ = cnt;
prev_cnt = cnt;
prev_id = qe.wop.objid;
}
next_mwm->version = qe.wop.version;
next_mwm->memop = qe.wop.memop;
next_mwm->tid = qe.tid;
next_mwm++;
// The following code dumps the object id in the log. But with object index,
// this is not needed. Keep it here because this is useful info for manual inspecting
// the log.
/*
memcpy(outbuf, &qe.wop, sizeof(struct wait_memop));
*(int *)(outbuf + sizeof(struct wait_memop)) = qe.tid;
outbuf += sizeof(struct wait_memop) + sizeof(int);
*/
enqueue_next_waitmemop(pq, log[qe.tid], wop, qe.tid);
cnt++;
}
// Last object's log size
*indexbuf++ = cnt - prev_cnt;
for (int i = prev_id + 1; i < g_nobj; ++i) {
*indexbuf++ = -1;
*indexbuf++ = 0;
}
DPRINTF("obj %d index %d log entry count %d\n", prev_id, prev_cnt, *indexbuf);
DPRINTF("total #wait_memop %d\n", cnt);
}
int main(int argc, char const *argv[]) {
if (argc != 3) {
printf("Usage: merge-memop <nobj> <nthr>\n");
exit(1);
}
istringstream nobjs(argv[1]);
nobjs >> g_nobj;
int nthr;
istringstream nthrs(argv[2]);
nthrs >> nthr;
vector<struct mapped_log> log;
struct mapped_log l;
for (int i = 0; i < nthr; ++i) {
// XXX Push the log structure into the vector even if open failed,
// because the index in the vector represents thread id.
open_mapped_log("sorted-memop", i, &l);
log.push_back(l);
}
merge_memop(log, (tid_t)nthr);
return 0;
}