Received: from ims.mariposa.ca.us (root@ims.mariposa.ca.us [205.139.78.193]) by math.psu.edu (8.7.5/8.7.3) with SMTP id XAA27333 for <barr@math.psu.edu>; Fri, 24 May 1996 23:30:05 -0400 (EDT)
Received: from mgb.ims.mariposa.ca.us (mgb.ims.mariposa.ca.us [205.139.78.195]) by ims.mariposa.ca.us (8.6.11/8.6.11) with SMTP id UAA16563 for <barr@math.psu.edu>; Fri, 24 May 1996 20:30:09 -0700
Date: Fri, 24 May 1996 20:30:09 -0700
Message-Id: <199605250330.UAA16563@ims.mariposa.ca.us>
X-Sender: mgb@ims.mariposa.ca.us
X-Mailer: Windows Eudora Light Version 1.5.2
Mime-Version: 1.0
Content-Type: text/plain; charset="us-ascii"
To: Dave Barr <barr@math.psu.edu>
From: Mike Bird <mgb@ims.mariposa.ca.us>
Subject: Re: innxmit dumps core on exit

Dave,

The patch following this message is a combination of the 03/15 version of
the robust/fast streaming patch together with the 05/20 bug fix.  It is
relative to unoff4 which already includes Jerry's enhanced streaming patch.
I'll be sending the patch directly from root@udev.ims.mariposa.ca.us to
avoid line-wrap problems in Eudora.

It does not include the higher priority for handing off readers in the
control loop.  I'm hoping someone will post that soon.  If not I may try my
hand at it.

--Mike
==================================================
Here is the 03/15 inn1.4unoff4 version of the innxmit
robust/fast streaming patch as amended 05/20 to fix a
benign bug which could result in occasional spurious
"stalloc: internal error" messages.

This patch works around problems with some TCP implementations
(e.g. Linux 1.2.X) which cause innxmit to hang when streaming.
Even if the TCP stream freezes solid the -T timeout works.

It is also speeds up Jerry Aguirre's protocol somewhat by
increasing I/O overlap on the NNTP stream.

A small fix to prevent memory corruption with quoted printable
is included.

NOTE: Needs to be able to fopen STNBUF articles instead of
STNBUF/2 so you may need to reduce STNBUF.  (This is not a
problem for Linux 1.2.X with STNBUF=32.)

NOTE: Consider running this with a pre-sorting NNTPSEND
for maximum performance.  >8) 

--Mike
----------------------------------------------------------
 Mike Bird          Tel: 209-742-5000   FAX: 209-966-3177
 President          POP: 209-742-5333   NOC: 209-742-9979 
 Iron Mtn Systems      http://www.ims.mariposa.ca.us/

Received: from udev.ims.mariposa.ca.us (root@udev.ims.mariposa.ca.us [205.139.78.194]) by math.psu.edu (8.7.5/8.7.3) with SMTP id XAA27344 for <barr@math.psu.edu>; Fri, 24 May 1996 23:30:21 -0400 (EDT)
Received: (from root@localhost) by udev.ims.mariposa.ca.us (8.6.11/8.6.9) id UAA02926; Fri, 24 May 1996 20:24:43 -0700
Date: Fri, 24 May 1996 20:24:43 -0700
From: root <root@udev.ims.mariposa.ca.us>
Message-Id: <199605250324.UAA02926@udev.ims.mariposa.ca.us>
To: barr@math.psu.edu
Subject: Robust/Fast Streaming Patch
Cc: mgb@yosemite.net

--- innxmit.c.SAFE	Fri May 24 20:05:56 1996
+++ innxmit.c	Fri May 24 20:06:07 1996
@@ -2,6 +2,7 @@
 **
 **  Transmit articles to remote site.
 **  Modified for NNTP streaming: 3Jan96 Jerry Aguirre
+**  Modified for "non-blocking" NNTP streaming: 8Feb96 Mike Bird
 */
 #include "configdata.h"
 #include <stdio.h>
@@ -46,6 +47,7 @@
 */
 
 /* max number of articles that can be streamed ahead */
+/* must be able to fopen this many articles */
 #define STNBUF 32
 
 /* Send "takethis" without "check" if this many articles were
@@ -53,13 +55,11 @@
 */
 #define STNC 16
 
-/* typical number of articles to stream  */
-/* must be able to fopen this many articles */
-#define STNBUFL (STNBUF/2)
-
 /* number of retries before requeueing to disk */
 #define STNRETRY 5
 
