目的

fio 本身支持测试多种引擎,为了提高准确度,其自身线程开销就要求比较小,线程模型就要求比较合适。
为此,有必要了解FIO内部io 请求提交和完成的流程。

异步提交过程

fio.c main()

51 fio_time_init(); 1508 /*
52 1509 * Entry point for the thread based jobs. The process based jobs end up
53 if (nr_clients) { 1510 * here as well, after a little setup.
54 set_genesis_time(); 1511 */
55 1512 static void *thread_main(void *data)
56 if (fio_start_all_clients()) 1513 {
57 goto done_key; 1514 struct fork_data *fd = data;
58 ret = fio_handle_clients(&fio_client_ops); 1515 unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, };
59 } else 1516 struct thread_data *td = fd->td;
60 ret = fio_backend(NULL);

重点在fio_backend()函数里面,主要内容如下:

int fio_backend(struct sk_out *sk_out)
{
struct thread_data *td;
int i;

if (exec_profile) {
if (load_profile(exec_profile))
return 1;
free(exec_profile);
exec_profile = NULL;
}
if (!thread_number)
return 0;

if (write_bw_log) {
struct log_params p = {
.log_type = IO_LOG_TYPE_BW,
};

setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
}

startup_sem = fio_sem_init(FIO_SEM_LOCKED);
if (startup_sem == NULL)
return 1;

set_genesis_time();
stat_init();
helper_thread_create(startup_sem, sk_out);

cgroup_list = smalloc(sizeof(*cgroup_list));
if (cgroup_list)
INIT_FLIST_HEAD(cgroup_list);

run_threads(sk_out);

helper_thread_exit();
......

上面主要的函数是 run_threads(),它会建立主要的IO线程:

建立IO 提交的线程 thread_main

td->rusage_sem = fio_sem_init(FIO_SEM_LOCKED);
td->update_rusage = 0;

/*
* Set state to created. Thread will transition
* to TD_INITIALIZED when it's done setting up.
*/
td_set_runstate(td, TD_CREATED);
map[this_jobs++] = td;
nr_started++;

fd = calloc(1, sizeof(*fd));
fd->td = td;
fd->sk_out = sk_out;

if (td->o.use_thread) {
int ret;

dprint(FD_PROCESS, "will pthread_create\n");
ret = pthread_create(&td->thread, NULL,
thread_main, fd);
if (ret) {
log_err("pthread_create: %s\n",
strerror(ret));
free(fd);
nr_started--;
break;
}

thread_main里和IO相关的核心函数是:do_io

while (keep_running(td)) {
uint64_t verify_bytes;

fio_gettime(&td->start, NULL);
memcpy(&td->ts_cache, &td->start, sizeof(td->start));

if (clear_state) {
clear_io_state(td, 0);

if (o->unlink_each_loop && unlink_all_files(td))
break;
}

prune_io_piece_log(td);

if (td->o.verify_only && td_write(td))
verify_bytes = do_dry_run(td);
else {
do_io(td, bytes_done);

if (!ddir_rw_sum(bytes_done)) {
fio_mark_td_terminate(td);
verify_bytes = 0;
} else {
verify_bytes = bytes_done[DDIR_WRITE] +
bytes_done[DDIR_TRIM];
}
}

分析do_io函数可以看到IO 提交和收割的过程:


/* * Main IO worker function. It retrieves io_u's to process and queues * and reaps them, checking for rate and errors along the way. * * Returns number of bytes written and trimmed. */ static void do_io(struct thread_data *td, uint64_t *bytes_done) { unsigned int i; int ret = 0; uint64_t total_bytes, bytes_issued = 0; for (i = 0; i < DDIR_RWDIR_CNT; i++) bytes_done[i] = td->bytes_done[i]; if (in_ramp_time(td)) td_set_runstate(td, TD_RAMP); else td_set_runstate(td, TD_RUNNING); lat_target_init(td); total_bytes = td->o.size; /* * Allow random overwrite workloads to write up to io_size * before starting verification phase as 'size' doesn't apply. */ if (td_write(td) && td_random(td) && td->o.norandommap) total_bytes = max(total_bytes, (uint64_t) td->o.io_size); /*

重点关注下面的提交请求的函数:


ret = io_u_submit(td, io_u); if (should_check_rate(td)) td->rate_next_io_time[ddir] = usec_for_io(td, ddir); if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 0, &comp_time)) break; /* * See if we need to complete some commands. Note that * we can get BUSY even without IO queued, if the * system is resource starved. */ reap: full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); if (full || io_in_polling(td)) ret = wait_for_completions(td, &comp_time);

其中上面io_u_submit()会调用实际存储引擎注册的io_submit 函数;

wait_for_completions 会调用后端实际存储引擎注册的getevents 函数。(wait_for_completions < –io_u_queued_complete() 循环
< --ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete_max, tvp);)
io_queue_event()判断当前是否还有没有处理完的io events;

特别要关注的是何时收割event的逻辑:
当可用来提交io 请求的空闲槽位都占满了,或者前端有正在执行的polling 操作的时候,就调用注册的存储引擎的get_events函数。