[ENH] Monitor for higher-level process synchronization

Nathanael Schärli n.schaerli at gmx.net
Mon Jul 1 21:17:44 UTC 2002


Hey,

I haven been programming some multi-threaded applications in Squeak, and
once more I got reminded that it is really hard and cumbersome to write
safe and understandable code by just using semaphores as the only means
of synchronization.

Having a look at the class SharedQueue is a good example: At a first
glance, it seems to be doing what it is supposed to, but when you look
at it more closely, you'll notice that there are about 5 or more
synchronization bugs in it.

I believe that it is possible to avoid most of these problems by using a
higher-level synchronization mechanism, and therefore I implemented a
variant of the well-known Monitor data-structure in Squeak. (In fact, I
ported an implementation I did in VW about 2 years ago).
This monior has the following properties:

1) At any time, only one process can be executing code inside a critcal
section of a monitor.
2) A monitor is reentrant, which means that the active process in a
monitor does never get blocked when it enters a (nested) critical
section of the same monitor.
3) Inside a critcal section, a process can wait for an event that may be
coupled to a certain condition. If the condition is not fulfilled, the
process leaves the monitor temporarily (in order to let other processes
enter) and waits until another process signals the event. Then, the
original process checks the condition again (this often necessary
because the state of the monitor could have changed in the meantime) and
continues if it is fulfilled.
4) The monitor is fair, which means that the process that is waiting on
a signaled condition the longest gets activated first.
5) The monitor allows to define timeouts after which a process gets
activated automatically.

There are several important advantages of such a Monitor over a
Semaphore:
1) A process autimatically leaves the monitor when a condition is not
fulfilled.
2) When a blocked process is woken up, the condition is always
rechecked.
3) Point 1) and 2) together guarantee that the code following the
condition is only executed if the condition holds. In addition, busy
waits will never occur.
4) The monitor is reentrant


In order to illustrate how a Monitor can drastically simplify process
synchronization, let's assume that we would like to implement a
thread-safe BoundedCounter with the following properties:
- The counter provides methods #inc and #dec, which increment and
decrement the counter. 
- If the value of the counter is equal to a certain #lowerBound (e.g.
0), every process invoking #dec gets blocked until the decrementation
can be performed without getting a value less than #lowerBound.
- Similarly, inc blocks processes in order to avoid values bigger than
#upperBound (e.g. 10)

Without a monitor, we typically use 3 semaphores (mutex, incPossible,
decPossible) to safely implement this behavior. And as you can see, the
whole synchronization is rather complex because the the usage of the
semaphores have to be nested in order to avoid unsafe situations or
deadlocks:

BoundedCounter>>dec
	| done |
	done _ false.
	[done] whileFalse: [
		mutex critical: [
			value > self lowerBound ifTrue: [
				value _ value - 1.
				done _ true.
				incPossible signal]].
		done ifFalse: [decPossible wait]].

BoundedCounter>>inc
	| done |
	done _ false.
	[done] whileFalse: [
		mutex critical: [
			value < self upperBound ifTrue: [
				value _ value + 1.
				done _ true.
				decPossible signal]].
		done ifFalse: [incPossible wait]].


Instead of using 3 Semaphores, we can use just one Monitor (monitor).
Like that, the code would look as follows:

BoundedCounter>>dec
	monitor critical: [
		monitor waitUntil: [value > self lowerBound].
		value = self upperBound ifTrue: [monitor signalAll].
		value _ value - 1].

BoundedCounter>>inc
	monitor critical: [
		monitor waitUntil: [value < self upperBound].
		value = self lowerBound ifTrue: [monitor signalAll].
		value _ value + 1].

Let's have a closer look at what happens:
- Invocation of #critical: ensures that only one process can be
modifying the counter at a time.
- Invocation of #waitUntil: guarantees that the argument condition holds
when the following code is executed. As long as this condition does not
hold, the process gets blocked and leaves the critical section so that
another process can enter.
- Invocation of #signalAll wakes up all the processes that have been
blocked because there waiting condition was not fulfilled. As explained
above, the conditio gets rechecked before the following code is
executed.


As you can see, this code is already a lot nicer than the original one.
However, using more advanced features of the monitor can make it even
nicer:

