REPOSTING: SharedQueue question???

Rob Withers slosher2 at home.com
Fri Oct 27 08:52:43 UTC 2000


jchludzinski at worldkey.net wrote:
> 
> Are items (Objects) place on a SharedQueue (by multiple Processes
> using #nextPut:) guarenteed to be taken off (by multiple Processes
> using #next) in the SAME orden they were placed on?
> 
> Are there any known thread-safe issues about SharedQueues?
> 
> ---John


Perhaps these SharedPriorityQueues would help you?  You can set the sort
block to whatever you'd like, timestamp for instance.  Sorry about the
tests - they are somewhat weak.

Rob

-- 
--------------------------------------------------
Smalltalking by choice.  Isn't it nice to have one!
-------------- next part --------------
'From Squeak2.9alpha of 13 June 2000 [latest update: #2570] on 27 October 2000 at 1:42:58 am'!
Object subclass: #SharedPriorityQueue
	instanceVariableNames: 'contents accessProtect readSync '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected'!
SharedPriorityQueue subclass: #FixedSizeSharedPriorityQueue
	instanceVariableNames: 'writeSync '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected'!
SharedPriorityQueue class
	instanceVariableNames: ''!

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:59'!
critical: aBlock
	^ accessProtect critical: aBlock
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:52'!
flush
	self critical: [contents size timesRepeat: [contents removeFirst]].! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:58'!
isEmpty
	^ self critical: [contents isEmpty]! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:02'!
nextNoWait
	^ self critical: [contents isEmpty
			ifTrue: [nil]
			ifFalse: [contents removeFirst]].
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:00'!
nextPut: aValue
	self critical: [contents add: aValue].
	readSync signal.
	^ aValue
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:03'!
peek
	^ self critical: [contents isEmpty
		ifTrue: [nil]
		ifFalse: [contents first]]
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:57'!
size
	^ self critical: [contents size]! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:03'!
sortBlock: aBlock
	self critical: [contents sortBlock: aBlock].
! !

!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:56'!
subsetFrom: start to: stop
	^ self critical: [contents copyFrom: start to: stop]

! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 23:07'!
init: initSize

	contents := Heap new: initSize.
	accessProtect := Semaphore forMutualExclusion.
	readSync := Semaphore new.
	self sortBlock: [:a :b | a hash < b hash].
! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 22:47'!
next
	readSync wait.
	^ self nextNoWait.
! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 20:35'!
nextTimeoutMSecs: milliSeconds
	readSync waitTimeoutMSecs: milliSeconds.
	^self nextNoWait
! !

!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 22:55'!
subset: limit contentsDo: aBlock
	| subset |
	subset := self subsetFrom: 1 to: limit.
	subset do: [:eachElement | aBlock value: eachElement].

! !

!SharedPriorityQueue methodsFor: 'printing' stamp: 'rww 7/27/2000 04:03'!
printOn: aStream

	aStream nextPutAll: 'priority queue => '.
	self size > 0
		ifTrue: [aStream nextPutAll: self size printString , ' events remaining'.
			self printQueueContentsOn: aStream]
		ifFalse: [aStream nextPutAll: 'empty'].! !

!SharedPriorityQueue methodsFor: 'printing' stamp: 'rww 7/27/2000 04:03'!
printQueueContentsOn: aStream 
	| printSize |
	(printSize := self size) > 10 
		ifTrue: [printSize := 10].
	self 
		subset: printSize 
		contentsDo: [:element | 
			aStream cr; tab; 
				nextPutAll: element printString].! !


!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:55'!
flush
	self critical: [contents size timesRepeat: [contents removeFirst. writeSync signal]].! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:02'!
init: initSize
	super init: initSize.
	writeSync := Semaphore new.
	1 to: initSize do: [:each | writeSync signal].! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:13'!
next
	| value |
	value := super next.
	value ifNotNil: [writeSync signal].
	^value
! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:06'!
nextPut: aValue
	writeSync wait.
	^ super nextPut: aValue
! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:05'!
nextPut: aValue timeoutMSecs: milliSeconds
	writeSync waitTimeoutMSecs: milliSeconds.
	^ super nextPut: aValue
! !

!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:15'!
nextTimeoutMSecs: milliSeconds
	| value |
	value := super nextTimeoutMSecs: milliSeconds.
	value ifNotNil: [writeSync signal].
	^value

! !


!SharedPriorityQueue class methodsFor: 'instance creation'!
new

	^self new: 10! !

!SharedPriorityQueue class methodsFor: 'instance creation'!
new: anInitSize 

	^(super new) init: anInitSize; yourself! !


!SharedPriorityQueue reorganize!
('access operations' critical: flush isEmpty nextNoWait nextPut: peek size sortBlock: subsetFrom:to:)
('protected operations' init: next nextTimeoutMSecs: subset:contentsDo:)
('printing' printOn: printQueueContentsOn:)
!

-------------- next part --------------
'From Squeak2.9alpha of 13 June 2000 [latest update: #2570] on 27 October 2000 at 1:43:10 am'!
TestCase subclass: #FixedSizeSharedPriorityQueueTest
	instanceVariableNames: 'queue '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected-Tests'!
TestCase subclass: #SharedPriorityQueueTest
	instanceVariableNames: 'queue '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'Collections-Protected-Tests'!

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:11'!
testConcurrentPostCheckSharedQueue
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:11'!
testConcurrentPreCheckSharedQueue
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/30/2000 05:26'!
testConcurrentPrintingSharedQueue
	| sem1 sem2 sem3 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.
	sem3 := Semaphore new.

	[1 to: 10 do: [:each | (Delay forMilliseconds: 2) wait.
		self queue nextPut: each]. sem1 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem2 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem3 signal] fork.
	sem1 wait.
	sem2 wait.
	sem3 wait.

	self should: [self queue size = 0].! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:10'!
testConcurrentSharedQueue
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:54'!
testFlush
	| sem1 |
	sem1 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	sem1 wait.
	self queue flush.
	self should: [self queue size = 0].

! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:10'!
testSharedQueue
	[1 to: 10 do: [:each | self queue nextPut: each]] fork.
	10 timesRepeat: [self queue next].
	self should: [self queue size = 0].! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
queue
	^ queue
! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 23:54'!
setUp
	queue := FixedSizeSharedPriorityQueue new: 20.
! !

!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
tearDown
	queue := nil.
! !


!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:34'!
testConcurrentPostCheckSharedQueue
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:34'!
testConcurrentPreCheckSharedQueue
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 

	[self queue size = 0]
		whileFalse: [(Delay forMilliseconds: 100) wait].

	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/30/2000 05:25'!
testConcurrentPrintingSharedQueue
	| sem1 sem2 sem3 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.
	sem3 := Semaphore new.

	[1 to: 10 do: [:each | (Delay forMilliseconds: 2) wait.
		self queue nextPut: each]. sem1 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem2 signal] fork.
	[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
		self queue next]. sem3 signal] fork.
	sem1 wait.
	sem2 wait.
	sem3 wait.

	self should: [self queue size = 0].! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:33'!
