Interprocess communication...

David T. Lewis lewis at mail.msen.com
Fri Apr 5 11:09:42 UTC 2002


On Fri, Apr 05, 2002 at 12:11:48PM +0100, goran.hultgren at bluefish.se wrote:
> We are fiddling around with networking code and we are thinking about
> coding up classes called SharedBufferStream and
> SharedBidirectionalStream.
> 
> We have multiple Squeak processes that wants to talk with each other
> using something similar to SocketStreams (the one in Comanche). A
> SocketStream is more or less a Streamlike Socket - an object with Stream
> protocol for bidirectional communication of bytes/strings.
> 
> We have looked at SharedQueue, BufferStream and SocketStream and we
> thought that first we need a BufferStream that is "Process safe" - that
> could be used as a SharedQueue but being character/byte based instead of
> object-based.
> 
> Then by using two of those (like done in SocketStream, one for read and
> one for write) in one object which will work like a SocketStream but
> with the difference that the "guy in the other end" is another Squeak
> process and not a Socket. That would be the SharedBidirectionalStream
> giving us a bidirectional SharedBufferStream.
> 
> Does these objects sound useful? Have we missed something already
> available? Have somebody already implemented something like this?

I'm attaching a copy of InternalPipe. This is used in CommandShell
to provide behavior similar to a Unix pipe for two Smalltalk processes
within Squeak. I think it's close to what you are describing.

