[Pharo-project] [squeak-dev] SharedQueue does scale (was: Re: SharedQueue doesn't scale)

Levente Uzonyi leves at elte.hu
Tue Oct 19 03:29:57 UTC 2010


On Tue, 19 Oct 2010, Igor Stasenko wrote:

> On 19 October 2010 03:42, Levente Uzonyi <leves at elte.hu> wrote:
>> On Tue, 19 Oct 2010, Igor Stasenko wrote:
>>
>>> On 19 October 2010 02:06, Levente Uzonyi <leves at elte.hu> wrote:
>>>>
>>>> On Mon, 18 Oct 2010, Igor Stasenko wrote:
>>>>
>>>> snip
>>>>
>>>>>
>>>>
>>>> Thanks, Levente for giving a feedback.
>>>> Please, feel free to shape my classes in more complete from (such as
>>>> proper naming),
>>>> to make them ready for inclusion in both Squeak's and Pharo cores.
>>>> I propose the following names:
>>>> AtomicQueue (base class) -> AtomicCollection
>>>> FIFOQueue -> AtomicQueue
>>>> LIFOQueue -> AtomicStack
>>>> If you, or anyone else having better suggestions, speak now :)
>>>>
>>>>
>>>>
>>>> I think these should be the names:
>>>>
>>>> FIFOQueue -> SharedQueue
>>>
>>> this name already used by Kernel.
>>> So, unless my class will fully replace it, i see no way how i could
>>> use this name in separate package.
>>
>> Yes, it would be good to replace the implementation IMHO. The API seems to
>> be complete to me (except for copying ;)).
>>
> You mean this:
>
> copy
> 	^ self errorDontCopy
>
> errorDontCopy
> 	"copying a structure, involved in concurrent operations is a bad idea"
> 	^ self error: 'Copying not available'
>
> :)
>
> See how Squeak's EventSensor doing right thing to make a 'copy':
>
> EventSensor>>flushAllButDandDEvents
> 	| newQueue oldQueue  |
>
> 	newQueue := SharedQueue new.
> 	self eventQueue ifNil:
> 		[eventQueue := newQueue.
> 		^self].
> 	oldQueue := self eventQueue.
> 	[oldQueue size > 0] whileTrue:
> 		[| item type |
> 		item := oldQueue next.
> 		type := item at: 1.
> 		type = EventTypeDragDropFiles ifTrue: [ newQueue nextPut: item]].
> 	eventQueue := newQueue.
>
> Well, you might be right, that #copy can be implemented as:
>
> copy
>   | copy |
>   copy := self class new.
>   [ copy put: (self nextIfNone: [ ^ copy ] ) ] repeat
>
> if it makes any sense, to anyone..
>
> Its hard to imagine a situation where one may need to copy existing queue,
> because he simply can keep using it.

I didn't say that I find copying useful, but the API is different.

>
>
>>>
>>> Also, i must stress, that behavior of FIFOQueue only attempts to
>>> closely resemble the SharedQueue behavior.
>>> However, there is a situations (related to how scheduling works and
>>> use of #yield), where using them could lead to deadlocks.
>>> In this regard, an AtomicSharedQueue (a subclass of FIFOQueue) is much
>>> better analogy to current SharedQueue.
>>
>> If you mean the case: "if process A tries to read from an empty queue, later
>> process B tries to do the same, then process A is guaranteed to read before
>> process B", then that shouldn't be a problem. It would require an external
>> synchronization step to make use of this feature with the current
>> implementation. I doubt that anyone wrote such code ever.
>>
> No. The problems is not in that. The problem related to scheduling and
> how #yield primitive works.
>
> Here the VM's primitiveYield:
>
> primitiveYield
> "primitively do the equivalent of Process>yield"
> 	| activeProc priority processLists processList |
> 	activeProc := self fetchPointer: ActiveProcessIndex
> 						 ofObject: self schedulerPointer.
> 	priority := self quickFetchInteger: PriorityIndex ofObject: activeProc.
> 	processLists := self fetchPointer: ProcessListsIndex ofObject: self
> schedulerPointer.
> 	processList := self fetchPointer: priority - 1 ofObject: processLists.
>
> 	(self isEmptyList: processList) ifFalse:[
> 		self addLastLink: activeProc toList: processList.
> 		self transferTo: self wakeHighestPriority]
>
> Note #wakeHighestPriority.
>
> So, a fetcher (which using #yield in spin loop) with priority higher
> than pusher process, will loop infinitely
> blocking pusher and all lower priority processes from advancing.
>
> To avoid this problem, one should make sure that process which pushing
> new items to queue
> having either higher or same priority as any fetching process(es)
> using same queue.
> Or use wait-free access to queue (avoid use #next, use #nextOrNil instead).
>
> That's why in subclass - AtomicSharedQueue, i using semaphore to
> workaround this issue.

Okay. So AtomicSharedQueue is the class which can be used to replace 
SharedQueue. So the names could be:

LIFOQueue -> AtomicStack
FIFOQueue -> AtomicQueue
AtomicSharedQueue -> SharedQueue

>
> And potentially, it would be good some day to have a way to say to scheduler:
> please stop current process and see if you can run anything with lower priority.

That would break the current scheduling policy.


Levente

> (Another reason to move scheduling to language side, so we are free to
> modify it in a way we like ;).
>
>>>
>>> As i noted in another mail, i see that we might also provide a
>>> separate wait-free interface. So we can guarantee,
>>> that if you using only wait-free interface, a queue can never be the
>>> cause of deadlock.
>>
>> That's great, and it can be a future addition even if we push the current
>> implementation to the Trunk.
>>
>
> Yes. I think that for most cases in concurrent environment, a
> wait-free access is preferable way to work with queues.
>
>
>>
>> Levente
>>
>
>
>
> -- 
> Best regards,
> Igor Stasenko AKA sig.
>
>



More information about the Squeak-dev mailing list