[ALPHA][FIX] Unix socket plugin Re: [UNIX][BUG] Sockets broken at startup

Rob Withers rwithers at banana.exobox.com
Tue Oct 17 22:01:28 UTC 2000


Hi Ned,

Attached you will find a complete rewrite of sqUnixNetwork.c.  (Can be
renamed to sqUnixSocket.c for 2.8+ vm)  It is in ALPHA, and does have a
few bugs which we are trying to identify through existing TestCases from
John McIntosh and new TestCases.  We have 20+ people running it.  I
tested PWS and it seems to work (based on a call to shutDown).  I wasn't
able to test Comanche.  The plugin you are using doesn't have the code
in place to destroy all open sockets because the privateStructures
aren't held by the plugin; they are held by the squeak socket.  This
newer version uses a state machine for handling events and it has a
linked list of the private socket structures such that on
sqNetworkShutdown, the plugin destroys all sockets held by Squeak
sockets.

The main reason for the rewrite was to replace the aioHandlers scheme
with a state machine that could handle any state/event combination at
all times (whereas the handlers were being Disabled and Enabled). 
Additionally, it supports the full close semantics of sockets with the
FIN-ack...FIN-ack from each side.  We utilize shutdown of the write
channel on sqConnectionClose, so we can still read data.

Please try this and send me any problems/TestCases.

regards,
Rob

Ned Konz wrote:
> 
> Under 2.9apre1, Linux, Comanche 4.6:
> 
> I had a running Comanche, saved it. The ComancheNetService that was running the
> swiki was running when I saved it, and its autoStart instVar was set true, as
> well.
> 
> Started back up, immediately sent "stop" to the ComancheNetService that's
> running
> the swiki.
> 
> got a segfault in _aioDisable:
> 
> _aioDisable
> sqSocketCloseConnection
> sqSocketAbortConnection
> sqSocketDestroy
> SocketPlugin_primitiveSocketDestroy
> (primitive)
> 
> because the privateSocketStruct *pss arg to _aioDisable looks like:
> 
> {s = 1085636546, connSema = 489, readSema = 544, writeSema = 493, sockState =
> 562, sockError = 0, peer = {sin_family = 65280, sin_port = 255, sin_addr =
> {s_addr = 1}, sin_zero = "H@\037 at H@\037@"}, multiListen = 16777215,
> acceptedSock = 16777215, pendingEvents = 16777215}
> 
> That is, pss->s (which should be a valid index into sockets[]) is a really big
> number.
> 
> Of course, another problem is that I can't stop the server to fix it (it
> just hangs if I do nothing).
> 
> I tried making a script to kill all the ComancheNetServices at startup,
> but startup scripts are processed after the class start up list, and so it's
> too late.
> 
> I tried adding a SIGINT handler that sets interruptPending, but no one
> is checking interruptPending after the image hangs.
> 
> Anyway, I'll build a new image, but what about the broken Socket startup
> behavior?
> 
> --
> Ned Konz
> currently: Stanwood, WA
> email:     ned at bike-nomad.com
> homepage:  http://bike-nomad.com
-------------- next part --------------
/* sqUnixSocket.c -- Unix socket support
 *      blah
 */


/* Author: Rob Withers   withers at vnet.net
 * 
 * Support for "accept" primitives contributed by:
 *	Lex Spoon <lex at cc.gatech.edu>
 * 
 * Notes:
 *
 * 1. 	UDP - no handlers implemented  (bottom).
 * 
 * 2.	Sockets are completely(???) asynchronous, but the resolver is still
 *	synchronous.
 *
 * 3.	Due to bugs in Solaris and HP-UX (and possibly others) the SIGIO
 *	mechanism is no longer used.  Instead aioPollForIO() is periodically
 *	called from HandleEvents in sqXWindow.c in order to check for I/O
 *	completion using select().
 *
 *
 *          NEW!  -> protocol machine implementation <-  
 *  Author:  Rob Withers
 *
 *  Completely rewritten to use protocol machines for each type.  These 
 *  machines are stored in the linked list 
 *          <protocolMachineNode *protocolMachines>
 * Each machine has the event handlers for nonblocking selects, and the 
 * commands to drive the state machine.  There may be a little squeak API 
 * knowledge in the TCP handlers, specifically the squeakState.  There are
 * two separate states stored in each <descriptorNode>, the internal state 
 * machine state and the external squeak state.  Do you need to ask why?  
 * Squeak inplements a very strange behavior contract.  The worst is the 
 * semaphore conrol.  The contract says that only if we are waiting on a 
 * semaphore, should we signal that semaphore when an event comes it.  
 * Instead, we should queue incoming and outgoing events, and always signal 
 * and wait on the semaphore.  We have good places to signal that we need 
 * to signal (ironically enough) for read and write sempahore, but we don't 
 * have one for connection semaphores.  I've decided that we will always 
 * signal the connection semaphore, and if that breaks something, then we'll 
 * fix it.  the ringBuffer is hardcoded to be whatever # MAXBUFSIZE is.  We 
 * can switch that to the primitive parameter later.  There is a ringBuffer 
 * for incoming messages, but the outgoing is direct.  
 *
 *
 *
 *
 * BUGS:    The Challege!   Find some!   Kick the tires hard!
 */


/* TO DEBUG, uncomment the line below */
// # define DEBUG


# ifndef MAXBUFSIZE
# define MAXBUFSIZE 8192
# endif


/************* Begin Includes and Defines *************/
#include "sq.h"
#include "SocketPlugin.h"


#ifdef ACORN
/* you know, I can't help thinking that a lot of this nonsense would
   simply vanish if Acorn used configure like the rest of us */
# include <time.h>
# define __time_t
# include <signal.h>
# include "inetlib.h"
# include "socklib.h"
# include "netdb.h"
# include "unixlib.h"
# include "sys/ioctl.h"
# include "sys/fcntl.h"
# include "sys/errno.h"
# define h_errno errno
# define MAXHOSTNAMELEN 256
# define socklen_t int
# define strncpy(dst, src, len) copyNCharsFromTo(len, src, dst)

#else /* !ACORN */

# ifdef HAVE_UNISTD_H
#   include <sys/types.h>
#   include <unistd.h>
# endif /* HAVE_UNISTD_H */
  
# ifdef NEED_GETHOSTNAME_P
    extern int gethostname();
# endif
  
# include <netdb.h>
# include <signal.h>
# include <sys/param.h>
# include <sys/socket.h>
# include <sys/ioctl.h>
# include <sys/fcntl.h>
# include <errno.h>
# include <netinet/in.h>
# include <netinet/udp.h>
# include <netinet/tcp.h>
# include <arpa/inet.h>
  
# ifdef HAVE_SYS_TIME_H
#   include <sys/time.h>
# else
#   include <time.h>
# endif
  
# ifdef HAS_SYS_SELECT_H
#   include <sys/select.h>
# endif
  
# ifndef FIONBIO
#   ifdef HAVE_SYS_FILIO_H
#     include <sys/filio.h>
#   endif
#   ifndef FIONBIO
#     ifdef FIOSNBIO
#       define FIONBIO FIOSNBIO
#     else
#       error: FIONBIO is not defined
#     endif
#   endif
# endif

#endif /* !ACORN */

/* debugging stuff. can probably be deleted */

#ifdef DEBUG
# ifdef ACORN
#   define FPRINTF(s) \
    { \
      extern os_error privateErr; \
      extern void platReportError(os_error *e); \
      privateErr.errnum = (bits)0; \
      sprintf s; \
      platReportError((os_error *)&privateErr); \
    };
# else /* !ACORN */
#   define FPRINTF(X) fprintf X
# endif
  char *ticks= "-\\|/";
  char *ticker= "";
  #define DO_TICK() \
  { \
    fprintf(stderr, "\r%c\r", *ticker); \
    if (!*ticker++) ticker= ticks; \
  }
#else /* !DEBUG */
# define FPRINTF(X)
# define DO_TICK()
#endif

# ifndef min
# define min(x, y) (((x) < (y)) ? (x) : (y))
# endif

# ifndef max
# define max(x, y) (((x) > (y)) ? (x) : (y))
# endif
/************* End Includes and Defines *************/





/************* Begin State Defines *************/
/* Socket types */
#define TCPSocketType	 	0
#define UDPSocketType	 	1

/* Resolver states */
#define ResolverUninitialised	0
#define ResolverSuccess		1
#define ResolverBusy		2
#define ResolverError		3

/* Squeak Socket states */
#define Unconnected		0x00
#define WaitingForConnection	0x01
#define Connected		0x02
#define OtherEndClosed		0x03
#define ThisEndClosed		0x04

/* TCP Internal Socket states */
# define NumberOfStatesTCP           7
# define TCPUnconnected              0
# define TCPWaitingForConnection     1
# define TCPListen                   2
# define TCPEstablished              3
# define TCPThisEndClosed            4
# define TCPOtherEndClosed           5
# define TCPClosed                   6

/* UDP Internal Socket states */
# define NumberOfStatesUDP       2
# define UDPClosed               0
# define UDPOpen                 1

/* Select Events */
# define SEL_READ                0
# define SEL_WRITE               1
# define SEL_EXCEPTION           2

#define CONN_NOTIFY	(1<<0)
#define READ_NOTIFY	(1<<1)
#define WRITE_NOTIFY	(1<<2)
#define CHECK_OOB	(1<<3)
/************* End State Defines *************/




/************* Begin Structure Typedefs *************/
typedef struct _ringBuffer {
  char *buf;
  char *readPtr;
  char *writePtr;
  char *endPtr;
  int byteCount;
  int ringSize;
} ringBuffer;

typedef void (*socketPollFunction_t)(int, int);