Also, have a look at Flow (http://www.netjam.org/flow) if you have not
already done so.

HTH,
Dave
-------------- next part --------------
'From Squeak3.1alpha of 27 September 2001 [latest update: #4347] on 5 April 2002 at 6:12:46 am'!
Stream subclass: #InternalPipe
	instanceVariableNames: 'queue writerClosed nonBlockingMode '
	classVariableNames: ''
	poolDictionaries: ''
	category: 'OSProcess-CommandShell'!
!InternalPipe commentStamp: '<historical>' prior: 0!
I am a first-in, first-out queue with streaming behavior. I behave similarly to an OSPipe,
but am implemented in the Smalltalk image rather than with external OS pipes. I can
behave either as a blocking pipe or as a nonblocking pipe, similar to an OS pipe with
its reader end set in blocking or nonblocking mode.!


!InternalPipe methodsFor: 'initialize-release' stamp: 'dtl 9/16/2001 22:35'!
initialize

	self queue
! !

!InternalPipe methodsFor: 'initialize-release' stamp: 'dtl 11/25/2001 14:33'!
setBlocking
	"For compatibility with OSPipe"

	self nonBlockingMode: false! !

!InternalPipe methodsFor: 'initialize-release' stamp: 'dtl 11/24/2001 15:56'!
setNonBlocking
	"For compatibility with OSPipe"

	self nonBlockingMode: true! !


!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/24/2001 21:06'!
contents
	"Answer contents of the pipe, and return the contents to the pipe so it can still be read."

	"InternalPipe new nextPutAll: 'hello'; contents"

	| s |
	s _ self next: queue size.
	self nextPutAll: s.
	^ s! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/24/2001 16:19'!
next
	"Answer the next object accessible by the receiver."

	(self nonBlockingMode and: [queue size == 0])
		ifTrue: [^ nil]
		ifFalse: [^ queue next]
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/24/2001 16:23'!
next: anInteger 
	"Answer the next anInteger elements of my collection."

	| strm c |
	strm _ WriteStream on: ''.
	(1 to: anInteger) do:
		[:index |
		self atEnd
			ifTrue: [^ strm contents]
			ifFalse:
				[(c _ self next) ifNil: [^ strm contents].
				strm nextPut: c]].
	^ strm contents
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 9/16/2001 22:37'!
nextPut: anObject 
	"Insert the argument, anObject, as the next object accessible by the 
	receiver. Answer anObject."

	^ queue nextPut: anObject! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 9/16/2001 22:41'!
nextPutAll: aCollection 
	"Append the elements of aCollection to the sequence of objects accessible 
	by the receiver. Answer aCollection."

	aCollection do: [:e | queue nextPut: e]
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/24/2001 15:55'!
nonBlockingMode

	^ nonBlockingMode ifNil: [nonBlockingMode _ false]
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/24/2001 15:55'!
nonBlockingMode: trueOrFalse

	nonBlockingMode _ trueOrFalse
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 9/16/2001 22:42'!
peek

	^ queue peek
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 9/16/2001 22:35'!
queue

	^ queue ifNil: [queue _ SharedQueue new]
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/25/2001 19:15'!
size
	"An InternalPipe may contain a trailing nil if it has been closed. This should
	not be counted as part of the pipe size, so use #contents to determine the size
	after stripping any trailing nil."

	"InternalPipe new nextPutAll: 'hello'; size"

	^ self closed
		ifTrue: [self contents size]
		ifFalse: [self queue size]
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/25/2001 18:14'!
upToEnd
	"Answer the remaining elements in the pipe"

	| strm c |
	strm _ WriteStream on: ''.
	[self atEnd] whileFalse:
		[c _ self next.
		c isNil
			ifTrue: [^ strm contents]
			ifFalse: [strm nextPut: c]].
	^ strm contents! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/24/2001 15:01'!
writerClosed

	^ writerClosed ifNil: [writerClosed _ false]
! !

!InternalPipe methodsFor: 'accessing' stamp: 'dtl 11/24/2001 15:02'!
writerClosed: trueOrFalse

	writerClosed _ trueOrFalse
! !


!InternalPipe methodsFor: 'character writing' stamp: 'dtl 9/23/2001 16:49'!
cr
	"Append a return character to the receiver."

	self queue nextPut: Character cr! !


!InternalPipe methodsFor: 'testing' stamp: 'dtl 11/24/2001 15:04'!
atEnd
	"Answer whether the receiver can access any more objects."

	^ self writerClosed and: [queue size == 0]
! !

!InternalPipe methodsFor: 'testing' stamp: 'dtl 11/24/2001 15:03'!
closed

	^ self writerClosed! !


!InternalPipe methodsFor: 'finalization' stamp: 'dtl 1/1/2002 11:38'!
close

	self closeWriter; closeReader! !

!InternalPipe methodsFor: 'finalization' stamp: 'dtl 1/1/2002 11:38'!
closeReader
	"Nothing to do. This is for OSPipe compatibility."
! !

!InternalPipe methodsFor: 'finalization' stamp: 'dtl 12/31/2001 15:23'!
closeWriter
	"Set the writerClosed flag, and add a trailing nil to the pipe to mimic the
	behaviour of an external pipe which blocks until the writer end is closed.
	Writing a trailing nil the the queue has the side effect of waking up any
	process which is blocked waiting on the queue, which will receive the nil
	as an indication that the pipe has been closed."

	self writerClosed: true.
	self nextPut: nil! !

"-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- "!

InternalPipe class
	instanceVariableNames: ''!

!InternalPipe class methodsFor: 'instance creation' stamp: 'dtl 12/2/2001 19:24'!
blockingPipe

	"InternalPipe blockingPipe"

	^ super basicNew initialize setBlocking
! !

!InternalPipe class methodsFor: 'instance creation' stamp: 'dtl 12/2/2001 19:25'!
new

	"InternalPipe new"

	^ self blockingPipe! !

!InternalPipe class methodsFor: 'instance creation' stamp: 'dtl 12/2/2001 19:24'!
nonBlockingPipe

	"InternalPipe nonBlockingPipe"

	^ super basicNew initialize setNonBlocking
! !


!InternalPipe class methodsFor: 'examples' stamp: 'dtl 9/19/2001 21:18'!
testPipe

	"InternalPipe testPipe inspect"

	| pipe result |
	pipe _ self new.
	pipe nextPutAll: 'string to send through an InternalPipe'.
	result _ pipe upToEnd.
	pipe close.
	^ result
! !


More information about the Squeak-dev mailing list