REPOSTING: SharedQueue question???

Rob Withers slosher2 at
Fri Oct 27 08:52:43 UTC 2000

jchludzinski at wrote:
> Are items (Objects) place on a SharedQueue (by multiple Processes
> using #nextPut:) guarenteed to be taken off (by multiple Processes
> using #next) in the SAME orden they were placed on?
> Are there any known thread-safe issues about SharedQueues?
> ---John

Perhaps these SharedPriorityQueues would help you?  You can set the sort
block to whatever you'd like, timestamp for instance.  Sorry about the
tests - they are somewhat weak.


Smalltalking by choice.  Isn't it nice to have one!
-------------- next part --------------
'From Squeak2.9alpha of 13 June 2000 [latest update: #2570] on 27 October 2000 at 1:42:58 am'!
Object subclass: #SharedPriorityQueue
	instanceVariableNames: 'contents accessProtect readSync '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected'!
SharedPriorityQueue subclass: #FixedSizeSharedPriorityQueue
	instanceVariableNames: 'writeSync '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected'!
SharedPriorityQueue class
	instanceVariableNames: ''!

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:59'!
critical: aBlock
	^ accessProtect critical: aBlock
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:52'!
	self critical: [contents size timesRepeat: [contents removeFirst]].! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:58'!
	^ self critical: [contents isEmpty]! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:02'!
	^ self critical: [contents isEmpty
			ifTrue: [nil]
			ifFalse: [contents removeFirst]].
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:00'!
nextPut: aValue
	self critical: [contents add: aValue].
	readSync signal.
	^ aValue
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:03'!
	^ self critical: [contents isEmpty
		ifTrue: [nil]
		ifFalse: [contents first]]
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:57'!
	^ self critical: [contents size]! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:03'!
sortBlock: aBlock
	self critical: [contents sortBlock: aBlock].
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:56'!
subsetFrom: start to: stop
	^ self critical: [contents copyFrom: start to: stop]

! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 23:07'!
init: initSize

	contents := Heap new: initSize.
	accessProtect := Semaphore forMutualExclusion.
	readSync := Semaphore new.
	self sortBlock: [:a :b | a hash < b hash].
! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 22:47'!
	readSync wait.
	^ self nextNoWait.
! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 20:35'!
nextTimeoutMSecs: milliSeconds
	readSync waitTimeoutMSecs: milliSeconds.
	^self nextNoWait
! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 22:55'!
subset: limit contentsDo: aBlock
	| subset |
	subset := self subsetFrom: 1 to: limit.
	subset do: [:eachElement | aBlock value: eachElement].

! !

!SharedPriorityQueue methodsFor: 'printing' stamp: 'rww 7/27/2000 04:03'!
printOn: aStream

	aStream nextPutAll: 'priority queue => '.
	self size > 0
		ifTrue: [aStream nextPutAll: self size printString , ' events remaining'.
			self printQueueContentsOn: aStream]
		ifFalse: [aStream nextPutAll: 'empty'].! !

!SharedPriorityQueue methodsFor: 'printing' stamp: 'rww 7/27/2000 04:03'!
printQueueContentsOn: aStream 
	| printSize |
	(printSize := self size) > 10 
		ifTrue: [printSize := 10].
		subset: printSize 
		contentsDo: [:element | 
			aStream cr; tab; 
				nextPutAll: element printString].! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:55'!
	self critical: [contents size timesRepeat: [contents removeFirst. writeSync signal]].! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:02'!
init: initSize
	super init: initSize.
	writeSync := Semaphore new.
	1 to: initSize do: [:each | writeSync signal].! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:13'!
	| value |
	value := super next.
	value ifNotNil: [writeSync signal].
! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:06'!
nextPut: aValue
	writeSync wait.
	^ super nextPut: aValue
! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:05'!
nextPut: aValue timeoutMSecs: milliSeconds
	writeSync waitTimeoutMSecs: milliSeconds.
	^ super nextPut: aValue
! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:15'!
nextTimeoutMSecs: milliSeconds
	| value |
	value := super nextTimeoutMSecs: milliSeconds.
	value ifNotNil: [writeSync signal].

! !

!SharedPriorityQueue class methodsFor: 'instance creation'!

	^self new: 10! !

!SharedPriorityQueue class methodsFor: 'instance creation'!
new: anInitSize 

	^(super new) init: anInitSize; yourself! !

!SharedPriorityQueue reorganize!
('access operations' critical: flush isEmpty nextNoWait nextPut: peek size sortBlock: subsetFrom:to:)
('protected operations' init: next nextTimeoutMSecs: subset:contentsDo:)
('printing' printOn: printQueueContentsOn:)

-------------- next part --------------
'From Squeak2.9alpha of 13 June 2000 [latest update: #2570] on 27 October 2000 at 1:43:10 am'!
TestCase subclass: #FixedSizeSharedPriorityQueueTest
	instanceVariableNames: 'queue '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected-Tests'!
TestCase subclass: #SharedPriorityQueueTest
	instanceVariableNames: 'queue '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected-Tests'!

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:11'!
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:11'!
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/30/2000 05:26'!
	| sem1 sem2 sem3 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.
	sem3 := Semaphore new.

	[1 to: 10 do: [:each | (Delay forMilliseconds: 2) wait.
		self queue nextPut: each]. sem1 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem2 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem3 signal] fork.
	sem1 wait.
	sem2 wait.
	sem3 wait.

	self should: [self queue size = 0].! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:10'!
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:54'!
	| sem1 |
	sem1 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	sem1 wait.
	self queue flush.
	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:10'!
	[1 to: 10 do: [:each | self queue nextPut: each]] fork.
	10 timesRepeat: [self queue next].
	self should: [self queue size = 0].! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
	^ queue
! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 23:54'!
	queue := FixedSizeSharedPriorityQueue new: 20.
! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
	queue := nil.
! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:34'!
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:34'!
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/30/2000 05:25'!
	| sem1 sem2 sem3 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.
	sem3 := Semaphore new.

	[1 to: 10 do: [:each | (Delay forMilliseconds: 2) wait.
		self queue nextPut: each]. sem1 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem2 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem3 signal] fork.
	sem1 wait.
	sem2 wait.
	sem3 wait.

	self should: [self queue size = 0].! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:33'!
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:53'!
	| sem1 |
	sem1 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	sem1 wait.
	self queue flush.
	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:09'!
	[1 to: 10 do: [:each | self queue nextPut: each]] fork.
	10 timesRepeat: [self queue next].
	self should: [self queue size = 0].! !

!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 21:59'!
	^ queue
! !

!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 21:59'!
	queue := SharedPriorityQueue new.
! !

!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
	queue := nil.
! !

!SharedPriorityQueueTest reorganize!
('tests' testConcurrentPostCheckSharedQueue testConcurrentPreCheckSharedQueue testConcurrentPrintingSharedQueue testConcurrentSharedQueue testFlush testSharedQueue)
('fixtures' queue setUp tearDown)

!FixedSizeSharedPriorityQueueTest reorganize!
('tests' testConcurrentPostCheckSharedQueue testConcurrentPreCheckSharedQueue testConcurrentPrintingSharedQueue testConcurrentSharedQueue testFlush testSharedQueue)
('fixtures' queue setUp tearDown)

More information about the Squeak-dev mailing list