REPOSTING: SharedQueue question???
Rob Withers
slosher2 at home.com
Fri Oct 27 08:52:43 UTC 2000
jchludzinski at worldkey.net wrote:
>
> Are items (Objects) place on a SharedQueue (by multiple Processes
> using #nextPut:) guarenteed to be taken off (by multiple Processes
> using #next) in the SAME orden they were placed on?
>
> Are there any known thread-safe issues about SharedQueues?
>
> ---John
Perhaps these SharedPriorityQueues would help you? You can set the sort
block to whatever you'd like, timestamp for instance. Sorry about the
tests - they are somewhat weak.
Rob
--
--------------------------------------------------
Smalltalking by choice. Isn't it nice to have one!
-------------- next part --------------
'From Squeak2.9alpha of 13 June 2000 [latest update: #2570] on 27 October 2000 at 1:42:58 am'!
Object subclass: #SharedPriorityQueue
instanceVariableNames: 'contents accessProtect readSync '
classVariableNames: ''
poolDictionaries: ''
category: 'Collections-Protected'!
SharedPriorityQueue subclass: #FixedSizeSharedPriorityQueue
instanceVariableNames: 'writeSync '
classVariableNames: ''
poolDictionaries: ''
category: 'Collections-Protected'!
SharedPriorityQueue class
instanceVariableNames: ''!
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:59'!
critical: aBlock
^ accessProtect critical: aBlock
! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:52'!
flush
self critical: [contents size timesRepeat: [contents removeFirst]].! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:58'!
isEmpty
^ self critical: [contents isEmpty]! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:02'!
nextNoWait
^ self critical: [contents isEmpty
ifTrue: [nil]
ifFalse: [contents removeFirst]].
! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:00'!
nextPut: aValue
self critical: [contents add: aValue].
readSync signal.
^ aValue
! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:03'!
peek
^ self critical: [contents isEmpty
ifTrue: [nil]
ifFalse: [contents first]]
! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:57'!
size
^ self critical: [contents size]! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 23:03'!
sortBlock: aBlock
self critical: [contents sortBlock: aBlock].
! !
!SharedPriorityQueue methodsFor: 'access operations' stamp: 'rww 7/29/2000 22:56'!
subsetFrom: start to: stop
^ self critical: [contents copyFrom: start to: stop]
! !
!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 23:07'!
init: initSize
contents := Heap new: initSize.
accessProtect := Semaphore forMutualExclusion.
readSync := Semaphore new.
self sortBlock: [:a :b | a hash < b hash].
! !
!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 22:47'!
next
readSync wait.
^ self nextNoWait.
! !
!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 20:35'!
nextTimeoutMSecs: milliSeconds
readSync waitTimeoutMSecs: milliSeconds.
^self nextNoWait
! !
!SharedPriorityQueue methodsFor: 'protected operations' stamp: 'rww 7/29/2000 22:55'!
subset: limit contentsDo: aBlock
| subset |
subset := self subsetFrom: 1 to: limit.
subset do: [:eachElement | aBlock value: eachElement].
! !
!SharedPriorityQueue methodsFor: 'printing' stamp: 'rww 7/27/2000 04:03'!
printOn: aStream
aStream nextPutAll: 'priority queue => '.
self size > 0
ifTrue: [aStream nextPutAll: self size printString , ' events remaining'.
self printQueueContentsOn: aStream]
ifFalse: [aStream nextPutAll: 'empty'].! !
!SharedPriorityQueue methodsFor: 'printing' stamp: 'rww 7/27/2000 04:03'!
printQueueContentsOn: aStream
| printSize |
(printSize := self size) > 10
ifTrue: [printSize := 10].
self
subset: printSize
contentsDo: [:element |
aStream cr; tab;
nextPutAll: element printString].! !
!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:55'!
flush
self critical: [contents size timesRepeat: [contents removeFirst. writeSync signal]].! !
!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:02'!
init: initSize
super init: initSize.
writeSync := Semaphore new.
1 to: initSize do: [:each | writeSync signal].! !
!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:13'!
next
| value |
value := super next.
value ifNotNil: [writeSync signal].
^value
! !
!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:06'!
nextPut: aValue
writeSync wait.
^ super nextPut: aValue
! !
!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 21:05'!
nextPut: aValue timeoutMSecs: milliSeconds
writeSync waitTimeoutMSecs: milliSeconds.
^ super nextPut: aValue
! !
!FixedSizeSharedPriorityQueue methodsFor: 'as yet unclassified' stamp: 'rww 7/29/2000 23:15'!
nextTimeoutMSecs: milliSeconds
| value |
value := super nextTimeoutMSecs: milliSeconds.
value ifNotNil: [writeSync signal].
^value
! !
!SharedPriorityQueue class methodsFor: 'instance creation'!
new
^self new: 10! !
!SharedPriorityQueue class methodsFor: 'instance creation'!
new: anInitSize
^(super new) init: anInitSize; yourself! !
!SharedPriorityQueue reorganize!
('access operations' critical: flush isEmpty nextNoWait nextPut: peek size sortBlock: subsetFrom:to:)
('protected operations' init: next nextTimeoutMSecs: subset:contentsDo:)
('printing' printOn: printQueueContentsOn:)
!
-------------- next part --------------
'From Squeak2.9alpha of 13 June 2000 [latest update: #2570] on 27 October 2000 at 1:43:10 am'!
TestCase subclass: #FixedSizeSharedPriorityQueueTest
instanceVariableNames: 'queue '
classVariableNames: ''
poolDictionaries: ''
category: 'Collections-Protected-Tests'!
TestCase subclass: #SharedPriorityQueueTest
instanceVariableNames: 'queue '
classVariableNames: ''
poolDictionaries: ''
category: 'Collections-Protected-Tests'!
!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:11'!
testConcurrentPostCheckSharedQueue
| sem1 sem2 |
sem1 := Semaphore new.
sem2 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
[10 timesRepeat: [self queue next]. sem2 signal] fork.
sem1 wait.
sem2 wait.
[self queue size = 0]
whileFalse: [(Delay forMilliseconds: 100) wait].
self should: [self queue size = 0].
! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:11'!
testConcurrentPreCheckSharedQueue
| sem1 sem2 |
sem1 := Semaphore new.
sem2 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
[10 timesRepeat: [self queue next]. sem2 signal] fork.
[self queue size = 0]
whileFalse: [(Delay forMilliseconds: 100) wait].
sem1 wait.
sem2 wait.
self should: [self queue size = 0].
! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/30/2000 05:26'!
testConcurrentPrintingSharedQueue
| sem1 sem2 sem3 |
sem1 := Semaphore new.
sem2 := Semaphore new.
sem3 := Semaphore new.
[1 to: 10 do: [:each | (Delay forMilliseconds: 2) wait.
self queue nextPut: each]. sem1 signal] fork.
[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
self queue next]. sem2 signal] fork.
[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
self queue next]. sem3 signal] fork.
sem1 wait.
sem2 wait.
sem3 wait.
self should: [self queue size = 0].! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:10'!
testConcurrentSharedQueue
| sem1 sem2 |
sem1 := Semaphore new.
sem2 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
[10 timesRepeat: [self queue next]. sem2 signal] fork.
sem1 wait.
sem2 wait.
self should: [self queue size = 0].
! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:54'!
testFlush
| sem1 |
sem1 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
sem1 wait.
self queue flush.
self should: [self queue size = 0].
! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:10'!
testSharedQueue
[1 to: 10 do: [:each | self queue nextPut: each]] fork.
10 timesRepeat: [self queue next].
self should: [self queue size = 0].! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
queue
^ queue
! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 23:54'!
setUp
queue := FixedSizeSharedPriorityQueue new: 20.
! !
!FixedSizeSharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
tearDown
queue := nil.
! !
!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:34'!
testConcurrentPostCheckSharedQueue
| sem1 sem2 |
sem1 := Semaphore new.
sem2 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
[10 timesRepeat: [self queue next]. sem2 signal] fork.
sem1 wait.
sem2 wait.
[self queue size = 0]
whileFalse: [(Delay forMilliseconds: 100) wait].
self should: [self queue size = 0].
! !
!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:34'!
testConcurrentPreCheckSharedQueue
| sem1 sem2 |
sem1 := Semaphore new.
sem2 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
[10 timesRepeat: [self queue next]. sem2 signal] fork.
[self queue size = 0]
whileFalse: [(Delay forMilliseconds: 100) wait].
sem1 wait.
sem2 wait.
self should: [self queue size = 0].
! !
!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/30/2000 05:25'!
testConcurrentPrintingSharedQueue
| sem1 sem2 sem3 |
sem1 := Semaphore new.
sem2 := Semaphore new.
sem3 := Semaphore new.
[1 to: 10 do: [:each | (Delay forMilliseconds: 2) wait.
self queue nextPut: each]. sem1 signal] fork.
[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
self queue next]. sem2 signal] fork.
[5 timesRepeat: [(Delay forMilliseconds: 1) wait.
self queue next]. sem3 signal] fork.
sem1 wait.
sem2 wait.
sem3 wait.
self should: [self queue size = 0].! !
!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 22:33'!
testConcurrentSharedQueue
| sem1 sem2 |
sem1 := Semaphore new.
sem2 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
[10 timesRepeat: [self queue next]. sem2 signal] fork.
sem1 wait.
sem2 wait.
self should: [self queue size = 0].
! !
!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:53'!
testFlush
| sem1 |
sem1 := Semaphore new.
[1 to: 10 do: [:each | self queue nextPut: each]. sem1 signal] fork.
sem1 wait.
self queue flush.
self should: [self queue size = 0].
! !
!SharedPriorityQueueTest methodsFor: 'tests' stamp: 'rww 7/29/2000 23:09'!
testSharedQueue
[1 to: 10 do: [:each | self queue nextPut: each]] fork.
10 timesRepeat: [self queue next].
self should: [self queue size = 0].! !
!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 21:59'!
queue
^ queue
! !
!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 21:59'!
setUp
queue := SharedPriorityQueue new.
! !
!SharedPriorityQueueTest methodsFor: 'fixtures' stamp: 'rww 7/29/2000 22:07'!
tearDown
queue := nil.
! !
!SharedPriorityQueueTest reorganize!
('tests' testConcurrentPostCheckSharedQueue testConcurrentPreCheckSharedQueue testConcurrentPrintingSharedQueue testConcurrentSharedQueue testFlush testSharedQueue)
('fixtures' queue setUp tearDown)
!
!FixedSizeSharedPriorityQueueTest reorganize!
('tests' testConcurrentPostCheckSharedQueue testConcurrentPreCheckSharedQueue testConcurrentPrintingSharedQueue testConcurrentSharedQueue testFlush testSharedQueue)
('fixtures' queue setUp tearDown)
!
More information about the Squeak-dev
mailing list
|