typedef struct _descriptorNode  {
  struct _protocolMachineNode *classStructure;
  struct _descriptorNode *next;
  SocketPtr squeakSocketPtr;
  int fd;         /* descriptor */
  int type;       /* the type of descriptor */
  int state;
  int squeakState;
  ringBuffer *readBuffer;
  struct sockaddr_in peer;      /* default send/recv address for UDP */
  int connSema;			/* connection io notification semaphore */
  int readSema;			/* read io notification semaphore */
  int writeSema;		/* write io notification semaphore */
  int backlogSize;
  int multiListen;
  int acceptedSocket;  /* a connection that has been accepted */
  int pendingEvents;
  int fdFlags;
} descriptorNode;

typedef void (*eventHandler)(descriptorNode*);
typedef eventHandler handlerSlice[3];

typedef struct _protocolMachineNode {
  int descriptorType;
  struct _protocolMachineNode* next;
  /****  event handlers  ****/
  int numberOfSlices;
  handlerSlice* slices;
  /****  operations  ****/
  int (*isValid)(descriptorNode *node);
  int (*isReadable)(descriptorNode *node);
  int (*isWritable)(descriptorNode *node);
  int (*getError)(descriptorNode *node);
  char* (*stateString)(descriptorNode *node);
  descriptorNode* (*create)(int readBufferSize, int connSemIndex, 
			    int readSemIndex, int writeSemIndex);
  descriptorNode* (*accept)(descriptorNode *node, int readBufferSize, 
			    int connSemIndex, int readSemIndex, int writeSemIndex);
  int (*listen)(descriptorNode *node, int port, int backlogSize);
  int (*connect)(descriptorNode *node, int addr, int port);
  int (*read)(descriptorNode *node, char *buf, int size);
  int (*write)(descriptorNode *node, char *buf, int size);
  int (*recvfrom)(descriptorNode *node, char *buf, int size, int *addr, int *port);
  int (*sendto)(descriptorNode *node, char *buf, int size, int addr, int port);
  int (*shutdown)(descriptorNode *node);
  int (*abort)(descriptorNode *node);
  int (*destroy)(descriptorNode *node);
  void (*eventCallback)(descriptorNode *node, int event);
  void (*installEventRegistrations)(descriptorNode *node, fd_set *rset, fd_set *wset, fd_set *exset);
} protocolMachineNode;
/************* End Structure Typedefs *************/


/*************  Begin Message Sending Macros *****************/
# define SEND( SELF, SELECTOR) \
       (((SELF)->classStructure->SELECTOR)(SELF))
# define SEND1( SELF, SELECTOR, ARG1) \
       (((SELF)->classStructure->SELECTOR)((SELF), (ARG1)))
# define SEND2( SELF, SELECTOR, ARG1, ARG2) \
       (((SELF)->classStructure->SELECTOR)((SELF), (ARG1), (ARG2)))
# define SEND3( SELF, SELECTOR, ARG1, ARG2, ARG3) \
       (((SELF)->classStructure->SELECTOR)((SELF), (ARG1). (ARG2), (ARG3)))
# define SEND4( SELF, SELECTOR, ARG1, ARG2, ARG3, ARG4) \
       (((SELF)->classStructure->SELECTOR)((SELF), (ARG1), (ARG2), (ARG3), (ARG4)))

/*************  End Message Sending Macros *****************/


/************* Begin Variable Declarations *************/
extern struct VirtualMachine *interpreterProxy;
extern socketPollFunction_t socketPollFunction;
static int thisNetSession= 0;
static descriptorNode descriptors;
static protocolMachineNode protocolMachines;
static int maxDescriptor;

/**** Resolver ****/
static char lastName[MAXHOSTNAMELEN+1];
static int  lastAddr= 0;
static int  lastError= 0;
static int  resolverSema;
static char   localHostName[MAXHOSTNAMELEN];
static u_long localHostAddress;	/* GROSS IPv4 ASSUMPTION! */
/************* End Variable Declarations *************/




/************* Begin Internal Function Prototypes *************/
/**** Initialize ****/
void sig_pipe(int sigio);
void initializeDescriptorList();
void initializeDescriptorList();
void initializeMachineList();
void setupTCPHandlers();
void setupUDPHandlers();

/**** Naming ****/
static const char *addrToName(int netAddress);
static int nameToAddr(char *hostName);
/**** Helper ****/
descriptorNode *socketPtrToDescriptor(SocketPtr s);
int socketValid(SocketPtr s);
void notify(descriptorNode *node, int eventMask);
/**** Debug ****/
char* squeakStateString(descriptorNode *node);
void printDescriptorTable();
void printDescriptorNode(descriptorNode* sock);
void printProtocolMachines();
void printProtocolMachineNode( protocolMachineNode* pmNode);


/**** TCP Commands ****/
int isTCPValid(descriptorNode *node);
int isTCPReadable(descriptorNode *node);
int isTCPWritable(descriptorNode *node);
int getTCPError(descriptorNode *node);
char *stateStringTCP(descriptorNode *node);
descriptorNode *createTCP(int readBufferSize, int connSemIndex, 
			  int readSemIndex, int writeSemIndex);
descriptorNode *acceptTCP(descriptorNode *node, int readBufferSize, 
			  int connSemIndex, int readSemIndex, int writeSemIndex);
int listenTCP(descriptorNode *node, int port, int backlogSize);
int connectTCP(descriptorNode *node, int addr, int port);
int readTCP(descriptorNode *node, char *buf, int size);
int writeTCP(descriptorNode *node, char *buf, int size);
int recvfromTCP(descriptorNode *node, char *buf, int size, int *addr, int *port);
int sendtoTCP(descriptorNode *node, char *buf, int size, int addr, int port);
int shutdownTCP(descriptorNode *node);
int abortTCP(descriptorNode *node);
int destroyTCP(descriptorNode *node);
void installTCPEventRegistrations( descriptorNode *node, fd_set *rset, fd_set *wset, fd_set *exset);
/**** TCP Handlers ****/
void handleTCPUnconnectedEx(descriptorNode* node);
void handleTCPConnectionWait(descriptorNode* node);
void handleTCPConnectionWaitEx(descriptorNode* node);
void handleTCPListen(descriptorNode* node);
void handleTCPListenEx(descriptorNode* node);
void handleTCPEstablishedRd(descriptorNode* node);
void handleTCPEstablishedWr(descriptorNode* node);
void handleTCPEstablishedEx(descriptorNode* node);
void handleTCPOtherEndClosedWr(descriptorNode* node);
void handleTCPOtherEndClosedEx(descriptorNode* node);
void handleTCPThisEndClosedRd(descriptorNode* node);
void handleTCPThisEndClosedEx(descriptorNode* node);
void handleTCPClosedEx(descriptorNode* node);


/**** UDP Commands ****/
int isUDPValid(descriptorNode *node);
int isUDPReadable(descriptorNode *node);
int isUDPWritable(descriptorNode *node);
int getUDPError(descriptorNode *node);
char *stateStringUDP(descriptorNode *node);
descriptorNode *createUDP(int readBufferSize, int connSemIndex, 
			  int readSemIndex, int writeSemIndex);
descriptorNode *acceptUDP(descriptorNode *node, int readBufferSize, 
			  int connSemIndex, int readSemIndex, int writeSemIndex);
int listenUDP(descriptorNode *node, int port, int backlogSize);
int connectUDP(descriptorNode *node, int addr, int port);
int shutdownUDP(descriptorNode *node);
int abortUDP(descriptorNode *node);
int destroyUDP(descriptorNode *node);
int readUDP(descriptorNode *node, char *buf, int size);
int writeUDP(descriptorNode *node, char *buf, int size);
int recvfromUDP(descriptorNode *node, char *buf, int size, int *addr, int *port);
int sendtoUDP(descriptorNode *node, char *buf, int size, int addr, int port);
void installUDPEventRegistrations( descriptorNode *node, fd_set *rset, fd_set *wset, fd_set *exset);
/**** UDP Handlers ****/
void handleUDPClosed(descriptorNode* node);
void handleUDPOpenRd(descriptorNode* node);
void handleUDPOpenWr(descriptorNode* node);
void handleUDPOpenEx(descriptorNode* node);


/**** Polling ****/
void pollForIO(int microSeconds, int extraFd);
int computeMaxDescriptor();
void installEventDependencies(fd_set *rset, fd_set *wset, fd_set *exset);
void handleSelectEventForDescriptor( descriptorNode* node, int isRead, int isWrite, int isException);

/**** Protocol Machine Management ****/
protocolMachineNode* createProtocolMachine(int type, int numberOfSlices);
void destroyProtocolMachine(protocolMachineNode* machine);
void registerProtocolMachine(protocolMachineNode* machine);
void deregisterProtocolMachine(protocolMachineNode* machine);
protocolMachineNode* getProtocolMachine(int type);

/**** Descriptor Management ****/
descriptorNode* createEmptyDescriptor(int type, int fd, int readBufferSize,
	     int connSemIndex, int readSemIndex, int writeSemIndex);
void destroyDescriptor(descriptorNode* fdNode);
void registerDescriptor(descriptorNode* fdNode);
void deregisterDescriptor(descriptorNode* fdNode);
descriptorNode* getDescriptor(int fd);

/**** Handler Management ****/
eventHandler getHandler(protocolMachineNode* mach, int state, int event);
void setHandler(protocolMachineNode* mach, int state, int event, eventHandler function);

/****  RingBuffer  ****/
ringBuffer* createRingBuffer(int size);
void destroyRingBuffer(ringBuffer *theBuffer);
int readFromRingBuffer(ringBuffer *rb, char *lb, int size);
int writeToRingBuffer(ringBuffer *rb, char *lb, int size);
int bytesToWriteInRingBuffer(ringBuffer *rb);
int bytesToReadInRingBuffer(ringBuffer *rb);

/************* End Internal Function Prototypes *************/



/************* Begin Squeak Helper Declarations *************/
void sig_pipe(int sigio) {
  signal(SIGPIPE, sig_pipe);
  FPRINTF((stderr, "SIGPIPE received\n"));
  return;
}