+enum stbufmode { Empty=0, Idle, In_Flight, Do_Check, Do_Takethis };
+
 struct stbufs {		/* for each article we are procesing */
 	char *st_fname;		/* file name */
 	char *st_id;		/* message ID */
@@ -67,9 +67,11 @@
 	int   st_age;		/* age count */
 	QIOSTATE *st_qp;	/* IO to read article contents */
 	int   st_hash;		/* hash value to speed searches */
+	enum stbufmode st_mode; /* mode (see above) */
 };
 static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */
 static int stnq;	/* current number of active entries in stbuf */
+static int stnflight;   /* current number of entries in-flight */
 static long stnofail;	/* Count of consecutive successful sends */
 
 static int TryStream = TRUE;	/* Should attempt stream negotation? */
@@ -214,12 +216,16 @@
 STATIC BOOL
 REMflush()
 {
-    int		i;
+    STATIC BOOL stream_write(char *p, int i);
+    BOOL	b;
 
     if (REMbuffptr == REMbuffer) return TRUE; /* nothing buffered */
-    i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
+    if (CanStream)
+      b = stream_write(REMbuffer, (int)(REMbuffptr - REMbuffer));
+    else
+      b = (xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer)) >= 0);
     REMbuffptr = REMbuffer;
-    return i < 0 ? FALSE : TRUE;
+    return b;
 }
 
 /*
@@ -235,7 +241,7 @@
     register int i;
 
     for (i = 0; i < STNBUF; i++) { /* linear search for ID */
-	if ((stbuf[i].st_id) && (stbuf[i].st_id[0])
+	if (stbuf[i].st_mode != Empty
 	 && (stbuf[i].st_hash == hash)) {
 	    register int n;
 
@@ -282,7 +288,7 @@
     register int i;
 
     for (i = 0; i < STNBUF; i++) {
-	if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break;
+	if (stbuf[i].st_mode == Empty) break;
     }
     if (i >= STNBUF) { /* stnq says not full but can not find unused */
 	syslog(L_ERROR, "stalloc: Internal error");
@@ -292,18 +298,13 @@
 	syslog(L_ERROR, "stalloc: filename longer than %d", SPOOLNAMEBUFF);
 	return (-1);
     }
-    /* allocate buffers on first use.
-    ** If filename ever is longer than SPOOLNAMEBUFF then code will abort.
-    ** If ID is ever longer than NNTP_STRLEN then other code would break.
-    */
-    if (!stbuf[i].st_fname) stbuf[i].st_fname = NEW(char, SPOOLNAMEBUFF);
-    if (!stbuf[i].st_id) stbuf[i].st_id = NEW(char, NNTP_STRLEN);
     (void)strcpy(stbuf[i].st_fname, Article);
     (void)strcpy(stbuf[i].st_id, MessageID);
     stbuf[i].st_qp = qp;
     stbuf[i].st_hash = hash;
     stbuf[i].st_retry = 0;
     stbuf[i].st_age = 0;
+    stbuf[i].st_mode = Idle;
     stnq++;
     return i;
 }
@@ -317,8 +318,7 @@
 	    QIOclose(stbuf[i].st_qp);
 	    stbuf[i].st_qp = 0;
 	}
-	if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0';
-	if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0';
+	stbuf[i].st_mode = Empty;
 	stnq--;
 }
 
@@ -391,12 +391,12 @@
     register int	prev;
 
     /* Buffer too full? */