BoundedCounter>>dec
	monitor critical: [
		monitor waitUntil: [value > self lowerBound] for:
#decPossible.
		monitor signal: #incPossible.
		value _ value - 1].

BoundedCounter>>inc
	monitor critical: [
		monitor waitUntil: [value < self upperBound] for:
#incPossible.
		monitor signal: #decPossible.
		value _ value + 1].

The difference to the implementation above are as follows:
- The method #waitUntil:for: is very similar to the #waitUntil:. The
only difference is the fact that a blocked process only gets woken up if
the event provided as the second argument is signaled.
- The method #signal: is very similar to #signalAll. The difference to
#signalAll is that
#signal: only wakes up *one* process that is waiting for the event
specified as the argument.


Well, you can find a complete documentation of the Monitor in the
changeset "Monitor.1.cs" that is attached to this email (read the class
comment). In addition, there is a changeset "MonitorTest.1.cs" that
contains some SUnit tests, and a file BoundedCounter.1.cs that contains
this example.

Please let me know if you find bugs or have suggestions for changes...


Cheers,
Nathanael

BTW: One remark for the ones that already know the Java synchronization
mechanism: This Monitor implementation can be considered as a more
powerful version of the Java synchronization mechanism. Just replace:

#critical: ...  ->  synchronized() { ... }
#wait  ->  wait()
#signal  ->  notify()
#signalAll  ->  notifyAll()
-------------- next part --------------
'From Squeak3.2gamma of 15 January 2002 [latest update: #4889] on 1 July 2002 at 11:14:34 pm'!
Object subclass: #BoundedCounter
	instanceVariableNames: 'value mutex incPossible decPossible '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Monitor-Examples'!

!BoundedCounter commentStamp: 'NS 7/1/2002 21:53' prior: 0!
Thread safe counter with upper and lower limits. This implementation uses only Semaphores and is rather clumsy. For a more understandable version have a look at MBoundedCounter.!

Object subclass: #MBoundedCounter
	instanceVariableNames: 'value monitor '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Monitor-Examples'!

!MBoundedCounter commentStamp: 'NS 7/1/2002 21:53' prior: 0!
Thread safe counter with upper and lower limits. This implementation uses a Monitor and is therefore pretty easy to understand. A possible implementation without a monitor is shown in BoundedCounter.!


!BoundedCounter methodsFor: 'initialize-release' stamp: 'NS 7/1/2002 20:26'!
initialize
	value _ 0.
	mutex _ Semaphore forMutualExclusion.
	incPossible _ Semaphore new.
	decPossible _ Semaphore new.! !

!BoundedCounter methodsFor: 'accessing' stamp: 'NS 7/1/2002 20:33'!
dec
	| done |
	done _ false.
	[done] whileFalse: [
		mutex critical: [
			value > self lowerBound ifTrue: [
				value _ value - 1.
				done _ true.
				incPossible signal]].
		done ifFalse: [decPossible wait]].! !

!BoundedCounter methodsFor: 'accessing' stamp: 'NS 7/1/2002 20:33'!
inc
	| done |
	done _ false.
	[done] whileFalse: [
		mutex critical: [
			value < self upperBound ifTrue: [
				value _ value + 1.
				done _ true.
				decPossible signal]].
		done ifFalse: [incPossible wait]].! !

!BoundedCounter methodsFor: 'accessing' stamp: 'NS 7/1/2002 20:25'!
value
	^ value! !

!BoundedCounter methodsFor: 'private' stamp: 'NS 7/1/2002 20:05'!
lowerBound
	^ 0! !

!BoundedCounter methodsFor: 'private' stamp: 'NS 7/1/2002 20:04'!
upperBound
	^ 10! !


!BoundedCounter class methodsFor: 'instance creation' stamp: 'NS 7/1/2002 20:26'!
new
	^ super new initialize! !


!MBoundedCounter methodsFor: 'initialize-release' stamp: 'NS 7/1/2002 20:37'!
initialize
	value _ 0.
	monitor _ Monitor new.! !