descriptorNode *socketPtrToDescriptor(SocketPtr s) {
  if (!socketValid(s))
    return 0;
  return (descriptorNode*)s->privateSocketPtr;
}

int socketValid(SocketPtr s) {
  if ((s != 0) 
      && (s->sessionID == thisNetSession)
      && (s->privateSocketPtr != 0)
      && (SEND((descriptorNode*)s->privateSocketPtr, isValid)))
    return true;
  interpreterProxy->success(false);
  return false;
}



void notify(descriptorNode *node, int eventMask) {
  int mask = node->pendingEvents & eventMask;
  if (mask & CONN_NOTIFY) {
    FPRINTF((stderr, "NOTIFY: %d for connection event\n", node->fd));
    interpreterProxy->signalSemaphoreWithIndex(node->connSema);
  }
  if (mask & READ_NOTIFY) {
    FPRINTF((stderr, "NOTIFY: %d for read event\n", node->fd));
    node->pendingEvents ^= mask;  
    interpreterProxy->signalSemaphoreWithIndex(node->readSema);
  }
  if (mask & WRITE_NOTIFY) {
    FPRINTF((stderr, "NOTIFY: %d for write event\n", node->fd));
    node->pendingEvents ^= mask;  
    interpreterProxy->signalSemaphoreWithIndex(node->writeSema);
  }
  /* always signal the connection semaphore */
}
/************* End Squeak Helper Declarations *************/




/************* Begin Module Initialization Declarations *************/
int socketInit(void) {
  FPRINTF((stderr, "socketInit\n"));
  signal(SIGPIPE, sig_pipe);
  initializeMachineList();
  initializeDescriptorList();
  setupTCPHandlers();
  setupUDPHandlers();
  socketPollFunction = pollForIO;
  return 1;
}

int socketShutdown(void) {
  /* disable the polling function */
  socketPollFunction = (socketPollFunction_t)0;
  /* shutdown the network */
  sqNetworkShutdown();
  return 1;
}
/************* End Module Initialization Declarations *************/



/************* Begin Squeak Initialization Declarations *************/
int sqNetworkInit(int resolverSemaIndex)
{
  FPRINTF((stderr, "netInit\n"));
  if (0 != thisNetSession)
    return 0;
  gethostname(localHostName, MAXHOSTNAMELEN);
  FPRINTF((stderr, "local host name = %s\n", localHostName));
  localHostAddress = nameToAddr(localHostName);
  thisNetSession = clock() + time(0);
  if (0 == thisNetSession)
    thisNetSession = 1;
  resolverSema = resolverSemaIndex;
# ifndef ACORN
  signal(SIGPIPE, SIG_IGN);
# endif
  return 0;
}

void sqNetworkShutdown(void) {
  descriptorNode *node;
  thisNetSession= 0;
  resolverSema= 0;
  node = &descriptors;
  while (node->next != &descriptors) {
    node->next->squeakSocketPtr->privateSocketPtr = 0;
    destroyDescriptor(node->next);
  }
  /* TODO:  close and destroy all sockets */
}
/************* End Squeak Initialization Declarations *************/



/************* Begin Squeak Resolver Declarations *************/
/* Note: the Mac and Win32 implementations implement asynchronous lookups
 * in the DNS.  I can't think of an easy way to do this in Unix without
 * going totally ott with threads or somesuch.  If anyone knows differently,
 * please tell me about it. - Ian
 */

void sqResolverAbort(void) {}

void sqResolverStartAddrLookup(int address)
{
  const char *res;
  res= addrToName(address);
  strncpy(lastName, res, MAXHOSTNAMELEN);
  FPRINTF((stderr, "startAddrLookup %s\n", lastName));
}

int sqResolverStatus(void)
{
  if(!thisNetSession)
    return ResolverUninitialised;
  if(lastError != 0)
    return ResolverError;
  return ResolverSuccess;
}

int sqResolverAddrLookupResultSize(void)	{ return strlen(lastName); }
int sqResolverError(void)			{ return lastError; }
int sqResolverLocalAddress(void)		{ return nameToAddr(localHostName); }
int sqResolverNameLookupResult(void)		{ return lastAddr; }

void sqResolverAddrLookupResult(char *nameForAddress, int nameSize)
{
  memcpy(nameForAddress, lastName, nameSize);
}

void sqResolverStartNameLookup(char *hostName, int nameSize) {
  int len= (nameSize < MAXHOSTNAMELEN) ? nameSize : MAXHOSTNAMELEN;
  memcpy(lastName, hostName, len);
  lastName[len]= lastError= 0;
  FPRINTF((stderr, "name lookup %s\n", lastName));
  lastAddr= nameToAddr(lastName);
  /* we're done before we even started */
  interpreterProxy->signalSemaphoreWithIndex(resolverSema);
}
/************* End Squeak Resolver Declarations *************/








/************* Begin Squeak Socket Helper Declarations *************/
void sqSocketDisplay(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  printDescriptorNode(node);
}

void sqSocketDisplayAll() {
  printDescriptorTable();
}

int sqSocketConnectionStatus(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return -1;
  }
  FPRINTF((stderr, "\tQUERY fd: %d squeakState: %s internalState: %s\n", 
	   node->fd, squeakStateString(node), SEND(node, stateString)));
  return node->squeakState;
}

int sqSocketReceiveDataAvailable(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return false;
  }
  FPRINTF((stderr, "\tQUERY fd: %d readable: %d\n", node->fd, SEND(node, isReadable)));
  if (SEND(node, isReadable))
    return true;
  node->pendingEvents |= READ_NOTIFY;
  return false;
}


int sqSocketSendDone(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return false;
  }
  FPRINTF((stderr, "\tQUERY fd: %d writable: %d\n", node->fd, SEND(node, isWritable)));
  if (SEND(node, isWritable))
    return true;
  node->pendingEvents |= WRITE_NOTIFY;
  return false;
}


int sqSocketError(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return EBADF;
  }
  FPRINTF((stderr, "\tQUERY fd: %d error: %d\n", node->fd, SEND(node, getError)));
  return SEND(node, getError);
}

int sqSocketLocalAddress(SocketPtr s) {
  struct sockaddr_in saddr;
  socklen_t saddrSize= sizeof(saddr);
  descriptorNode *node = socketPtrToDescriptor(s);
  if (!node) return false;
  if (getsockname(node->fd, (struct sockaddr *)&saddr, &saddrSize)
      || (AF_INET != saddr.sin_family))
    return 0;
  return ntohl(saddr.sin_addr.s_addr);
}

int sqSocketRemoteAddress(SocketPtr s) {
  struct sockaddr_in saddr;
  socklen_t saddrSize= sizeof(saddr);
  descriptorNode *node = socketPtrToDescriptor(s);
  if (!node) return false;
  if (TCPSocketType == node->type) {
      /* --- TCP --- */
      if (getpeername(node->fd, (struct sockaddr *)&saddr, &saddrSize)
	  || (AF_INET != saddr.sin_family))
	return 0;
      return ntohl(saddr.sin_addr.s_addr);
    }
  /* --- UDP --- */
  return ntohl(node->peer.sin_addr.s_addr);
}

int sqSocketLocalPort(SocketPtr s) {
  struct sockaddr_in saddr;
  socklen_t saddrSize= sizeof(saddr);
  descriptorNode *node = socketPtrToDescriptor(s);
  if (!node) return false;
  if (getsockname(node->fd, (struct sockaddr *)&saddr, &saddrSize)
      || (AF_INET != saddr.sin_family))
    return 0;
  return ntohs(saddr.sin_port);
}

int sqSocketRemotePort(SocketPtr s) {
  struct sockaddr_in saddr;
  socklen_t saddrSize= sizeof(saddr);
  descriptorNode *node = socketPtrToDescriptor(s);
  if (!node) return false;
   if (TCPSocketType == node->type)
    {
      /* --- TCP --- */
      if (getpeername(node->fd, (struct sockaddr *)&saddr, &saddrSize)
	  || (AF_INET != saddr.sin_family))
	return 0;
      return ntohs(saddr.sin_port);
    }
  /* --- UDP --- */
  return ntohs(node->peer.sin_port);
}
/************* End Squeak Socket Helper Declarations *************/



/************* Begin Squeak Socket Commands Declarations *************/
void sqSocketCreateNetTypeSocketTypeRecvBytesSendBytesSemaID
    (SocketPtr s, int netType, int socketType,
     int recvBufSize, int sendBufSize, int semaIndex) {
  sqSocketCreateNetTypeSocketTypeRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID
    (s, netType, socketType,recvBufSize, sendBufSize,
     semaIndex, semaIndex, semaIndex);
}

void sqSocketCreateNetTypeSocketTypeRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID
    (SocketPtr s, int netType, int socketType,
     int recvBufSize, int sendBufSize,
     int semaIndex, int readSemaIndex, int writeSemaIndex) {
  protocolMachineNode *mach;
  descriptorNode *node;
  /* we can't get the machine from the INVALID descriptor, so use the direct access by type  */
  mach = getProtocolMachine(socketType);
  if  (mach == 0) {
    interpreterProxy->success(false);
    return;
  }
  node = mach->create(MAXBUFSIZE, semaIndex, readSemaIndex, writeSemaIndex);
  if (node == 0) {
    interpreterProxy->success(false);
    return;
  }
  FPRINTF((stderr, "COMMAND create fd: %d\n", node->fd));
  s->sessionID = thisNetSession;
  s->socketType = socketType;
  s->privateSocketPtr = node;
  node->squeakSocketPtr = s;
}

void sqSocketListenOnPort(SocketPtr s, int port) {
  sqSocketListenOnPortBacklogSize(s, port, 1);
}

void sqSocketListenOnPortBacklogSize(SocketPtr s, int port, int backlogSize) {
  int result;
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return;
  }
  FPRINTF((stderr, "COMMAND listen fd: %d port: %d backlog: %d\n", node->fd, port, backlogSize));
  result = SEND2(node, listen, port, backlogSize);
  if (result == -1)
    interpreterProxy->success(false);
}

