rudp.c (20818B)
1 /* 2 * Reliable User Datagram Protocol, currently only for IPv4. 3 * This protocol is compatible with UDP's packet format. 4 * It could be done over UDP if need be. 5 */ 6 #include "u.h" 7 #include "lib.h" 8 #include "mem.h" 9 #include "dat.h" 10 #include "fns.h" 11 #include "error.h" 12 13 #include "ip.h" 14 15 #define DEBUG 0 16 #define DPRINT if(DEBUG)print 17 18 #define SEQDIFF(a,b) ( (a)>=(b)?\ 19 (a)-(b):\ 20 0xffffffffUL-((b)-(a)) ) 21 #define INSEQ(a,start,end) ( (start)<=(end)?\ 22 ((a)>(start)&&(a)<=(end)):\ 23 ((a)>(start)||(a)<=(end)) ) 24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd) 25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 ) 26 27 enum 28 { 29 UDP_PHDRSIZE = 12, /* pseudo header */ 30 // UDP_HDRSIZE = 20, /* pseudo header + udp header */ 31 UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */ 32 UDP_IPHDR = 8, /* ip header */ 33 IP_UDPPROTO = 254, 34 UDP_USEAD7 = 52, /* size of new ipv6 headers struct */ 35 36 Rudprxms = 200, 37 Rudptickms = 50, 38 Rudpmaxxmit = 10, 39 Maxunacked = 100, 40 }; 41 42 #define Hangupgen 0xffffffff /* used only in hangup messages */ 43 44 typedef struct Udphdr Udphdr; 45 struct Udphdr 46 { 47 /* ip header */ 48 uchar vihl; /* Version and header length */ 49 uchar tos; /* Type of service */ 50 uchar length[2]; /* packet length */ 51 uchar id[2]; /* Identification */ 52 uchar frag[2]; /* Fragment information */ 53 54 /* pseudo header starts here */ 55 uchar Unused; 56 uchar udpproto; /* Protocol */ 57 uchar udpplen[2]; /* Header plus data length */ 58 uchar udpsrc[4]; /* Ip source */ 59 uchar udpdst[4]; /* Ip destination */ 60 61 /* udp header */ 62 uchar udpsport[2]; /* Source port */ 63 uchar udpdport[2]; /* Destination port */ 64 uchar udplen[2]; /* data length */ 65 uchar udpcksum[2]; /* Checksum */ 66 }; 67 68 typedef struct Rudphdr Rudphdr; 69 struct Rudphdr 70 { 71 /* ip header */ 72 uchar vihl; /* Version and header length */ 73 uchar tos; /* Type of service */ 74 uchar length[2]; /* packet length */ 75 uchar id[2]; /* Identification */ 76 uchar frag[2]; /* Fragment information */ 77 78 /* pseudo header starts here */ 79 uchar Unused; 80 uchar udpproto; /* Protocol */ 81 uchar udpplen[2]; /* Header plus data length */ 82 uchar udpsrc[4]; /* Ip source */ 83 uchar udpdst[4]; /* Ip destination */ 84 85 /* udp header */ 86 uchar udpsport[2]; /* Source port */ 87 uchar udpdport[2]; /* Destination port */ 88 uchar udplen[2]; /* data length (includes rudp header) */ 89 uchar udpcksum[2]; /* Checksum */ 90 91 /* rudp header */ 92 uchar relseq[4]; /* id of this packet (or 0) */ 93 uchar relsgen[4]; /* generation/time stamp */ 94 uchar relack[4]; /* packet being acked (or 0) */ 95 uchar relagen[4]; /* generation/time stamp */ 96 }; 97 98 99 /* 100 * one state structure per destination 101 */ 102 typedef struct Reliable Reliable; 103 struct Reliable 104 { 105 Ref; 106 107 Reliable *next; 108 109 uchar addr[IPaddrlen]; /* always V6 when put here */ 110 ushort port; 111 112 Block *unacked; /* unacked msg list */ 113 Block *unackedtail; /* and its tail */ 114 115 int timeout; /* time since first unacked msg sent */ 116 int xmits; /* number of times first unacked msg sent */ 117 118 ulong sndseq; /* next packet to be sent */ 119 ulong sndgen; /* and its generation */ 120 121 ulong rcvseq; /* last packet received */ 122 ulong rcvgen; /* and its generation */ 123 124 ulong acksent; /* last ack sent */ 125 ulong ackrcvd; /* last msg for which ack was rcvd */ 126 127 /* flow control */ 128 QLock lock; 129 Rendez vous; 130 int blocked; 131 }; 132 133 134 135 /* MIB II counters */ 136 typedef struct Rudpstats Rudpstats; 137 struct Rudpstats 138 { 139 ulong rudpInDatagrams; 140 ulong rudpNoPorts; 141 ulong rudpInErrors; 142 ulong rudpOutDatagrams; 143 }; 144 145 typedef struct Rudppriv Rudppriv; 146 struct Rudppriv 147 { 148 Ipht ht; 149 150 /* MIB counters */ 151 Rudpstats ustats; 152 153 /* non-MIB stats */ 154 ulong csumerr; /* checksum errors */ 155 ulong lenerr; /* short packet */ 156 ulong rxmits; /* # of retransmissions */ 157 ulong orders; /* # of out of order pkts */ 158 159 /* keeping track of the ack kproc */ 160 int ackprocstarted; 161 QLock apl; 162 }; 163 164 165 static ulong generation = 0; 166 static Rendez rend; 167 168 /* 169 * protocol specific part of Conv 170 */ 171 typedef struct Rudpcb Rudpcb; 172 struct Rudpcb 173 { 174 QLock; 175 uchar headers; 176 uchar randdrop; 177 Reliable *r; 178 }; 179 180 /* 181 * local functions 182 */ 183 void relsendack(Conv*, Reliable*, int); 184 int reliput(Conv*, Block*, uchar*, ushort); 185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*); 186 void relput(Reliable*); 187 void relforget(Conv *, uchar*, int, int); 188 void relackproc(void *); 189 void relackq(Reliable *, Block*); 190 void relhangup(Conv *, Reliable*); 191 void relrexmit(Conv *, Reliable*); 192 void relput(Reliable*); 193 void rudpkick(void *x); 194 195 static void 196 rudpstartackproc(Proto *rudp) 197 { 198 Rudppriv *rpriv; 199 char kpname[KNAMELEN]; 200 201 rpriv = rudp->priv; 202 if(rpriv->ackprocstarted == 0){ 203 qlock(&rpriv->apl); 204 if(rpriv->ackprocstarted == 0){ 205 sprint(kpname, "#I%drudpack", rudp->f->dev); 206 kproc(kpname, relackproc, rudp); 207 rpriv->ackprocstarted = 1; 208 } 209 qunlock(&rpriv->apl); 210 } 211 } 212 213 static char* 214 rudpconnect(Conv *c, char **argv, int argc) 215 { 216 char *e; 217 Rudppriv *upriv; 218 219 upriv = c->p->priv; 220 rudpstartackproc(c->p); 221 e = Fsstdconnect(c, argv, argc); 222 Fsconnected(c, e); 223 iphtadd(&upriv->ht, c); 224 225 return e; 226 } 227 228 229 static int 230 rudpstate(Conv *c, char *state, int n) 231 { 232 Rudpcb *ucb; 233 Reliable *r; 234 int m; 235 236 m = snprint(state, n, "%s", c->inuse?"Open":"Closed"); 237 ucb = (Rudpcb*)c->ptcl; 238 qlock(ucb); 239 for(r = ucb->r; r; r = r->next) 240 m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r)); 241 m += snprint(state+m, n-m, "\n"); 242 qunlock(ucb); 243 return m; 244 } 245 246 static char* 247 rudpannounce(Conv *c, char** argv, int argc) 248 { 249 char *e; 250 Rudppriv *upriv; 251 252 upriv = c->p->priv; 253 rudpstartackproc(c->p); 254 e = Fsstdannounce(c, argv, argc); 255 if(e != nil) 256 return e; 257 Fsconnected(c, nil); 258 iphtadd(&upriv->ht, c); 259 260 return nil; 261 } 262 263 static void 264 rudpcreate(Conv *c) 265 { 266 c->rq = qopen(64*1024, Qmsg, 0, 0); 267 c->wq = qopen(64*1024, Qkick, rudpkick, c); 268 } 269 270 static void 271 rudpclose(Conv *c) 272 { 273 Rudpcb *ucb; 274 Reliable *r, *nr; 275 Rudppriv *upriv; 276 277 upriv = c->p->priv; 278 iphtrem(&upriv->ht, c); 279 280 /* force out any delayed acks */ 281 ucb = (Rudpcb*)c->ptcl; 282 qlock(ucb); 283 for(r = ucb->r; r; r = r->next){ 284 if(r->acksent != r->rcvseq) 285 relsendack(c, r, 0); 286 } 287 qunlock(ucb); 288 289 qclose(c->rq); 290 qclose(c->wq); 291 qclose(c->eq); 292 ipmove(c->laddr, IPnoaddr); 293 ipmove(c->raddr, IPnoaddr); 294 c->lport = 0; 295 c->rport = 0; 296 297 ucb->headers = 0; 298 ucb->randdrop = 0; 299 qlock(ucb); 300 for(r = ucb->r; r; r = nr){ 301 if(r->acksent != r->rcvseq) 302 relsendack(c, r, 0); 303 nr = r->next; 304 relhangup(c, r); 305 relput(r); 306 } 307 ucb->r = 0; 308 309 qunlock(ucb); 310 } 311 312 /* 313 * randomly don't send packets 314 */ 315 static void 316 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos) 317 { 318 Rudpcb *ucb; 319 320 ucb = (Rudpcb*)c->ptcl; 321 if(ucb->randdrop && nrand(100) < ucb->randdrop) 322 freeblist(bp); 323 else 324 ipoput4(f, bp, x, ttl, tos, nil); 325 } 326 327 int 328 flow(void *v) 329 { 330 Reliable *r = v; 331 332 return UNACKED(r) <= Maxunacked; 333 } 334 335 void 336 rudpkick(void *x) 337 { 338 Conv *c = x; 339 Udphdr *uh; 340 ushort rport; 341 uchar laddr[IPaddrlen], raddr[IPaddrlen]; 342 Block *bp; 343 Rudpcb *ucb; 344 Rudphdr *rh; 345 Reliable *r; 346 int dlen, ptcllen; 347 Rudppriv *upriv; 348 Fs *f; 349 350 upriv = c->p->priv; 351 f = c->p->f; 352 353 netlog(c->p->f, Logrudp, "rudp: kick\n"); 354 bp = qget(c->wq); 355 if(bp == nil) 356 return; 357 358 ucb = (Rudpcb*)c->ptcl; 359 switch(ucb->headers) { 360 case 7: 361 /* get user specified addresses */ 362 bp = pullupblock(bp, UDP_USEAD7); 363 if(bp == nil) 364 return; 365 ipmove(raddr, bp->rp); 366 bp->rp += IPaddrlen; 367 ipmove(laddr, bp->rp); 368 bp->rp += IPaddrlen; 369 /* pick interface closest to dest */ 370 if(ipforme(f, laddr) != Runi) 371 findlocalip(f, laddr, raddr); 372 bp->rp += IPaddrlen; /* Ignore ifc address */ 373 rport = nhgets(bp->rp); 374 bp->rp += 2+2; /* Ignore local port */ 375 break; 376 default: 377 ipmove(raddr, c->raddr); 378 ipmove(laddr, c->laddr); 379 rport = c->rport; 380 break; 381 } 382 383 dlen = blocklen(bp); 384 385 /* Make space to fit rudp & ip header */ 386 bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE); 387 if(bp == nil) 388 return; 389 390 uh = (Udphdr *)(bp->rp); 391 uh->vihl = IP_VER4; 392 393 rh = (Rudphdr*)uh; 394 395 ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE); 396 uh->Unused = 0; 397 uh->udpproto = IP_UDPPROTO; 398 uh->frag[0] = 0; 399 uh->frag[1] = 0; 400 hnputs(uh->udpplen, ptcllen); 401 switch(ucb->headers){ 402 case 7: 403 v6tov4(uh->udpdst, raddr); 404 hnputs(uh->udpdport, rport); 405 v6tov4(uh->udpsrc, laddr); 406 break; 407 default: 408 v6tov4(uh->udpdst, c->raddr); 409 hnputs(uh->udpdport, c->rport); 410 if(ipcmp(c->laddr, IPnoaddr) == 0) 411 findlocalip(f, c->laddr, c->raddr); 412 v6tov4(uh->udpsrc, c->laddr); 413 break; 414 } 415 hnputs(uh->udpsport, c->lport); 416 hnputs(uh->udplen, ptcllen); 417 uh->udpcksum[0] = 0; 418 uh->udpcksum[1] = 0; 419 420 qlock(ucb); 421 r = relstate(ucb, raddr, rport, "kick"); 422 r->sndseq = NEXTSEQ(r->sndseq); 423 hnputl(rh->relseq, r->sndseq); 424 hnputl(rh->relsgen, r->sndgen); 425 426 hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */ 427 hnputl(rh->relagen, r->rcvgen); 428 429 if(r->rcvseq != r->acksent) 430 r->acksent = r->rcvseq; 431 432 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE)); 433 434 relackq(r, bp); 435 qunlock(ucb); 436 437 upriv->ustats.rudpOutDatagrams++; 438 439 DPRINT("sent: %lud/%lud, %lud/%lud\n", 440 r->sndseq, r->sndgen, r->rcvseq, r->rcvgen); 441 442 doipoput(c, f, bp, 0, c->ttl, c->tos); 443 444 if(waserror()) { 445 relput(r); 446 qunlock(&r->lock); 447 nexterror(); 448 } 449 450 /* flow control of sorts */ 451 qlock(&r->lock); 452 if(UNACKED(r) > Maxunacked){ 453 r->blocked = 1; 454 sleep(&r->vous, flow, r); 455 r->blocked = 0; 456 } 457 458 qunlock(&r->lock); 459 relput(r); 460 poperror(); 461 } 462 463 void 464 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp) 465 { 466 int len, olen, ottl; 467 Udphdr *uh; 468 Conv *c; 469 Rudpcb *ucb; 470 uchar raddr[IPaddrlen], laddr[IPaddrlen]; 471 ushort rport, lport; 472 Rudppriv *upriv; 473 Fs *f; 474 uchar *p; 475 476 upriv = rudp->priv; 477 f = rudp->f; 478 479 upriv->ustats.rudpInDatagrams++; 480 481 uh = (Udphdr*)(bp->rp); 482 483 /* Put back pseudo header for checksum 484 * (remember old values for icmpnoconv()) 485 */ 486 ottl = uh->Unused; 487 uh->Unused = 0; 488 len = nhgets(uh->udplen); 489 olen = nhgets(uh->udpplen); 490 hnputs(uh->udpplen, len); 491 492 v4tov6(raddr, uh->udpsrc); 493 v4tov6(laddr, uh->udpdst); 494 lport = nhgets(uh->udpdport); 495 rport = nhgets(uh->udpsport); 496 497 if(nhgets(uh->udpcksum)) { 498 if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) { 499 upriv->ustats.rudpInErrors++; 500 upriv->csumerr++; 501 netlog(f, Logrudp, "rudp: checksum error %I\n", raddr); 502 DPRINT("rudp: checksum error %I\n", raddr); 503 freeblist(bp); 504 return; 505 } 506 } 507 508 qlock(rudp); 509 510 c = iphtlook(&upriv->ht, raddr, rport, laddr, lport); 511 if(c == nil){ 512 /* no conversation found */ 513 upriv->ustats.rudpNoPorts++; 514 qunlock(rudp); 515 netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport, 516 laddr, lport); 517 uh->Unused = ottl; 518 hnputs(uh->udpplen, olen); 519 icmpnoconv(f, bp); 520 freeblist(bp); 521 return; 522 } 523 ucb = (Rudpcb*)c->ptcl; 524 qlock(ucb); 525 qunlock(rudp); 526 527 if(reliput(c, bp, raddr, rport) < 0){ 528 qunlock(ucb); 529 freeb(bp); 530 return; 531 } 532 533 /* 534 * Trim the packet down to data size 535 */ 536 537 len -= (UDP_RHDRSIZE-UDP_PHDRSIZE); 538 bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len); 539 if(bp == nil) { 540 netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n", 541 raddr, rport, laddr, lport); 542 DPRINT("rudp: len err %I.%d -> %I.%d\n", 543 raddr, rport, laddr, lport); 544 upriv->lenerr++; 545 return; 546 } 547 548 netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n", 549 raddr, rport, laddr, lport, len); 550 551 switch(ucb->headers){ 552 case 7: 553 /* pass the src address */ 554 bp = padblock(bp, UDP_USEAD7); 555 p = bp->rp; 556 ipmove(p, raddr); p += IPaddrlen; 557 ipmove(p, laddr); p += IPaddrlen; 558 ipmove(p, ifc->lifc->local); p += IPaddrlen; 559 hnputs(p, rport); p += 2; 560 hnputs(p, lport); 561 break; 562 default: 563 /* connection oriented rudp */ 564 if(ipcmp(c->raddr, IPnoaddr) == 0){ 565 /* save the src address in the conversation */ 566 ipmove(c->raddr, raddr); 567 c->rport = rport; 568 569 /* reply with the same ip address (if not broadcast) */ 570 if(ipforme(f, laddr) == Runi) 571 ipmove(c->laddr, laddr); 572 else 573 v4tov6(c->laddr, ifc->lifc->local); 574 } 575 break; 576 } 577 if(bp->next) 578 bp = concatblock(bp); 579 580 if(qfull(c->rq)) { 581 netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport, 582 laddr, lport); 583 freeblist(bp); 584 } 585 else 586 qpass(c->rq, bp); 587 588 qunlock(ucb); 589 } 590 591 static char *rudpunknown = "unknown rudp ctl request"; 592 593 char* 594 rudpctl(Conv *c, char **f, int n) 595 { 596 Rudpcb *ucb; 597 uchar ip[IPaddrlen]; 598 int x; 599 600 ucb = (Rudpcb*)c->ptcl; 601 if(n < 1) 602 return rudpunknown; 603 604 if(strcmp(f[0], "headers") == 0){ 605 ucb->headers = 7; /* new headers format */ 606 return nil; 607 } else if(strcmp(f[0], "hangup") == 0){ 608 if(n < 3) 609 return "bad syntax"; 610 if (parseip(ip, f[1]) == -1) 611 return Ebadip; 612 x = atoi(f[2]); 613 qlock(ucb); 614 relforget(c, ip, x, 1); 615 qunlock(ucb); 616 return nil; 617 } else if(strcmp(f[0], "randdrop") == 0){ 618 x = 10; /* default is 10% */ 619 if(n > 1) 620 x = atoi(f[1]); 621 if(x > 100 || x < 0) 622 return "illegal rudp drop rate"; 623 ucb->randdrop = x; 624 return nil; 625 } 626 return rudpunknown; 627 } 628 629 void 630 rudpadvise(Proto *rudp, Block *bp, char *msg) 631 { 632 Udphdr *h; 633 uchar source[IPaddrlen], dest[IPaddrlen]; 634 ushort psource, pdest; 635 Conv *s, **p; 636 637 h = (Udphdr*)(bp->rp); 638 639 v4tov6(dest, h->udpdst); 640 v4tov6(source, h->udpsrc); 641 psource = nhgets(h->udpsport); 642 pdest = nhgets(h->udpdport); 643 644 /* Look for a connection */ 645 for(p = rudp->conv; *p; p++) { 646 s = *p; 647 if(s->rport == pdest) 648 if(s->lport == psource) 649 if(ipcmp(s->raddr, dest) == 0) 650 if(ipcmp(s->laddr, source) == 0){ 651 qhangup(s->rq, msg); 652 qhangup(s->wq, msg); 653 break; 654 } 655 } 656 freeblist(bp); 657 } 658 659 int 660 rudpstats(Proto *rudp, char *buf, int len) 661 { 662 Rudppriv *upriv; 663 664 upriv = rudp->priv; 665 return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n", 666 upriv->ustats.rudpInDatagrams, 667 upriv->ustats.rudpNoPorts, 668 upriv->ustats.rudpInErrors, 669 upriv->ustats.rudpOutDatagrams, 670 upriv->rxmits, 671 upriv->orders); 672 } 673 674 void 675 rudpinit(Fs *fs) 676 { 677 678 Proto *rudp; 679 680 rudp = smalloc(sizeof(Proto)); 681 rudp->priv = smalloc(sizeof(Rudppriv)); 682 rudp->name = "rudp"; 683 rudp->connect = rudpconnect; 684 rudp->announce = rudpannounce; 685 rudp->ctl = rudpctl; 686 rudp->state = rudpstate; 687 rudp->create = rudpcreate; 688 rudp->close = rudpclose; 689 rudp->rcv = rudpiput; 690 rudp->advise = rudpadvise; 691 rudp->stats = rudpstats; 692 rudp->ipproto = IP_UDPPROTO; 693 rudp->nc = 16; 694 rudp->ptclsize = sizeof(Rudpcb); 695 696 Fsproto(fs, rudp); 697 } 698 699 /*********************************************/ 700 /* Here starts the reliable helper functions */ 701 /*********************************************/ 702 /* 703 * Enqueue a copy of an unacked block for possible retransmissions 704 */ 705 void 706 relackq(Reliable *r, Block *bp) 707 { 708 Block *np; 709 710 np = copyblock(bp, blocklen(bp)); 711 if(r->unacked) 712 r->unackedtail->list = np; 713 else { 714 /* restart timer */ 715 r->timeout = 0; 716 r->xmits = 1; 717 r->unacked = np; 718 } 719 r->unackedtail = np; 720 np->list = nil; 721 } 722 723 /* 724 * retransmit unacked blocks 725 */ 726 void 727 relackproc(void *a) 728 { 729 Rudpcb *ucb; 730 Proto *rudp; 731 Reliable *r; 732 Conv **s, *c; 733 734 rudp = (Proto *)a; 735 736 loop: 737 tsleep(&up->sleep, return0, 0, Rudptickms); 738 739 for(s = rudp->conv; *s; s++) { 740 c = *s; 741 ucb = (Rudpcb*)c->ptcl; 742 qlock(ucb); 743 744 for(r = ucb->r; r; r = r->next) { 745 if(r->unacked != nil){ 746 r->timeout += Rudptickms; 747 if(r->timeout > Rudprxms*r->xmits) 748 relrexmit(c, r); 749 } 750 if(r->acksent != r->rcvseq) 751 relsendack(c, r, 0); 752 } 753 qunlock(ucb); 754 } 755 goto loop; 756 } 757 758 /* 759 * get the state record for a conversation 760 */ 761 Reliable* 762 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from) 763 { 764 Reliable *r, **l; 765 766 l = &ucb->r; 767 for(r = *l; r; r = *l){ 768 if(memcmp(addr, r->addr, IPaddrlen) == 0 && 769 port == r->port) 770 break; 771 l = &r->next; 772 } 773 774 /* no state for this addr/port, create some */ 775 if(r == nil){ 776 while(generation == 0) 777 generation = rand(); 778 779 DPRINT("from %s new state %lud for %I!%ud\n", 780 from, generation, addr, port); 781 782 r = smalloc(sizeof(Reliable)); 783 memmove(r->addr, addr, IPaddrlen); 784 r->port = port; 785 r->unacked = 0; 786 if(generation == Hangupgen) 787 generation++; 788 r->sndgen = generation++; 789 r->sndseq = 0; 790 r->ackrcvd = 0; 791 r->rcvgen = 0; 792 r->rcvseq = 0; 793 r->acksent = 0; 794 r->xmits = 0; 795 r->timeout = 0; 796 r->ref = 0; 797 incref(r); /* one reference for being in the list */ 798 799 *l = r; 800 } 801 802 incref(r); 803 return r; 804 } 805 806 void 807 relput(Reliable *r) 808 { 809 if(decref(r) == 0) 810 free(r); 811 } 812 813 /* 814 * forget a Reliable state 815 */ 816 void 817 relforget(Conv *c, uchar *ip, int port, int originator) 818 { 819 Rudpcb *ucb; 820 Reliable *r, **l; 821 822 ucb = (Rudpcb*)c->ptcl; 823 824 l = &ucb->r; 825 for(r = *l; r; r = *l){ 826 if(ipcmp(ip, r->addr) == 0 && port == r->port){ 827 *l = r->next; 828 if(originator) 829 relsendack(c, r, 1); 830 relhangup(c, r); 831 relput(r); /* remove from the list */ 832 break; 833 } 834 l = &r->next; 835 } 836 } 837 838 /* 839 * process a rcvd reliable packet. return -1 if not to be passed to user process, 840 * 0 therwise. 841 * 842 * called with ucb locked. 843 */ 844 int 845 reliput(Conv *c, Block *bp, uchar *addr, ushort port) 846 { 847 Block *nbp; 848 Rudpcb *ucb; 849 Rudppriv *upriv; 850 Udphdr *uh; 851 Reliable *r; 852 Rudphdr *rh; 853 ulong seq, ack, sgen, agen, ackreal; 854 int rv = -1; 855 856 /* get fields */ 857 uh = (Udphdr*)(bp->rp); 858 rh = (Rudphdr*)uh; 859 seq = nhgetl(rh->relseq); 860 sgen = nhgetl(rh->relsgen); 861 ack = nhgetl(rh->relack); 862 agen = nhgetl(rh->relagen); 863 864 upriv = c->p->priv; 865 ucb = (Rudpcb*)c->ptcl; 866 r = relstate(ucb, addr, port, "input"); 867 868 DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n", 869 seq, sgen, ack, agen, r->sndgen); 870 871 /* if acking an incorrect generation, ignore */ 872 if(ack && agen != r->sndgen) 873 goto out; 874 875 /* Look for a hangup */ 876 if(sgen == Hangupgen) { 877 if(agen == r->sndgen) 878 relforget(c, addr, port, 0); 879 goto out; 880 } 881 882 /* make sure we're not talking to a new remote side */ 883 if(r->rcvgen != sgen){ 884 if(seq != 0 && seq != 1) 885 goto out; 886 887 /* new connection */ 888 if(r->rcvgen != 0){ 889 DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen); 890 relhangup(c, r); 891 } 892 r->rcvgen = sgen; 893 } 894 895 /* dequeue acked packets */ 896 if(ack && agen == r->sndgen){ 897 ackreal = 0; 898 while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){ 899 nbp = r->unacked; 900 r->unacked = nbp->list; 901 DPRINT("%lud/%lud acked, r->sndgen = %lud\n", 902 ack, agen, r->sndgen); 903 freeb(nbp); 904 r->ackrcvd = NEXTSEQ(r->ackrcvd); 905 ackreal = 1; 906 } 907 908 /* flow control */ 909 if(UNACKED(r) < Maxunacked/8 && r->blocked) 910 wakeup(&r->vous); 911 912 /* 913 * retransmit next packet if the acked packet 914 * was transmitted more than once 915 */ 916 if(ackreal && r->unacked != nil){ 917 r->timeout = 0; 918 if(r->xmits > 1){ 919 r->xmits = 1; 920 relrexmit(c, r); 921 } 922 } 923 924 } 925 926 /* no message or input queue full */ 927 if(seq == 0 || qfull(c->rq)) 928 goto out; 929 930 /* refuse out of order delivery */ 931 if(seq != NEXTSEQ(r->rcvseq)){ 932 relsendack(c, r, 0); /* tell him we got it already */ 933 upriv->orders++; 934 DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq)); 935 goto out; 936 } 937 r->rcvseq = seq; 938 939 rv = 0; 940 out: 941 relput(r); 942 return rv; 943 } 944 945 void 946 relsendack(Conv *c, Reliable *r, int hangup) 947 { 948 Udphdr *uh; 949 Block *bp; 950 Rudphdr *rh; 951 int ptcllen; 952 Fs *f; 953 954 bp = allocb(UDP_IPHDR + UDP_RHDRSIZE); 955 if(bp == nil) 956 return; 957 bp->wp += UDP_IPHDR + UDP_RHDRSIZE; 958 f = c->p->f; 959 uh = (Udphdr *)(bp->rp); 960 uh->vihl = IP_VER4; 961 rh = (Rudphdr*)uh; 962 963 ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE); 964 uh->Unused = 0; 965 uh->udpproto = IP_UDPPROTO; 966 uh->frag[0] = 0; 967 uh->frag[1] = 0; 968 hnputs(uh->udpplen, ptcllen); 969 970 v6tov4(uh->udpdst, r->addr); 971 hnputs(uh->udpdport, r->port); 972 hnputs(uh->udpsport, c->lport); 973 if(ipcmp(c->laddr, IPnoaddr) == 0) 974 findlocalip(f, c->laddr, c->raddr); 975 v6tov4(uh->udpsrc, c->laddr); 976 hnputs(uh->udplen, ptcllen); 977 978 if(hangup) 979 hnputl(rh->relsgen, Hangupgen); 980 else 981 hnputl(rh->relsgen, r->sndgen); 982 hnputl(rh->relseq, 0); 983 hnputl(rh->relagen, r->rcvgen); 984 hnputl(rh->relack, r->rcvseq); 985 986 if(r->acksent < r->rcvseq) 987 r->acksent = r->rcvseq; 988 989 uh->udpcksum[0] = 0; 990 uh->udpcksum[1] = 0; 991 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE)); 992 993 DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen); 994 doipoput(c, f, bp, 0, c->ttl, c->tos); 995 } 996 997 998 /* 999 * called with ucb locked (and c locked if user initiated close) 1000 */ 1001 void 1002 relhangup(Conv *c, Reliable *r) 1003 { 1004 int n; 1005 Block *bp; 1006 char hup[ERRMAX]; 1007 1008 n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port); 1009 qproduce(c->eq, hup, n); 1010 1011 /* 1012 * dump any unacked outgoing messages 1013 */ 1014 for(bp = r->unacked; bp != nil; bp = r->unacked){ 1015 r->unacked = bp->list; 1016 bp->list = nil; 1017 freeb(bp); 1018 } 1019 1020 r->rcvgen = 0; 1021 r->rcvseq = 0; 1022 r->acksent = 0; 1023 if(generation == Hangupgen) 1024 generation++; 1025 r->sndgen = generation++; 1026 r->sndseq = 0; 1027 r->ackrcvd = 0; 1028 r->xmits = 0; 1029 r->timeout = 0; 1030 wakeup(&r->vous); 1031 } 1032 1033 /* 1034 * called with ucb locked 1035 */ 1036 void 1037 relrexmit(Conv *c, Reliable *r) 1038 { 1039 Rudppriv *upriv; 1040 Block *np; 1041 Fs *f; 1042 1043 upriv = c->p->priv; 1044 f = c->p->f; 1045 r->timeout = 0; 1046 if(r->xmits++ > Rudpmaxxmit){ 1047 relhangup(c, r); 1048 return; 1049 } 1050 1051 upriv->rxmits++; 1052 np = copyblock(r->unacked, blocklen(r->unacked)); 1053 DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1); 1054 doipoput(c, f, np, 0, c->ttl, c->tos); 1055 }