-    if (REMbuffend - REMbuffptr < i + 3) {
+    if (REMbuffend - REMbuffptr < i*3 + 3) {
 	if (!REMflush())
 	    return FALSE;
-	if (REMbuffend - REMbuffer < i + 3) {
+	if (REMbuffend - REMbuffer < i*3 + 3) {
 	    /* Line too long -- grow buffer. */
-	    size = i * 2;
+	    size = i*3 * 2;
 	    RENEW(REMbuffer, char, size);
 	    REMbuffend = &REMbuffer[size];
 	}
@@ -449,6 +449,9 @@
     double		usertime;
     double		systime;
 
+    STATIC BOOL stream_term();
+    stream_term();
+
     if (!Purging) {
 	(void)REMwrite(QUIT, STRLEN(QUIT), FALSE);
 	(void)REMflush();
@@ -566,10 +569,11 @@
 	register int i;
 
 	for (i = 0; i < STNBUF; i++) {    /* requeue unacknowledged articles */
-	    if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
+	    if (stbuf[i].st_mode != Empty) {
 		if (Debug)
-		    (void)fprintf(stderr, "stbuf[%d]= %s, %s\n",
-			    i, stbuf[i].st_fname, stbuf[i].st_id);
+		    (void)fprintf(stderr, "stbuf[%d](%c)= %s, %s\n",
+			    i, "EIFCT"[stbuf[i].st_mode],
+			    stbuf[i].st_fname, stbuf[i].st_id);
 		Requeue(stbuf[i].st_fname, stbuf[i].st_id);
 		if (Article == stbuf[i].st_fname) Article = NULL;
 		strel(i); /* release entry */
@@ -955,6 +959,8 @@
 	return TRUE;
     }
     STAToffered++;
+    stbuf[i].st_mode = In_Flight;
+    stnflight++;
     if (Debug) {
 	if (stbuf[i].st_retry)
 	    (void)fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry);
@@ -1005,6 +1011,8 @@
     if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id,
 	    stbuf[i].st_qp))
 	return TRUE;
+    stbuf[i].st_mode = In_Flight;
+    stnflight++;
     QIOclose(stbuf[i].st_qp);	/* should not need file again */
     stbuf[i].st_qp = 0;		/* so close to free descriptor */
     stbuf[i].st_age = 0;
@@ -1013,33 +1021,28 @@
 }
 
 
-/* listen for responses.  Process acknowledgments to remove items from
-** the queue.  Also sends the articles on request.  Returns TRUE on error.
+/* Process acknowledgments to remove items from
+** the queue.  Also sends the articles on request. 
 ** return TRUE on failure.
 */
 STATIC BOOL
-strlisten()
+strlisten(buff, writing_p)
+     char *buff;
+     BOOL writing_p;
 {
     int		resp;
-    int		i;
+    int		i, j;
     char	*id, *p;
-    char	buff[NNTP_STRLEN];
     int		hash;
 
-    while(TRUE) {
-	if (!REMread(buff, (int)sizeof buff)) {
-	    (void)fprintf(stderr, "No reply to check, %s\n", strerror(errno));
-	    return TRUE;
-	}
-	if (GotInterrupt)
-	    Interrupted((char *)0, (char *)0);
+    {
 	if (Debug)
 	    (void)fprintf(stderr, "< %s", buff);
 
 	/* Parse the reply. */
 	resp =  atoi(buff);
 	/* Skip the 1XX informational messages */
-	if ((resp >= 100) && (resp < 200)) continue;
+	if ((resp >= 100) && (resp < 200)) return FALSE;
 	switch (resp) { /* first time is to verify it */
 	case NNTP_ERR_GOTID_VAL:
 	case NNTP_OK_SENDID_VAL:
@@ -1055,6 +1058,8 @@
 		    syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff));
 		    return (TRUE); /* can't find it! */
 		}
+		stbuf[i].st_mode = Idle;
+		--stnflight;
 	    } else {
 		syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff));
 		return (TRUE);
@@ -1076,7 +1081,10 @@
 	case NNTP_RESENDID_VAL:	/* remote wants it later */
 	    /* try again now because time has passed */
 	    if (stbuf[i].st_retry < STNRETRY) {
-		if (check(i)) return TRUE;
+	        if (writing_p)
+		  stbuf[i].st_mode = Do_Check;
+		else if (check(i))
+		  return TRUE;
 		stbuf[i].st_retry++;
 		stbuf[i].st_age = 0;
 	    } else { /* requeue to disk for later */
@@ -1091,7 +1099,10 @@
 	    break;
 		
 	case NNTP_OK_SENDID_VAL:	/* remote wants article */
-	    if (takethis(i)) return TRUE;
+	    if (writing_p)
+	      stbuf[i].st_mode = Do_Takethis;
+	    else if (takethis(i))
+	      return TRUE;
 	    stnofail++;
 	    break;
 
@@ -1106,8 +1117,9 @@
 	    stnofail = 0;
 	    break;
 	}
-	break;
     }
+    if (Debug)
+      (void)fprintf(stderr, " ... OK\n");
     return (FALSE);
 }
 
@@ -1147,6 +1159,9 @@
     unsigned int	TotalTimeout;
     extern char		*nntp_port;
 
+    STATIC BOOL stream_init();
+    STATIC BOOL stream_process(BOOL writing_p);
+
     /* Set defaults. */
     ConnectTimeout = 0;
     TotalTimeout = 0;