void sqSocketConnectToPort(SocketPtr s, int addr, int port) {
  int result;
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return;
  }
  FPRINTF((stderr, "COMMAND connect fd: %d addr: %lx port: %d\n", node->fd, (unsigned long)addr, port));
  result = SEND2(node, connect, addr, port);
  if (result == -1)
    interpreterProxy->success(false);
}

void sqSocketAcceptFromRecvBytesSendBytesSemaID
    (SocketPtr s, SocketPtr serverSocket,
     int recvBufSize, int sendBufSize, int semaIndex) {
  sqSocketAcceptFromRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID
    (s, serverSocket, recvBufSize, sendBufSize, 
     semaIndex, semaIndex, semaIndex);
}


void sqSocketAcceptFromRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID
    (SocketPtr s, SocketPtr serverSocket,
     int recvBufSize, int sendBufSize,
     int semaIndex, int readSemaIndex, int writeSemaIndex) {
  descriptorNode *connNode;
  descriptorNode *node = socketPtrToDescriptor(serverSocket);
  if (node == 0) {
    interpreterProxy->success(false);
    return;
  }
  if (!node->multiListen) {
    interpreterProxy->success(false);
    return;
  }
  connNode = SEND4(node, accept, MAXBUFSIZE, semaIndex, readSemaIndex, writeSemaIndex);
  if (connNode == 0) {
    interpreterProxy->success(false);
    return;
  }
  FPRINTF((stderr, "COMMAND accept fd: %d -> newFd: %d\n", node->fd, connNode->fd));
  s->privateSocketPtr = connNode;
  s->socketType = node->type;
  s->sessionID = thisNetSession;
  node->squeakSocketPtr = s;
}

void sqSocketCloseConnection(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return;
  }
  FPRINTF((stderr, "COMMAND close fd: %d\n", node->fd));
  if(SEND(node, shutdown))
    interpreterProxy->success(false);
}

void sqSocketAbortConnection(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return;
  }
  FPRINTF((stderr, "COMMAND abort fd: %d\n", node->fd));
  if(SEND(node, abort))
    interpreterProxy->success(false);
}


/* Release the resources associated with this socket. 
   If a connection is open, abort it.*/

void sqSocketDestroy(SocketPtr s) {
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return;
  }
  FPRINTF((stderr, "COMMAND destroy fd: %d\n", node->fd));
  if (SEND(node, destroy))
    interpreterProxy->success(false);
  s->privateSocketPtr = 0;
}

int sqSocketReceiveDataBufCount(SocketPtr s, int buf, int bufSize) {
  int bytesRead = 0;
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return 0;
  }
  FPRINTF((stderr, "COMMAND read fd: %d bytes: %d\n", node->fd, bufSize));
  bytesRead = SEND2(node, read, (char*)buf, bufSize);
  FPRINTF((stderr, "\t\tread bytes: %d\n", bytesRead));
  if (bytesRead == -1) {
    interpreterProxy->success(false);
    return 0;
  }
  return bytesRead;
}

int sqSocketSendDataBufCount(SocketPtr s, int buf, int bufSize) {
  int bytesWritten = 0;
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return 0;
  }
  FPRINTF((stderr, "COMMAND write fd: %d bytes: %d\n", node->fd, bufSize));
  bytesWritten = SEND2(node, write, (char*)buf, bufSize);
  FPRINTF((stderr, "\t\twrote bytes: %d\n", bytesWritten));
  if (bytesWritten == -1) {
    interpreterProxy->success(false);
    return 0;
  }
  return bytesWritten;
}

int sqSocketReceiveUDPDataBufCountaddressportmoreFlag ( 
        SocketPtr s, int buf, int bufSize, int *address,  
	int *port, int *moreFlag) {
  int bytesRead;
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return 0;
  }
  bytesRead = SEND4(node, recvfrom, (char*)buf, bufSize, address, port);
  FPRINTF((stderr, "COMMAND recvfrom fd: %d bytes: %d\n", node->fd, bytesRead));
  if (bytesRead == -1) {
    interpreterProxy->success(false);
    return 0;
  }
  return bytesRead;
}


int sqSockettoHostportSendDataBufCount ( 
	SocketPtr s, int address, int port,
	int buf, int bufSize) {
  int bytesWritten;
  descriptorNode *node = socketPtrToDescriptor(s);
  if (node == 0) {
    interpreterProxy->success(false);
    return 0;
  }
  bytesWritten = SEND4(node, sendto, (char*)buf, bufSize, address, port);
  FPRINTF((stderr, "COMMAND sendto fd: %d bytes: %d\n", node->fd, bytesWritten));
  if (bytesWritten == -1) {
    interpreterProxy->success(false);
    return 0;
  }
  return bytesWritten;
}
/************* End Squeak Socket Commands Declarations *************/





/************* Begin Socket Options Declarations *************/
/*** socket options ***/


/* NOTE: we only support the portable options here as an incentive for
         people to write portable Squeak programs.  If you need
         non-portable socket options then go write yourself a plugin
         specific to your platform.  This decision is unilateral and
         non-negotiable.  - ikp
   NOTE: we only support the integer-valued options because the code
	 in SocketPlugin doesn't seem able to cope with the others.
	 (Personally I think that things like SO_SNDTIMEO et al would
	 by far more interesting than the majority of things on this
	 list, but there you go...)
   NOTE: if your build fails because of a missing option in this list,
	 simply DELETE THE OPTION (or comment it out) and then send
	 me mail (ian.piumarta at inria.fr) to let me know about it.
 */
typedef struct
{
  char *name;		/* name as known to Squeak */
  int   optlevel;	/* protocol level */
  int   optname;	/* name as known to Unix */
} socketOption;

#ifndef SOL_IP
# define SOL_IP IPPROTO_IP
#endif

#ifndef SOL_UDP
# define SOL_UDP IPPROTO_UDP
#endif

#ifndef SOL_TCP
# define SOL_TCP IPPROTO_TCP
#endif

static socketOption socketOptions[]= {
  { "SO_DEBUG",				SOL_SOCKET,	SO_DEBUG },
  { "SO_REUSEADDR",			SOL_SOCKET,	SO_REUSEADDR },
  { "SO_DONTROUTE",			SOL_SOCKET,	SO_DONTROUTE },
  { "SO_BROADCAST",			SOL_SOCKET,	SO_BROADCAST },
  { "SO_SNDBUF",			SOL_SOCKET,	SO_SNDBUF },
  { "SO_RCVBUF",			SOL_SOCKET,	SO_RCVBUF },
  { "SO_KEEPALIVE",			SOL_SOCKET,	SO_KEEPALIVE },
  { "SO_OOBINLINE",			SOL_SOCKET,	SO_OOBINLINE },
  { "SO_LINGER",			SOL_SOCKET,	SO_LINGER },
  { "IP_TTL",				SOL_IP,		IP_TTL },
  { "IP_HDRINCL",			SOL_IP,		IP_HDRINCL },
  { "IP_MULTICAST_IF",			SOL_IP,		IP_MULTICAST_IF },
  { "IP_MULTICAST_TTL",			SOL_IP,		IP_MULTICAST_TTL },
  { "IP_MULTICAST_LOOP",		SOL_IP,		IP_MULTICAST_LOOP },
  { "TCP_MAXSEG",			SOL_TCP,	TCP_MAXSEG },
  { "TCP_NODELAY",			SOL_TCP,	TCP_NODELAY },
# if 0 /*** deliberately unsupported options -- do NOT enable these! ***/
  { "SO_REUSEPORT",			SOL_SOCKET,	SO_REUSEPORT },
  { "SO_PRIORITY",			SOL_SOCKET,	SO_PRIORITY },
  { "SO_RCVLOWAT",			SOL_SOCKET,	SO_RCVLOWAT },
  { "SO_SNDLOWAT",			SOL_SOCKET,	SO_SNDLOWAT },
  { "IP_RCVOPTS",			SOL_IP,		IP_RCVOPTS },
  { "IP_RCVDSTADDR",			SOL_IP,		IP_RCVDSTADDR },
  { "UDP_CHECKSUM",			SOL_UDP,	UDP_CHECKSUM },
  { "TCP_ABORT_THRESHOLD",		SOL_TCP,	TCP_ABORT_THRESHOLD },
  { "TCP_CONN_NOTIFY_THRESHOLD",	SOL_TCP,	TCP_CONN_NOTIFY_THRESHOLD },
  { "TCP_CONN_ABORT_THRESHOLD",		SOL_TCP,	TCP_CONN_ABORT_THRESHOLD },
  { "TCP_NOTIFY_THRESHOLD",		SOL_TCP,	TCP_NOTIFY_THRESHOLD },
  { "TCP_URGENT_PTR_TYPE",		SOL_TCP,	TCP_URGENT_PTR_TYPE },
# endif
  { (char *)0,				0,		0 }
};


static socketOption *findOption(char *name, size_t nameSize)
{
  socketOption *opt= 0;
  char buf[32];
  strncpy(buf, name, nameSize);
  for (opt= socketOptions; opt->name != 0; ++opt)
    if (!strcmp(buf, opt->name))
      return opt;
  return 0;
}


/* set the given option for the socket.
 */
int sqSocketSetOptionsoptionNameStartoptionNameSizeoptionValueStartoptionValueSizereturnedValue
    (SocketPtr s,int optionName, int optionNameSize,
     int optionValue, int optionValueSize, int *result) {
  interpreterProxy->success(false);
  return false;
}


/* query the socket for the given option.  */
int sqSocketGetOptionsoptionNameStartoptionNameSizereturnedValue
    (SocketPtr s,int optionName, int optionNameSize, int *result)
{
  interpreterProxy->success(false);
  return errno;
}
/************* End Socket Options Declarations *************/




