A Python class for POSIX-flavor condition variables

Tim Peters (tim@ksr.com)
Thu, 12 May 94 03:04:24 -0400

> [guido]
> ...
> I guess it's time to get modern and add condition variables to
> thread.c and threadmodule.c so we don't HAVE to write code like that

> [tim, makes a pitch for doing these the way posix does 'em]

nothing-like-a-prototype<wink>-ly y'rs - tim

Tim Peters tim@ksr.com
not speaking for Kendall Square Research Corp

# Defines class `condition', instances of which are POSIX-like
# synchronization objects. Note that use of this module requires that
# your Python support threads.
#
# A condition object is created via
# import this_module
# your_condition_object = this_module.condition()
#
# Methods:
# .acquire()
# acquire the lock associated with the condition
# .release()
# release the lock associated with the condition
# .wait()
# block the thread until such time as some other thread does a
# .signal or .broadcast on the same condition, and release the
# lock associated with the condition. The lock associated with
# the condition MUST be in the acquired state at the time
# .wait is invoked.
# .signal()
# wake up exactly one thread (if any) that previously did a .wait
# on the condition; that thread will awaken with the lock associated
# with the condition in the acquired state. If no threads are
# .wait'ing, this is a nop. If more than one thread is .wait'ing on
# the condition, any of them may be awakened.
# .broadcast()
# wake up all threads (if any) that are .wait'ing on the condition;
# the threads are woken up serially, each with the lock in the
# acquired state, so should .release() as soon as possible. If no
# threads are .wait'ing, this is a nop.
#
# Note that if a thread does a .wait *while* a signal/broadcast is
# in progress, it's guaranteeed to block until a subsequent
# signal/broadcast.
#
# Secret feature: `broadcast' actually takes an integer argument,
# and will wake up exactly that many waiting threads (or the total
# number waiting, if that's less). Use of this is dubious, though,
# and probably won't be supported if this form of condition is
# reimplemented in C.
#
# DIFFERENCES FROM POSIX
#
# + A separate mutex is not needed to guard condition data. Instead, a
# condition object can (must) be .acquire'ed and .release'ed directly.
# This eliminates a common error in using POSIX conditions.
#
# + Because of implementation difficulties, a POSIX `signal' wakes up
# _at least_ one .wait'ing thread. Race conditions make it difficult
# to stop that. This implementation guarantees to wake up only one,
# but you probably shouldn't rely on that.
#
# PROTOCOL
#
# Condition objects are used to block threads until "some condition" is
# true. E.g., a thread may wish to wait until a producer pumps out data
# for it to consume, or a server may wish to wait until someone requests
# its services, or perhaps a whole bunch of threads want to wait until a
# preceding pass over the data is complete. Early models for conditions
# relied on some other thread figuring out when a blocked thread's
# condition was true, and made the other thread responsible both for
# waking up the blocked thread and guaranteeing that it woke up with all
# data in a correct state. This proved to be very delicate in practice,
# and gave conditions a bad name in some circles.
#
# The POSIX model addresses these problems by making a thread responsible
# for ensuring that its own state is correct when it wakes, and relies
# on a rigid protocol to make this easy; so long as you stick to the
# protocol, POSIX conditions are easy to "get right":
#
# A) The thread that's waiting for some arbitrarily-complex condition
# (ACC) to become true does:
#
# condition.acquire()
# while not (code to evaluate the ACC):
# condition.wait()
# # That blocks the thread, *and* releases the lock. When a
# # condition.signal() happens, it will wake up some thread that
# # did a .wait, *and* acquire the lock again before .wait
# # returns.
# #
# # Because the lock is acquired at this point, the state used
# # in evaluating the ACC is frozen, so it's safe to go back &
# # reevaluate the ACC.
#
# # At this point, ACC is true, and the thread has the condition
# # locked.
# # So code here can safely muck with the shared state that
# # went into evaluating the ACC -- if it wants to.
# # When done mucking with the shared state, do
# condition.release()
#
# B) Threads that are mucking with shared state that may affect the
# ACC do:
#
# condition.acquire()
# # muck with shared state
# condition.release()
# if it's possible that ACC is true now:
# condition.signal() # or .broadcast()
#
# TRICK OF THE TRADE
#
# With simpler forms of conditions, it can be impossible to know when
# a thread that's supposed to do a .wait has actually done it. But
# because this form of condition releases a lock as _part_ of doing a
# wait, the state of that lock can be used to guarantee it. E.g.,
# in thread A we may have:
#
# cond = condition()
# threadB_is_waiting_on_cond = condition()
# ...
# threadB_is_waiting_on_cond.acquire()
# threadB_is_waiting_on_cond.wait()
# threadB_is_waiting_on_cond.release()
# # since cond is acquire'd at the time thread B does
# # threadB_is_waiting_on_cond.signal()
# # _our_ attempt to acquire cond can't succeed until thread B's
# # cond.wait() released the lock
# cond.acquire(); cond.release()
# # now we know for sure that B executed its cond.wait()
#
# And in thread B:
# ....
# cond.acquire()
# threadB_is_waiting_on_cond.signal()
# cond.wait()
# ....

InternalError = 'internal condition error'
import thread

class condition:
def __init__(self):
# the lock actually used by .acquire() and .release()
self.mutex = thread.allocate_lock()

# number waiting (at checkout) for next signal
self.waiting = 0
self.checkout = thread.allocate_lock()
self.checkout.acquire()

# number that entered (& waiting at checkin) during release
self.pending = 0
self.checkin = thread.allocate_lock()
self.checkin.acquire()

# lock to prevent signals from releasing concurrently
self.go = thread.allocate_lock()

# number to retain after a signal
self.retain = None

def acquire(self):
self.mutex.acquire()

def release(self):
self.mutex.release()

def wait(self):
mutex, checkin, checkout = self.mutex, self.checkin, self.checkout
if not mutex.locked():
raise ValueError, \
"condition must be .acquire'd when .wait() invoked"
self.pending = self.pending + 1
mutex.release()

# hang here until a signal
checkin.acquire()

# let one in
if not mutex.locked():
raise InternalError, 'mutex not locked during checkin'
if not self.go.locked():
raise InternalError, 'go not locked during checkin'
if not checkout.locked():
raise InternalError, 'checkout not locked during checkin'

self.pending = self.pending - 1
if self.pending:
checkin.release() # let next one in
else:
mutex.release() # unblock .wait
# let them out, but keep checkin locked so new
# .wait'ers hang there
checkout.release()

# hang here until checkin of pending threads complete
checkout.acquire()

# let one out
if not self.go.locked():
raise InternalError, 'go not locked during checkout'
if not checkin.locked():
raise InternalError, 'checkin not locked during checkout'
mutex.acquire() # for our caller
self.waiting = self.waiting - 1
if self.waiting > self.retain:
checkout.release() # let another out
else:
# keep checkout locked so nobody else leaves
# & allow another signal to release
self.go.release()

def signal(self):
self.broadcast(1)

def broadcast(self, num = -1):
if num < -1:
raise ValueError, '.signal called with num ' + `num`
if num == 0:
return
# block next signal here, until release complete
self.go.acquire()
# block .wait, until checkin of pending complete
self.mutex.acquire()
if not self.checkin.locked():
raise InternalError, 'checkin not locked during signal'
if not self.checkout.locked():
raise InternalError, 'checkout not locked during signal'
self.waiting = self.waiting + self.pending
if self.waiting == 0:
self.go.release()
self.mutex.release()
return
if num == -1:
num = self.waiting
self.retain = self.waiting - min(self.waiting, num)
if self.pending:
self.checkin.release()
else:
self.mutex.release()
self.checkout.release()

# the test is not intended to illustrate intelligent usage; it's intended
# to stress the mechanisms by overusing them in brain-dead ways

def _test_task(ident):
global num
while 1:
cond.acquire()
print 'task', ident, 'waiting'
tdone.acquire(); num = num - 1; tdone.release(); tdone.signal()
cond.wait()
print 'task', ident, 'leaving'
cond.release()

nextiter.acquire()
tdone.acquire(); num = num - 1; tdone.release(); tdone.signal()
nextiter.wait()
nextiter.release()

def _test_iter(i):
global num, N
import time, whrandom

tdone.acquire()
while num:
tdone.wait()
num = N
tdone.release()

# add another thread, to make life more interesting
tdone.acquire()
num = 1
_test_make_task()
while num:
tdone.wait()
num = N
tdone.release()

# guarantee all in cond.wait
cond.acquire(); cond.release()
print 'all threads waiting'
print '*** iter', i+1

for j in range(2):
tdone.acquire()
if num == 0:
tdone.release()
break
release = whrandom.randint(0, num>>1)
tdone.release()
print 'letting', release, 'out'
if release == 1:
cond.signal()
else:
cond.broadcast(release)
time.sleep(2)

tdone.acquire()
if num:
print 'letting the rest out'
cond.broadcast()
tdone.release()

tdone.acquire()
while num:
tdone.wait()
num = N
tdone.release()

# guarantee all in nextiter.wait
nextiter.acquire(); nextiter.release()
print 'all out!'

def _test_make_task():
global N
print 'creating new task', N
thread.start_new_thread(_test_task, (N,))
N = N + 1

def test():
global N, num, cond, tdone, nextiter
TRIPS = 10

cond = condition()
tdone = condition() # 'num' is shared data owned by this
nextiter = condition()

N = 0
for i in range(3): _test_make_task()
num = N

for i in range(TRIPS):
_test_iter(i)
if i < TRIPS-1:
nextiter.broadcast()
print 'test may have passed!'

if __name__ == '__main__':
test()

# end of module