@@ -1338,11 +1353,18 @@
 		int i;
 
 		for (i = 0; i < STNBUF; i++) { /* reset buffers */
-		    stbuf[i].st_fname = 0;
-		    stbuf[i].st_id = 0;
+		    /*
+		    ** If filename ever is longer than SPOOLNAMEBUFF then code will abort.
+		    ** If ID is ever longer than NNTP_STRLEN then other code would break.
+		    */
+		    stbuf[i].st_fname = NEW(char, SPOOLNAMEBUFF);
+		    stbuf[i].st_id = NEW(char, NNTP_STRLEN);
 		    stbuf[i].st_qp = 0;
+		    stbuf[i].st_mode = Empty;
 		}
 		stnq = 0;
+		stnflight = 0;
+		stream_init();
 	    }
 	}
     }
@@ -1505,16 +1527,19 @@
 		QIOclose(qp);
 		continue;
 	    }
-	    /* This code tries to optimize by sending a burst of "check"
-	     * commands before flushing the buffer.  This should result
-	     * in several being sent in one packet reducing the network
-	     * overhead.
-	     */
-	    if (DoCheck && (stnofail < STNC)) lim = STNBUF;
-	    else                              lim = STNBUFL;
-	    if (stnq >= lim) { /* need to empty a buffer */
-		while (stnq >= STNBUFL) { /* or several */
-		    if (strlisten()) {
+	    /* 18May96 mgb@yosemite.net fixed bug identified by
+	       fubar@yoda.fdt.net which caused stalloc:Internal error
+	       if alarm went off while all buffers were in use */
+	    while (stnq >= STNBUF) { /* need to empty a buffer */
+	        {
+		    if (GotAlarm) {
+		        (void)fprintf(stderr, "Timed out\n");
+		        /* Don't resend the current article. */
+		        RequeueRestAndExit((char *)NULL, (char *)NULL);
+		    }
+		    if (GotInterrupt)
+		        Interrupted(Article, MessageID);
+		    if (!stream_process(FALSE)) {
 			RequeueRestAndExit(Article, MessageID);
 		    }
 		}
@@ -1534,24 +1559,6 @@
 		    RequeueRestAndExit((char *)NULL, (char *)NULL);
 		}
 	    }
-	    /* check for need to resend any IDs */
-	    for (i = 0; i < STNBUF; i++) {
-		if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
-		    if (stbuf[i].st_age++ > stnq) {
-			/* This should not happen but just in case ... */
-			if (stbuf[i].st_retry < STNRETRY) {
-			    if (check(i)) /* resend check */
-				RequeueRestAndExit((char *)NULL, (char *)NULL);
-			    retries++;
-			    stbuf[i].st_retry++;
-			    stbuf[i].st_age = 0;
-			} else { /* requeue to disk for later */
-			    Requeue(stbuf[i].st_fname, stbuf[i].st_id);
-			    strel(i); /* release entry */
-			}
-		    }
-		}
-	    }
 	    continue; /* next article */
 	}
 	(void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID);
