aboutsummaryrefslogtreecommitdiffstats
path: root/ncpd/link.cc
diff options
context:
space:
mode:
Diffstat (limited to 'ncpd/link.cc')
-rw-r--r--ncpd/link.cc592
1 files changed, 402 insertions, 190 deletions
diff --git a/ncpd/link.cc b/ncpd/link.cc
index b7d5fb7..228b878 100644
--- a/ncpd/link.cc
+++ b/ncpd/link.cc
@@ -4,7 +4,7 @@
* This file is part of plptools.
*
* Copyright (C) 1999 Philip Proudman <philip.proudman@btinternet.com>
- * Copyright (C) 1999-2001 Fritz Elfert <felfert@to.com>
+ * Copyright (C) 1999-2002 Fritz Elfert <felfert@to.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -27,278 +27,490 @@
#include <stream.h>
#include <stdlib.h>
+#include <unistd.h>
#include <stdio.h>
+#include <sys/time.h>
+#include <plp_inttypes.h>
#include "link.h"
#include "packet.h"
+#include "ncp.h"
#include "bufferstore.h"
#include "bufferarray.h"
-link::link(const char *fname, int baud, IOWatch *iow, unsigned short _verbose)
+extern "C" {
+ static void *expire_check(void *arg)
+ {
+ Link *l = (Link *)arg;
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+ while (1) {
+ usleep(l->retransTimeout * 500);
+ l->retransmit();
+ }
+ }
+};
+
+Link::Link(const char *fname, int baud, ncp *_ncp, unsigned short _verbose)
{
- p = new packet(fname, baud, iow);
+ p = new packet(fname, baud, this, _verbose);
+ retransTimeout = ((unsigned long)baud * 1000 / 13200) + 200;
+ theNCP = _ncp;
verbose = _verbose;
- idSent = 0;
- idLastGot = -1;
- newLink = true;
- somethingToSend = false;
- timesSent = 0;
+ txSequence = 1;
+ rxSequence = -1;
failed = false;
+ seqMask = 7;
+ maxOutstanding = 1;
for (int i = 0; i < 256; i++)
xoff[i] = false;
+ // generate magic number for sendCon()
+ srandom(time(NULL));
+ conMagic = random();
+
+ pthread_mutex_init(&queueMutex, NULL);
+ pthread_create(&checkthread, NULL, expire_check, this);
+
+ // submit a link request
+ bufferStore blank;
+ transmit(blank);
}
-link::~link()
+Link::~Link()
{
flush();
+ pthread_cancel(checkthread);
+ pthread_mutex_destroy(&queueMutex);
delete p;
}
-void link::
+void Link::
reset() {
- idSent = 0;
- idLastGot = -1;
- newLink = true;
- somethingToSend = false;
- timesSent = 0;
+ txSequence = 1;
+ rxSequence = -1;
failed = false;
+
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.clear();
+ holdQueue.clear();
+ pthread_mutex_unlock(&queueMutex);
for (int i = 0; i < 256; i++)
xoff[i] = false;
+
+ // submit a link request
+ bufferStore blank;
+ transmit(blank);
}
-short int link::
+unsigned short Link::
getVerbose()
{
return verbose;
}
-void link::
-setVerbose(short int _verbose)
+void Link::
+setVerbose(unsigned short _verbose)
{
verbose = _verbose;
+ p->setVerbose(verbose);
}
-short int link::
-getPktVerbose()
+void Link::
+send(const bufferStore & buff)
{
- return p->getVerbose();
+ if (buff.getLen() > 300) {
+ failed = true;
+ } else
+ transmit(buff);
}
-void link::
-setPktVerbose(short int _verbose)
+void Link::
+purgeQueue(int channel)
{
- p->setVerbose(_verbose);
+ pthread_mutex_lock(&queueMutex);
+ vector<ackWaitQueueElement>::iterator i;
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (i->data.getByte(0) == channel) {
+ ackWaitQueue.erase(i);
+ i--;
+ }
+ vector<bufferStore>::iterator j;
+ for (j = holdQueue.begin(); j != holdQueue.end(); j++)
+ if (j->getByte(0) == channel) {
+ holdQueue.erase(j);
+ j--;
+ }
+ pthread_mutex_unlock(&queueMutex);
}
-void link::
-send(const bufferStore & buff)
+void Link::
+sendAck(int seq)
{
- if (buff.getLen() > 300)
- failed = true;
- else
- sendQueue += buff;
+ if (hasFailed())
+ return;
+ bufferStore tmp;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> ack seq=" << seq << endl;
+ if (seq > 7) {
+ int hseq = seq >> 3;
+ int lseq = (seq & 7) | 8;
+ seq = (hseq << 8) + lseq;
+ tmp.prependWord(seq);
+ } else
+ tmp.prependByte(seq);
+ p->send(tmp);
}
-void link::
-purgeQueue(int channel)
+void Link::
+sendCon()
{
- bufferArray hsendQueue;
- bufferStore b;
-
- while (!sendQueue.empty()) {
- b = sendQueue.pop();
- if (b.getByte(0) != channel)
- hsendQueue += b;
- }
- sendQueue = hsendQueue;
+ if (hasFailed())
+ return;
+ bufferStore tmp;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> con seq=4" << endl;
+ tmp.addByte(0x24);
+ tmp.addDWord(conMagic);
+ ackWaitQueueElement e;
+ e.seq = 0; // expected ACK is 0, _NOT_ 4!
+ gettimeofday(&e.stamp, NULL);
+ e.data = tmp;
+ e.txcount = 4;
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.push_back(e);
+ pthread_mutex_unlock(&queueMutex);
+ p->send(tmp);
}
-bufferArray link::
-poll()
+void Link::
+receive(bufferStore buff)
{
- bufferArray ret;
- bufferStore buff;
- unsigned char type;
-
- // RX loop
- while (p->get(type, buff)) {
- int seq = type & 0x0f;
- bufferStore blank;
- type &= 0xf0;
-
- // Support for incoming extended sequence numbers
- if (seq & 0x08) {
- int tseq = buff.getByte(0);
- buff.discardFirstBytes(1);
- seq = (tseq << 3) | (seq & 0x07);
- }
+ vector<ackWaitQueueElement>::iterator i;
+ bool ackFound;
+ bool conFound;
+ int type = buff.getByte(0);
+ int seq = type & 0x0f;
- switch (type) {
- case 0x30:
- // Normal data
- if (verbose & LNK_DEBUG_LOG) {
- cout << "link: << dat seq=" << seq ;
- if (verbose & LNK_DEBUG_DUMP)
- cout << " " << buff << endl;
- else
- cout << " len=" << buff.getLen() << endl;
- }
- // Send ack
- if (idLastGot != seq) {
- idLastGot = seq;
- // Must check for XOFF/XON ncp frames HERE!
- if ((buff.getLen() == 3) && (buff.getByte(0) == 0)) {
- switch (buff.getByte(2)) {
- case 1:
- // XOFF
- xoff[buff.getByte(1)] = true;
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: got XOFF for channel " << buff.getByte(1) << endl;
- break;
- case 2:
- // XON
- xoff[buff.getByte(1)] = false;
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: got XON for channel " << buff.getByte(1) << endl;
- break;
- default:
- ret += buff;
- }
- } else
- ret += buff;
- } else {
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: DUP\n";
- }
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: >> ack seq=" << seq << endl;
- blank.init();
-
- // Support for incoming extended sequence numbers
- if (seq > 7) {
- blank.addByte(seq >> 3);
- seq &= 0x07;
- seq |= 0x08;
- }
+ type &= 0xf0;
+ // Support for incoming extended sequence numbers
+ if (seq & 8) {
+ int tseq = buff.getByte(1);
+ buff.discardFirstBytes(2);
+ seq = (tseq << 3) + (seq & 0x07);
+ } else
+ buff.discardFirstBytes(1);
+
+ switch (type) {
+ case 0x30:
+ // Normal data
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: << dat seq=" << seq ;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buff << endl;
+ else
+ cout << " len=" << buff.getLen() << endl;
+ }
+ sendAck((rxSequence+1) & seqMask);
+
+ if (((rxSequence + 1) & seqMask) == seq) {
+ rxSequence++;
+ rxSequence &= seqMask;
- p->send(seq, blank);
- break;
+ // Must check for XOFF/XON ncp frames HERE!
+ if ((buff.getLen() == 3) && (buff.getByte(0) == 0)) {
+ switch (buff.getByte(2)) {
+ case 1:
+ // XOFF
+ xoff[buff.getByte(1)] = true;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: got XOFF for channel "
+ << buff.getByte(1) << endl;
+ break;
+ case 2:
+ // XON
+ xoff[buff.getByte(1)] = false;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: got XON for channel "
+ << buff.getByte(1) << endl;
+ // Transmit packets on hold queue
+ transmitHoldQueue(buff.getByte(1));
+ break;
+ default:
+ theNCP->receive(buff);
+ }
+ } else
+ theNCP->receive(buff);
- case 0x00:
- // Incoming ack
- if (seq == idSent) {
+ } else {
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: DUP\n";
+ }
+ break;
+
+ case 0x00:
+ // Incoming ack
+ // Find corresponding packet in ackWaitQueue
+ ackFound = false;
+ struct timeval refstamp;
+ pthread_mutex_lock(&queueMutex);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (i->seq == seq) {
+ ackFound = true;
+ refstamp = i->stamp;
+ ackWaitQueue.erase(i);
if (verbose & LNK_DEBUG_LOG) {
- cout << "link: << ack seq=" << seq ;
+ cout << "Link: << ack seq=" << seq ;
if (verbose & LNK_DEBUG_DUMP)
cout << " " << buff;
cout << endl;
}
- somethingToSend = false;
- timesSent = 0;
+ break;
}
- break;
+ pthread_mutex_unlock(&queueMutex);
+ if (ackFound)
+ // Older packets implicitely ack'ed
+ multiAck(refstamp);
+ else {
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: << UNMATCHED ack seq=" << seq ;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buff;
+ cout << endl;
+ }
+ }
+ break;
- case 0x20:
- // New link
+ case 0x20:
+ // New link
+ conFound = false;
+ if (seq > 3) {
+ // May be a link confirm packet (EPOC)
+ pthread_mutex_lock(&queueMutex);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if ((i->seq > 0) && (i->seq < 4) &&
+ (i->data.getByte(0) & 0xf0) == 0x20) {
+ ackWaitQueue.erase(i);
+ conFound = true;
+ // EPOC can handle extended sequence numbers
+ seqMask = 0x7ff;
+ // EPOC can handle up to 8 unacknowledged packets
+ maxOutstanding = 8;
+ p->setEpoc(true);
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: << con seq=" << seq ;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buff;
+ cout << endl;
+ }
+ break;
+ }
+ pthread_mutex_unlock(&queueMutex);
+ }
+ if (conFound) {
+ rxSequence = 0;
+ txSequence = 1;
+ sendAck(rxSequence);
+ } else {
if (verbose & LNK_DEBUG_LOG) {
- cout << "link: << lrq seq=" << seq;
+ cout << "Link: << req seq=" << seq;
if (verbose & LNK_DEBUG_DUMP)
cout << " " << buff;
cout << endl;
}
- idLastGot = 0;
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: >> lack seq=" << seq << endl;
- somethingToSend = false;
- blank.init();
- p->send(idLastGot, blank);
- break;
-
- case 0x10:
- // Disconnect
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: << DISC" << endl;
- failed = true;
- return ret;
- }
- }
+ rxSequence = txSequence = 0;
+ if (seq > 0) {
+ // EPOC can handle extended sequence numbers
+ seqMask = 0x7ff;
+ // EPOC can handle up to 8 unacknowledged packets
+ maxOutstanding = 8;
+ p->setEpoc(true);
+ sendCon();
+ } else
+ sendAck(rxSequence);
+ }
+ break;
- if (p->linkFailed()) {
- failed = true;
- return ret;
+ case 0x10:
+ // Disconnect
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: << DISC" << endl;
+ failed = true;
+ break;
+
+ default:
+ cerr << "Link: FATAL: Unknown packet type " << type << endl;
}
+}
+
+void Link::
+transmitHoldQueue(int channel)
+{
+ vector<bufferStore> tmpQueue;
+ vector<bufferStore>::iterator i;
- if (!somethingToSend) {
- countToResend = 0;
- if (newLink) {
- somethingToSend = true;
- toSend.init();
- newLink = false;
- idSent = 0;
+ // First, move desired packets to a temporary queue
+ pthread_mutex_lock(&queueMutex);
+ for (i = holdQueue.begin(); i != holdQueue.end(); i++)
+ if (i->getByte(0) == channel) {
+ tmpQueue.push_back(*i);
+ holdQueue.erase(i);
+ i--;
+ }
+ pthread_mutex_unlock(&queueMutex);
+
+ // ... then transmit the moved packets
+ for (i = tmpQueue.begin(); i != tmpQueue.end(); i++)
+ transmit(*i);
+}
+
+void Link::
+transmit(bufferStore buf)
+{
+ if (hasFailed())
+ return;
+
+ int remoteChan = buf.getByte(0);
+ if (xoff[remoteChan]) {
+ pthread_mutex_lock(&queueMutex);
+ holdQueue.push_back(buf);
+ pthread_mutex_unlock(&queueMutex);
+ } else {
+
+ // Wait, until backlog is drained.
+ int ql;
+ do {
+ pthread_mutex_lock(&queueMutex);
+ ql = ackWaitQueue.size();
+ pthread_mutex_unlock(&queueMutex);
+ if (ql >= maxOutstanding)
+ usleep(100000);
+ } while (ql >= maxOutstanding);
+
+ ackWaitQueueElement e;
+ e.seq = txSequence++;
+ txSequence &= seqMask;
+ gettimeofday(&e.stamp, NULL);
+ // An empty buffer is considered a new link request
+ if (buf.empty()) {
+ // Request for new link
+ e.txcount = 4;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> req seq=" << e.seq << endl;
+ buf.prependByte(0x20 + e.seq);
} else {
- bufferArray hsendQueue;
-
- while (!sendQueue.empty()) {
- toSend = sendQueue.pop();
- int remoteChan = toSend.getByte(0);
- if (xoff[remoteChan])
- hsendQueue += toSend;
- else {
- somethingToSend = true;
- idSent++;
- if (idSent > 7)
- idSent = 0;
- break;
- }
+ e.txcount = 8;
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: >> dat seq=" << e.seq;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buf;
+ cout << endl;
}
- sendQueue = hsendQueue + sendQueue;
+ if (e.seq > 7) {
+ int hseq = e.seq >> 3;
+ int lseq = 0x30 + ((e.seq & 7) | 8);
+ int seq = (hseq << 8) + lseq;
+ buf.prependWord(seq);
+ } else
+ buf.prependByte(0x30 + e.seq);
}
+ e.data = buf;
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.push_back(e);
+ pthread_mutex_unlock(&queueMutex);
+ p->send(buf);
}
+}
- if (somethingToSend) {
- if (countToResend == 0) {
- timesSent++;
- if (timesSent == 5) {
- failed = true;
- } else {
- if (toSend.empty()) {
- // Request for new link
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: >> lrq seq=" << idSent << endl;
- p->send(0x20 + idSent, toSend);
- } else {
- if (verbose & LNK_DEBUG_LOG) {
- cout << "link: >> data seq=" << idSent;
- if (verbose & LNK_DEBUG_DUMP)
- cout << " " << toSend;
- cout << endl;
- }
- p->send(0x30 + idSent, toSend);
- }
- countToResend = 5;
- }
- } else
- countToResend--;
+static void
+timesub(struct timeval *tv, unsigned long millisecs)
+{
+ uint64_t micros = tv->tv_sec;
+ uint64_t sub = millisecs;
+
+ micros <<= 32;
+ micros += tv->tv_usec;
+ micros -= (sub * 1000);
+ tv->tv_usec = micros & 0xffffffff;
+ tv->tv_sec = (micros >>= 32) & 0xffffffff;
+}
+
+static bool
+olderthan(struct timeval t1, struct timeval t2)
+{
+ uint64_t m1 = t1.tv_sec;
+ uint64_t m2 = t2.tv_sec;
+ m1 <<= 32;
+ m2 <<= 32;
+ m1 += t1.tv_usec;
+ m2 += t2.tv_usec;
+ return (m1 < m2);
+}
+
+void Link::
+multiAck(struct timeval refstamp)
+{
+ vector<ackWaitQueueElement>::iterator i;
+ pthread_mutex_lock(&queueMutex);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (olderthan(i->stamp, refstamp)) {
+ ackWaitQueue.erase(i);
+ i--;
+ }
+ pthread_mutex_unlock(&queueMutex);
+}
+
+void Link::
+retransmit()
+{
+
+ if (hasFailed()) {
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.clear();
+ holdQueue.clear();
+ pthread_mutex_unlock(&queueMutex);
+ return;
}
- return ret;
+ pthread_mutex_lock(&queueMutex);
+ vector<ackWaitQueueElement>::iterator i;
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ struct timeval expired = now;
+ timesub(&expired, retransTimeout);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (olderthan(i->stamp, expired)) {
+ if (i->txcount-- == 0) {
+ // timeout, remove packet
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> TRANSMIT timeout seq=" << i->seq << endl;
+ ackWaitQueue.erase(i);
+ i--;
+ } else {
+ // retransmit it
+ i->stamp = now;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> RETRANSMIT seq=" << i->seq << endl;
+ p->send(i->data);
+ }
+ }
+ pthread_mutex_unlock(&queueMutex);
}
-void link::
+void Link::
flush() {
- while ((!failed) && stuffToSend())
- poll();
+ while (stuffToSend())
+ sleep(1);
}
-bool link::
+bool Link::
stuffToSend()
{
- return (!failed && (somethingToSend || !sendQueue.empty()));
+ return ((!failed) && (!ackWaitQueue.empty()));
}
-bool link::
+bool Link::
hasFailed()
{
+ failed |= p->linkFailed();
return failed;
}