testConcurrentSharedQueue
	| sem1 sem2 |
	sem1 := Semaphore new.
	sem2 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	[10 timesRepeat: [self queue next]. sem2 signal] fork. 
	sem1 wait.
	sem2 wait.

	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:53'!
testFlush
	| sem1 |
	sem1 := Semaphore new.

	[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
	sem1 wait.
	self queue flush.
	self should: [self queue size = 0].

! !

!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:09'!
testSharedQueue
	[1 to: 10 do: [:each | self queue nextPut: each]] fork.
	10 timesRepeat: [self queue next].
	self should: [self queue size = 0].! !

!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 21:59'!
queue
	^ queue
! !

!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 21:59'!
setUp
	queue := SharedPriorityQueue new.
! !

!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
tearDown
	queue := nil.
! !


!SharedPriorityQueueTest reorganize!
('tests' testConcurrentPostCheckSharedQueue testConcurrentPreCheckSharedQueue testConcurrentPrintingSharedQueue testConcurrentSharedQueue testFlush testSharedQueue)
('fixtures' queue setUp tearDown)
!


!FixedSizeSharedPriorityQueueTest reorganize!
('tests' testConcurrentPostCheckSharedQueue testConcurrentPreCheckSharedQueue testConcurrentPrintingSharedQueue testConcurrentSharedQueue testFlush testSharedQueue)
('fixtures' queue setUp tearDown)
!



More information about the Squeak-dev mailing list