!MBoundedCounter methodsFor: 'accessing' stamp: 'NS 7/1/2002 20:40'!
dec
	monitor critical: [
		monitor waitUntil: [value > self lowerBound] for: #decPossible.
		value _ value - 1.
		monitor signal: #incPossible].! !

!MBoundedCounter methodsFor: 'accessing' stamp: 'NS 7/1/2002 20:40'!
inc
	monitor critical: [
		monitor waitUntil: [value < self upperBound] for: #incPossible.
		value _ value + 1.
		monitor signal: #decPossible].! !

!MBoundedCounter methodsFor: 'accessing' stamp: 'NS 7/1/2002 20:37'!
value
	^ value! !

!MBoundedCounter methodsFor: 'private' stamp: 'NS 7/1/2002 20:37'!
lowerBound
	^ 0! !

!MBoundedCounter methodsFor: 'private' stamp: 'NS 7/1/2002 20:37'!
upperBound
	^ 10! !


!MBoundedCounter class methodsFor: 'instance creation' stamp: 'NS 7/1/2002 20:37'!
new
	^ super new initialize! !

-------------- next part --------------
'From Squeak3.2gamma of 15 January 2002 [latest update: #4889] on 1 July 2002 at 11:14:27 pm'!
Object subclass: #Monitor
	instanceVariableNames: 'mutex ownerProcess defaultQueue queueDict '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Kernel-Processes'!

!Monitor commentStamp: 'NS 7/1/2002 22:22' prior: 0!
A monitor provides process synchronization that is more highlevel than the one provided by a Semaphore. Similar to the classical definition of a Monitor, it has the following properties:

1) At any time, only one process can be executing code inside a critcal section of a monitor.
2) A monitor is reentrant, which means that the active process in a monitor does never get blocked when it enters a (nested) critical section of the same monitor.
3) Inside a critcal section, a process can wait for an event that maybe coupled to a certain condition. If the condition is not fulfilled, the process leaves the monitor temporarily (in order to let other processes enter) and waits until another process signals the event. Then, the original process checks the condition again (this often necessary because the state of the monitor could have changed in the meantime) and continues if it is fulfilled.
4) The monitor is fair, which means that the process that is waiting on a signaled condition the longest gets activated first.
5) The monitor allows to define timeouts after which a process gets activated automatically.


Basic usage:

Monitor>>critcal: aBlock
Critical section.
Executes aBlock as a critcal section. At any time, only one process can be executing code in a critical section.
NOTE: All the following synchronization operations are only valid inside the critical section of the monitor!!

Monitor>>wait
Unconditional waiting for the default event.
The current process gets blocked and leaves the monitor, which means that the monitor allows another process to execute critical code. When the default event is signaled, the original process is resumed.

Monitor>>waitWhile: aBlock
Conditional waiting for the default event.
The current process gets blocked and leaves the monitor only if the argument block evaluates to true. This means that another process can enter the monitor. When the default event is signaled, the original process is resumed, which means that the condition (argument block) is checked again. Only if it evaluates to false, execution proceeds. Otherwise, the process gets blcoked and leaves the monitor again...

Monitor>>waitUntil: aBlock
Conditional waiting for the default event.
See Monitor>>waitWhile: aBlock.

Monitor>>signal
One process waiting for the default event is woken up.

Monitor>>signalAll
All processes waiting for the default event are woken up.


Using non-default (specific) events:

Monitor>>waitFor: aSymbol
Unconditional waiting for the non-default event represented by the argument symbol.
Same as Monitor>>wait, but the process gets only reactivated by the specific event and not the default event.

Monitor>>waitWhile: aBlock for: aSymbol
Confitional waiting for the non-default event represented by the argument symbol.
Same as Monitor>>waitWhile:for:, but the process gets only reactivated by the specific event and not the default event.

Monitor>>waitUntil: aBlock for: aSymbol
Confitional waiting for the non-default event represented by the argument symbol.
See Monitor>>waitWhile:for: aBlock.

Monitor>>signal: aSymbol
One process waiting for the given event is woken up. If there is no process waiting for this specific event, a process waiting for the default event gets resumed.

Monitor>>signalAll: aSymbol
All process waiting for the given event or the default event are woken up.

Monitor>>signalReallyAll
All processes waiting for any events (default or specific) are woken up.


Using timeouts

Monitor>>waitMaxMilliseconds: anInteger
Monitor>>waitFor: aSymbol maxMilliseconds: anInteger
Same as Monitor>>wait (resp. Monitor>>waitFor:), but the process gets automatically woken up when the specified time has passed.