/************* Begin Initialize Declarations *************/
void initializeDescriptorList() {
  memset(&descriptors, 0, sizeof(descriptorNode));
  descriptors.next = &descriptors;
  descriptors.fd = -1;
  descriptors.type = -1;
  computeMaxDescriptor();
}
void initializeMachineList() {
  protocolMachines.next = &protocolMachines;
  protocolMachines.descriptorType = -1;
  protocolMachines.slices = 0;
}


void setupTCPHandlers() {
  protocolMachineNode* mach = createProtocolMachine(TCPSocketType, NumberOfStatesTCP);  

  mach->isValid = isTCPValid;
  mach->isReadable = isTCPReadable;
  mach->isWritable = isTCPWritable;
  mach->getError = getTCPError;
  mach->stateString = stateStringTCP;
  mach->create = createTCP;
  mach->accept = acceptTCP;
  mach->listen = listenTCP;
  mach->connect = connectTCP;
  mach->read = readTCP;
  mach->write = writeTCP;
  mach->recvfrom = recvfromTCP;
  mach->sendto = sendtoTCP;
  mach->shutdown = shutdownTCP;
  mach->abort = abortTCP;
  mach->destroy = destroyTCP;
  mach->eventCallback = notify;
  mach->installEventRegistrations = installTCPEventRegistrations;

  registerProtocolMachine(mach);

  setHandler(mach, TCPUnconnected, SEL_READ, handleTCPUnconnectedEx);
  setHandler(mach, TCPUnconnected, SEL_WRITE, handleTCPUnconnectedEx);
  setHandler(mach, TCPUnconnected, SEL_EXCEPTION, handleTCPUnconnectedEx);
  setHandler(mach, TCPWaitingForConnection, SEL_READ, handleTCPConnectionWait);
  setHandler(mach, TCPWaitingForConnection, SEL_WRITE, handleTCPConnectionWait);
  setHandler(mach, TCPWaitingForConnection, SEL_EXCEPTION, handleTCPConnectionWaitEx);
  setHandler(mach, TCPListen, SEL_READ, handleTCPListen);
  setHandler(mach, TCPListen, SEL_WRITE, handleTCPListen);
  setHandler(mach, TCPListen, SEL_EXCEPTION, handleTCPListenEx);
  setHandler(mach, TCPEstablished, SEL_READ, handleTCPEstablishedRd);
  setHandler(mach, TCPEstablished, SEL_WRITE, handleTCPEstablishedWr);
  setHandler(mach, TCPEstablished, SEL_EXCEPTION, handleTCPEstablishedEx);
  setHandler(mach, TCPOtherEndClosed, SEL_READ, handleTCPOtherEndClosedEx);
  setHandler(mach, TCPOtherEndClosed, SEL_WRITE, handleTCPOtherEndClosedWr);
  setHandler(mach, TCPOtherEndClosed, SEL_EXCEPTION, handleTCPOtherEndClosedEx);
  setHandler(mach, TCPThisEndClosed, SEL_READ, handleTCPThisEndClosedRd);
  setHandler(mach, TCPThisEndClosed, SEL_WRITE, handleTCPThisEndClosedEx);
  setHandler(mach, TCPThisEndClosed, SEL_EXCEPTION, handleTCPThisEndClosedEx);
  setHandler(mach, TCPClosed, SEL_READ, handleTCPClosedEx);
  setHandler(mach, TCPClosed, SEL_WRITE, handleTCPClosedEx);
  setHandler(mach, TCPClosed, SEL_EXCEPTION, handleTCPClosedEx);
}

void setupUDPHandlers() {
  protocolMachineNode* mach = createProtocolMachine(UDPSocketType, NumberOfStatesUDP);

  mach->isValid = isUDPValid;
  mach->isReadable = isUDPReadable;
  mach->isWritable = isUDPWritable;
  mach->getError = getUDPError;
  mach->stateString = stateStringUDP;
  mach->create = createUDP;
  mach->accept = acceptUDP;
  mach->listen = listenUDP;
  mach->connect = connectUDP;
  mach->read = readUDP;
  mach->write = writeUDP;
  mach->recvfrom = recvfromUDP;
  mach->sendto = sendtoUDP;
  mach->shutdown = shutdownUDP;
  mach->abort = abortUDP;
  mach->destroy = destroyUDP;
  mach->eventCallback = notify;
  mach->installEventRegistrations = installUDPEventRegistrations;

  registerProtocolMachine(mach);

  setHandler(mach, UDPClosed, SEL_READ, &handleUDPClosed);
  setHandler(mach, UDPClosed, SEL_WRITE, &handleUDPClosed);
  setHandler(mach, UDPClosed, SEL_EXCEPTION, &handleUDPClosed);
  setHandler(mach, UDPOpen, SEL_READ, &handleUDPOpenRd);
  setHandler(mach, UDPOpen, SEL_WRITE, &handleUDPOpenWr);
  setHandler(mach, UDPOpen, SEL_EXCEPTION, &handleUDPOpenEx);
}
/************* End Initialize Declarations *************/



/************* Begin TCP Commands Declarations *************/
int isTCPValid(descriptorNode* node) {
  if (node == 0)
    return 0;
  if (node->fd >= 0)    
    return 1;
  return 0;
}

int isTCPReadable(descriptorNode* node) {
  if (!SEND(node, isValid))
    return 0;
  if ((node->state == TCPEstablished) 
      || (node->state == TCPThisEndClosed)
      || (node->state == TCPOtherEndClosed)) {
    return bytesToReadInRingBuffer(node->readBuffer) > 0;
  }
  return 0;
}

int isTCPWritable (descriptorNode* node) {
  struct timeval tv= { 0, 0 };
  fd_set fds;
  if (!SEND(node, isValid))
    return 0;
  if ((node->state == TCPEstablished) 
      || (node->state == TCPOtherEndClosed)) {
    FD_ZERO(&fds);
    FD_SET(node->fd, &fds);
    return select(node->fd + 1, 0, &fds, 0, &tv) > 0;
  }
  return 0;
}

int getTCPError(descriptorNode *node) {
  int localError;
  int size;
  int value;

  if (!SEND(node, isValid))
    return EBADF;
  if ((node->state == TCPEstablished) 
      || (node->state == TCPThisEndClosed)
      || (node->state == TCPListen)
      || (node->state == TCPWaitingForConnection)
      || (node->state == TCPOtherEndClosed)) {
    size = sizeof(localError);
    value = getsockopt(node->fd, SOL_SOCKET, SO_ERROR, &localError, &size);
    if (value == -1) {
      return errno;
    }
    return localError;
  } else {
    return ENOTCONN;
  }
}

char *stateStringTCP(descriptorNode *node) {
  char *stateStr;
  switch (node->state) {
    case TCPUnconnected:
      stateStr = "TCPUnconnected";
      break;
    case TCPWaitingForConnection:
      stateStr = "TCPWaitingForConnection";
      break;
    case TCPListen:
      stateStr = "TCPListen";
      break;
    case TCPEstablished:
      stateStr = "TCPEstablished";
      break;
    case TCPThisEndClosed:
      stateStr = "TCPThisEndClosed";
      break;
    case TCPOtherEndClosed:
      stateStr = "TCPOtherEndClosed";
      break;
    case TCPClosed:
      stateStr = "TCPClosed";
      break;
    default:
      stateStr = "TCPInvalidState";
      break;
  }
  return stateStr;
}

descriptorNode *createTCP(int readBufferSize, int connSemIndex, int readSemIndex, int writeSemIndex) {
  int fd;
  descriptorNode* node;

  fd = socket(AF_INET, SOCK_STREAM, 0);
  if (fd == -1) {
    return 0;
  }
  node = createEmptyDescriptor(TCPSocketType, fd, 
	      readBufferSize, connSemIndex, readSemIndex, writeSemIndex);
  node->state = TCPUnconnected;
  node->squeakState = Unconnected;
  registerDescriptor(node);
  return node;
}

descriptorNode* acceptTCP(descriptorNode *node, int readBufferSize, int connSemIndex, int readSemIndex, int writeSemIndex) {
  descriptorNode* connNode;

  if (!SEND(node, isValid)) {
    node->acceptedSocket = -1;
    return 0;
  }
  if (node->acceptedSocket < 0)
    return node;
  connNode = createEmptyDescriptor(TCPSocketType, node->acceptedSocket, 
		  readBufferSize, connSemIndex, readSemIndex, writeSemIndex);
  if (connNode == 0)
    return 0;
  node->acceptedSocket = -1;
  node->squeakState = WaitingForConnection;
  connNode->state = TCPEstablished;
  connNode->squeakState = Connected;
  registerDescriptor(connNode);
  return connNode;
}