@@ -1610,8 +1617,8 @@
 	QIOclose(qp);
     }
     if (CanStream) { /* need to wait for rest of ACKs */
-	while (stnq > 0) {
-	    if (strlisten()) {
+	while (stnq > 0 && !GotInterrupt && !GotAlarm) {
+	    if (!stream_process(FALSE)) {
 		RequeueRestAndExit((char *)NULL, (char *)NULL);
 	    }
 	}
@@ -1625,4 +1632,174 @@
 		BATCHtemp, strerror(errno));
     ExitWithStats(0);
     /* NOTREACHED */
+}
+
+/*
+ * MGB: Here is a "non-blocking" I/O interface for streaming
+ *      Beware that it can't write from within a write
+ */
+
+STATIC char stream_command_buf[BUFSIZ];
+
+STATIC char stream_read_buf[BUFSIZ];
+STATIC char *stream_read_ptr;
+STATIC int stream_read_left;
+
+STATIC char *stream_write_ptr;
+STATIC int stream_write_left;
+
+STATIC BOOL
+stream_init()
+{
+  if (SetNonBlocking(FromServer) < 0 || SetNonBlocking(ToServer) < 0)
+    return FALSE;
+  stream_read_ptr = stream_read_buf;
+  stream_read_left = BUFSIZ-2;
+  stream_write_left = 0;
+}
+
+STATIC void
+stream_term()
+{}
+
+STATIC BOOL
+stream_respond()
+{
+  BOOL busy_p;
+  do {
+    int i;
+    busy_p = FALSE;
+    for (i = 0; i < STNBUF; ++i) {
+      if (GotInterrupt || GotAlarm)
+	break;
+      switch (stbuf[i].st_mode) {
+      case Do_Check:
+	if (check(i))
+	  return FALSE;
+	busy_p = TRUE;
+	break;
+      case Do_Takethis:
+	if (takethis(i))
+	  return FALSE;
+	busy_p = TRUE;
+	break;
+      default: break;
+      }
+    }
+  } while (busy_p);
+  return TRUE;
+}
+
+STATIC BOOL
+stream_process(writing_p)
+     BOOL writing_p;
+{
+  int max_fd, i;
+  fd_set inp_set, out_set, exc_set;
+  struct timeval timeout;
+  if (!writing_p && (!stream_respond() || !REMflush()))
+    return FALSE;
+  max_fd = FromServer;
+  if (ToServer > max_fd)
+    max_fd = ToServer;
+  FD_ZERO(&inp_set);
+  FD_ZERO(&out_set);
+  FD_ZERO(&exc_set);
+  FD_SET(FromServer, &exc_set);
+  FD_SET(FromServer, &inp_set);
+  FD_SET(ToServer, &exc_set);
+  if (stream_write_left)
+    FD_SET(ToServer, &out_set);
+  timeout.tv_sec = 10;
+  timeout.tv_usec = 0;
+  if (Debug)
+    fprintf(stderr, "stream_process::select R%c, queued=%d, buffered=%dB, in-flight=%d\n",
+	    writing_p ? 'W' : 'O', stnq-stnflight, stream_write_left, stnflight);
+  if ((i = select(max_fd+1, &inp_set, &out_set, &exc_set, &timeout)) < 0)
+    return FALSE;
+
+  /* INTERRUPT OR TIMEOUT */
+      
+  if (i == 0 || GotInterrupt || GotAlarm)
+    return TRUE;
+    
+  /* PROBLEM CONNECTION? */
+      
+  if (FD_ISSET(FromServer, &exc_set) || FD_ISSET(ToServer, &exc_set))
+    return FALSE;
+    
+  /* INPUT AVAILABLE? */
+      
+  if (FD_ISSET(FromServer, &inp_set)) {
+    int i, m, n;
+    char *p, *q;
+    if (stream_read_left == 0) {
+      /* Overlong response - continually overwrite last character */
+      stream_read_ptr--;
+      stream_read_left = 1;
+    }
+    i = read(FromServer, stream_read_ptr, stream_read_left);
+    if (i < 0 && i != EAGAIN)
+      return FALSE;
+    if (i > 0) {
+      stream_read_ptr += i;
+      stream_read_left -= i;
+      *stream_read_ptr = '\0';
+      if (Debug)
+	fprintf(stderr, "read (%d) %s\n", i, "" /*stream_read_ptr-i*/);
+      while ((p = strchr(stream_read_buf, '\n')) != NULL) {
+	if (p > stream_read_buf && p[-1] == '\r')
+	  p[-1] = '\0';
+	*p = '\0';
+	m = (p+1)-stream_read_buf;
+	/* Handle the dot escape. */
+	if (stream_read_buf[0] == '.') {
+	  if (stream_read_buf[1] == '\0')
+	    /* EOF. */
+	    return FALSE;
+	  memmove(stream_command_buf, stream_read_buf+1, m-1);
+	} else
+	  memmove(stream_command_buf, stream_read_buf, m);
+	m = (p+1)-stream_read_buf;
+	n = BUFSIZ-m;
+	memmove(stream_read_buf, p+1, n);
+	stream_read_ptr -= m;
+	stream_read_left += m;
+	if (strlisten(stream_command_buf, writing_p))
+	  return FALSE;
+      }
+    }
+  }
+  
+    
+  /* OUTPUT POSSIBLE? */
+      
+  if (FD_ISSET(ToServer, &out_set) && stream_write_left) {
+    int i = write(ToServer, stream_write_ptr, stream_write_left);
+    if (i < 0 && errno != EAGAIN)
+      return FALSE;
+    if (i > 0) {
+      if (Debug)
+	fprintf(stderr, "wrote %d\n", i);
+      stream_write_ptr += i;
+      stream_write_left -= i;
+    }
+  }
+
+  return TRUE;
+}
+
+STATIC BOOL
+stream_write(p, i)
+  char	*p;
+  int	i;
+{
+  if (Debug)
+    fprintf(stderr, "stream_write (%d) %s\n", i, "" /*p*/);
+  stream_write_ptr = p;
+  stream_write_left = i;
+  while (stream_write_left && !GotInterrupt && !GotAlarm)
+    if (!stream_process(TRUE))
+      return FALSE;
+  return TRUE;
 }