Monitor>>waitWhile: aBlock maxMilliseconds: anInteger
Monitor>>waitWhile: aBlock for: aSymbol maxMilliseconds: anInteger
Same as Monitor>>waitWhile: (resp. Monitor>>waitWhile:for:), but the process gets automatically woken up when the specified time has passed.

Monitor>>waitUntil: aBlock maxMilliseconds: anInteger
Monitor>>waitUntil: aBlock for: aSymbol maxMilliseconds: anInteger
Same as Monitor>>waitUntil: (resp. Monitor>>waitUntil:for:), but the process gets automatically woken up when the specified time has passed.


Usage examples

See code in class MBoundedCounter and compare it to the clumsy BoundedCounter that iw written wihout a monitor.!


!Monitor methodsFor: 'synchronization' stamp: 'NS 7/1/2002 21:54'!
critical: aBlock
	"Critical section.
	Executes aBlock as a critcal section. At any time, only one process can be executing code 
	in a critical section.
	NOTE: All the following synchronization operations are only valid inside the critical section 
	of the monitor!!"

	[self enter.
	aBlock value] ensure: [self exit].! !

!Monitor methodsFor: 'waiting-basic' stamp: 'NS 7/1/2002 21:55'!
wait
	"Unconditional waiting for the default event.
	The current process gets blocked and leaves the monitor, which means that the monitor
	allows another process to execute critical code. When the default event is signaled, the
	original process is resumed."

	^ self waitMaxMilliseconds: nil! !

!Monitor methodsFor: 'waiting-basic' stamp: 'NS 7/1/2002 21:56'!
waitUntil: aBlock
	"Conditional waiting for the default event.
	See Monitor>>waitWhile: aBlock."

	^ self waitUntil: aBlock for: nil! !

!Monitor methodsFor: 'waiting-basic' stamp: 'NS 7/1/2002 21:56'!
waitWhile: aBlock
	"Conditional waiting for the default event.
	The current process gets blocked and leaves the monitor only if the argument block
	evaluates to true. This means that another process can enter the monitor. When the 
	default event is signaled, the original process is resumed, which means that the condition
	(argument block) is checked again. Only if it evaluates to false, execution proceeds.
	Otherwise, the process gets blcoked and leaves the monitor again..."

	^ self waitWhile: aBlock for: nil! !

!Monitor methodsFor: 'waiting-specific' stamp: 'NS 7/1/2002 21:58'!
waitFor: aSymbolOrNil
	"Unconditional waiting for the non-default event represented by the argument symbol.
	Same as Monitor>>wait, but the process gets only reactivated by the specific event and 
	not the default event."

	^ self waitFor: aSymbolOrNil maxMilliseconds: nil! !

!Monitor methodsFor: 'waiting-specific' stamp: 'NS 7/1/2002 22:01'!
waitUntil: aBlock for: aSymbolOrNil
	"Confitional waiting for the non-default event represented by the argument symbol.
	See Monitor>>waitWhile:for: aBlock."

	^ self waitUntil: aBlock for: aSymbolOrNil maxMilliseconds: nil! !

!Monitor methodsFor: 'waiting-specific' stamp: 'NS 7/1/2002 22:01'!
waitWhile: aBlock for: aSymbolOrNil
	"Confitional waiting for the non-default event represented by the argument symbol.
	Same as Monitor>>waitWhile:for:, but the process gets only reactivated by the specific 
	event and not the default event."

	^ self waitWhile: aBlock for: aSymbolOrNil maxMilliseconds: nil! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:03'!
waitFor: aSymbolOrNil maxMilliseconds: anIntegerOrNil
	"Same as Monitor>>waitFor:, but the process gets automatically woken up when the 
	specified time has passed."

	self checkOwnerProcess.
	self waitInQueue: (self queueFor: aSymbolOrNil) maxMilliseconds: anIntegerOrNil.! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:04'!
waitFor: aSymbolOrNil maxSeconds: aNumber
	"Same as Monitor>>waitFor:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitFor: aSymbolOrNil maxMilliseconds: (aNumber * 1000) asInteger! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:04'!
