Commit f04356cb authored by J. R. Okajima's avatar J. R. Okajima
Browse files

aufs: workqueue



Aufs uses the workqueue both synchronously and asynchronously.
For sync-use-case, aufs uses its own specific wkq since doesn't want to
be disturbed by other tasks on the system. For async-use-case, aufs uses
the system global workqueue.
Aufs has to prevent itself to being unmounted during the async-task is
queued.

See also the document in this commit.
Signed-off-by: default avatarJ. R. Okajima <hooanon05g@gmail.com>
parent 2460d8e6
......@@ -37,3 +37,31 @@ among multiple writable branches, XIB files, pseudo-links and kobject.
See below in detail.
About the policies which supports copy-down a directory, see
wbr_policy.txt too.
Workqueue
----------------------------------------------------------------------
Aufs sometimes requires privilege access to a branch. For instance,
in copy-up/down operation. When a user process is going to make changes
to a file which exists in the lower readonly branch only, and the mode
of one of ancestor directories may not be writable by a user
process. Here aufs copy-up the file with its ancestors and they may
require privilege to set its owner/group/mode/etc.
This is a typical case of a application character of aufs (see
Introduction).
Aufs uses workqueue synchronously for this case. It creates its own
workqueue. The workqueue is a kernel thread and has privilege. Aufs
passes the request to call mkdir or write (for example), and wait for
its completion. This approach solves a problem of a signal handler
simply.
If aufs didn't adopt the workqueue and changed the privilege of the
process, then the process may receive the unexpected SIGXFSZ or other
signals.
Also aufs uses the system global workqueue ("events" kernel thread) too
for asynchronous tasks, such like handling inotify/fsnotify, re-creating a
whiteout base and etc. This is unrelated to a privilege.
Most of aufs operation tries acquiring a rw_semaphore for aufs
superblock at the beginning, at the same time waits for the completion
of all queued asynchronous tasks.
......@@ -11,7 +11,7 @@ ccflags-y += -include ${srctree}/include/uapi/linux/aufs_type.h
obj-$(CONFIG_AUFS_FS) += aufs.o
aufs-y := module.o sbinfo.o super.o branch.o opts.o \
vfsub.o dcsub.o \
wkq.o vfsub.o dcsub.o \
cpup.o \
dinfo.o \
iinfo.o inode.o
......
......@@ -34,6 +34,7 @@
#include "rwsem.h"
#include "super.h"
#include "vfsub.h"
#include "wkq.h"
#endif /* __KERNEL__ */
#endif /* __AUFS_H__ */
......@@ -203,8 +203,8 @@ void au_dpri_sb(struct super_block *sb)
sbinfo = au_sbi(sb);
if (!sbinfo)
return;
dpri("gen %u, kobj %d\n",
sbinfo->si_generation,
dpri("nw %d, gen %u, kobj %d\n",
atomic_read(&sbinfo->si_nowait.nw_len), sbinfo->si_generation,
kref_read(&sbinfo->si_kobj.kref));
for (bindex = 0; bindex <= sbinfo->si_bbot; bindex++)
do_pri_br(bindex, sbinfo->si_branch[0 + bindex]);
......@@ -265,3 +265,15 @@ void au_dbg_verify_gen(struct dentry *parent, unsigned int sigen)
}
au_dpages_free(&dpages);
}
void au_dbg_verify_kthread(void)
{
if (au_wkq_test()) {
/* au_dbg_blocked(); re-commit later */
/*
* It may be recursive, but udba=notify between two aufs mounts,
* where a single ro branch is shared, is not a problem.
*/
/* WARN_ON(1); */
}
}
......@@ -83,6 +83,7 @@ void au_dpri_sb(struct super_block *sb);
#define au_dbg_verify_dinode(d) __au_dbg_verify_dinode(d, __func__, __LINE__)
void __au_dbg_verify_dinode(struct dentry *dentry, const char *func, int line);
void au_dbg_verify_gen(struct dentry *parent, unsigned int sigen);
void au_dbg_verify_kthread(void);
#define AuDbgInode(i) do { \
mutex_lock(&au_dbg_mtx); \
......@@ -114,6 +115,7 @@ void au_dbg_verify_gen(struct dentry *parent, unsigned int sigen);
#else
AuStubVoid(au_dbg_verify_dinode, struct dentry *dentry)
AuStubVoid(au_dbg_verify_gen, struct dentry *parent, unsigned int sigen)
AuStubVoid(au_dbg_verify_kthread, void)
#define AuDbgInode(i) do {} while (0)
#define AuDbgDAlias(i) do {} while (0)
......
......@@ -120,13 +120,19 @@ static int __init aufs_init(void)
memset(au_cache, 0, sizeof(au_cache));
err = au_cache_init();
err = au_wkq_init();
if (unlikely(err))
goto out;
err = au_cache_init();
if (unlikely(err))
goto out_wkq;
/* since we define pr_fmt, call printk directly */
printk(KERN_INFO AUFS_NAME " " AUFS_VERSION "\n");
goto out; /* success */
out_wkq:
au_wkq_fin();
out:
return err;
}
......
......@@ -18,6 +18,7 @@ void au_si_free(struct kobject *kobj)
struct au_sbinfo *sbinfo;
sbinfo = container_of(kobj, struct au_sbinfo, si_kobj);
AuDebugOn(atomic_read(&sbinfo->si_nowait.nw_len));
au_rw_write_lock(&sbinfo->si_rwsem);
au_br_free(sbinfo);
......@@ -44,6 +45,7 @@ int au_si_alloc(struct super_block *sb)
if (unlikely(!sbinfo->si_branch))
goto out_sbinfo;
au_nwt_init(&sbinfo->si_nowait);
au_rw_init_wlock(&sbinfo->si_rwsem);
sbinfo->si_bbot = -1;
......@@ -116,3 +118,26 @@ aufs_bindex_t au_new_br_id(struct super_block *sb)
return -1;
}
/* ---------------------------------------------------------------------- */
/* it is ok that new 'nwt' tasks are appended while we are sleeping */
int si_read_lock(struct super_block *sb, int flags)
{
if (au_ftest_lock(flags, FLUSH))
au_nwt_flush(&au_sbi(sb)->si_nowait);
si_noflush_read_lock(sb);
return 0; /* re-commit later */
}
int si_write_lock(struct super_block *sb, int flags)
{
if (au_ftest_lock(flags, FLUSH))
au_nwt_flush(&au_sbi(sb)->si_nowait);
si_noflush_write_lock(sb);
return 0; /* re-commit later */
}
......@@ -15,9 +15,13 @@
#include <linux/fs.h>
#include <linux/kobject.h>
#include "rwsem.h"
#include "wkq.h"
struct au_branch;
struct au_sbinfo {
/* nowait tasks in the system-wide workqueue */
struct au_nowait_tasks si_nowait;
/*
* tried sb->s_umount, but failed due to the dependency between i_mutex.
* rwsem for au_sbinfo is necessary.
......@@ -49,10 +53,11 @@ struct au_sbinfo {
/* ---------------------------------------------------------------------- */
/* flags for __si_read_lock()/aufs_read_lock()/di_read_lock() */
/* flags for si_read_lock()/aufs_read_lock()/di_read_lock() */
#define AuLock_DW 1 /* write-lock dentry */
#define AuLock_IR (1 << 1) /* read-lock inode */
#define AuLock_IW (1 << 2) /* write-lock inode */
#define AuLock_FLUSH (1 << 3) /* wait for 'nowait' tasks */
#define au_ftest_lock(flags, name) ((flags) & AuLock_##name)
#define au_fset_lock(flags, name) \
do { (flags) |= AuLock_##name; } while (0)
......@@ -72,6 +77,9 @@ int au_sbr_realloc(struct au_sbinfo *sbinfo, int nbr, int may_shrink);
unsigned int au_sigen_inc(struct super_block *sb);
aufs_bindex_t au_new_br_id(struct super_block *sb);
int si_read_lock(struct super_block *sb, int flags);
int si_write_lock(struct super_block *sb, int flags);
/* ---------------------------------------------------------------------- */
static inline struct au_sbinfo *au_sbi(struct super_block *sb)
......@@ -101,6 +109,65 @@ static inline struct au_sbinfo *au_sbi(struct super_block *sb)
#define SiMustAnyLock(sb) AuRwMustAnyLock(&au_sbi(sb)->si_rwsem)
#define SiMustWriteLock(sb) AuRwMustWriteLock(&au_sbi(sb)->si_rwsem)
static inline void si_noflush_read_lock(struct super_block *sb)
{
__si_read_lock(sb);
/* re-commit later */
}
static inline int si_noflush_read_trylock(struct super_block *sb)
{
return __si_read_trylock(sb); /* re-commit later */
}
static inline void si_noflush_write_lock(struct super_block *sb)
{
__si_write_lock(sb);
/* re-commit later */
}
static inline int si_noflush_write_trylock(struct super_block *sb)
{
return __si_write_trylock(sb); /* re-commit later */
}
#if 0 /* reserved */
static inline int si_read_trylock(struct super_block *sb, int flags)
{
if (au_ftest_lock(flags, FLUSH))
au_nwt_flush(&au_sbi(sb)->si_nowait);
return si_noflush_read_trylock(sb);
}
#endif
static inline void si_read_unlock(struct super_block *sb)
{
/* re-commit later */
__si_read_unlock(sb);
}
#if 0 /* reserved */
static inline int si_write_trylock(struct super_block *sb, int flags)
{
if (au_ftest_lock(flags, FLUSH))
au_nwt_flush(&au_sbi(sb)->si_nowait);
return si_noflush_write_trylock(sb);
}
#endif
static inline void si_write_unlock(struct super_block *sb)
{
/* re-commit later */
__si_write_unlock(sb);
}
#if 0 /* reserved */
static inline void si_downgrade_lock(struct super_block *sb)
{
__si_downgrade_lock(sb);
}
#endif
/* ---------------------------------------------------------------------- */
static inline aufs_bindex_t au_sbbot(struct super_block *sb)
......
// SPDX-License-Identifier: GPL-2.0
/*
* Copyright (C) 2005-2019 Junjiro R. Okajima
*/
/*
* workqueue for asynchronous/super-io operations
* todo: try new credential scheme
*/
#include <linux/module.h>
#include <linux/sched/signal.h>
#include "aufs.h"
/* internal workqueue named AUFS_WKQ_NAME */
static struct workqueue_struct *au_wkq;
struct au_wkinfo {
struct work_struct wk;
struct kobject *kobj;
unsigned int flags; /* see wkq.h */
au_wkq_func_t func;
void *args;
#ifdef CONFIG_LOCKDEP
int dont_check;
struct held_lock **hlock;
#endif
struct completion *comp;
};
/* ---------------------------------------------------------------------- */
/*
* Aufs passes some operations to the workqueue such as the internal copyup.
* This scheme looks rather unnatural for LOCKDEP debugging feature, since the
* job run by workqueue depends upon the locks acquired in the other task.
* Delegating a small operation to the workqueue, aufs passes its lockdep
* information too. And the job in the workqueue restores the info in order to
* pretend as if it acquired those locks. This is just to make LOCKDEP work
* correctly and expectedly.
*/
#ifndef CONFIG_LOCKDEP
AuStubInt0(au_wkq_lockdep_alloc, struct au_wkinfo *wkinfo);
AuStubVoid(au_wkq_lockdep_free, struct au_wkinfo *wkinfo);
AuStubVoid(au_wkq_lockdep_pre, struct au_wkinfo *wkinfo);
AuStubVoid(au_wkq_lockdep_post, struct au_wkinfo *wkinfo);
AuStubVoid(au_wkq_lockdep_init, struct au_wkinfo *wkinfo);
#else
static void au_wkq_lockdep_init(struct au_wkinfo *wkinfo)
{
wkinfo->hlock = NULL;
wkinfo->dont_check = 0;
}
/*
* 1: matched
* 0: unmatched
*/
static int au_wkq_lockdep_test(struct lock_class_key *key, const char *name)
{
static DEFINE_SPINLOCK(spin);
static struct {
char *name;
struct lock_class_key *key;
} a[] = {
{ .name = "&sbinfo->si_rwsem" },
{ .name = "&finfo->fi_rwsem" },
{ .name = "&dinfo->di_rwsem" },
{ .name = "&iinfo->ii_rwsem" }
};
static int set;
int i;
/* lockless read from 'set.' see below */
if (set == ARRAY_SIZE(a)) {
for (i = 0; i < ARRAY_SIZE(a); i++)
if (a[i].key == key)
goto match;
goto unmatch;
}
spin_lock(&spin);
if (set)
for (i = 0; i < ARRAY_SIZE(a); i++)
if (a[i].key == key) {
spin_unlock(&spin);
goto match;
}
for (i = 0; i < ARRAY_SIZE(a); i++) {
if (a[i].key) {
if (unlikely(a[i].key == key)) { /* rare but possible */
spin_unlock(&spin);
goto match;
} else
continue;
}
if (strstr(a[i].name, name)) {
/*
* the order of these three lines is important for the
* lockless read above.
*/
a[i].key = key;
spin_unlock(&spin);
set++;
/* AuDbg("%d, %s\n", set, name); */
goto match;
}
}
spin_unlock(&spin);
goto unmatch;
match:
return 1;
unmatch:
return 0;
}
static int au_wkq_lockdep_alloc(struct au_wkinfo *wkinfo)
{
int err, n;
struct task_struct *curr;
struct held_lock **hl, *held_locks, *p;
err = 0;
curr = current;
wkinfo->dont_check = lockdep_recursing(curr);
if (wkinfo->dont_check)
goto out;
n = curr->lockdep_depth;
if (!n)
goto out;
err = -ENOMEM;
wkinfo->hlock = kmalloc_array(n + 1, sizeof(*wkinfo->hlock), GFP_NOFS);
if (unlikely(!wkinfo->hlock))
goto out;
err = 0;
#if 0
if (0 && au_debug_test()) /* left for debugging */
lockdep_print_held_locks(curr);
#endif
held_locks = curr->held_locks;
hl = wkinfo->hlock;
while (n--) {
p = held_locks++;
if (au_wkq_lockdep_test(p->instance->key, p->instance->name))
*hl++ = p;
}
*hl = NULL;
out:
return err;
}
static void au_wkq_lockdep_free(struct au_wkinfo *wkinfo)
{
au_kfree_try_rcu(wkinfo->hlock);
}
static void au_wkq_lockdep_pre(struct au_wkinfo *wkinfo)
{
struct held_lock *p, **hl = wkinfo->hlock;
int subclass;
if (wkinfo->dont_check)
lockdep_off();
if (!hl)
return;
while ((p = *hl++)) { /* assignment */
subclass = lockdep_hlock_class(p)->subclass;
/* AuDbg("%s, %d\n", p->instance->name, subclass); */
if (p->read)
rwsem_acquire_read(p->instance, subclass, 0,
/*p->acquire_ip*/_RET_IP_);
else
rwsem_acquire(p->instance, subclass, 0,
/*p->acquire_ip*/_RET_IP_);
}
}
static void au_wkq_lockdep_post(struct au_wkinfo *wkinfo)
{
struct held_lock *p, **hl = wkinfo->hlock;
if (wkinfo->dont_check)
lockdep_on();
if (!hl)
return;
while ((p = *hl++)) /* assignment */
rwsem_release(p->instance, 0, /*p->acquire_ip*/_RET_IP_);
}
#endif
static void wkq_func(struct work_struct *wk)
{
struct au_wkinfo *wkinfo = container_of(wk, struct au_wkinfo, wk);
AuDebugOn(!uid_eq(current_fsuid(), GLOBAL_ROOT_UID));
AuDebugOn(rlimit(RLIMIT_FSIZE) != RLIM_INFINITY);
au_wkq_lockdep_pre(wkinfo);
wkinfo->func(wkinfo->args);
au_wkq_lockdep_post(wkinfo);
if (au_ftest_wkq(wkinfo->flags, WAIT))
complete(wkinfo->comp);
else {
kobject_put(wkinfo->kobj);
module_put(THIS_MODULE); /* todo: ?? */
au_kfree_rcu(wkinfo);
}
}
/*
* Since struct completion is large, try allocating it dynamically.
*/
#if 1 /* defined(CONFIG_4KSTACKS) || defined(AuTest4KSTACKS) */
#define AuWkqCompDeclare(name) struct completion *comp = NULL
static int au_wkq_comp_alloc(struct au_wkinfo *wkinfo, struct completion **comp)
{
*comp = kmalloc(sizeof(**comp), GFP_NOFS);
if (*comp) {
init_completion(*comp);
wkinfo->comp = *comp;
return 0;
}
return -ENOMEM;
}
static void au_wkq_comp_free(struct completion *comp)
{
au_kfree_rcu(comp);
}
#else
/* no braces */
#define AuWkqCompDeclare(name) \
DECLARE_COMPLETION_ONSTACK(_ ## name); \
struct completion *comp = &_ ## name
static int au_wkq_comp_alloc(struct au_wkinfo *wkinfo, struct completion **comp)
{
wkinfo->comp = *comp;
return 0;
}
static void au_wkq_comp_free(struct completion *comp __maybe_unused)
{
/* empty */
}
#endif /* 4KSTACKS */
static void au_wkq_run(struct au_wkinfo *wkinfo)
{
au_dbg_verify_kthread();
if (au_ftest_wkq(wkinfo->flags, WAIT)) {
INIT_WORK_ONSTACK(&wkinfo->wk, wkq_func);
queue_work(au_wkq, &wkinfo->wk);
} else {
INIT_WORK(&wkinfo->wk, wkq_func);
schedule_work(&wkinfo->wk);
}
}
/*
* Be careful. It is easy to make deadlock happen.
* processA: lock, wkq and wait
* processB: wkq and wait, lock in wkq
* --> deadlock
*/
int au_wkq_do_wait(unsigned int flags, au_wkq_func_t func, void *args)
{
int err;
AuWkqCompDeclare(comp);
struct au_wkinfo wkinfo = {
.flags = flags,
.func = func,
.args = args
};
err = au_wkq_comp_alloc(&wkinfo, &comp);
if (unlikely(err))
goto out;
err = au_wkq_lockdep_alloc(&wkinfo);
if (unlikely(err))
goto out_comp;
if (!err) {
au_wkq_run(&wkinfo);
/* no timeout, no interrupt */
wait_for_completion(wkinfo.comp);
}
au_wkq_lockdep_free(&wkinfo);
out_comp:
au_wkq_comp_free(comp);
out:
destroy_work_on_stack(&wkinfo.wk);
return err;
}
/*
* Note: dget/dput() in func for aufs dentries are not supported. It will be a
* problem in a concurrent umounting.
*/
int au_wkq_nowait(au_wkq_func_t func, void *args, struct super_block *sb,
unsigned int flags)
{
int err;
struct au_wkinfo *wkinfo;
atomic_inc(&au_sbi(sb)->si_nowait.nw_len);
/*
* wkq_func() must free this wkinfo.
* it highly depends upon the implementation of workqueue.
*/
err = 0;
wkinfo = kmalloc(sizeof(*wkinfo), GFP_NOFS);
if (wkinfo) {
wkinfo->kobj = &au_sbi(sb)->si_kobj;
wkinfo->flags = flags & ~AuWkq_WAIT;
wkinfo->func = func;
wkinfo->args = args;
wkinfo->comp = NULL;
au_wkq_lockdep_init(wkinfo);
kobject_get(wkinfo->kobj);
__module_get(THIS_MODULE); /* todo: ?? */
au_wkq_run(wkinfo);
} else {
err = -ENOMEM;
au_nwt_done(&au_sbi(sb)->si_nowait);
}
return err;
}
/* ---------------------------------------------------------------------- */
void au_nwt_init(struct au_nowait_tasks *nwt)
{
atomic_set(&nwt->nw_len, 0);
/* smp_mb(); */ /* atomic_set */
init_waitqueue_head(&nwt->nw_wq);
}
void au_wkq_fin(void)
{
destroy_workqueue(au_wkq);
}
int __init au_wkq_init(void)
{
int err;
err = 0;
au_wkq = alloc_workqueue(AUFS_WKQ_NAME, 0, WQ_DFL_ACTIVE);
if (IS_ERR(au_wkq))
err = PTR_ERR(au_wkq);
else if (!au_wkq)
err = -ENOMEM;
return err;
}
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (C) 2005-2019 Junjiro R. Okajima
*/
/*
* workqueue for asynchronous/super-io operations
* todo: try new credentials management scheme
*/
#ifndef __AUFS_WKQ_H__
#define __AUFS_WKQ_H__
#ifdef __KERNEL__
#include <linux/wait.h>
struct super_block;
/* ---------------------------------------------------------------------- */
/*
* in the next operation, wait for the 'nowait' tasks in system-wide workqueue
*/
struct au_nowait_tasks {
atomic_t nw_len;
wait_queue_head_t nw_wq;
};
/* ---------------------------------------------------------------------- */
typedef void (*au_wkq_func_t)(void *args);
/* wkq flags */
#define AuWkq_WAIT 1
#define au_ftest_wkq(flags, name) ((flags) & AuWkq_##name)
#define au_fset_wkq(flags, name) \
do { (flags) |= AuWkq_##name; } while (0)
#define au_fclr_wkq(flags, name) \
do { (flags) &= ~AuWkq_##name; } while (0)
/* wkq.c */
int au_wkq_do_wait(unsigned int flags, au_wkq_func_t func, void *args);
int au_wkq_nowait(au_wkq_func_t func, void *args, struct super_block *sb,
unsigned int flags);
void au_nwt_init(struct au_nowait_tasks *nwt);
int __init au_wkq_init(void);
void au_wkq_fin(void);
/* ---------------------------------------------------------------------- */
static inline int au_wkq_test(void)
{
return current->flags & PF_WQ_WORKER;
}
static inline int au_wkq_wait(au_wkq_func_t func, void *args)
{
return au_wkq_do_wait(AuWkq_WAIT, func, args);
}
static inline void au_nwt_done(struct au_nowait_tasks *nwt)
{
if (atomic_dec_and_test(&nwt->nw_len))
wake_up_all(&nwt->nw_wq);
}
static inline int au_nwt_flush(struct au_nowait_tasks *nwt)
{
wait_event(nwt->nw_wq, !atomic_read(&nwt->nw_len));
return 0;
}
#endif /* __KERNEL__ */
#endif /* __AUFS_WKQ_H__ */
......@@ -54,6 +54,12 @@ typedef int16_t aufs_bindex_t;
#endif
#endif /* __KERNEL__ */
/* ---------------------------------------------------------------------- */
#define AUFS_FSTYPE AUFS_NAME
#define AUFS_WKQ_NAME AUFS_NAME "d"
/* branch permissions and attributes */
#define AUFS_BRPERM_RO "ro"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment