@ -13,13 +13,9 @@
# include <linux/sched/mm.h>
# include <linux/percpu.h>
# include <linux/slab.h>
# include <linux/kthread.h>
# include <linux/rculist_nulls.h>
# include <linux/fs_struct.h>
# include <linux/task_work.h>
# include <linux/blk-cgroup.h>
# include <linux/audit.h>
# include <linux/cpu.h>
# include <linux/tracehook.h>
# include "../kernel/sched/sched.h"
# include "io-wq.h"
@ -36,7 +32,6 @@ enum {
enum {
IO_WQ_BIT_EXIT = 0 , /* wq exiting */
IO_WQ_BIT_ERROR = 1 , /* error on setup */
} ;
enum {
@ -57,14 +52,12 @@ struct io_worker {
struct io_wq_work * cur_work ;
spinlock_t lock ;
struct rcu_head rcu ;
struct mm_struct * mm ;
# ifdef CONFIG_BLK_CGROUP
struct cgroup_subsys_state * blkcg_css ;
# endif
const struct cred * cur_creds ;
const struct cred * saved_creds ;
struct nsproxy * restore_nsproxy ;
struct completion ref_done ;
struct rcu_head rcu ;
} ;
# if BITS_PER_LONG == 64
@ -93,7 +86,6 @@ struct io_wqe {
struct {
raw_spinlock_t lock ;
struct io_wq_work_list work_list ;
unsigned long hash_map ;
unsigned flags ;
} ____cacheline_aligned_in_smp ;
@ -103,6 +95,8 @@ struct io_wqe {
struct hlist_nulls_head free_list ;
struct list_head all_list ;
struct wait_queue_entry wait ;
struct io_wq * wq ;
struct io_wq_work * hash_tail [ IO_WQ_NR_HASH_BUCKETS ] ;
} ;
@ -119,12 +113,15 @@ struct io_wq {
struct task_struct * manager ;
struct user_struct * user ;
struct io_wq_hash * hash ;
refcount_t refs ;
struct completion done ;
struct hlist_node cpuhp_node ;
refcount_t use_refs ;
pid_t task_pid ;
} ;
static enum cpuhp_state io_wq_online ;
@ -137,62 +134,7 @@ static bool io_worker_get(struct io_worker *worker)
static void io_worker_release ( struct io_worker * worker )
{
if ( refcount_dec_and_test ( & worker - > ref ) )
wake_up_process ( worker - > task ) ;
}
/*
* Note : drops the wqe - > lock if returning true ! The caller must re - acquire
* the lock in that case . Some callers need to restart handling if this
* happens , so we can ' t just re - acquire the lock on behalf of the caller .
*/
static bool __io_worker_unuse ( struct io_wqe * wqe , struct io_worker * worker )
{
bool dropped_lock = false ;
if ( worker - > saved_creds ) {
revert_creds ( worker - > saved_creds ) ;
worker - > cur_creds = worker - > saved_creds = NULL ;
}
if ( current - > files ) {
__acquire ( & wqe - > lock ) ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
dropped_lock = true ;
task_lock ( current ) ;
current - > files = NULL ;
current - > nsproxy = worker - > restore_nsproxy ;
task_unlock ( current ) ;
}
if ( current - > fs )
current - > fs = NULL ;
/*
* If we have an active mm , we need to drop the wq lock before unusing
* it . If we do , return true and let the caller retry the idle loop .
*/
if ( worker - > mm ) {
if ( ! dropped_lock ) {
__acquire ( & wqe - > lock ) ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
dropped_lock = true ;
}
__set_current_state ( TASK_RUNNING ) ;
kthread_unuse_mm ( worker - > mm ) ;
mmput ( worker - > mm ) ;
worker - > mm = NULL ;
}
# ifdef CONFIG_BLK_CGROUP
if ( worker - > blkcg_css ) {
kthread_associate_blkcg ( NULL ) ;
worker - > blkcg_css = NULL ;
}
# endif
if ( current - > signal - > rlim [ RLIMIT_FSIZE ] . rlim_cur ! = RLIM_INFINITY )
current - > signal - > rlim [ RLIMIT_FSIZE ] . rlim_cur = RLIM_INFINITY ;
return dropped_lock ;
complete ( & worker - > ref_done ) ;
}
static inline struct io_wqe_acct * io_work_get_acct ( struct io_wqe * wqe ,
@ -204,9 +146,10 @@ static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
return & wqe - > acct [ IO_WQ_ACCT_BOUND ] ;
}
static inline struct io_wqe_acct * io_wqe_get_acct ( struct io_wqe * wqe ,
struct io_worker * worker )
static inline struct io_wqe_acct * io_wqe_get_acct ( struct io_worker * worker )
{
struct io_wqe * wqe = worker - > wqe ;
if ( worker - > flags & IO_WORKER_F_BOUND )
return & wqe - > acct [ IO_WQ_ACCT_BOUND ] ;
@ -216,39 +159,36 @@ static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
static void io_worker_exit ( struct io_worker * worker )
{
struct io_wqe * wqe = worker - > wqe ;
struct io_wqe_acct * acct = io_wqe_get_acct ( wqe , worker ) ;
struct io_wqe_acct * acct = io_wqe_get_acct ( worker ) ;
unsigned flags ;
/*
* If we ' re not at zero , someone else is holding a brief reference
* to the worker . Wait for that to go away .
*/
set_current_state ( TASK_INTERRUPTIBLE ) ;
if ( ! refcount_dec_and_test ( & worker - > ref ) )
schedule ( ) ;
__set_current_state ( TASK_RUNNING ) ;
if ( refcount_dec_and_test ( & worker - > ref ) )
complete ( & worker - > ref_done ) ;
wait_for_completion ( & worker - > ref_done ) ;
preempt_disable ( ) ;
current - > flags & = ~ PF_IO_WORKER ;
if ( worker - > flags & IO_WORKER_F_RUNNING )
flags = worker - > flags ;
worker - > flags = 0 ;
if ( flags & IO_WORKER_F_RUNNING )
atomic_dec ( & acct - > nr_running ) ;
if ( ! ( worker - > flags & IO_WORKER_F_BOUND ) )
atomic_dec ( & wqe - > wq - > user - > processes ) ;
worker - > flags = 0 ;
preempt_enable ( ) ;
if ( worker - > saved_creds ) {
revert_creds ( worker - > saved_creds ) ;
worker - > cur_creds = worker - > saved_creds = NULL ;
}
raw_spin_lock_irq ( & wqe - > lock ) ;
hlist_nulls_del_rcu ( & worker - > nulls_node ) ;
if ( flags & IO_WORKER_F_FREE )
hlist_nulls_del_rcu ( & worker - > nulls_node ) ;
list_del_rcu ( & worker - > all_list ) ;
if ( __io_worker_unuse ( wqe , worker ) ) {
__release ( & wqe - > lock ) ;
raw_spin_lock_irq ( & wqe - > lock ) ;
}
acct - > nr_workers - - ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
kfree_rcu ( worker , rcu ) ;
if ( refcount_dec_and_test ( & wqe - > wq - > refs ) )
complete ( & wqe - > wq - > done ) ;
io_wq_put ( wqe - > wq ) ;
}
static inline bool io_wqe_run_queue ( struct io_wqe * wqe )
@ -306,33 +246,27 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
wake_up_process ( wqe - > wq - > manager ) ;
}
static void io_wqe_inc_running ( struct io_wqe * wqe , struct io_w orker * worker )
static void io_wqe_inc_running ( struct io_worker * worker )
{
struct io_wqe_acct * acct = io_wqe_get_acct ( wqe , w orker ) ;
struct io_wqe_acct * acct = io_wqe_get_acct ( worker ) ;
atomic_inc ( & acct - > nr_running ) ;
}
static void io_wqe_dec_running ( struct io_wqe * wqe , struct io_w orker * worker )
static void io_wqe_dec_running ( struct io_worker * worker )
__must_hold ( wqe - > lock )
{
struct io_wqe_acct * acct = io_wqe_get_acct ( wqe , worker ) ;
struct io_wqe_acct * acct = io_wqe_get_acct ( worker ) ;
struct io_wqe * wqe = worker - > wqe ;
if ( atomic_dec_and_test ( & acct - > nr_running ) & & io_wqe_run_queue ( wqe ) )
io_wqe_wake_worker ( wqe , acct ) ;
}
static void io_worker_start ( struct io_wqe * wqe , struct io_w orker * worker )
static void io_worker_start ( struct io_worker * worker )
{
allow_kernel_signal ( SIGINT ) ;
current - > flags | = PF_IO_WORKER ;
current - > fs = NULL ;
current - > files = NULL ;
worker - > flags | = ( IO_WORKER_F_UP | IO_WORKER_F_RUNNING ) ;
worker - > restore_nsproxy = current - > nsproxy ;
io_wqe_inc_running ( wqe , worker ) ;
io_wqe_inc_running ( worker ) ;
}
/*
@ -357,19 +291,17 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
worker_bound = ( worker - > flags & IO_WORKER_F_BOUND ) ! = 0 ;
work_bound = ( work - > flags & IO_WQ_WORK_UNBOUND ) = = 0 ;
if ( worker_bound ! = work_bound ) {
io_wqe_dec_running ( wqe , w orker ) ;
io_wqe_dec_running ( worker ) ;
if ( work_bound ) {
worker - > flags | = IO_WORKER_F_BOUND ;
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . nr_workers - - ;
wqe - > acct [ IO_WQ_ACCT_BOUND ] . nr_workers + + ;
atomic_dec ( & wqe - > wq - > user - > processes ) ;
} else {
worker - > flags & = ~ IO_WORKER_F_BOUND ;
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . nr_workers + + ;
wqe - > acct [ IO_WQ_ACCT_BOUND ] . nr_workers - - ;
atomic_inc ( & wqe - > wq - > user - > processes ) ;
}
io_wqe_inc_running ( wqe , w orker ) ;
io_wqe_inc_running ( worker ) ;
}
}
@ -380,15 +312,17 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
* retry the loop in that case ( we changed task state ) , we don ' t regrab
* the lock if we return success .
*/
static bool __io_worker_idle ( struct io_wqe * wqe , struct io_worker * worker )
static void __io_worker_idle ( struct io_wqe * wqe , struct io_worker * worker )
__must_hold ( wqe - > lock )
{
if ( ! ( worker - > flags & IO_WORKER_F_FREE ) ) {
worker - > flags | = IO_WORKER_F_FREE ;
hlist_nulls_add_head_rcu ( & worker - > nulls_node , & wqe - > free_list ) ;
}
return __io_worker_unuse ( wqe , worker ) ;
if ( worker - > saved_creds ) {
revert_creds ( worker - > saved_creds ) ;
worker - > cur_creds = worker - > saved_creds = NULL ;
}
}
static inline unsigned int io_get_work_hash ( struct io_wq_work * work )
@ -396,14 +330,31 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
return work - > flags > > IO_WQ_HASH_SHIFT ;
}
static void io_wait_on_hash ( struct io_wqe * wqe , unsigned int hash )
{
struct io_wq * wq = wqe - > wq ;
spin_lock ( & wq - > hash - > wait . lock ) ;
if ( list_empty ( & wqe - > wait . entry ) ) {
__add_wait_queue ( & wq - > hash - > wait , & wqe - > wait ) ;
if ( ! test_bit ( hash , & wq - > hash - > map ) ) {
__set_current_state ( TASK_RUNNING ) ;
list_del_init ( & wqe - > wait . entry ) ;
}
}
spin_unlock ( & wq - > hash - > wait . lock ) ;
}
static struct io_wq_work * io_get_next_work ( struct io_wqe * wqe )
__must_hold ( wqe - > lock )
{
struct io_wq_work_node * node , * prev ;
struct io_wq_work * work , * tail ;
unsigned int hash ;
unsigned int stall_ hash = - 1U ;
wq_list_for_each ( node , prev , & wqe - > work_list ) {
unsigned int hash ;
work = container_of ( node , struct io_wq_work , list ) ;
/* not hashed, can run anytime */
@ -412,111 +363,60 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
return work ;
}
/* hashed, can run if not already running */
hash = io_get_work_hash ( work ) ;
if ( ! ( wqe - > hash_map & BIT ( hash ) ) ) {
wqe - > hash_map | = BIT ( hash ) ;
/* all items with this hash lie in [work, tail] */
tail = wqe - > hash_tail [ hash ] ;
/* all items with this hash lie in [work, tail] */
tail = wqe - > hash_tail [ hash ] ;
/* hashed, can run if not already running */
if ( ! test_and_set_bit ( hash , & wqe - > wq - > hash - > map ) ) {
wqe - > hash_tail [ hash ] = NULL ;
wq_list_cut ( & wqe - > work_list , & tail - > list , prev ) ;
return work ;
}
if ( stall_hash = = - 1U )
stall_hash = hash ;
/* fast forward to a next hash, for-each will fix up @prev */
node = & tail - > list ;
}
return NULL ;
}
static void io_wq_switch_mm ( struct io_worker * worker , struct io_wq_work * work )
{
if ( worker - > mm ) {
kthread_unuse_mm ( worker - > mm ) ;
mmput ( worker - > mm ) ;
worker - > mm = NULL ;
if ( stall_hash ! = - 1U ) {
raw_spin_unlock ( & wqe - > lock ) ;
io_wait_on_hash ( wqe , stall_hash ) ;
raw_spin_lock ( & wqe - > lock ) ;
}
if ( mmget_not_zero ( work - > identity - > mm ) ) {
kthread_use_mm ( work - > identity - > mm ) ;
worker - > mm = work - > identity - > mm ;
return ;
}
/* failed grabbing mm, ensure work gets cancelled */
work - > flags | = IO_WQ_WORK_CANCEL ;
return NULL ;
}
static inline void io_wq_switch_blkcg ( struct io_worker * worker ,
struct io_wq_work * work )
static void io_flush_signals ( void )
{
# ifdef CONFIG_BLK_CGROUP
if ( ! ( work - > flags & IO_WQ_WORK_BLKCG ) )
return ;
if ( work - > identity - > blkcg_css ! = worker - > blkcg_css ) {
kthread_associate_blkcg ( work - > identity - > blkcg_css ) ;
worker - > blkcg_css = work - > identity - > blkcg_css ;
if ( unlikely ( test_tsk_thread_flag ( current , TIF_NOTIFY_SIGNAL ) ) ) {
if ( current - > task_works )
task_work_run ( ) ;
clear_tsk_thread_flag ( current , TIF_NOTIFY_SIGNAL ) ;
}
# endif
}
static void io_wq_switch_creds ( struct io_worker * worker ,
struct io_wq_work * work )
{
const struct cred * old_creds = override_creds ( work - > identity - > creds ) ;
const struct cred * old_creds = override_creds ( work - > creds ) ;
worker - > cur_creds = work - > identity - > creds ;
worker - > cur_creds = work - > creds ;
if ( worker - > saved_creds )
put_cred ( old_creds ) ; /* creds set by previous switch */
else
worker - > saved_creds = old_creds ;
}
static void io_impersonate_work ( struct io_worker * worker ,
struct io_wq_work * work )
{
if ( ( work - > flags & IO_WQ_WORK_FILES ) & &
current - > files ! = work - > identity - > files ) {
task_lock ( current ) ;
current - > files = work - > identity - > files ;
current - > nsproxy = work - > identity - > nsproxy ;
task_unlock ( current ) ;
if ( ! work - > identity - > files ) {
/* failed grabbing files, ensure work gets cancelled */
work - > flags | = IO_WQ_WORK_CANCEL ;
}
}
if ( ( work - > flags & IO_WQ_WORK_FS ) & & current - > fs ! = work - > identity - > fs )
current - > fs = work - > identity - > fs ;
if ( ( work - > flags & IO_WQ_WORK_MM ) & & work - > identity - > mm ! = worker - > mm )
io_wq_switch_mm ( worker , work ) ;
if ( ( work - > flags & IO_WQ_WORK_CREDS ) & &
worker - > cur_creds ! = work - > identity - > creds )
io_wq_switch_creds ( worker , work ) ;
if ( work - > flags & IO_WQ_WORK_FSIZE )
current - > signal - > rlim [ RLIMIT_FSIZE ] . rlim_cur = work - > identity - > fsize ;
else if ( current - > signal - > rlim [ RLIMIT_FSIZE ] . rlim_cur ! = RLIM_INFINITY )
current - > signal - > rlim [ RLIMIT_FSIZE ] . rlim_cur = RLIM_INFINITY ;
io_wq_switch_blkcg ( worker , work ) ;
# ifdef CONFIG_AUDIT
current - > loginuid = work - > identity - > loginuid ;
current - > sessionid = work - > identity - > sessionid ;
# endif
}
static void io_assign_current_work ( struct io_worker * worker ,
struct io_wq_work * work )
{
if ( work ) {
/* flush pending signals before assigning new work */
if ( signal_pending ( current ) )
flush_signals ( current ) ;
io_flush_signals ( ) ;
cond_resched ( ) ;
}
# ifdef CONFIG_AUDIT
current - > loginuid = KUIDT_INIT ( AUDIT_UID_UNSET ) ;
current - > sessionid = AUDIT_SID_UNSET ;
# endif
spin_lock_irq ( & worker - > lock ) ;
worker - > cur_work = work ;
spin_unlock_irq ( & worker - > lock ) ;
@ -550,6 +450,7 @@ get_next:
if ( ! work )
break ;
io_assign_current_work ( worker , work ) ;
__set_current_state ( TASK_RUNNING ) ;
/* handle a whole dependent link */
do {
@ -557,7 +458,8 @@ get_next:
unsigned int hash = io_get_work_hash ( work ) ;
next_hashed = wq_next_work ( work ) ;
io_impersonate_work ( worker , work ) ;
if ( work - > creds & & worker - > cur_creds ! = work - > creds )
io_wq_switch_creds ( worker , work ) ;
wq - > do_work ( work ) ;
io_assign_current_work ( worker , NULL ) ;
@ -572,8 +474,10 @@ get_next:
io_wqe_enqueue ( wqe , linked ) ;
if ( hash ! = - 1U & & ! next_hashed ) {
clear_bit ( hash , & wq - > hash - > map ) ;
if ( wq_has_sleeper ( & wq - > hash - > wait ) )
wake_up ( & wq - > hash - > wait ) ;
raw_spin_lock_irq ( & wqe - > lock ) ;
wqe - > hash_map & = ~ BIT_ULL ( hash ) ;
wqe - > flags & = ~ IO_WQE_FLAG_STALLED ;
/* skip unnecessary unlock-lock wqe->lock */
if ( ! work )
@ -592,27 +496,23 @@ static int io_wqe_worker(void *data)
struct io_wqe * wqe = worker - > wqe ;
struct io_wq * wq = wqe - > wq ;
io_worker_start ( wqe , w orker ) ;
io_worker_start ( worker ) ;
while ( ! test_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ) {
set_current_state ( TASK_INTERRUPTIBLE ) ;
loop :
raw_spin_lock_irq ( & wqe - > lock ) ;
if ( io_wqe_run_queue ( wqe ) ) {
__set_current_state ( TASK_RUNNING ) ;
io_worker_handle_work ( worker ) ;
goto loop ;
}
/* drops the lock on success, retry */
if ( __io_worker_idle ( wqe , worker ) ) {
__release ( & wqe - > lock ) ;
goto loop ;
}
__io_worker_idle ( wqe , worker ) ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
if ( signal_pending ( current ) )
flush_signals ( current ) ;
io_flush_signals ( ) ;
if ( schedule_timeout ( WORKER_IDLE_TIMEOUT ) )
continue ;
if ( fatal_signal_pending ( current ) )
break ;
/* timed out, exit unless we're the fixed worker */
if ( test_bit ( IO_WQ_BIT_EXIT , & wq - > state ) | |
! ( worker - > flags & IO_WORKER_F_FIXED ) )
@ -636,15 +536,16 @@ loop:
*/
void io_wq_worker_running ( struct task_struct * tsk )
{
struct io_worker * worker = kthread_data ( tsk ) ;
struct io_wqe * wqe = worker - > wqe ;
struct io_worker * worker = tsk - > pf_io_worker ;
if ( ! worker )
return ;
if ( ! ( worker - > flags & IO_WORKER_F_UP ) )
return ;
if ( worker - > flags & IO_WORKER_F_RUNNING )
return ;
worker - > flags | = IO_WORKER_F_RUNNING ;
io_wqe_inc_running ( wqe , w orker ) ;
io_wqe_inc_running ( worker ) ;
}
/*
@ -654,9 +555,10 @@ void io_wq_worker_running(struct task_struct *tsk)
*/
void io_wq_worker_sleeping ( struct task_struct * tsk )
{
struct io_worker * worker = kthread_data ( tsk ) ;
struct io_wqe * wqe = worker - > wqe ;
struct io_worker * worker = tsk - > pf_io_worker ;
if ( ! worker )
return ;
if ( ! ( worker - > flags & IO_WORKER_F_UP ) )
return ;
if ( ! ( worker - > flags & IO_WORKER_F_RUNNING ) )
@ -664,32 +566,27 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
worker - > flags & = ~ IO_WORKER_F_RUNNING ;
raw_spin_lock_irq ( & wqe - > lock ) ;
io_wqe_dec_running ( wqe , w orker ) ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
raw_spin_lock_irq ( & worker - > w qe - > lock ) ;
io_wqe_dec_running ( worker ) ;
raw_spin_unlock_irq ( & worker - > w qe - > lock ) ;
}
static bool create_io_worker ( struct io_wq * wq , struct io_wqe * wqe , int index )
static int task_thread ( void * data , int index )
{
struct io_worker * worker = data ;
struct io_wqe * wqe = worker - > wqe ;
struct io_wqe_acct * acct = & wqe - > acct [ index ] ;
struct io_worker * worker ;
struct io_wq * wq = wqe - > wq ;
char buf [ TASK_COMM_LEN ] ;
worker = kzalloc_node ( sizeof ( * worker ) , GFP_KERNEL , wqe - > node ) ;
if ( ! worker )
return false ;
sprintf ( buf , " iou-wrk-%d " , wq - > task_pid ) ;
set_task_comm ( current , buf ) ;
refcount_set ( & worker - > ref , 1 ) ;
worker - > nulls_node . pprev = NULL ;
worker - > wqe = wqe ;
spin_lock_init ( & worker - > lock ) ;
current - > pf_io_worker = worker ;
worker - > task = current ;
worker - > task = kthread_create_on_node ( io_wqe_worker , worker , wqe - > node ,
" io_wqe_worker-%d/%d " , index , wqe - > node ) ;
if ( IS_ERR ( worker - > task ) ) {
kfree ( worker ) ;
return false ;
}
kthread_bind_mask ( worker - > task , cpumask_of_node ( wqe - > node ) ) ;
set_cpus_allowed_ptr ( current , cpumask_of_node ( wqe - > node ) ) ;
current - > flags | = PF_NO_SETAFFINITY ;
raw_spin_lock_irq ( & wqe - > lock ) ;
hlist_nulls_add_head_rcu ( & worker - > nulls_node , & wqe - > free_list ) ;
@ -702,11 +599,63 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
acct - > nr_workers + + ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
if ( index = = IO_WQ_ACCT_UNBOUND )
atomic_inc ( & wq - > user - > processes ) ;
io_wqe_worker ( data ) ;
do_exit ( 0 ) ;
}
static int task_thread_bound ( void * data )
{
return task_thread ( data , IO_WQ_ACCT_BOUND ) ;
}
static int task_thread_unbound ( void * data )
{
return task_thread ( data , IO_WQ_ACCT_UNBOUND ) ;
}
pid_t io_wq_fork_thread ( int ( * fn ) ( void * ) , void * arg )
{
unsigned long flags = CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_THREAD |
CLONE_IO | SIGCHLD ;
struct kernel_clone_args args = {
. flags = ( ( lower_32_bits ( flags ) | CLONE_VM |
CLONE_UNTRACED ) & ~ CSIGNAL ) ,
. exit_signal = ( lower_32_bits ( flags ) & CSIGNAL ) ,
. stack = ( unsigned long ) fn ,
. stack_size = ( unsigned long ) arg ,
} ;
return kernel_clone ( & args ) ;
}
static bool create_io_worker ( struct io_wq * wq , struct io_wqe * wqe , int index )
{
struct io_worker * worker ;
pid_t pid ;
__set_current_state ( TASK_RUNNING ) ;
worker = kzalloc_node ( sizeof ( * worker ) , GFP_KERNEL , wqe - > node ) ;
if ( ! worker )
return false ;
refcount_set ( & worker - > ref , 1 ) ;
worker - > nulls_node . pprev = NULL ;
worker - > wqe = wqe ;
spin_lock_init ( & worker - > lock ) ;
init_completion ( & worker - > ref_done ) ;
refcount_inc ( & wq - > refs ) ;
wake_up_process ( worker - > task ) ;
if ( index = = IO_WQ_ACCT_BOUND )
pid = io_wq_fork_thread ( task_thread_bound , worker ) ;
else
pid = io_wq_fork_thread ( task_thread_unbound , worker ) ;
if ( pid < 0 ) {
io_wq_put ( wq ) ;
kfree ( worker ) ;
return false ;
}
return true ;
}
@ -752,93 +701,57 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
return false ;
}
/*
* Manager thread . Tasked with creating new workers , if we need them .
*/
static int io_wq_manager ( void * data )
static void io_wq_check_workers ( struct io_wq * wq )
{
struct io_wq * wq = data ;
int node ;
/* create fixed workers */
refcount_set ( & wq - > refs , 1 ) ;
for_each_node ( node ) {
struct io_wqe * wqe = wq - > wqes [ node ] ;
bool fork_worker [ 2 ] = { false , false } ;
if ( ! node_online ( node ) )
continue ;
if ( create_io_worker ( wq , wq - > wqes [ node ] , IO_WQ_ACCT_BOUND ) )
continue ;
set_bit ( IO_WQ_BIT_ERROR , & wq - > state ) ;
set_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ;
goto out ;
}
complete ( & wq - > done ) ;
while ( ! kthread_should_stop ( ) ) {
if ( current - > task_works )
task_work_run ( ) ;
for_each_node ( node ) {
struct io_wqe * wqe = wq - > wqes [ node ] ;
bool fork_worker [ 2 ] = { false , false } ;
if ( ! node_online ( node ) )
continue ;
raw_spin_lock_irq ( & wqe - > lock ) ;
if ( io_wqe_need_worker ( wqe , IO_WQ_ACCT_BOUND ) )
fork_worker [ IO_WQ_ACCT_BOUND ] = true ;
if ( io_wqe_need_worker ( wqe , IO_WQ_ACCT_UNBOUND ) )
fork_worker [ IO_WQ_ACCT_UNBOUND ] = true ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
if ( fork_worker [ IO_WQ_ACCT_BOUND ] )
create_io_worker ( wq , wqe , IO_WQ_ACCT_BOUND ) ;
if ( fork_worker [ IO_WQ_ACCT_UNBOUND ] )
create_io_worker ( wq , wqe , IO_WQ_ACCT_UNBOUND ) ;
}
set_current_state ( TASK_INTERRUPTIBLE ) ;
schedule_timeout ( HZ ) ;
}
if ( current - > task_works )
task_work_run ( ) ;
out :
if ( refcount_dec_and_test ( & wq - > refs ) ) {
complete ( & wq - > done ) ;
return 0 ;
}
/* if ERROR is set and we get here, we have workers to wake */
if ( test_bit ( IO_WQ_BIT_ERROR , & wq - > state ) ) {
rcu_read_lock ( ) ;
for_each_node ( node )
io_wq_for_each_worker ( wq - > wqes [ node ] , io_wq_worker_wake , NULL ) ;
rcu_read_unlock ( ) ;
raw_spin_lock_irq ( & wqe - > lock ) ;
if ( io_wqe_need_worker ( wqe , IO_WQ_ACCT_BOUND ) )
fork_worker [ IO_WQ_ACCT_BOUND ] = true ;
if ( io_wqe_need_worker ( wqe , IO_WQ_ACCT_UNBOUND ) )
fork_worker [ IO_WQ_ACCT_UNBOUND ] = true ;
raw_spin_unlock_irq ( & wqe - > lock ) ;
if ( fork_worker [ IO_WQ_ACCT_BOUND ] )
create_io_worker ( wq , wqe , IO_WQ_ACCT_BOUND ) ;
if ( fork_worker [ IO_WQ_ACCT_UNBOUND ] )
create_io_worker ( wq , wqe , IO_WQ_ACCT_UNBOUND ) ;
}
return 0 ;
}
static bool io_wq_can_queue ( struct io_wqe * wqe , struct io_wqe_acct * acct ,
struct io_wq_work * work )
/*
* Manager thread . Tasked with creating new workers , if we need them .
*/
static int io_wq_manager ( void * data )
{
bool free_worker ;
if ( ! ( work - > flags & IO_WQ_WORK_UNBOUND ) )
return true ;
if ( atomic_read ( & acct - > nr_running ) )
return true ;
struct io_wq * wq = data ;
char buf [ TASK_COMM_LEN ] ;
rcu_read_lock ( ) ;
free_worker = ! hlist_nulls_empty ( & wqe - > free_list ) ;
rcu_read_unlock ( ) ;
if ( free_worker )
return true ;
sprintf ( buf , " iou-mgr-%d " , wq - > task_pid ) ;
set_task_comm ( current , buf ) ;
current - > flags | = PF_IO_WORKER ;
wq - > manager = current ;
if ( atomic_read ( & wqe - > wq - > user - > processes ) > = acct - > max_workers & &
! ( capable ( CAP_SYS_RESOURCE ) | | capable ( CAP_SYS_ADMIN ) ) )
return false ;
complete ( & wq - > done ) ;
return true ;
do {
set_current_state ( TASK_INTERRUPTIBLE ) ;
io_wq_check_workers ( wq ) ;
schedule_timeout ( HZ ) ;
if ( fatal_signal_pending ( current ) )
set_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ;
} while ( ! test_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ) ;
io_wq_check_workers ( wq ) ;
wq - > manager = NULL ;
io_wq_put ( wq ) ;
do_exit ( 0 ) ;
}
static void io_run_cancel ( struct io_wq_work * work , struct io_wqe * wqe )
@ -872,20 +785,37 @@ append:
wq_list_add_after ( & work - > list , & tail - > list , & wqe - > work_list ) ;
}
static int io_wq_fork_manager ( struct io_wq * wq )
{
int ret ;
if ( wq - > manager )
return 0 ;
clear_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ;
refcount_inc ( & wq - > refs ) ;
current - > flags | = PF_IO_WORKER ;
ret = io_wq_fork_thread ( io_wq_manager , wq ) ;
current - > flags & = ~ PF_IO_WORKER ;
if ( ret > = 0 ) {
wait_for_completion ( & wq - > done ) ;
return 0 ;
}
io_wq_put ( wq ) ;
return ret ;
}
static void io_wqe_enqueue ( struct io_wqe * wqe , struct io_wq_work * work )
{
struct io_wqe_acct * acct = io_work_get_acct ( wqe , work ) ;
int work_flags ;
unsigned long flags ;
/*
* Do early check to see if we need a new unbound worker , and if we do ,
* if we ' re allowed to do so . This isn ' t 100 % accurate as there ' s a
* gap between this check and incrementing the value , but that ' s OK .
* It ' s close enough to not be an issue , fork ( ) has the same delay .
*/
if ( unlikely ( ! io_wq_can_queue ( wqe , acct , work ) ) ) {
io_run_cancel ( work , wqe ) ;
/* Can only happen if manager creation fails after exec */
if ( unlikely ( io_wq_fork_manager ( wqe - > wq ) ) ) {
work - > flags | = IO_WQ_WORK_CANCEL ;
wqe - > wq - > do_work ( work ) ;
return ;
}
@ -939,7 +869,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
spin_lock_irqsave ( & worker - > lock , flags ) ;
if ( worker - > cur_work & &
match - > fn ( worker - > cur_work , match - > data ) ) {
send_sig ( SIGINT , worker - > task , 1 ) ;
set_notify_signal ( worker - > task ) ;
match - > nr_running + + ;
}
spin_unlock_irqrestore ( & worker - > lock , flags ) ;
@ -1043,6 +973,24 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
return IO_WQ_CANCEL_NOTFOUND ;
}
static int io_wqe_hash_wake ( struct wait_queue_entry * wait , unsigned mode ,
int sync , void * key )
{
struct io_wqe * wqe = container_of ( wait , struct io_wqe , wait ) ;
int ret ;
list_del_init ( & wait - > entry ) ;
rcu_read_lock ( ) ;
ret = io_wqe_activate_free_worker ( wqe ) ;
rcu_read_unlock ( ) ;
if ( ! ret )
wake_up_process ( wqe - > wq - > manager ) ;
return 1 ;
}
struct io_wq * io_wq_create ( unsigned bounded , struct io_wq_data * data )
{
int ret = - ENOMEM , node ;
@ -1063,12 +1011,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
if ( ret )
goto err_wqes ;
refcount_inc ( & data - > hash - > refs ) ;
wq - > hash = data - > hash ;
wq - > free_work = data - > free_work ;
wq - > do_work = data - > do_work ;
/* caller must already hold a reference to this */
wq - > user = data - > user ;
ret = - ENOMEM ;
for_each_node ( node ) {
struct io_wqe * wqe ;
@ -1083,11 +1030,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
wqe - > node = alloc_node ;
wqe - > acct [ IO_WQ_ACCT_BOUND ] . max_workers = bounded ;
atomic_set ( & wqe - > acct [ IO_WQ_ACCT_BOUND ] . nr_running , 0 ) ;
if ( wq - > user ) {
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . max_workers =
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . max_workers =
task_rlimit ( current , RLIMIT_NPROC ) ;
}
atomic_set ( & wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . nr_running , 0 ) ;
wqe - > wait . func = io_wqe_hash_wake ;
INIT_LIST_HEAD ( & wqe - > wait . entry ) ;
wqe - > wq = wq ;
raw_spin_lock_init ( & wqe - > lock ) ;
INIT_WQ_LIST ( & wqe - > work_list ) ;
@ -1095,23 +1042,16 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
INIT_LIST_HEAD ( & wqe - > all_list ) ;
}
wq - > task_pid = current - > pid ;
init_completion ( & wq - > done ) ;
refcount_set ( & wq - > refs , 1 ) ;
wq - > manager = kthread_create ( io_wq_manager , wq , " io_wq_manager " ) ;
if ( ! IS_ERR ( wq - > manager ) ) {
wake_up_process ( wq - > manager ) ;
wait_for_completion ( & wq - > done ) ;
if ( test_bit ( IO_WQ_BIT_ERROR , & wq - > state ) ) {
ret = - ENOMEM ;
goto err ;
}
refcount_set ( & wq - > use_refs , 1 ) ;
reinit_completion ( & wq - > done ) ;
ret = io_wq_fork_manager ( wq ) ;
if ( ! ret )
return wq ;
}
ret = PTR_ERR ( wq - > manager ) ;
complete ( & wq - > done ) ;
io_wq_put ( wq ) ;
io_wq_put_hash ( data - > hash ) ;
err :
cpuhp_state_remove_instance_nocalls ( io_wq_online , & wq - > cpuhp_node ) ;
for_each_node ( node )
@ -1123,15 +1063,7 @@ err_wq:
return ERR_PTR ( ret ) ;
}
bool io_wq_get ( struct io_wq * wq , struct io_wq_data * data )
{
if ( data - > free_work ! = wq - > free_work | | data - > do_work ! = wq - > do_work )
return false ;
return refcount_inc_not_zero ( & wq - > use_refs ) ;
}
static void __io_wq_destroy ( struct io_wq * wq )
static void io_wq_destroy ( struct io_wq * wq )
{
int node ;
@ -1139,30 +1071,31 @@ static void __io_wq_destroy(struct io_wq *wq)
set_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ;
if ( wq - > manager )
kthread_stop ( wq - > manager ) ;
wake_up_process ( wq - > manager ) ;
rcu_read_lock ( ) ;
for_each_node ( node )
io_wq_for_each_worker ( wq - > wqes [ node ] , io_wq_worker_wake , NULL ) ;
rcu_read_unlock ( ) ;
wait_for_completion ( & wq - > done ) ;
spin_lock_irq ( & wq - > hash - > wait . lock ) ;
for_each_node ( node ) {
struct io_wqe * wqe = wq - > wqes [ node ] ;
for_each_node ( node )
kfree ( wq - > wqes [ node ] ) ;
list_del_init ( & wqe - > wait . entry ) ;
kfree ( wqe ) ;
}
spin_unlock_irq ( & wq - > hash - > wait . lock ) ;
io_wq_put_hash ( wq - > hash ) ;
kfree ( wq - > wqes ) ;
kfree ( wq ) ;
}
void io_wq_destroy ( struct io_wq * wq )
{
if ( refcount_dec_and_test ( & wq - > use_refs ) )
__io_wq_destroy ( wq ) ;
}
struct task_struct * io_wq_get_task ( struct io_wq * wq )
void io_wq_put ( struct io_wq * wq )
{
return wq - > manager ;
if ( refcount_dec_and_test ( & wq - > refs ) )
io_wq_destroy ( wq ) ;
}
static bool io_wq_worker_affinity ( struct io_worker * worker , void * data )