waitMaxMilliseconds: anIntegerOrNil
	"Same as Monitor>>wait, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitFor: nil maxMilliseconds: anIntegerOrNil! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:05'!
waitMaxSeconds: aNumber
	"Same as Monitor>>wait, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitMaxMilliseconds: (aNumber * 1000) asInteger! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:05'!
waitUntil: aBlock for: aSymbolOrNil maxMilliseconds: anIntegerOrNil
	"Same as Monitor>>waitUntil:for:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitWhile: [aBlock value not] for: aSymbolOrNil maxMilliseconds: anIntegerOrNil! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:05'!
waitUntil: aBlock for: aSymbolOrNil maxSeconds: aNumber
	"Same as Monitor>>waitUntil:for:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitUntil: aBlock for: aSymbolOrNil maxMilliseconds: (aNumber * 1000) asInteger! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:05'!
waitUntil: aBlock maxMilliseconds: anIntegerOrNil
	"Same as Monitor>>waitUntil:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitUntil: aBlock for: nil maxMilliseconds: anIntegerOrNil! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:06'!
waitUntil: aBlock maxSeconds: aNumber
	"Same as Monitor>>waitUntil:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitUntil: aBlock maxMilliseconds: (aNumber * 1000) asInteger! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:06'!
waitWhile: aBlock for: aSymbolOrNil maxMilliseconds: anIntegerOrNil
	"Same as Monitor>>waitWhile:for:, but the process gets automatically woken up when the 
	specified time has passed."

	self checkOwnerProcess.
	self waitWhile: aBlock inQueue: (self queueFor: aSymbolOrNil) maxMilliseconds: anIntegerOrNil.! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:06'!
waitWhile: aBlock for: aSymbolOrNil maxSeconds: aNumber
	"Same as Monitor>>waitWhile:for:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitWhile: aBlock for: aSymbolOrNil maxMilliseconds: (aNumber * 1000) asInteger! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:06'!
waitWhile: aBlock maxMilliseconds: anIntegerOrNil
	"Same as Monitor>>waitWhile:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitWhile: aBlock for: nil maxMilliseconds: anIntegerOrNil! !

!Monitor methodsFor: 'waiting-timeout' stamp: 'NS 7/1/2002 22:06'!
waitWhile: aBlock maxSeconds: aNumber
	"Same as Monitor>>waitWhile:, but the process gets automatically woken up when the 
	specified time has passed."

	^ self waitWhile: aBlock maxMilliseconds: (aNumber * 1000) asInteger! !

!Monitor methodsFor: 'signaling-default' stamp: 'NS 7/1/2002 21:57'!
signal
	"One process waiting for the default event is woken up."

	^ self signal: nil! !

!Monitor methodsFor: 'signaling-default' stamp: 'NS 7/1/2002 21:57'!
signalAll
	"All processes waiting for the default event are woken up."

	^ self signalAll: nil! !

!Monitor methodsFor: 'signaling-specific' stamp: 'NS 7/1/2002 22:02'!
signal: aSymbolOrNil
	"One process waiting for the given event is woken up. If there is no process waiting 
	for this specific event, a process waiting for the default event gets resumed."

	| queue |
	self checkOwnerProcess.
	queue _ self queueFor: aSymbolOrNil.
	((self normalizeQueueAndReturnIfEmpty: queue) and: [queue ~~ self defaultQueue])
		ifTrue: [queue _ self defaultQueue].
	self signalQueue: queue.! !

!Monitor methodsFor: 'signaling-specific' stamp: 'NS 7/1/2002 22:02'!
signalAll: aSymbolOrNil
	"All process waiting for the given event or the default event are woken up."

	| queue |
	self checkOwnerProcess.
	queue _ self queueFor: aSymbolOrNil.
	self signalAllInQueue: self defaultQueue.
	queue ~~ self defaultQueue ifTrue: [self signalAllInQueue: queue].! !

!Monitor methodsFor: 'signaling-specific' stamp: 'NS 7/1/2002 22:02'!
signalReallyAll
	"All processes waiting for any events (default or specific) are woken up."

	self checkOwnerProcess.
	self signalAll.
	self queueDict valuesDo: [:queue |
		self signalAllInQueue: queue].! !

!Monitor methodsFor: 'accessing' stamp: 'NS 7/1/2002 20:02'!
cleanup
	self checkOwnerProcess.
	self critical: [self privateCleanup].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 12:38'!
checkOwnerProcess
	(ownerProcess == Processor activeProcess)
		ifFalse: [self error: 'Monitor access violation'].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 12:37'!
clearOwnerProcess
	ownerProcess _ nil.! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 15:06'!
defaultQueue
	defaultQueue ifNil: [defaultQueue _ OrderedCollection new].
	^ defaultQueue! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 12:36'!
enter
	self isOwnerProcess ifTrue: [^ self].
	mutex wait.
	self setOwnerProcess.! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 12:36'!
exit
	self clearOwnerProcess.
	mutex signal.! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 14:52'!
exitAndWaitInQueue: anOrderedCollection maxMilliseconds: anIntegerOrNil
	| lock |
	anOrderedCollection isEmpty ifTrue: [
		lock _ anOrderedCollection addLast: Semaphore new.
	] ifFalse: [
		lock _ anOrderedCollection last.
		(anIntegerOrNil notNil and: [lock isEmpty not]) ifTrue: [
			lock _ anOrderedCollection addLast: Semaphore new.
			anOrderedCollection addLast: Semaphore new]].
	
	self exit.
	anIntegerOrNil isNil
		ifTrue: [lock wait]
		ifFalse: [lock waitTimeoutMSecs: anIntegerOrNil].
	self enter.! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 15:42'!
isOwnerProcess
	^ Processor activeProcess == ownerProcess! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 13:13'!
lastSemaphoreInQueue: anOrderedCollection
	| last |
	anOrderedCollection isEmpty ifFalse: [last _ anOrderedCollection last].
	^ (last isKindOf: Semaphore)
		ifTrue: [last]
		ifFalse: [anOrderedCollection addLast: (Semaphore new)].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 15:51'!
normalizeQueue: anOrderedCollectionOrNil
	[anOrderedCollectionOrNil isEmptyOrNil not and: [anOrderedCollectionOrNil first isEmpty]]
		whileTrue: [anOrderedCollectionOrNil removeFirst].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 16:01'!
normalizeQueueAndReturnIfEmpty: anOrderedCollectionOrNil
	self normalizeQueue: anOrderedCollectionOrNil.
	^ anOrderedCollectionOrNil isEmptyOrNil! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 17:08'!
privateCleanup
	(self normalizeQueueAndReturnIfEmpty: defaultQueue) ifTrue: [defaultQueue _ nil].
	queueDict ifNotNil: [
		queueDict copy keysAndValuesDo: [:id :queue | 
			(self normalizeQueueAndReturnIfEmpty: queue)
				ifTrue: [queueDict removeKey: id]].
		queueDict isEmpty ifTrue: [queueDict _ nil]].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 15:10'!
queueDict
	queueDict ifNil: [queueDict _ IdentityDictionary new].
	^ queueDict.! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 15:12'!
queueFor: aSymbol
	aSymbol ifNil: [^ self defaultQueue].
	^ self queueDict 
		at: aSymbol 
		ifAbsent: [self queueDict at: aSymbol put: OrderedCollection new].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 12:37'!
setOwnerProcess
	ownerProcess _ Processor activeProcess.! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 15:22'!
signalAllInQueue: anOrderedCollection
	anOrderedCollection do: [:lock |
		[lock isEmpty] whileFalse: [lock signal]].
	anOrderedCollection removeAllSuchThat: [:each | true].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 16:02'!
signalQueue: anOrderedCollection
	| lock |
	(self normalizeQueueAndReturnIfEmpty: anOrderedCollection) ifTrue: [^ self].
	lock _ anOrderedCollection first.
	lock signal.
	lock isEmpty ifTrue: [anOrderedCollection removeFirst].! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 13:17'!
waitInQueue: anOrderedCollection maxMilliseconds: anIntegerOrNil
	self exitAndWaitInQueue: anOrderedCollection maxMilliseconds: anIntegerOrNil.! !

!Monitor methodsFor: 'private' stamp: 'NS 7/1/2002 13:17'!
waitWhile: aBlock inQueue: anOrderedCollection maxMilliseconds: anIntegerOrNil
	[aBlock value] whileTrue: [self exitAndWaitInQueue: anOrderedCollection maxMilliseconds: anIntegerOrNil].! !

