vx32

Local 9vx git repository for patches.
git clone git://r-36.net/vx32
Log | Files | Refs

rudp.c (20818B)


      1 /*
      2  *  Reliable User Datagram Protocol, currently only for IPv4.
      3  *  This protocol is compatible with UDP's packet format.
      4  *  It could be done over UDP if need be.
      5  */
      6 #include	"u.h"
      7 #include	"lib.h"
      8 #include	"mem.h"
      9 #include	"dat.h"
     10 #include	"fns.h"
     11 #include	"error.h"
     12 
     13 #include	"ip.h"
     14 
     15 #define DEBUG	0
     16 #define DPRINT if(DEBUG)print
     17 
     18 #define SEQDIFF(a,b) ( (a)>=(b)?\
     19 			(a)-(b):\
     20 			0xffffffffUL-((b)-(a)) )
     21 #define INSEQ(a,start,end) ( (start)<=(end)?\
     22 				((a)>(start)&&(a)<=(end)):\
     23 				((a)>(start)||(a)<=(end)) )
     24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
     25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
     26 
     27 enum
     28 {
     29 	UDP_PHDRSIZE	= 12,	/* pseudo header */
     30 //	UDP_HDRSIZE	= 20,	/* pseudo header + udp header */
     31 	UDP_RHDRSIZE	= 36,	/* pseudo header + udp header + rudp header */
     32 	UDP_IPHDR	= 8,	/* ip header */
     33 	IP_UDPPROTO	= 254,
     34 	UDP_USEAD7	= 52,	/* size of new ipv6 headers struct */
     35 
     36 	Rudprxms	= 200,
     37 	Rudptickms	= 50,
     38 	Rudpmaxxmit	= 10,
     39 	Maxunacked	= 100,
     40 };
     41 
     42 #define Hangupgen	0xffffffff	/* used only in hangup messages */
     43 
     44 typedef struct Udphdr Udphdr;
     45 struct Udphdr
     46 {
     47 	/* ip header */
     48 	uchar	vihl;		/* Version and header length */
     49 	uchar	tos;		/* Type of service */
     50 	uchar	length[2];	/* packet length */
     51 	uchar	id[2];		/* Identification */
     52 	uchar	frag[2];	/* Fragment information */
     53 
     54 	/* pseudo header starts here */
     55 	uchar	Unused;
     56 	uchar	udpproto;	/* Protocol */
     57 	uchar	udpplen[2];	/* Header plus data length */
     58 	uchar	udpsrc[4];	/* Ip source */
     59 	uchar	udpdst[4];	/* Ip destination */
     60 
     61 	/* udp header */
     62 	uchar	udpsport[2];	/* Source port */
     63 	uchar	udpdport[2];	/* Destination port */
     64 	uchar	udplen[2];	/* data length */
     65 	uchar	udpcksum[2];	/* Checksum */
     66 };
     67 
     68 typedef struct Rudphdr Rudphdr;
     69 struct Rudphdr
     70 {
     71 	/* ip header */
     72 	uchar	vihl;		/* Version and header length */
     73 	uchar	tos;		/* Type of service */
     74 	uchar	length[2];	/* packet length */
     75 	uchar	id[2];		/* Identification */
     76 	uchar	frag[2];	/* Fragment information */
     77 
     78 	/* pseudo header starts here */
     79 	uchar	Unused;
     80 	uchar	udpproto;	/* Protocol */
     81 	uchar	udpplen[2];	/* Header plus data length */
     82 	uchar	udpsrc[4];	/* Ip source */
     83 	uchar	udpdst[4];	/* Ip destination */
     84 
     85 	/* udp header */
     86 	uchar	udpsport[2];	/* Source port */
     87 	uchar	udpdport[2];	/* Destination port */
     88 	uchar	udplen[2];	/* data length (includes rudp header) */
     89 	uchar	udpcksum[2];	/* Checksum */
     90 
     91 	/* rudp header */
     92 	uchar	relseq[4];	/* id of this packet (or 0) */
     93 	uchar	relsgen[4];	/* generation/time stamp */
     94 	uchar	relack[4];	/* packet being acked (or 0) */
     95 	uchar	relagen[4];	/* generation/time stamp */
     96 };
     97 
     98 
     99 /*
    100  *  one state structure per destination
    101  */
    102 typedef struct Reliable Reliable;
    103 struct Reliable
    104 {
    105 	Ref;
    106 
    107 	Reliable *next;
    108 
    109 	uchar	addr[IPaddrlen];	/* always V6 when put here */
    110 	ushort	port;
    111 
    112 	Block	*unacked;	/* unacked msg list */
    113 	Block	*unackedtail;	/*  and its tail */
    114 
    115 	int	timeout;	/* time since first unacked msg sent */
    116 	int	xmits;		/* number of times first unacked msg sent */
    117 
    118 	ulong	sndseq;		/* next packet to be sent */
    119 	ulong	sndgen;		/*  and its generation */
    120 
    121 	ulong	rcvseq;		/* last packet received */
    122 	ulong	rcvgen;		/*  and its generation */
    123 
    124 	ulong	acksent;	/* last ack sent */
    125 	ulong	ackrcvd;	/* last msg for which ack was rcvd */
    126 
    127 	/* flow control */
    128 	QLock	lock;
    129 	Rendez	vous;
    130 	int	blocked;
    131 };
    132 
    133 
    134 
    135 /* MIB II counters */
    136 typedef struct Rudpstats Rudpstats;
    137 struct Rudpstats
    138 {
    139 	ulong	rudpInDatagrams;
    140 	ulong	rudpNoPorts;
    141 	ulong	rudpInErrors;
    142 	ulong	rudpOutDatagrams;
    143 };
    144 
    145 typedef struct Rudppriv Rudppriv;
    146 struct Rudppriv
    147 {
    148 	Ipht	ht;
    149 
    150 	/* MIB counters */
    151 	Rudpstats	ustats;
    152 
    153 	/* non-MIB stats */
    154 	ulong	csumerr;		/* checksum errors */
    155 	ulong	lenerr;			/* short packet */
    156 	ulong	rxmits;			/* # of retransmissions */
    157 	ulong	orders;			/* # of out of order pkts */
    158 
    159 	/* keeping track of the ack kproc */
    160 	int	ackprocstarted;
    161 	QLock	apl;
    162 };
    163 
    164 
    165 static ulong generation = 0;
    166 static Rendez rend;
    167 
    168 /*
    169  *  protocol specific part of Conv
    170  */
    171 typedef struct Rudpcb Rudpcb;
    172 struct Rudpcb
    173 {
    174 	QLock;
    175 	uchar	headers;
    176 	uchar	randdrop;
    177 	Reliable *r;
    178 };
    179 
    180 /*
    181  * local functions 
    182  */
    183 void	relsendack(Conv*, Reliable*, int);
    184 int	reliput(Conv*, Block*, uchar*, ushort);
    185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
    186 void	relput(Reliable*);
    187 void	relforget(Conv *, uchar*, int, int);
    188 void	relackproc(void *);
    189 void	relackq(Reliable *, Block*);
    190 void	relhangup(Conv *, Reliable*);
    191 void	relrexmit(Conv *, Reliable*);
    192 void	relput(Reliable*);
    193 void	rudpkick(void *x);
    194 
    195 static void
    196 rudpstartackproc(Proto *rudp)
    197 {
    198 	Rudppriv *rpriv;
    199 	char kpname[KNAMELEN];
    200 
    201 	rpriv = rudp->priv;
    202 	if(rpriv->ackprocstarted == 0){
    203 		qlock(&rpriv->apl);
    204 		if(rpriv->ackprocstarted == 0){
    205 			sprint(kpname, "#I%drudpack", rudp->f->dev);
    206 			kproc(kpname, relackproc, rudp);
    207 			rpriv->ackprocstarted = 1;
    208 		}
    209 		qunlock(&rpriv->apl);
    210 	}
    211 }
    212 
    213 static char*
    214 rudpconnect(Conv *c, char **argv, int argc)
    215 {
    216 	char *e;
    217 	Rudppriv *upriv;
    218 
    219 	upriv = c->p->priv;
    220 	rudpstartackproc(c->p);
    221 	e = Fsstdconnect(c, argv, argc);
    222 	Fsconnected(c, e);
    223 	iphtadd(&upriv->ht, c);
    224 
    225 	return e;
    226 }
    227 
    228 
    229 static int
    230 rudpstate(Conv *c, char *state, int n)
    231 {
    232 	Rudpcb *ucb;
    233 	Reliable *r;
    234 	int m;
    235 
    236 	m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
    237 	ucb = (Rudpcb*)c->ptcl;
    238 	qlock(ucb);
    239 	for(r = ucb->r; r; r = r->next)
    240 		m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
    241 	m += snprint(state+m, n-m, "\n");
    242 	qunlock(ucb);
    243 	return m;
    244 }
    245 
    246 static char*
    247 rudpannounce(Conv *c, char** argv, int argc)
    248 {
    249 	char *e;
    250 	Rudppriv *upriv;
    251 
    252 	upriv = c->p->priv;
    253 	rudpstartackproc(c->p);
    254 	e = Fsstdannounce(c, argv, argc);
    255 	if(e != nil)
    256 		return e;
    257 	Fsconnected(c, nil);
    258 	iphtadd(&upriv->ht, c);
    259 
    260 	return nil;
    261 }
    262 
    263 static void
    264 rudpcreate(Conv *c)
    265 {
    266 	c->rq = qopen(64*1024, Qmsg, 0, 0);
    267 	c->wq = qopen(64*1024, Qkick, rudpkick, c);
    268 }
    269 
    270 static void
    271 rudpclose(Conv *c)
    272 {
    273 	Rudpcb *ucb;
    274 	Reliable *r, *nr;
    275 	Rudppriv *upriv;
    276 
    277 	upriv = c->p->priv;
    278 	iphtrem(&upriv->ht, c);
    279 
    280 	/* force out any delayed acks */
    281 	ucb = (Rudpcb*)c->ptcl;
    282 	qlock(ucb);
    283 	for(r = ucb->r; r; r = r->next){
    284 		if(r->acksent != r->rcvseq)
    285 			relsendack(c, r, 0);
    286 	}
    287 	qunlock(ucb);
    288 
    289 	qclose(c->rq);
    290 	qclose(c->wq);
    291 	qclose(c->eq);
    292 	ipmove(c->laddr, IPnoaddr);
    293 	ipmove(c->raddr, IPnoaddr);
    294 	c->lport = 0;
    295 	c->rport = 0;
    296 
    297 	ucb->headers = 0;
    298 	ucb->randdrop = 0;
    299 	qlock(ucb);
    300 	for(r = ucb->r; r; r = nr){
    301 		if(r->acksent != r->rcvseq)
    302 			relsendack(c, r, 0);
    303 		nr = r->next;
    304 		relhangup(c, r);
    305 		relput(r);
    306 	}
    307 	ucb->r = 0;
    308 
    309 	qunlock(ucb);
    310 }
    311 
    312 /*
    313  *  randomly don't send packets
    314  */
    315 static void
    316 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
    317 {
    318 	Rudpcb *ucb;
    319 
    320 	ucb = (Rudpcb*)c->ptcl;
    321 	if(ucb->randdrop && nrand(100) < ucb->randdrop)
    322 		freeblist(bp);
    323 	else
    324 		ipoput4(f, bp, x, ttl, tos, nil);
    325 }
    326 
    327 int
    328 flow(void *v)
    329 {
    330 	Reliable *r = v;
    331 
    332 	return UNACKED(r) <= Maxunacked;
    333 }
    334 
    335 void
    336 rudpkick(void *x)
    337 {
    338 	Conv *c = x;
    339 	Udphdr *uh;
    340 	ushort rport;
    341 	uchar laddr[IPaddrlen], raddr[IPaddrlen];
    342 	Block *bp;
    343 	Rudpcb *ucb;
    344 	Rudphdr *rh;
    345 	Reliable *r;
    346 	int dlen, ptcllen;
    347 	Rudppriv *upriv;
    348 	Fs *f;
    349 
    350 	upriv = c->p->priv;
    351 	f = c->p->f;
    352 
    353 	netlog(c->p->f, Logrudp, "rudp: kick\n");
    354 	bp = qget(c->wq);
    355 	if(bp == nil)
    356 		return;
    357 
    358 	ucb = (Rudpcb*)c->ptcl;
    359 	switch(ucb->headers) {
    360 	case 7:
    361 		/* get user specified addresses */
    362 		bp = pullupblock(bp, UDP_USEAD7);
    363 		if(bp == nil)
    364 			return;
    365 		ipmove(raddr, bp->rp);
    366 		bp->rp += IPaddrlen;
    367 		ipmove(laddr, bp->rp);
    368 		bp->rp += IPaddrlen;
    369 		/* pick interface closest to dest */
    370 		if(ipforme(f, laddr) != Runi)
    371 			findlocalip(f, laddr, raddr);
    372 		bp->rp += IPaddrlen;		/* Ignore ifc address */
    373 		rport = nhgets(bp->rp);
    374 		bp->rp += 2+2;			/* Ignore local port */
    375 		break;
    376 	default:
    377 		ipmove(raddr, c->raddr);
    378 		ipmove(laddr, c->laddr);
    379 		rport = c->rport;
    380 		break;
    381 	}
    382 
    383 	dlen = blocklen(bp);
    384 
    385 	/* Make space to fit rudp & ip header */
    386 	bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
    387 	if(bp == nil)
    388 		return;
    389 
    390 	uh = (Udphdr *)(bp->rp);
    391 	uh->vihl = IP_VER4;
    392 
    393 	rh = (Rudphdr*)uh;
    394 
    395 	ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
    396 	uh->Unused = 0;
    397 	uh->udpproto = IP_UDPPROTO;
    398 	uh->frag[0] = 0;
    399 	uh->frag[1] = 0;
    400 	hnputs(uh->udpplen, ptcllen);
    401 	switch(ucb->headers){
    402 	case 7:
    403 		v6tov4(uh->udpdst, raddr);
    404 		hnputs(uh->udpdport, rport);
    405 		v6tov4(uh->udpsrc, laddr);
    406 		break;
    407 	default:
    408 		v6tov4(uh->udpdst, c->raddr);
    409 		hnputs(uh->udpdport, c->rport);
    410 		if(ipcmp(c->laddr, IPnoaddr) == 0)
    411 			findlocalip(f, c->laddr, c->raddr);
    412 		v6tov4(uh->udpsrc, c->laddr);
    413 		break;
    414 	}
    415 	hnputs(uh->udpsport, c->lport);
    416 	hnputs(uh->udplen, ptcllen);
    417 	uh->udpcksum[0] = 0;
    418 	uh->udpcksum[1] = 0;
    419 
    420 	qlock(ucb);
    421 	r = relstate(ucb, raddr, rport, "kick");
    422 	r->sndseq = NEXTSEQ(r->sndseq);
    423 	hnputl(rh->relseq, r->sndseq);
    424 	hnputl(rh->relsgen, r->sndgen);
    425 
    426 	hnputl(rh->relack, r->rcvseq);  /* ACK last rcvd packet */
    427 	hnputl(rh->relagen, r->rcvgen);
    428 
    429 	if(r->rcvseq != r->acksent)
    430 		r->acksent = r->rcvseq;
    431 
    432 	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
    433 
    434 	relackq(r, bp);
    435 	qunlock(ucb);
    436 
    437 	upriv->ustats.rudpOutDatagrams++;
    438 
    439 	DPRINT("sent: %lud/%lud, %lud/%lud\n", 
    440 		r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
    441 
    442 	doipoput(c, f, bp, 0, c->ttl, c->tos);
    443 
    444 	if(waserror()) {
    445 		relput(r);
    446 		qunlock(&r->lock);
    447 		nexterror();
    448 	}
    449 
    450 	/* flow control of sorts */
    451 	qlock(&r->lock);
    452 	if(UNACKED(r) > Maxunacked){
    453 		r->blocked = 1;
    454 		sleep(&r->vous, flow, r);
    455 		r->blocked = 0;
    456 	}
    457 
    458 	qunlock(&r->lock);
    459 	relput(r);
    460 	poperror();
    461 }
    462 
    463 void
    464 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
    465 {
    466 	int len, olen, ottl;
    467 	Udphdr *uh;
    468 	Conv *c;
    469 	Rudpcb *ucb;
    470 	uchar raddr[IPaddrlen], laddr[IPaddrlen];
    471 	ushort rport, lport;
    472 	Rudppriv *upriv;
    473 	Fs *f;
    474 	uchar *p;
    475 
    476 	upriv = rudp->priv;
    477 	f = rudp->f;
    478 
    479 	upriv->ustats.rudpInDatagrams++;
    480 
    481 	uh = (Udphdr*)(bp->rp);
    482 
    483 	/* Put back pseudo header for checksum 
    484 	 * (remember old values for icmpnoconv()) 
    485 	 */
    486 	ottl = uh->Unused;
    487 	uh->Unused = 0;
    488 	len = nhgets(uh->udplen);
    489 	olen = nhgets(uh->udpplen);
    490 	hnputs(uh->udpplen, len);
    491 
    492 	v4tov6(raddr, uh->udpsrc);
    493 	v4tov6(laddr, uh->udpdst);
    494 	lport = nhgets(uh->udpdport);
    495 	rport = nhgets(uh->udpsport);
    496 
    497 	if(nhgets(uh->udpcksum)) {
    498 		if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
    499 			upriv->ustats.rudpInErrors++;
    500 			upriv->csumerr++;
    501 			netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
    502 			DPRINT("rudp: checksum error %I\n", raddr);
    503 			freeblist(bp);
    504 			return;
    505 		}
    506 	}
    507 
    508 	qlock(rudp);
    509 
    510 	c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
    511 	if(c == nil){
    512 		/* no conversation found */
    513 		upriv->ustats.rudpNoPorts++;
    514 		qunlock(rudp);
    515 		netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
    516 			laddr, lport);
    517 		uh->Unused = ottl;
    518 		hnputs(uh->udpplen, olen);
    519 		icmpnoconv(f, bp);
    520 		freeblist(bp);
    521 		return;
    522 	}
    523 	ucb = (Rudpcb*)c->ptcl;
    524 	qlock(ucb);
    525 	qunlock(rudp);
    526 
    527 	if(reliput(c, bp, raddr, rport) < 0){
    528 		qunlock(ucb);
    529 		freeb(bp);
    530 		return;
    531 	}
    532 
    533 	/*
    534 	 * Trim the packet down to data size
    535 	 */
    536 
    537 	len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
    538 	bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
    539 	if(bp == nil) {
    540 		netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n", 
    541 			raddr, rport, laddr, lport);
    542 		DPRINT("rudp: len err %I.%d -> %I.%d\n", 
    543 			raddr, rport, laddr, lport);
    544 		upriv->lenerr++;
    545 		return;
    546 	}
    547 
    548 	netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n", 
    549 		raddr, rport, laddr, lport, len);
    550 
    551 	switch(ucb->headers){
    552 	case 7:
    553 		/* pass the src address */
    554 		bp = padblock(bp, UDP_USEAD7);
    555 		p = bp->rp;
    556 		ipmove(p, raddr); p += IPaddrlen;
    557 		ipmove(p, laddr); p += IPaddrlen;
    558 		ipmove(p, ifc->lifc->local); p += IPaddrlen;
    559 		hnputs(p, rport); p += 2;
    560 		hnputs(p, lport);
    561 		break;
    562 	default:
    563 		/* connection oriented rudp */
    564 		if(ipcmp(c->raddr, IPnoaddr) == 0){
    565 			/* save the src address in the conversation */
    566 		 	ipmove(c->raddr, raddr);
    567 			c->rport = rport;
    568 
    569 			/* reply with the same ip address (if not broadcast) */
    570 			if(ipforme(f, laddr) == Runi)
    571 				ipmove(c->laddr, laddr);
    572 			else
    573 				v4tov6(c->laddr, ifc->lifc->local);
    574 		}
    575 		break;
    576 	}
    577 	if(bp->next)
    578 		bp = concatblock(bp);
    579 
    580 	if(qfull(c->rq)) {
    581 		netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
    582 			laddr, lport);
    583 		freeblist(bp);
    584 	}
    585 	else
    586 		qpass(c->rq, bp);
    587 	
    588 	qunlock(ucb);
    589 }
    590 
    591 static char *rudpunknown = "unknown rudp ctl request";
    592 
    593 char*
    594 rudpctl(Conv *c, char **f, int n)
    595 {
    596 	Rudpcb *ucb;
    597 	uchar ip[IPaddrlen];
    598 	int x;
    599 
    600 	ucb = (Rudpcb*)c->ptcl;
    601 	if(n < 1)
    602 		return rudpunknown;
    603 
    604 	if(strcmp(f[0], "headers") == 0){
    605 		ucb->headers = 7;		/* new headers format */
    606 		return nil;
    607 	} else if(strcmp(f[0], "hangup") == 0){
    608 		if(n < 3)
    609 			return "bad syntax";
    610 		if (parseip(ip, f[1]) == -1)
    611 			return Ebadip;
    612 		x = atoi(f[2]);
    613 		qlock(ucb);
    614 		relforget(c, ip, x, 1);
    615 		qunlock(ucb);
    616 		return nil;
    617 	} else if(strcmp(f[0], "randdrop") == 0){
    618 		x = 10;			/* default is 10% */
    619 		if(n > 1)
    620 			x = atoi(f[1]);
    621 		if(x > 100 || x < 0)
    622 			return "illegal rudp drop rate";
    623 		ucb->randdrop = x;
    624 		return nil;
    625 	}
    626 	return rudpunknown;
    627 }
    628 
    629 void
    630 rudpadvise(Proto *rudp, Block *bp, char *msg)
    631 {
    632 	Udphdr *h;
    633 	uchar source[IPaddrlen], dest[IPaddrlen];
    634 	ushort psource, pdest;
    635 	Conv *s, **p;
    636 
    637 	h = (Udphdr*)(bp->rp);
    638 
    639 	v4tov6(dest, h->udpdst);
    640 	v4tov6(source, h->udpsrc);
    641 	psource = nhgets(h->udpsport);
    642 	pdest = nhgets(h->udpdport);
    643 
    644 	/* Look for a connection */
    645 	for(p = rudp->conv; *p; p++) {
    646 		s = *p;
    647 		if(s->rport == pdest)
    648 		if(s->lport == psource)
    649 		if(ipcmp(s->raddr, dest) == 0)
    650 		if(ipcmp(s->laddr, source) == 0){
    651 			qhangup(s->rq, msg);
    652 			qhangup(s->wq, msg);
    653 			break;
    654 		}
    655 	}
    656 	freeblist(bp);
    657 }
    658 
    659 int
    660 rudpstats(Proto *rudp, char *buf, int len)
    661 {
    662 	Rudppriv *upriv;
    663 
    664 	upriv = rudp->priv;
    665 	return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
    666 		upriv->ustats.rudpInDatagrams,
    667 		upriv->ustats.rudpNoPorts,
    668 		upriv->ustats.rudpInErrors,
    669 		upriv->ustats.rudpOutDatagrams,
    670 		upriv->rxmits,
    671 		upriv->orders);
    672 }
    673 
    674 void
    675 rudpinit(Fs *fs)
    676 {
    677 
    678 	Proto *rudp;
    679 
    680 	rudp = smalloc(sizeof(Proto));
    681 	rudp->priv = smalloc(sizeof(Rudppriv));
    682 	rudp->name = "rudp";
    683 	rudp->connect = rudpconnect;
    684 	rudp->announce = rudpannounce;
    685 	rudp->ctl = rudpctl;
    686 	rudp->state = rudpstate;
    687 	rudp->create = rudpcreate;
    688 	rudp->close = rudpclose;
    689 	rudp->rcv = rudpiput;
    690 	rudp->advise = rudpadvise;
    691 	rudp->stats = rudpstats;
    692 	rudp->ipproto = IP_UDPPROTO;
    693 	rudp->nc = 16;
    694 	rudp->ptclsize = sizeof(Rudpcb);
    695 
    696 	Fsproto(fs, rudp);
    697 }
    698 
    699 /*********************************************/
    700 /* Here starts the reliable helper functions */
    701 /*********************************************/
    702 /*
    703  *  Enqueue a copy of an unacked block for possible retransmissions
    704  */
    705 void
    706 relackq(Reliable *r, Block *bp)
    707 {
    708 	Block *np;
    709 
    710 	np = copyblock(bp, blocklen(bp));
    711 	if(r->unacked)
    712 		r->unackedtail->list = np;
    713 	else {
    714 		/* restart timer */
    715 		r->timeout = 0;
    716 		r->xmits = 1;
    717 		r->unacked = np;
    718 	}
    719 	r->unackedtail = np;
    720 	np->list = nil;
    721 }
    722 
    723 /*
    724  *  retransmit unacked blocks
    725  */
    726 void
    727 relackproc(void *a)
    728 {
    729 	Rudpcb *ucb;
    730 	Proto *rudp;
    731 	Reliable *r;
    732 	Conv **s, *c;
    733 
    734 	rudp = (Proto *)a;
    735 
    736 loop:
    737 	tsleep(&up->sleep, return0, 0, Rudptickms);
    738 
    739 	for(s = rudp->conv; *s; s++) {
    740 		c = *s;
    741 		ucb = (Rudpcb*)c->ptcl;
    742 		qlock(ucb);
    743 
    744 		for(r = ucb->r; r; r = r->next) {
    745 			if(r->unacked != nil){
    746 				r->timeout += Rudptickms;
    747 				if(r->timeout > Rudprxms*r->xmits)
    748 					relrexmit(c, r);
    749 			}
    750 			if(r->acksent != r->rcvseq)
    751 				relsendack(c, r, 0);
    752 		}
    753 		qunlock(ucb);
    754 	}
    755 	goto loop;
    756 }
    757 
    758 /*
    759  *  get the state record for a conversation
    760  */
    761 Reliable*
    762 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
    763 {
    764 	Reliable *r, **l;
    765 
    766 	l = &ucb->r;
    767 	for(r = *l; r; r = *l){
    768 		if(memcmp(addr, r->addr, IPaddrlen) == 0 && 
    769 		    port == r->port)
    770 			break;
    771 		l = &r->next;
    772 	}
    773 
    774 	/* no state for this addr/port, create some */
    775 	if(r == nil){
    776 		while(generation == 0)
    777 			generation = rand();
    778 
    779 		DPRINT("from %s new state %lud for %I!%ud\n", 
    780 		        from, generation, addr, port);
    781 
    782 		r = smalloc(sizeof(Reliable));
    783 		memmove(r->addr, addr, IPaddrlen);
    784 		r->port = port;
    785 		r->unacked = 0;
    786 		if(generation == Hangupgen)
    787 			generation++;
    788 		r->sndgen = generation++;
    789 		r->sndseq = 0;
    790 		r->ackrcvd = 0;
    791 		r->rcvgen = 0;
    792 		r->rcvseq = 0;
    793 		r->acksent = 0;
    794 		r->xmits = 0;
    795 		r->timeout = 0;
    796 		r->ref = 0;
    797 		incref(r);	/* one reference for being in the list */
    798 
    799 		*l = r;
    800 	}
    801 
    802 	incref(r);
    803 	return r;
    804 }
    805 
    806 void
    807 relput(Reliable *r)
    808 {
    809 	if(decref(r) == 0)
    810 		free(r);
    811 }
    812 
    813 /*
    814  *  forget a Reliable state
    815  */
    816 void
    817 relforget(Conv *c, uchar *ip, int port, int originator)
    818 {
    819 	Rudpcb *ucb;
    820 	Reliable *r, **l;
    821 
    822 	ucb = (Rudpcb*)c->ptcl;
    823 
    824 	l = &ucb->r;
    825 	for(r = *l; r; r = *l){
    826 		if(ipcmp(ip, r->addr) == 0 && port == r->port){
    827 			*l = r->next;
    828 			if(originator)
    829 				relsendack(c, r, 1);
    830 			relhangup(c, r);
    831 			relput(r);	/* remove from the list */
    832 			break;
    833 		}
    834 		l = &r->next;
    835 	}
    836 }
    837 
    838 /* 
    839  *  process a rcvd reliable packet. return -1 if not to be passed to user process,
    840  *  0 therwise.
    841  *
    842  *  called with ucb locked.
    843  */
    844 int
    845 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
    846 {
    847 	Block *nbp;
    848 	Rudpcb *ucb;
    849 	Rudppriv *upriv;
    850 	Udphdr *uh;
    851 	Reliable *r;
    852 	Rudphdr *rh;
    853 	ulong seq, ack, sgen, agen, ackreal;
    854 	int rv = -1;
    855 
    856 	/* get fields */
    857 	uh = (Udphdr*)(bp->rp);
    858 	rh = (Rudphdr*)uh;
    859 	seq = nhgetl(rh->relseq);
    860 	sgen = nhgetl(rh->relsgen);
    861 	ack = nhgetl(rh->relack);
    862 	agen = nhgetl(rh->relagen);
    863 
    864 	upriv = c->p->priv;
    865 	ucb = (Rudpcb*)c->ptcl;
    866 	r = relstate(ucb, addr, port, "input");
    867 
    868 	DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n", 
    869 		seq, sgen, ack, agen, r->sndgen);
    870 
    871 	/* if acking an incorrect generation, ignore */
    872 	if(ack && agen != r->sndgen)
    873 		goto out;
    874 
    875 	/* Look for a hangup */
    876 	if(sgen == Hangupgen) {
    877 		if(agen == r->sndgen)
    878 			relforget(c, addr, port, 0);
    879 		goto out;
    880 	}
    881 
    882 	/* make sure we're not talking to a new remote side */
    883 	if(r->rcvgen != sgen){
    884 		if(seq != 0 && seq != 1)
    885 			goto out;
    886 
    887 		/* new connection */
    888 		if(r->rcvgen != 0){
    889 			DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
    890 			relhangup(c, r);
    891 		}
    892 		r->rcvgen = sgen;
    893 	}
    894 
    895 	/* dequeue acked packets */
    896 	if(ack && agen == r->sndgen){
    897 		ackreal = 0;
    898 		while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
    899 			nbp = r->unacked;
    900 			r->unacked = nbp->list;
    901 			DPRINT("%lud/%lud acked, r->sndgen = %lud\n", 
    902 			       ack, agen, r->sndgen);
    903 			freeb(nbp);
    904 			r->ackrcvd = NEXTSEQ(r->ackrcvd);
    905 			ackreal = 1;
    906 		}
    907 
    908 		/* flow control */
    909 		if(UNACKED(r) < Maxunacked/8 && r->blocked)
    910 			wakeup(&r->vous);
    911 
    912 		/*
    913 		 *  retransmit next packet if the acked packet
    914 		 *  was transmitted more than once
    915 		 */
    916 		if(ackreal && r->unacked != nil){
    917 			r->timeout = 0;
    918 			if(r->xmits > 1){
    919 				r->xmits = 1;
    920 				relrexmit(c, r);
    921 			}
    922 		}
    923 		
    924 	}
    925 
    926 	/* no message or input queue full */
    927 	if(seq == 0 || qfull(c->rq))
    928 		goto out;
    929 
    930 	/* refuse out of order delivery */
    931 	if(seq != NEXTSEQ(r->rcvseq)){
    932 		relsendack(c, r, 0);	/* tell him we got it already */
    933 		upriv->orders++;
    934 		DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
    935 		goto out;
    936 	}
    937 	r->rcvseq = seq;
    938 
    939 	rv = 0;
    940 out:
    941 	relput(r);
    942 	return rv;
    943 }
    944 
    945 void
    946 relsendack(Conv *c, Reliable *r, int hangup)
    947 {
    948 	Udphdr *uh;
    949 	Block *bp;
    950 	Rudphdr *rh;
    951 	int ptcllen;
    952 	Fs *f;
    953 
    954 	bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
    955 	if(bp == nil)
    956 		return;
    957 	bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
    958 	f = c->p->f;
    959 	uh = (Udphdr *)(bp->rp);
    960 	uh->vihl = IP_VER4;
    961 	rh = (Rudphdr*)uh;
    962 
    963 	ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
    964 	uh->Unused = 0;
    965 	uh->udpproto = IP_UDPPROTO;
    966 	uh->frag[0] = 0;
    967 	uh->frag[1] = 0;
    968 	hnputs(uh->udpplen, ptcllen);
    969 
    970 	v6tov4(uh->udpdst, r->addr);
    971 	hnputs(uh->udpdport, r->port);
    972 	hnputs(uh->udpsport, c->lport);
    973 	if(ipcmp(c->laddr, IPnoaddr) == 0)
    974 		findlocalip(f, c->laddr, c->raddr);
    975 	v6tov4(uh->udpsrc, c->laddr);
    976 	hnputs(uh->udplen, ptcllen);
    977 
    978 	if(hangup)
    979 		hnputl(rh->relsgen, Hangupgen);
    980 	else
    981 		hnputl(rh->relsgen, r->sndgen);
    982 	hnputl(rh->relseq, 0);
    983 	hnputl(rh->relagen, r->rcvgen);
    984 	hnputl(rh->relack, r->rcvseq);
    985 
    986 	if(r->acksent < r->rcvseq)
    987 		r->acksent = r->rcvseq;
    988 
    989 	uh->udpcksum[0] = 0;
    990 	uh->udpcksum[1] = 0;
    991 	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
    992 
    993 	DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
    994 	doipoput(c, f, bp, 0, c->ttl, c->tos);
    995 }
    996 
    997 
    998 /*
    999  *  called with ucb locked (and c locked if user initiated close)
   1000  */
   1001 void
   1002 relhangup(Conv *c, Reliable *r)
   1003 {
   1004 	int n;
   1005 	Block *bp;
   1006 	char hup[ERRMAX];
   1007 
   1008 	n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
   1009 	qproduce(c->eq, hup, n);
   1010 
   1011 	/*
   1012 	 *  dump any unacked outgoing messages
   1013 	 */
   1014 	for(bp = r->unacked; bp != nil; bp = r->unacked){
   1015 		r->unacked = bp->list;
   1016 		bp->list = nil;
   1017 		freeb(bp);
   1018 	}
   1019 
   1020 	r->rcvgen = 0;
   1021 	r->rcvseq = 0;
   1022 	r->acksent = 0;
   1023 	if(generation == Hangupgen)
   1024 		generation++;
   1025 	r->sndgen = generation++;
   1026 	r->sndseq = 0;
   1027 	r->ackrcvd = 0;
   1028 	r->xmits = 0;
   1029 	r->timeout = 0;
   1030 	wakeup(&r->vous);
   1031 }
   1032 
   1033 /*
   1034  *  called with ucb locked
   1035  */
   1036 void
   1037 relrexmit(Conv *c, Reliable *r)
   1038 {
   1039 	Rudppriv *upriv;
   1040 	Block *np;
   1041 	Fs *f;
   1042 
   1043 	upriv = c->p->priv;
   1044 	f = c->p->f;
   1045 	r->timeout = 0;
   1046 	if(r->xmits++ > Rudpmaxxmit){
   1047 		relhangup(c, r);
   1048 		return;
   1049 	}
   1050 
   1051 	upriv->rxmits++;
   1052 	np = copyblock(r->unacked, blocklen(r->unacked));
   1053 	DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
   1054 	doipoput(c, f, np, 0, c->ttl, c->tos);
   1055 }