-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathuring_cat.c
367 lines (313 loc) · 10.1 KB
/
uring_cat.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <linux/fs.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
/* 如果你的编译失败是因为缺少下面的头文件,
* 您的内核可能太旧,无法支持 io_uring。
* */
#include <linux/io_uring.h>
#define QUEUE_DEPTH 1
#define BLOCK_SZ 1024
/* This is x86 specific */
#define read_barrier() __asm__ __volatile__("":::"memory")
#define write_barrier() __asm__ __volatile__("":::"memory")
struct app_io_sq_ring {
unsigned *head;
unsigned *tail;
unsigned *ring_mask;
unsigned *ring_entries;
unsigned *flags;
unsigned *array;
};
struct app_io_cq_ring {
unsigned *head;
unsigned *tail;
unsigned *ring_mask;
unsigned *ring_entries;
struct io_uring_cqe *cqes;
};
struct submitter {
int ring_fd;
struct app_io_sq_ring sq_ring;
struct io_uring_sqe *sqes;
struct app_io_cq_ring cq_ring;
};
struct file_info {
off_t file_sz;
struct iovec iovecs[]; /* Referred by readv/writev */
};
/*
* 这段代码是在没有io_uring相关系统调用的年代写的
* 标准 C 库的一部分。所以,我们推出自己的系统调用包装器.
* */
int io_uring_setup(unsigned entries, struct io_uring_params *p)
{
return (int) syscall(__NR_io_uring_setup, entries, p);
}
int io_uring_enter(int ring_fd, unsigned int to_submit,
unsigned int min_complete, unsigned int flags)
{
return (int) syscall(__NR_io_uring_enter, ring_fd, to_submit, min_complete,
flags, NULL, 0);
}
/*
* 返回传入其打开文件描述符的文件的大小。
* 正确处理常规文件和块设备。
* */
off_t get_file_size(int fd) {
struct stat st;
if(fstat(fd, &st) < 0) {
perror("fstat");
return -1;
}
if (S_ISBLK(st.st_mode)) {
unsigned long long bytes;
if (ioctl(fd, BLKGETSIZE64, &bytes) != 0) {
perror("ioctl");
return -1;
}
return bytes;
} else if (S_ISREG(st.st_mode))
return st.st_size;
return -1;
}
/*
* io_uring 需要很多设置,看起来很麻烦
* 所以 io_uring 的作者创建了 liburing,比较好用。
* 但是,您应该花时间了解此代码。
* */
int app_setup_uring(struct submitter *s) {
struct app_io_sq_ring *sring = &s->sq_ring;
struct app_io_cq_ring *cring = &s->cq_ring;
struct io_uring_params p;
void *sq_ptr, *cq_ptr;
/*
* 我们需要将 io_uring_params 结构体传递给 io_uring_setup() 去置0初始化。
* 我们可以设置任何想要的标记。
* */
memset(&p, 0, sizeof(p));
s->ring_fd = io_uring_setup(QUEUE_DEPTH, &p);
if (s->ring_fd < 0) {
perror("io_uring_setup");
return 1;
}
/*
* io_uring 通信通过 2 个共享的内核用户空间环形缓冲区进行,
* 可以在内核中通过 mmap() 调用映射。
* 虽然完成队列是直接映射进去的, 但提交队列里面有个数组,我们也把它映射进* 去
* */
int sring_sz = p.sq_off.array + p.sq_entries * sizeof(unsigned);
int cring_sz = p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe);
/* 在内核版本 5.4 及以上,
* 可以使用单个 mmap() 调用同时完成两个缓冲区的映射。
* 关于内核版本,可以检查 io_uring_params 的字段,并使用 mask 获取。
* 如果 IORING_FEAT_SINGLE_MMAP 已设置,我们可以不用第二个 mmap() 去映* 射。
* */
if (p.features & IORING_FEAT_SINGLE_MMAP) {
if (cring_sz > sring_sz) {
sring_sz = cring_sz;
}
cring_sz = sring_sz;
}
/* 在提交和完成队列环形缓冲区中映射。
* 不过,较旧的内核仅映射到提交队列中。
* */
sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
s->ring_fd, IORING_OFF_SQ_RING);
if (sq_ptr == MAP_FAILED) {
perror("mmap");
return 1;
}
if (p.features & IORING_FEAT_SINGLE_MMAP) {
cq_ptr = sq_ptr;
} else {
/* 分别映射到旧内核中的完成队列环形缓冲区 */
cq_ptr = mmap(0, cring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
s->ring_fd, IORING_OFF_CQ_RING);
if (cq_ptr == MAP_FAILED) {
perror("mmap");
return 1;
}
}
/* 将有用的字段保存在全局 app_io_sq_ring 结构中以备后用
* 简单的一个参考 */
sring->head = sq_ptr + p.sq_off.head;
sring->tail = sq_ptr + p.sq_off.tail;
sring->ring_mask = sq_ptr + p.sq_off.ring_mask;
sring->ring_entries = sq_ptr + p.sq_off.ring_entries;
sring->flags = sq_ptr + p.sq_off.flags;
sring->array = sq_ptr + p.sq_off.array;
/* 映射到提交队列条目数组 */
s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
s->ring_fd, IORING_OFF_SQES);
if (s->sqes == MAP_FAILED) {
perror("mmap");
return 1;
}
/* 将有用的字段保存在全局 app_io_cq_ring 结构中以备后用
* 简单参考 */
cring->head = cq_ptr + p.cq_off.head;
cring->tail = cq_ptr + p.cq_off.tail;
cring->ring_mask = cq_ptr + p.cq_off.ring_mask;
cring->ring_entries = cq_ptr + p.cq_off.ring_entries;
cring->cqes = cq_ptr + p.cq_off.cqes;
return 0;
}
/*
* 输出长度为 len 的字符串到 stdout
* 我们在这里使用缓冲输出以提高效率,
* 因为我们需要逐个字符地输出。
* */
void output_to_console(char *buf, int len) {
while (len--) {
fputc(*buf++, stdout);
}
}
/*
* 从完成队列中读取。
* 在这个函数中,我们从完成队列中读取完成事件,
* 得到包含文件数据并将其打印到控制台的数据缓冲区。
* */
void read_from_cq(struct submitter *s) {
struct file_info *fi;
struct app_io_cq_ring *cring = &s->cq_ring;
struct io_uring_cqe *cqe;
unsigned head, reaped = 0;
head = *cring->head;
do {
read_barrier();
/*
* 请记住,这是一个环形缓冲区。如果头==尾,则表示
* 缓冲区为空。
* */
if (head == *cring->tail)
break;
/* 获取条目 */
cqe = &cring->cqes[head & *s->cq_ring.ring_mask];
fi = (struct file_info*) cqe->user_data;
if (cqe->res < 0)
fprintf(stderr, "Error: %s\n", strerror(abs(cqe->res)));
int blocks = (int) fi->file_sz / BLOCK_SZ;
if (fi->file_sz % BLOCK_SZ) blocks++;
for (int i = 0; i < blocks; i++)
output_to_console(fi->iovecs[i].iov_base, fi->iovecs[i].iov_len);
head++;
} while (1);
*cring->head = head;
write_barrier();
}
/*
* 提交到提交队列。
* 在这个函数中,我们将请求提交到提交队列。你可以提交
* 我们的将是 readv() 请求,通过 IORING_OP_READV 指定。
*
* */
int submit_to_sq(char *file_path, struct submitter *s) {
struct file_info *fi;
int file_fd = open(file_path, O_RDONLY);
if (file_fd < 0 ) {
perror("open");
return 1;
}
struct app_io_sq_ring *sring = &s->sq_ring;
unsigned index = 0, current_block = 0, tail = 0, next_tail = 0;
off_t file_sz = get_file_size(file_fd);
if (file_sz < 0)
return 1;
off_t bytes_remaining = file_sz;
int blocks = (int) file_sz / BLOCK_SZ;
if (file_sz % BLOCK_SZ) blocks++;
fi = malloc(sizeof(*fi) + sizeof(struct iovec) * blocks);
if (!fi) {
fprintf(stderr, "Unable to allocate memory\n");
return 1;
}
fi->file_sz = file_sz;
/*
* 对于我们需要读取的文件的每个块,我们分配一个iovec struct
* 索引到 iovecs 数组中。这个数组作为一部分提交传入。
* 如果你不明白这一点,那么你需要去
* 了解一下 readv() 和 writev() 系统调用的工作方式。
* */
while (bytes_remaining) {
off_t bytes_to_read = bytes_remaining;
if (bytes_to_read > BLOCK_SZ)
bytes_to_read = BLOCK_SZ;
fi->iovecs[current_block].iov_len = bytes_to_read;
void *buf;
if( posix_memalign(&buf, BLOCK_SZ, BLOCK_SZ)) {
perror("posix_memalign");
return 1;
}
fi->iovecs[current_block].iov_base = buf;
current_block++;
bytes_remaining -= bytes_to_read;
}
/* 将我们的提交队列条目添加到 SQE 环形缓冲区的尾部 */
next_tail = tail = *sring->tail;
next_tail++;
read_barrier();
index = tail & *s->sq_ring.ring_mask;
struct io_uring_sqe *sqe = &s->sqes[index];
sqe->fd = file_fd;
sqe->flags = 0;
sqe->opcode = IORING_OP_READV;
sqe->addr = (unsigned long) fi->iovecs;
sqe->len = blocks;
sqe->off = 0;
sqe->user_data = (unsigned long long) fi;
sring->array[index] = index;
tail = next_tail;
/* 更新尾部以便内核可以看到它 */
if(*sring->tail != tail) {
*sring->tail = tail;
write_barrier();
}
/*
* 告诉内核我们已经用 io_uring_enter() 提交了事件。
* 们还传入了 IOURING_ENTER_GETEVENTS 标志,这会导致
* io_uring_enter() 调用等待 min_complete 事件完成后返回。
* */
int ret = io_uring_enter(s->ring_fd, 1,1,
IORING_ENTER_GETEVENTS);
if(ret < 0) {
perror("io_uring_enter");
return 1;
}
return 0;
}
int main(int argc, char *argv[]) {
struct submitter *s;
if (argc < 2) {
fprintf(stderr, "Usage: %s <filename>\n", argv[0]);
return 1;
}
s = malloc(sizeof(*s));
if (!s) {
perror("malloc");
return 1;
}
memset(s, 0, sizeof(*s));
if(app_setup_uring(s)) {
fprintf(stderr, "Unable to setup uring!\n");
return 1;
}
for (int i = 1; i < argc; i++) {
if(submit_to_sq(argv[i], s)) {
fprintf(stderr, "Error reading file\n");
return 1;
}
read_from_cq(s);
}
return 0;
}