!Monitor methodsFor: 'initialize-release' stamp: 'NS 7/1/2002 15:10'!
initialize
	mutex _ Semaphore forMutualExclusion.! !


!Monitor class methodsFor: 'instance creation' stamp: 'NS 7/1/2002 15:33'!
new
	^ super new initialize! !

Monitor removeSelector: #addSynchProcess!
Monitor removeSelector: #checkSynchProcess!
Monitor removeSelector: #checkValidProcess!
Monitor removeSelector: #isQueueEmpty:!
Monitor removeSelector: #normalizeQueueAndReturnWhetherEmpty:!
Monitor removeSelector: #registerSynchProcess!
Monitor removeSelector: #removeSynchProcess!
Monitor removeSelector: #synchronized:!
Monitor removeSelector: #waitInQueue:!
Monitor removeSelector: #waitUntil:inQueue:!
Monitor removeSelector: #waitWhile:inQueue:!

!Monitor reorganize!
('synchronization' critical:)
('waiting-basic' wait waitUntil: waitWhile:)
('waiting-specific' waitFor: waitUntil:for: waitWhile:for:)
('waiting-timeout' waitFor:maxMilliseconds: waitFor:maxSeconds: waitMaxMilliseconds: waitMaxSeconds: waitUntil:for:maxMilliseconds: waitUntil:for:maxSeconds: waitUntil:maxMilliseconds: waitUntil:maxSeconds: waitWhile:for:maxMilliseconds: waitWhile:for:maxSeconds: waitWhile:maxMilliseconds: waitWhile:maxSeconds:)
('signaling-default' signal signalAll)
('signaling-specific' signal: signalAll: signalReallyAll)
('accessing' cleanup)
('private' checkOwnerProcess clearOwnerProcess defaultQueue enter exit exitAndWaitInQueue:maxMilliseconds: isOwnerProcess lastSemaphoreInQueue: normalizeQueue: normalizeQueueAndReturnIfEmpty: privateCleanup queueDict queueFor: setOwnerProcess signalAllInQueue: signalQueue: waitInQueue:maxMilliseconds: waitWhile:inQueue:maxMilliseconds:)
('initialize-release' initialize)
!

-------------- next part --------------
'From Squeak3.2gamma of 15 January 2002 [latest update: #4889] on 1 July 2002 at 11:14:30 pm'!
TestCase subclass: #MonitorTest
	instanceVariableNames: 'm out '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Kernel-Processes'!

!MonitorTest methodsFor: 'Testing' stamp: 'NS 7/1/2002 20:02'!
testDefaultQueue1
	10 timesRepeat: [[m critical: [m wait. out add: 1]] forkAt: Processor userInterruptPriority].
	self assert: out isEmpty.
	1 to: 10 do: [:index | 
		m critical: [m signal].
		self assert: out size = index].
	self assert: out size = 10! !

!MonitorTest methodsFor: 'Testing' stamp: 'NS 7/1/2002 20:02'!
testDefaultQueue2
	10 timesRepeat: [[m critical: [m wait. out add: 1]] forkAt: Processor userInterruptPriority].
	self assert: out isEmpty.
	m critical: [m signalAll].
	self assert: out size = 10! !

!MonitorTest methodsFor: 'Testing' stamp: 'NS 7/1/2002 20:02'!
testDefaultQueue3
	10 timesRepeat: [[m critical: [m wait. out add: 1]] forkAt: Processor userInterruptPriority].
	self assert: out isEmpty.
	1 to: 10 do: [:index | 
		m critical: [m signal: ('a', index asString) asSymbol].
		self assert: out size = index].
	self assert: out size = 10.! !

!MonitorTest methodsFor: 'Testing' stamp: 'NS 7/1/2002 20:02'!
testDefaultQueueUntil1
	| a |
	a _ ValueHolder new contents: 0.
	3 timesRepeat: [
		[m critical: [
			m waitUntil: [a contents > 10].
			out add: a contents.
			a contents: 0]] forkAt: Processor userInterruptPriority].
	self assert: out isEmpty.
	a contents: 11.
	self assert: out isEmpty.
	m critical: [m signal].
	self assert: (out size = 1 and: [out last = 11]).
	m critical: [m signal].
	self assert: out size = 1.
	a contents: 12.
	self assert: out size = 1.
	m critical: [m signal. Processor yield. a contents: 13. m signal].
	self assert: (out size = 2 and: [out last = 13]).
	m critical: [a contents: 14. m signal].
	self assert: (out size = 3 and: [out last = 14])! !