int listenTCP(descriptorNode *node, int port, int backlogSize) {
  int on = 1;
  int val;

  if (!SEND(node, isValid))
    return 0;
  node->multiListen = backlogSize > 1;
  node->peer.sin_port = htons(port);
  val = setsockopt(node->fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
  if (val == -1) {
    if ((errno != EWOULDBLOCK)
	&& (errno != EINPROGRESS)) {
      return val;
    }
  }
  val = bind(node->fd, &(node->peer), sizeof(node->peer));
  if (val == -1) {
    if ((errno != EWOULDBLOCK)
	&& (errno != EINPROGRESS)) {
      return val;
    }
  }
  val = ioctl(node->fd, FIONBIO, (char *)&on);
  if (val == -1) {
    if ((errno != EWOULDBLOCK)
	&& (errno != EINPROGRESS)) {
      return val;
    }
  }
  val = listen(node->fd, backlogSize);
  if (val == -1) {
    if ((errno != EWOULDBLOCK)
	&& (errno != EINPROGRESS)) {
      return val;
    }
  }
  node->state = TCPListen;
  node->squeakState = WaitingForConnection;
  return 0;
}

int connectTCP(descriptorNode *node, int addr, int port) {
  int val;
  int on = 1;

  if (!SEND(node, isValid))
    return 0;
  node->peer.sin_addr.s_addr = htonl(addr);
  node->peer.sin_port        = htons(port);
  fcntl(node->fd, F_SETFL, node->fdFlags | O_NONBLOCK);
  val = connect(node->fd, &(node->peer), sizeof(node->peer));
  FPRINTF((stderr, "Connect return val: %d errno: %d\n", val, errno));
  if (val == 0) {
    node->state = TCPEstablished;
    node->squeakState = Connected;
    fcntl(node->fd, F_SETFL, node->fdFlags);
    ioctl(node->fd, FIONBIO, (char *)&on);
    return 0;
  }
  if (val == -1) {
    if ((errno != EWOULDBLOCK)
	&& (errno != EINPROGRESS)) {
      return -1;
    }
  }
  node->state = TCPWaitingForConnection;
  node->squeakState = WaitingForConnection;
  return 0;
}

int shutdownTCP(descriptorNode *node) {
  struct linger linger= { 0, 0 };

  if (!SEND(node, isValid))
    return 0;
  if ((node->state == TCPOtherEndClosed) 
      || (node->state == TCPWaitingForConnection)
      || (node->state == TCPUnconnected)
      || (node->state == TCPListen)) {
    setsockopt(node->fd, SOL_SOCKET, SO_LINGER,
	       (char *)&linger, sizeof(linger));
    close(node->fd);
    node->state = TCPClosed;
    node->squeakState = Unconnected;
  } else if (node->state == TCPEstablished) {
    shutdown(node->fd, SHUT_WR);
    node->state = TCPThisEndClosed;
    node->squeakState = ThisEndClosed;
  }
  return 0;
}

int abortTCP(descriptorNode *node) {
  struct linger linger= { 0, 0 };

  if (!SEND(node, isValid))
    return 0;
  setsockopt(node->fd, SOL_SOCKET, SO_LINGER,
             (char *)&linger, sizeof(linger));
  close(node->fd);
  node->state = TCPClosed;
  node->squeakState = Unconnected;
  return 0;
}

int destroyTCP(descriptorNode *node) {
  struct linger linger= { 0, 0 };

  if (!SEND(node, isValid))
    return 0;
  if (node->state != TCPClosed) {
    setsockopt(node->fd, SOL_SOCKET, SO_LINGER,
	       (char *)&linger, sizeof(linger));
    close(node->fd);
    node->state = TCPClosed;
    node->squeakState = Unconnected;
  }
  destroyDescriptor(node);
  return 0;
}

int readTCP(descriptorNode *node, char *buf, int size) {
  int bytesRead;
  if (!SEND(node, isReadable))
    return -1;
  bytesRead = readFromRingBuffer(node->readBuffer, buf, size);
  if ((node->state == TCPOtherEndClosed)
      && (bytesToReadInRingBuffer(node->readBuffer) == 0))
    node->squeakState = OtherEndClosed;
  return bytesRead;
}

int writeTCP(descriptorNode *node, char *buf, int size) {
  int bytesWritten = 0;
  if (!SEND(node, isWritable))
    return -1;

  bytesWritten = write(node->fd, buf, size);
  if (bytesWritten < 0) {
    if ((errno != EWOULDBLOCK)
	&& (errno != EINPROGRESS)) {
      return -1;
    }
  } else if (bytesWritten == 0) {
    if (node->state == TCPEstablished) {
      node->state = TCPOtherEndClosed;
      node->squeakState = OtherEndClosed;      
    } else if (node->state == TCPOtherEndClosed) {
      node->state = TCPClosed;
      node->squeakState = Unconnected;      
    }
  } else {
    return bytesWritten;
  }
  return 0;
}

int recvfromTCP(descriptorNode *node, char *buf, int size, int *addr, int *port) {
  return -1;
}

int sendtoTCP(descriptorNode *node, char *buf, int size, int addr, int port) {
  return -1;
}

void installTCPEventRegistrations(descriptorNode *node, fd_set *rset, fd_set *wset, fd_set *exset) {
  switch(node->state) {
    case TCPUnconnected:
      break;
    case TCPWaitingForConnection:
      FD_SET(node->fd, rset);
      FD_SET(node->fd, wset);
      if (node->pendingEvents & CHECK_OOB)
        FD_SET(node->fd, exset);
      break;
    case TCPListen:
      FD_SET(node->fd, rset);
      if (node->pendingEvents & CHECK_OOB)
        FD_SET(node->fd, exset);
      break;
    case TCPEstablished:
      if (bytesToWriteInRingBuffer(node->readBuffer) > 0)
        FD_SET(node->fd, rset);
      if (node->pendingEvents & WRITE_NOTIFY)
        FD_SET(node->fd, wset);
      if (node->pendingEvents & CHECK_OOB)
	FD_SET(node->fd, exset);
      break;
    case TCPOtherEndClosed:
      if (node->pendingEvents & WRITE_NOTIFY)
        FD_SET(node->fd, wset);
      if (node->pendingEvents & CHECK_OOB)
	FD_SET(node->fd, exset);
      break;
    case TCPThisEndClosed:
      FD_SET(node->fd, rset);
      break;
    case TCPClosed:
      break;
    default:
      break;
  }
}
/************* End TCP Commands Declarations *************/


/************* Begin TCP Handlers Declarations *************/
void handleTCPUnconnectedEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPConnectionWait(descriptorNode* node) {
  int on = 1;
  struct linger ling;

  FPRINTF((stderr, "EVENT fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  if(SEND(node, getError)) {
    node->state = TCPClosed;
    node->squeakState = Unconnected;
    SEND1(node, eventCallback, CONN_NOTIFY);
    return;
  }
  node->state = TCPEstablished;
  node->squeakState = Connected;
  ling.l_onoff = 1;               
  ling.l_linger = 0;
  setsockopt(node->fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));  /* cause RST to be sent on close() */
  ioctl(node->fd, FIONBIO, (char *)&on);   /* cause non-blocking behavior */
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPConnectionWaitEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPListen(descriptorNode* node) {
  int connfd;

  FPRINTF((stderr, "EVENT fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  if (node->acceptedSocket >= 0) {
    SEND1(node, eventCallback, CONN_NOTIFY);
    return;
  }
  connfd = accept(node->fd, 0, 0);
  if (connfd < 0) {
    if ((errno == EWOULDBLOCK)
	|| (errno == EINTR)
	|| (errno == ECONNABORTED)
	|| (errno == EPROTO))
      return;
  } else {
    node->squeakState = Connected;
    if (node->multiListen) {
      node->acceptedSocket = connfd;
    } else {
      close(node->fd);
      node->fd = connfd;
      node->state = TCPEstablished;
    }
  }
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPListenEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPEstablishedRd(descriptorNode* node) {
  int bufSize = MAXBUFSIZE;
  char buffer[MAXBUFSIZE];
  int bytesRead;
  FPRINTF((stderr, "EVENT read fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  bytesRead = read(node->fd, buffer,   
		  min(bufSize, 
		      node->readBuffer->ringSize - node->readBuffer->byteCount));
  if ( bytesRead < 0) {
    if (errno != EWOULDBLOCK)
      return;
  } else if (bytesRead == 0) {
    node->state = TCPOtherEndClosed;
    if (bytesToReadInRingBuffer(node->readBuffer) == 0) {
      node->squeakState = OtherEndClosed;
      SEND1(node, eventCallback, CONN_NOTIFY);
    }
  } else {
    writeToRingBuffer(node->readBuffer, buffer, bytesRead);
    SEND1(node, eventCallback, READ_NOTIFY);
  }
}

void handleTCPEstablishedWr(descriptorNode* node) {
  FPRINTF((stderr, "EVENT write fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, WRITE_NOTIFY);
}
void handleTCPEstablishedEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPOtherEndClosedWr(descriptorNode* node) {
  FPRINTF((stderr, "EVENT write fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, WRITE_NOTIFY);
}
void handleTCPOtherEndClosedEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPThisEndClosedRd(descriptorNode* node) {
  int bufSize = MAXBUFSIZE;
  char buffer[MAXBUFSIZE];
  int bytesRead;
  FPRINTF((stderr, "EVENT read fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  bytesRead = read(node->fd, buffer,   
		  min(bufSize, 
		      node->readBuffer->ringSize - node->readBuffer->byteCount));
  if ( bytesRead < 0) {
    if (errno != EWOULDBLOCK) {
      return;
    }
  } else if (bytesRead == 0) {
    shutdown(node->fd, SHUT_RD);
    node->state = TCPClosed;
    node->squeakState = Unconnected;
    getProtocolMachine(node->type)->eventCallback(node, CONN_NOTIFY);
  } else {
    writeToRingBuffer(node->readBuffer, buffer, bytesRead);
    getProtocolMachine(node->type)->eventCallback(node, READ_NOTIFY);
  }
}
void handleTCPThisEndClosedEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleTCPClosedEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
}

/************* End TCP Handlers Declarations *************/





/************* Begin UDP Commands Declarations *************/
int isUDPValid(descriptorNode* node) {
  if (node == 0)
    return 0;
  if (node->fd >= 0)
    return 1;
  return 0;
}

int isUDPReadable(descriptorNode* node) {
  struct timeval tv= { 0, 0 };
  fd_set fds;

  if (!SEND(node, isValid))
    return 0;
  if (node->state == UDPOpen) {
    FD_ZERO(&fds);
    FD_SET(node->fd, &fds);
    return select(node->fd + 1, &fds, 0, 0, &tv) > 0;
  }
  return 0;
}

int isUDPWritable (descriptorNode* node) {
  struct timeval tv= { 0, 0 };
  fd_set fds;

  if (!SEND(node, isValid))
    return 0;
  if (node->state == UDPOpen) {
    FD_ZERO(&fds);
    FD_SET(node->fd, &fds);
    return select(node->fd + 1, 0, &fds, 0, &tv) > 0;
  }
  return 0;
}

int getUDPError(descriptorNode *node) {
  int localError;
  int size;
  int value;

  if (!SEND(node, isValid))
    return EBADF;
  if (node->state == UDPOpen) {
    size = sizeof(localError);
    value = getsockopt(node->fd, SOL_SOCKET, SO_ERROR, &localError, &size);
    FPRINTF((stderr, "descriptor: %d error: %d\n", node->fd, localError));
    if (value == -1) {
      return errno;
    }
    return localError;
  } else {
    return ENOTCONN;
  }
}

void installUDPEventRegistrations(descriptorNode *node, fd_set *rset, fd_set *wset, fd_set *exset) {
  switch(node->state) {
    case UDPClosed:
      break;
    case UDPOpen:
      FD_SET(node->fd, rset);
      if (node->pendingEvents & WRITE_NOTIFY)
        FD_SET(node->fd, wset);
      if (node->pendingEvents & CHECK_OOB)
        FD_SET(node->fd, exset);
      break;
    default:
      break;
  }
}

descriptorNode *createUDP(int readBufferSize, int connSemIndex, 
			  int readSemIndex, int writeSemIndex) {
  int fd;
  descriptorNode* node;

  fd = socket(AF_INET, SOCK_DGRAM, 0);
  node = createEmptyDescriptor(UDPSocketType, fd,  
	      readBufferSize, connSemIndex, readSemIndex, writeSemIndex);
  node->state = UDPOpen;
  node->squeakState = Connected;
  node->peer.sin_family = AF_INET;
  node->peer.sin_addr.s_addr = INADDR_ANY;
  node->peer.sin_port = 0;

  registerDescriptor(node);
  return node;
}

descriptorNode* acceptUDP(descriptorNode *listenNode, int readBufferSize, 
			  int connSemIndex, int readSemIndex, int writeSemIndex) {
  return 0;
}

int listenUDP(descriptorNode *node, int port, int backlogSize) {
  int on = 1;
  node->peer.sin_port = htons(port);
  setsockopt(node->fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
  bind(node->fd, &(node->peer), sizeof(node->peer));

  node->state = UDPOpen;
  node->squeakState = Connected;
  node->multiListen = 0;
  return 0;
}

int connectUDP(descriptorNode *node, int addr, int port) {
  node->peer.sin_port = htons(port);
  node->peer.sin_addr.s_addr = htonl(addr);

  node->state = UDPOpen;
  node->squeakState = Connected;
  return 0;
}

int shutdownUDP(descriptorNode *node) {
  close(node->fd);
  node->state = UDPClosed;
  node->squeakState = ThisEndClosed;
  return 0;
}

int abortUDP(descriptorNode *node) {
  close(node->fd);
  node->state = UDPClosed;
  node->squeakState = ThisEndClosed;
  return 0;
}

int destroyUDP(descriptorNode *node) {
  shutdownUDP(node);
  destroyDescriptor(node);
  return 0;
}

int readUDP(descriptorNode *node, char *buf, int size) {
  int bytesRead;
  int addrsize;
  if (!SEND(node, isReadable))
    return -1;
  addrsize = sizeof(node->peer);
  bytesRead =  recvfrom(node->fd, buf, size, 0, 
			(struct sockaddr*)&(node->peer), &addrsize);
  return bytesRead;
}

int writeUDP(descriptorNode *node, char *buf, int size) {
  int bytesWritten;
  int addrsize;
  if (!SEND(node, isWritable))
    return -1;
  addrsize = sizeof(node->peer);
  bytesWritten = sendto(node->fd, buf, size, 0, 
			(struct sockaddr*)&(node->peer), addrsize);
  if (bytesWritten < 0) {
    if ((errno != EWOULDBLOCK)
	&& (errno != EINPROGRESS)) {
      return -1;
    }
  } else if (bytesWritten == 0) {
  } else {
    return bytesWritten;
  }
  return 0;
}

int recvfromUDP(descriptorNode *node, char *buf, int size, int *addr, int *port) {
  int bytesRead;
  int addrsize;
  if (!SEND(node, isReadable))
    return -1;
  node->peer.sin_addr.s_addr = htonl(*addr);
  node->peer.sin_port        = htons(*port);
  addrsize = sizeof(node->peer);
  bytesRead =  recvfrom(node->fd, buf, size, 0, 
			(struct sockaddr*)&(node->peer), &addrsize);
  return bytesRead;
}

int sendtoUDP(descriptorNode *node, char *buf, int size, int addr, int port) {
  int bytesWritten;
  int addrsize;
  if (!SEND(node, isWritable))
    return -1;
  node->peer.sin_addr.s_addr = htonl(addr);
  node->peer.sin_port        = htons(port);
  addrsize = sizeof(node->peer);
  bytesWritten = sendto(node->fd, buf, size, 0, 
			(struct sockaddr*)&(node->peer), addrsize);
  return bytesWritten;
}

char *stateStringUDP(descriptorNode *node) {
  char *stateStr;
  switch (node->state) {
    case UDPClosed:
      stateStr = "UDPClosed";
      break;
    case UDPOpen:
      stateStr = "UDPOpen";
      break;
    default:
      stateStr = "UDPInvalidState";
      break;
  }
  return stateStr;
}
/************* End UDP Commands Declarations *************/


/************* Begin UDP Handlers Declarations *************/
void handleUDPClosed(descriptorNode* node) {
  FPRINTF((stderr, "EVENT fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
void handleUDPOpenRd(descriptorNode* node) {
  FPRINTF((stderr, "EVENT read fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, READ_NOTIFY);
}
void handleUDPOpenWr(descriptorNode* node) {
  FPRINTF((stderr, "EVENT write fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, WRITE_NOTIFY);
}
void handleUDPOpenEx(descriptorNode* node) {
  FPRINTF((stderr, "EVENT exception fd: %d state: %s\n", node->fd, SEND(node, stateString)));
  SEND1(node, eventCallback, CONN_NOTIFY);
}
/************* End UDP Handlers Declarations *************/









/************* Begin Polling Declarations *************/
/* poll for io activity and call the appropriate handler(s) *
 *
 * Note: this can be called from ioProcessEvents with a zero timeout
 *       and from ioRelinquishProcessor with a non-zero timeout.
 *
 *	 "extraFd" is a file descriptor that is polled for reading but
 *	 never handled -- this allows a relinquished CPU to return
 *	 early if there is mouse or keyboard input activity.  Essential
 *	 for (e.g.) handling keyboard interrupts during i/o wait.
 */
void pollForIO(int microSeconds, int extraFd)
{
  descriptorNode *nextNode = &descriptors;
  int result, limit;
  int isRead, isWrite, isException;
  fd_set rset, wset, exset;
  struct timeval tv;

  computeMaxDescriptor();
  /* get out early if there is no pending i/o and no need to relinquish cpu */
  if ((maxDescriptor == 0) && (microSeconds == 0))
    return;

  FD_ZERO(&rset);
  FD_ZERO(&wset);
  FD_ZERO(&exset);

  tv.tv_sec =  microSeconds / 1000000;
  tv.tv_usec = microSeconds % 1000000;

  installEventDependencies( &rset, &wset, &exset);
  if (extraFd)
    FD_SET(extraFd, &rset);
  limit = (extraFd > maxDescriptor ? extraFd : maxDescriptor) + 1;

  do {
    result = select(limit, &rset, &wset, &exset, &tv);
  } while ((result < 0) && (errno == EINTR));

  if (result < 0 ) {
    perror("select");
    exit (1);
  }
  if (result > 0) {
    nextNode = &descriptors;
    while (nextNode->next != &descriptors) {
      if (SEND(nextNode->next, isValid)) {
        isRead = FD_ISSET(nextNode->next->fd, &rset);
        isWrite = FD_ISSET(nextNode->next->fd , &wset);
        isException = FD_ISSET(nextNode->next->fd, &exset);
        handleSelectEventForDescriptor(nextNode->next, isRead, 
				       isWrite, isException);
      }
      nextNode = nextNode->next;
    }
  }
}

int computeMaxDescriptor() {
  int newMax = 0;
  descriptorNode* nextNode = &descriptors;

  while (nextNode->next != &descriptors) {
    if(SEND(nextNode->next, isValid)) {
      if (newMax < nextNode->next->fd)
        newMax = nextNode->next->fd;
    }
    nextNode = nextNode->next;
  }
  maxDescriptor = newMax;
  return newMax;
}

void installEventDependencies( fd_set *rset, fd_set *wset, fd_set *exset) {
  descriptorNode *node, *next;
  protocolMachineNode *mach;

  next = &descriptors;
  while ((node = next->next) != &descriptors) {
    mach = getProtocolMachine(node->type);
    if(mach != 0)
      mach->installEventRegistrations(node, rset, wset, exset);
    next = node;
  }
}

void handleSelectEventForDescriptor(descriptorNode* node, 
				    int isRead, int isWrite, int isException) {
  protocolMachineNode *mach;
  eventHandler handler;
  int eventCode;
  if (isException)
    eventCode = 2;
  else if (isRead)
    eventCode = 0;
  else if (isWrite)
    eventCode = 1;
  else {
    return;
  }
  if (isException)
    node->pendingEvents ^= CHECK_OOB;
  else
    node->pendingEvents |= CHECK_OOB;
  if (isRead || isWrite || isException) {
    mach = getProtocolMachine(node->type);
    handler = getHandler(mach, node->state, eventCode);
    handler(node);
  }
}
/************* End Polling Declarations *************/




/************* Begin Naming Declarations *************/
static const char *addrToName(int netAddress)
{
  u_long nAddr;
  struct hostent *he;

  lastError= 0;			/* for the resolver */
  nAddr= htonl(netAddress);
  if ((he= gethostbyaddr((char *)&nAddr, sizeof(nAddr), AF_INET)))
    return he->h_name;
  lastError= h_errno;		/* ditto */
  return "";
}

static int nameToAddr(char *hostName)
{
  struct hostent *he;

  lastError= 0;			/* ditto */
  if ((he= gethostbyname(hostName)))
    return ntohl(*(long *)(he->h_addr_list[0]));
  lastError= h_errno;		/* and one more ditto */
  return 0;
}
/************* End Naming Declarations *************/





/************* Begin Descriptor Management Declarations *************/
descriptorNode* createEmptyDescriptor(int type, int fd, int bufferSize,
               int connSemIndex, int readSemIndex, int writeSemIndex) {
  descriptorNode *node;
  node = (descriptorNode*)calloc(1, sizeof(descriptorNode));
  memset(node, 0, sizeof(descriptorNode));
  node->type = type;
  node->fd = fd;
  node->pendingEvents = 0 | CONN_NOTIFY | CHECK_OOB;
  node->acceptedSocket = -1;
  node->next = 0;
  node->classStructure = getProtocolMachine(type);
  node->connSema = connSemIndex;
  node->readSema = readSemIndex;
  node->writeSema = writeSemIndex;
  node->peer.sin_family = AF_INET;
  node->peer.sin_addr.s_addr = INADDR_ANY;
  node->peer.sin_port = 0;
  node->fdFlags = fcntl(node->fd, F_GETFL, 0); 
  if (bufferSize > 0)
    node->readBuffer = createRingBuffer(bufferSize);
  return node;
}

void destroyDescriptor(descriptorNode* node) {
  deregisterDescriptor(node);
  node->type = -1;
  node->fd = -1;
  if (node->readBuffer)
    destroyRingBuffer(node->readBuffer);
  free(node);
}

void registerDescriptor(descriptorNode* fdNode) {
  descriptorNode *tempNode;
  tempNode = getDescriptor(fdNode->fd);
  if (tempNode != 0)
    return;

  tempNode = descriptors.next;
  descriptors.next = fdNode;
  fdNode->next = tempNode;
}

void deregisterDescriptor(descriptorNode* fdNode) {
  descriptorNode *tempNode, *removedNode;
  tempNode = getDescriptor(fdNode->fd);
  if (tempNode == 0)
    return;

  tempNode = &descriptors;
  while (tempNode->next != &descriptors) {
    if (tempNode->next->fd == fdNode->fd) {
      removedNode = tempNode->next;
      tempNode->next = removedNode->next;
      removedNode->next = 0;
      return;
    }
    tempNode = tempNode->next;
  }
}

descriptorNode* getDescriptor(int fd) {
  descriptorNode* nextNode = &descriptors;

  while (nextNode->next != &descriptors) {
    if (nextNode->next->fd == fd)
      return nextNode->next;
    nextNode = nextNode->next;
  }
  return 0;
}
/************* End Descriptor Management Declarations *************/



/************* Begin Protocol Machine Management Declarations *************/
protocolMachineNode* createProtocolMachine(int type, int slices) {
  protocolMachineNode* node;

  node = (protocolMachineNode*)calloc(1, sizeof(protocolMachineNode));
  node->descriptorType = type;
  node->next = 0;
  node->numberOfSlices = slices;
  node->slices = calloc(slices, sizeof(handlerSlice));
  return node;
}

void destroyProtocolMachine(protocolMachineNode* machine) {
  machine->next = 0;
  machine->descriptorType = -1;
  machine->numberOfSlices = 0;
  if (machine->slices != NULL)
    free(machine->slices);
  free(machine);
}

void registerProtocolMachine(protocolMachineNode* machine) {
  protocolMachineNode *tempMachine;
  tempMachine = getProtocolMachine(machine->descriptorType);
  if (tempMachine != 0)
    return;

  tempMachine = protocolMachines.next;
  protocolMachines.next = machine;
  machine->next = tempMachine;
}

void deregisterProtocolMachine(protocolMachineNode* machine) {
  protocolMachineNode *tempMachine, *removedMachine;
  tempMachine = getProtocolMachine(machine->descriptorType);
  if (tempMachine == 0)
    return;

  tempMachine = &protocolMachines;
  while (tempMachine->next != &protocolMachines) {
    if (tempMachine->next->descriptorType == machine->descriptorType) {
      removedMachine = tempMachine->next;
      tempMachine->next = removedMachine->next;
      removedMachine->next = 0;
      return;
    }
    tempMachine = tempMachine->next;
  }
}

protocolMachineNode* getProtocolMachine(int type) {
  protocolMachineNode *tempMachine = &protocolMachines;

  while (tempMachine->next != &protocolMachines) {
    if (tempMachine->next->descriptorType == type)
      return tempMachine->next;
    tempMachine = tempMachine->next;
  }
  return 0;
}
/************* End Protocol Machine Management Declarations *************/



/************* Begin Handler Management Declarations *************/
eventHandler getHandler(protocolMachineNode* mach, int state, int event) {
  return mach->slices[state][event];
}

void setHandler(protocolMachineNode* mach, int state, int event, eventHandler function) {
  mach->slices[state][event] = function;
}
/************* End Handler Management Declarations *************/




/************* Begin Debug Declarations *************/
void printDescriptorTable() {
  descriptorNode* nextNode;

  fprintf(stdout, "Printing Descriptors...\n");
  nextNode = &descriptors;
  while (nextNode->next != &descriptors) {
    printDescriptorNode( nextNode->next);
    nextNode = nextNode->next;
  }
}

char *squeakStateString(descriptorNode *node) {
  char *stateStr;
  switch (node->squeakState) {
    case Unconnected:
      stateStr = "Unconnected";
      break;
    case WaitingForConnection:
      stateStr = "WaitingForConnection";
      break;
    case Connected:
      stateStr = "Connected";
      break;
    case OtherEndClosed:
      stateStr = "OtherEndClosed";
      break;
    case ThisEndClosed:
      stateStr = "ThisEndClosed";
      break;
    default:
      stateStr = "Invalid";
      break;
  }
  return stateStr;
}

void printDescriptorNode( descriptorNode* node) {
  if (!SEND(node, isValid)) 
    fprintf(stderr, "  descriptor(***invalid***)\n");
  else {
    fprintf(stderr, "  socket descriptor(fd: %d    squeakState: %s   internalState: %s)\n",
            node->fd, squeakStateString(node), 
	    SEND(node, stateString));
  }
}

void printProtocolMachines() {
  protocolMachineNode* nextNode;

  fprintf(stdout, "Printing Protocol Machines...\n");
  nextNode = &protocolMachines;
  while (nextNode->next != &protocolMachines) {
    printProtocolMachineNode( nextNode->next);
    nextNode = nextNode->next;
  }
}

void printProtocolMachineNode( protocolMachineNode* pmNode) {
  fprintf(stderr, "  protocol machine(type:%d  #slices:%d)\n",
          pmNode->descriptorType, pmNode->numberOfSlices);
}
/************* End Debug Declarations *************/




/************* Begin RingBuffer Declarations *************/
ringBuffer* createRingBuffer(int size) {
  ringBuffer *theBuffer;

  theBuffer = (ringBuffer*)calloc(1, sizeof(ringBuffer));
  theBuffer->buf = (char*) calloc(size, sizeof(char));
  theBuffer->byteCount = 0;
  theBuffer->ringSize = size;
  theBuffer->readPtr = theBuffer->buf;
  theBuffer->writePtr = theBuffer->buf;
  theBuffer->endPtr = theBuffer->buf + size - 1;
  return theBuffer;
}
void destroyRingBuffer(ringBuffer *theBuffer) {
  free(theBuffer->buf);
  free(theBuffer);
}
int readFromRingBuffer(ringBuffer *rb, char *lb, int size) {
  int wrapped;
  int firstChunkSize;
  int remainingSize;

  if (rb->byteCount <= 0)
    return 0;
  wrapped = rb->writePtr <= rb->readPtr;
  if (!wrapped) {
    firstChunkSize = min( rb->writePtr - rb->readPtr, size);
    memcpy(lb, rb->readPtr, firstChunkSize);
    rb->byteCount -= firstChunkSize;
    rb->readPtr += firstChunkSize;
    return firstChunkSize;
  }

  firstChunkSize = min(rb->endPtr - rb->readPtr + 1, size);
  memcpy(lb, rb->readPtr, firstChunkSize);
  rb->byteCount -= firstChunkSize;
  if (rb->readPtr + firstChunkSize <= rb->endPtr) {
    rb->readPtr += firstChunkSize;
    return firstChunkSize;
  }

  rb->readPtr = rb->buf;

  if (firstChunkSize >= size)
    return size;

  remainingSize = size - firstChunkSize;
  return firstChunkSize + 
    readFromRingBuffer(rb, lb+firstChunkSize, remainingSize);
}
int writeToRingBuffer(ringBuffer *rb, char *lb, int size) {
  int spaceAvailable;
  int wrapped;
  int firstChunkSize;
  int remainingSize;

  spaceAvailable = rb->ringSize - rb->byteCount;
  if (spaceAvailable <= 0)
    return 0;
  wrapped = rb->writePtr < rb->readPtr;
  if (wrapped) {
    firstChunkSize = min(spaceAvailable, size);
    memcpy(rb->writePtr, lb, firstChunkSize);
    rb->byteCount += firstChunkSize;
    rb->writePtr += firstChunkSize;
    return firstChunkSize;
  }

  firstChunkSize = min(rb->endPtr - rb->writePtr + 1, size);
  memcpy(rb->writePtr, lb, firstChunkSize);
  if (rb->writePtr + firstChunkSize > rb->endPtr)
    rb->writePtr = rb->buf;
  else
    rb->writePtr += firstChunkSize;
  rb->byteCount += firstChunkSize;

  if (firstChunkSize >= size)
    return size;

  remainingSize = size - firstChunkSize;
  return firstChunkSize + 
    writeToRingBuffer(rb, lb+firstChunkSize, remainingSize);
}
int bytesToWriteInRingBuffer(ringBuffer *rb) {
  return rb->ringSize - rb->byteCount;
}
int bytesToReadInRingBuffer(ringBuffer *rb) {
  return rb->byteCount;
}
/************* End RingBuffer Declarations *************/





















More information about the Squeak-dev mailing list