!MonitorTest methodsFor: 'Testing' stamp: 'NS 7/1/2002 20:02'!
testDefaultQueueUntilWithTimeout1
	| a |
	a _ ValueHolder new contents: 0.
	[m critical: [m waitUntil: [a contents > 10]. out add: 1]] 
		forkAt: Processor userInterruptPriority.
	[m critical: [m waitUntil: [a contents > 20] maxMilliseconds: 10. out add: 2]] 
		forkAt: Processor userInterruptPriority.
	m critical: [m signal].
	a contents: 15.
	self assert: out isEmpty.
	(Delay forMilliseconds: 11) wait.
	self assert: out isEmpty.
	m critical: [m signal].
	a contents: 22.
	self assert: (out size = 1 and: [out last = 1]).
	(Delay forMilliseconds: 11) wait.
	self assert: (out size = 2 and: [out last = 2]).! !

!MonitorTest methodsFor: 'Testing' stamp: 'NS 7/1/2002 20:02'!
testDefaultQueueWithTimeout1
	[m critical: [m wait. out add: 1]] forkAt: Processor userInterruptPriority.
	[m critical: [m waitMaxMilliseconds: 10. out add: 2]] forkAt: Processor userInterruptPriority.
	[m critical: [m wait. out add: 3]] forkAt: Processor userInterruptPriority.
	[m critical: [m waitMaxMilliseconds: 30. out add: 4]] forkAt: Processor userInterruptPriority.
	[m critical: [m waitMaxMilliseconds: 20. out add: 5]] forkAt: Processor userInterruptPriority.
	[m critical: [m waitMaxSeconds: 1. out add: 6]] forkAt: Processor userInterruptPriority.
	self assert: out isEmpty.
	(Delay forMilliseconds: 11) wait.
	self assert: (out size = 1 and: [out last = 2]).
	(Delay forMilliseconds: 11) wait.
	self assert: (out size = 2 and: [out last = 5]).
	(Delay forMilliseconds: 11) wait.
	self assert: (out size = 3 and: [out last = 4]).
	m critical: [m signal].
	self assert: (out size = 4 and: [out last = 1]).
	m critical: [m signal].
	self assert: (out size = 5 and: [out last = 3]).
	m critical: [m signal].
	self assert: (out size = 6 and: [out last = 6]).	! !

!MonitorTest methodsFor: 'Testing' stamp: 'NS 7/1/2002 20:03'!
testQueue1
	10 timesRepeat: [
		[m critical: [m waitFor: #event. out add: 2]] forkAt: Processor userInterruptPriority.
		[m critical: [m wait. out add: 1]] forkAt: Processor userInterruptPriority].
	self assert: out isEmpty.
	1 to: 10 do: [:index | 
		m critical: [m signal].
		self assert: out size = index].
	1 to: 10 do: [:index | 
		m critical: [m signal: #event].
		self assert: out size = (10 + index)].
	1 to: 20 do: [:index | 
		index <= 10 
			ifTrue: [self assert: (out at: index) = 1]
			ifFalse: [self assert: (out at: index) = 2]].! !

!MonitorTest methodsFor: 'Running' stamp: 'NS 7/1/2002 16:36'!
setUp
	m _ Monitor new.
	out _ OrderedCollection new.! !

MonitorTest removeSelector: #testDefaultQueue4!
MonitorTest removeSelector: #testDefaultQueueBasic!
MonitorTest removeSelector: #testDefaultQueueUntilWithTimeout!
MonitorTest removeSelector: #testDefaultQueueWithTimeout!
MonitorTest removeSelector: #testTest!

!MonitorTest reorganize!
('Testing' testDefaultQueue1 testDefaultQueue2 testDefaultQueue3 testDefaultQueueUntil1 testDefaultQueueUntilWithTimeout1 testDefaultQueueWithTimeout1 testQueue1)
('Running' setUp)
!



More information about the Squeak-dev mailing list