aboutsummaryrefslogtreecommitdiffstats
path: root/ncpd
diff options
context:
space:
mode:
authorFritz Elfert <felfert@to.com>2002-03-05 17:58:11 +0000
committerFritz Elfert <felfert@to.com>2002-03-05 17:58:11 +0000
commitcb2577b29fe7b93e9b168ded7f35da748fdeaf1d (patch)
treed7cf962ead89069f885f8da7137feb94acb3dfec /ncpd
parent8f9ae0a93ba3ea860a28933c2a411eae9365c859 (diff)
downloadplptools-cb2577b29fe7b93e9b168ded7f35da748fdeaf1d.tar.gz
plptools-cb2577b29fe7b93e9b168ded7f35da748fdeaf1d.tar.bz2
plptools-cb2577b29fe7b93e9b168ded7f35da748fdeaf1d.zip
- Re-Implemented lower levels of ncpd (packet and link).
ncpd is now multithreaded. Results in much better performance and less CPU usage.
Diffstat (limited to 'ncpd')
-rw-r--r--ncpd/Makefile.am4
-rw-r--r--ncpd/link.cc592
-rw-r--r--ncpd/link.h172
-rw-r--r--ncpd/linkchan.cc5
-rw-r--r--ncpd/linkchan.h4
-rw-r--r--ncpd/main.cc156
-rw-r--r--ncpd/mp_serial.c11
-rw-r--r--ncpd/ncp.cc109
-rw-r--r--ncpd/ncp.h20
-rw-r--r--ncpd/packet.cc610
-rw-r--r--ncpd/packet.h120
11 files changed, 1131 insertions, 672 deletions
diff --git a/ncpd/Makefile.am b/ncpd/Makefile.am
index 1ead85c..ca715b2 100644
--- a/ncpd/Makefile.am
+++ b/ncpd/Makefile.am
@@ -1,10 +1,12 @@
# $Id$
#
INCLUDES=-I$(top_srcdir)/lib
+CFLAGS += -D_REENTRANT
+CXXFLAGS += -D_REENTRANT
sbin_PROGRAMS = ncpd
-ncpd_LDADD = $(top_srcdir)/lib/libplp.la $(LIBCCMALLOC_CXX)
+ncpd_LDADD = $(top_srcdir)/lib/libplp.la -lpthread $(LIBCCMALLOC_CXX)
ncpd_SOURCES = channel.cc link.cc linkchan.cc main.cc \
ncp.cc packet.cc socketchan.cc mp_serial.c
EXTRA_DIST = channel.h link.h linkchan.h mp_serial.h ncp.h packet.h socketchan.h
diff --git a/ncpd/link.cc b/ncpd/link.cc
index b7d5fb7..228b878 100644
--- a/ncpd/link.cc
+++ b/ncpd/link.cc
@@ -4,7 +4,7 @@
* This file is part of plptools.
*
* Copyright (C) 1999 Philip Proudman <philip.proudman@btinternet.com>
- * Copyright (C) 1999-2001 Fritz Elfert <felfert@to.com>
+ * Copyright (C) 1999-2002 Fritz Elfert <felfert@to.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -27,278 +27,490 @@
#include <stream.h>
#include <stdlib.h>
+#include <unistd.h>
#include <stdio.h>
+#include <sys/time.h>
+#include <plp_inttypes.h>
#include "link.h"
#include "packet.h"
+#include "ncp.h"
#include "bufferstore.h"
#include "bufferarray.h"
-link::link(const char *fname, int baud, IOWatch *iow, unsigned short _verbose)
+extern "C" {
+ static void *expire_check(void *arg)
+ {
+ Link *l = (Link *)arg;
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+ while (1) {
+ usleep(l->retransTimeout * 500);
+ l->retransmit();
+ }
+ }
+};
+
+Link::Link(const char *fname, int baud, ncp *_ncp, unsigned short _verbose)
{
- p = new packet(fname, baud, iow);
+ p = new packet(fname, baud, this, _verbose);
+ retransTimeout = ((unsigned long)baud * 1000 / 13200) + 200;
+ theNCP = _ncp;
verbose = _verbose;
- idSent = 0;
- idLastGot = -1;
- newLink = true;
- somethingToSend = false;
- timesSent = 0;
+ txSequence = 1;
+ rxSequence = -1;
failed = false;
+ seqMask = 7;
+ maxOutstanding = 1;
for (int i = 0; i < 256; i++)
xoff[i] = false;
+ // generate magic number for sendCon()
+ srandom(time(NULL));
+ conMagic = random();
+
+ pthread_mutex_init(&queueMutex, NULL);
+ pthread_create(&checkthread, NULL, expire_check, this);
+
+ // submit a link request
+ bufferStore blank;
+ transmit(blank);
}
-link::~link()
+Link::~Link()
{
flush();
+ pthread_cancel(checkthread);
+ pthread_mutex_destroy(&queueMutex);
delete p;
}
-void link::
+void Link::
reset() {
- idSent = 0;
- idLastGot = -1;
- newLink = true;
- somethingToSend = false;
- timesSent = 0;
+ txSequence = 1;
+ rxSequence = -1;
failed = false;
+
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.clear();
+ holdQueue.clear();
+ pthread_mutex_unlock(&queueMutex);
for (int i = 0; i < 256; i++)
xoff[i] = false;
+
+ // submit a link request
+ bufferStore blank;
+ transmit(blank);
}
-short int link::
+unsigned short Link::
getVerbose()
{
return verbose;
}
-void link::
-setVerbose(short int _verbose)
+void Link::
+setVerbose(unsigned short _verbose)
{
verbose = _verbose;
+ p->setVerbose(verbose);
}
-short int link::
-getPktVerbose()
+void Link::
+send(const bufferStore & buff)
{
- return p->getVerbose();
+ if (buff.getLen() > 300) {
+ failed = true;
+ } else
+ transmit(buff);
}
-void link::
-setPktVerbose(short int _verbose)
+void Link::
+purgeQueue(int channel)
{
- p->setVerbose(_verbose);
+ pthread_mutex_lock(&queueMutex);
+ vector<ackWaitQueueElement>::iterator i;
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (i->data.getByte(0) == channel) {
+ ackWaitQueue.erase(i);
+ i--;
+ }
+ vector<bufferStore>::iterator j;
+ for (j = holdQueue.begin(); j != holdQueue.end(); j++)
+ if (j->getByte(0) == channel) {
+ holdQueue.erase(j);
+ j--;
+ }
+ pthread_mutex_unlock(&queueMutex);
}
-void link::
-send(const bufferStore & buff)
+void Link::
+sendAck(int seq)
{
- if (buff.getLen() > 300)
- failed = true;
- else
- sendQueue += buff;
+ if (hasFailed())
+ return;
+ bufferStore tmp;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> ack seq=" << seq << endl;
+ if (seq > 7) {
+ int hseq = seq >> 3;
+ int lseq = (seq & 7) | 8;
+ seq = (hseq << 8) + lseq;
+ tmp.prependWord(seq);
+ } else
+ tmp.prependByte(seq);
+ p->send(tmp);
}
-void link::
-purgeQueue(int channel)
+void Link::
+sendCon()
{
- bufferArray hsendQueue;
- bufferStore b;
-
- while (!sendQueue.empty()) {
- b = sendQueue.pop();
- if (b.getByte(0) != channel)
- hsendQueue += b;
- }
- sendQueue = hsendQueue;
+ if (hasFailed())
+ return;
+ bufferStore tmp;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> con seq=4" << endl;
+ tmp.addByte(0x24);
+ tmp.addDWord(conMagic);
+ ackWaitQueueElement e;
+ e.seq = 0; // expected ACK is 0, _NOT_ 4!
+ gettimeofday(&e.stamp, NULL);
+ e.data = tmp;
+ e.txcount = 4;
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.push_back(e);
+ pthread_mutex_unlock(&queueMutex);
+ p->send(tmp);
}
-bufferArray link::
-poll()
+void Link::
+receive(bufferStore buff)
{
- bufferArray ret;
- bufferStore buff;
- unsigned char type;
-
- // RX loop
- while (p->get(type, buff)) {
- int seq = type & 0x0f;
- bufferStore blank;
- type &= 0xf0;
-
- // Support for incoming extended sequence numbers
- if (seq & 0x08) {
- int tseq = buff.getByte(0);
- buff.discardFirstBytes(1);
- seq = (tseq << 3) | (seq & 0x07);
- }
+ vector<ackWaitQueueElement>::iterator i;
+ bool ackFound;
+ bool conFound;
+ int type = buff.getByte(0);
+ int seq = type & 0x0f;
- switch (type) {
- case 0x30:
- // Normal data
- if (verbose & LNK_DEBUG_LOG) {
- cout << "link: << dat seq=" << seq ;
- if (verbose & LNK_DEBUG_DUMP)
- cout << " " << buff << endl;
- else
- cout << " len=" << buff.getLen() << endl;
- }
- // Send ack
- if (idLastGot != seq) {
- idLastGot = seq;
- // Must check for XOFF/XON ncp frames HERE!
- if ((buff.getLen() == 3) && (buff.getByte(0) == 0)) {
- switch (buff.getByte(2)) {
- case 1:
- // XOFF
- xoff[buff.getByte(1)] = true;
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: got XOFF for channel " << buff.getByte(1) << endl;
- break;
- case 2:
- // XON
- xoff[buff.getByte(1)] = false;
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: got XON for channel " << buff.getByte(1) << endl;
- break;
- default:
- ret += buff;
- }
- } else
- ret += buff;
- } else {
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: DUP\n";
- }
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: >> ack seq=" << seq << endl;
- blank.init();
-
- // Support for incoming extended sequence numbers
- if (seq > 7) {
- blank.addByte(seq >> 3);
- seq &= 0x07;
- seq |= 0x08;
- }
+ type &= 0xf0;
+ // Support for incoming extended sequence numbers
+ if (seq & 8) {
+ int tseq = buff.getByte(1);
+ buff.discardFirstBytes(2);
+ seq = (tseq << 3) + (seq & 0x07);
+ } else
+ buff.discardFirstBytes(1);
+
+ switch (type) {
+ case 0x30:
+ // Normal data
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: << dat seq=" << seq ;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buff << endl;
+ else
+ cout << " len=" << buff.getLen() << endl;
+ }
+ sendAck((rxSequence+1) & seqMask);
+
+ if (((rxSequence + 1) & seqMask) == seq) {
+ rxSequence++;
+ rxSequence &= seqMask;
- p->send(seq, blank);
- break;
+ // Must check for XOFF/XON ncp frames HERE!
+ if ((buff.getLen() == 3) && (buff.getByte(0) == 0)) {
+ switch (buff.getByte(2)) {
+ case 1:
+ // XOFF
+ xoff[buff.getByte(1)] = true;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: got XOFF for channel "
+ << buff.getByte(1) << endl;
+ break;
+ case 2:
+ // XON
+ xoff[buff.getByte(1)] = false;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: got XON for channel "
+ << buff.getByte(1) << endl;
+ // Transmit packets on hold queue
+ transmitHoldQueue(buff.getByte(1));
+ break;
+ default:
+ theNCP->receive(buff);
+ }
+ } else
+ theNCP->receive(buff);
- case 0x00:
- // Incoming ack
- if (seq == idSent) {
+ } else {
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: DUP\n";
+ }
+ break;
+
+ case 0x00:
+ // Incoming ack
+ // Find corresponding packet in ackWaitQueue
+ ackFound = false;
+ struct timeval refstamp;
+ pthread_mutex_lock(&queueMutex);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (i->seq == seq) {
+ ackFound = true;
+ refstamp = i->stamp;
+ ackWaitQueue.erase(i);
if (verbose & LNK_DEBUG_LOG) {
- cout << "link: << ack seq=" << seq ;
+ cout << "Link: << ack seq=" << seq ;
if (verbose & LNK_DEBUG_DUMP)
cout << " " << buff;
cout << endl;
}
- somethingToSend = false;
- timesSent = 0;
+ break;
}
- break;
+ pthread_mutex_unlock(&queueMutex);
+ if (ackFound)
+ // Older packets implicitely ack'ed
+ multiAck(refstamp);
+ else {
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: << UNMATCHED ack seq=" << seq ;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buff;
+ cout << endl;
+ }
+ }
+ break;
- case 0x20:
- // New link
+ case 0x20:
+ // New link
+ conFound = false;
+ if (seq > 3) {
+ // May be a link confirm packet (EPOC)
+ pthread_mutex_lock(&queueMutex);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if ((i->seq > 0) && (i->seq < 4) &&
+ (i->data.getByte(0) & 0xf0) == 0x20) {
+ ackWaitQueue.erase(i);
+ conFound = true;
+ // EPOC can handle extended sequence numbers
+ seqMask = 0x7ff;
+ // EPOC can handle up to 8 unacknowledged packets
+ maxOutstanding = 8;
+ p->setEpoc(true);
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: << con seq=" << seq ;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buff;
+ cout << endl;
+ }
+ break;
+ }
+ pthread_mutex_unlock(&queueMutex);
+ }
+ if (conFound) {
+ rxSequence = 0;
+ txSequence = 1;
+ sendAck(rxSequence);
+ } else {
if (verbose & LNK_DEBUG_LOG) {
- cout << "link: << lrq seq=" << seq;
+ cout << "Link: << req seq=" << seq;
if (verbose & LNK_DEBUG_DUMP)
cout << " " << buff;
cout << endl;
}
- idLastGot = 0;
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: >> lack seq=" << seq << endl;
- somethingToSend = false;
- blank.init();
- p->send(idLastGot, blank);
- break;
-
- case 0x10:
- // Disconnect
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: << DISC" << endl;
- failed = true;
- return ret;
- }
- }
+ rxSequence = txSequence = 0;
+ if (seq > 0) {
+ // EPOC can handle extended sequence numbers
+ seqMask = 0x7ff;
+ // EPOC can handle up to 8 unacknowledged packets
+ maxOutstanding = 8;
+ p->setEpoc(true);
+ sendCon();
+ } else
+ sendAck(rxSequence);
+ }
+ break;
- if (p->linkFailed()) {
- failed = true;
- return ret;
+ case 0x10:
+ // Disconnect
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: << DISC" << endl;
+ failed = true;
+ break;
+
+ default:
+ cerr << "Link: FATAL: Unknown packet type " << type << endl;
}
+}
+
+void Link::
+transmitHoldQueue(int channel)
+{
+ vector<bufferStore> tmpQueue;
+ vector<bufferStore>::iterator i;
- if (!somethingToSend) {
- countToResend = 0;
- if (newLink) {
- somethingToSend = true;
- toSend.init();
- newLink = false;
- idSent = 0;
+ // First, move desired packets to a temporary queue
+ pthread_mutex_lock(&queueMutex);
+ for (i = holdQueue.begin(); i != holdQueue.end(); i++)
+ if (i->getByte(0) == channel) {
+ tmpQueue.push_back(*i);
+ holdQueue.erase(i);
+ i--;
+ }
+ pthread_mutex_unlock(&queueMutex);
+
+ // ... then transmit the moved packets
+ for (i = tmpQueue.begin(); i != tmpQueue.end(); i++)
+ transmit(*i);
+}
+
+void Link::
+transmit(bufferStore buf)
+{
+ if (hasFailed())
+ return;
+
+ int remoteChan = buf.getByte(0);
+ if (xoff[remoteChan]) {
+ pthread_mutex_lock(&queueMutex);
+ holdQueue.push_back(buf);
+ pthread_mutex_unlock(&queueMutex);
+ } else {
+
+ // Wait, until backlog is drained.
+ int ql;
+ do {
+ pthread_mutex_lock(&queueMutex);
+ ql = ackWaitQueue.size();
+ pthread_mutex_unlock(&queueMutex);
+ if (ql >= maxOutstanding)
+ usleep(100000);
+ } while (ql >= maxOutstanding);
+
+ ackWaitQueueElement e;
+ e.seq = txSequence++;
+ txSequence &= seqMask;
+ gettimeofday(&e.stamp, NULL);
+ // An empty buffer is considered a new link request
+ if (buf.empty()) {
+ // Request for new link
+ e.txcount = 4;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> req seq=" << e.seq << endl;
+ buf.prependByte(0x20 + e.seq);
} else {
- bufferArray hsendQueue;
-
- while (!sendQueue.empty()) {
- toSend = sendQueue.pop();
- int remoteChan = toSend.getByte(0);
- if (xoff[remoteChan])
- hsendQueue += toSend;
- else {
- somethingToSend = true;
- idSent++;
- if (idSent > 7)
- idSent = 0;
- break;
- }
+ e.txcount = 8;
+ if (verbose & LNK_DEBUG_LOG) {
+ cout << "Link: >> dat seq=" << e.seq;
+ if (verbose & LNK_DEBUG_DUMP)
+ cout << " " << buf;
+ cout << endl;
}
- sendQueue = hsendQueue + sendQueue;
+ if (e.seq > 7) {
+ int hseq = e.seq >> 3;
+ int lseq = 0x30 + ((e.seq & 7) | 8);
+ int seq = (hseq << 8) + lseq;
+ buf.prependWord(seq);
+ } else
+ buf.prependByte(0x30 + e.seq);
}
+ e.data = buf;
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.push_back(e);
+ pthread_mutex_unlock(&queueMutex);
+ p->send(buf);
}
+}
- if (somethingToSend) {
- if (countToResend == 0) {
- timesSent++;
- if (timesSent == 5) {
- failed = true;
- } else {
- if (toSend.empty()) {
- // Request for new link
- if (verbose & LNK_DEBUG_LOG)
- cout << "link: >> lrq seq=" << idSent << endl;
- p->send(0x20 + idSent, toSend);
- } else {
- if (verbose & LNK_DEBUG_LOG) {
- cout << "link: >> data seq=" << idSent;
- if (verbose & LNK_DEBUG_DUMP)
- cout << " " << toSend;
- cout << endl;
- }
- p->send(0x30 + idSent, toSend);
- }
- countToResend = 5;
- }
- } else
- countToResend--;
+static void
+timesub(struct timeval *tv, unsigned long millisecs)
+{
+ uint64_t micros = tv->tv_sec;
+ uint64_t sub = millisecs;
+
+ micros <<= 32;
+ micros += tv->tv_usec;
+ micros -= (sub * 1000);
+ tv->tv_usec = micros & 0xffffffff;
+ tv->tv_sec = (micros >>= 32) & 0xffffffff;
+}
+
+static bool
+olderthan(struct timeval t1, struct timeval t2)
+{
+ uint64_t m1 = t1.tv_sec;
+ uint64_t m2 = t2.tv_sec;
+ m1 <<= 32;
+ m2 <<= 32;
+ m1 += t1.tv_usec;
+ m2 += t2.tv_usec;
+ return (m1 < m2);
+}
+
+void Link::
+multiAck(struct timeval refstamp)
+{
+ vector<ackWaitQueueElement>::iterator i;
+ pthread_mutex_lock(&queueMutex);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (olderthan(i->stamp, refstamp)) {
+ ackWaitQueue.erase(i);
+ i--;
+ }
+ pthread_mutex_unlock(&queueMutex);
+}
+
+void Link::
+retransmit()
+{
+
+ if (hasFailed()) {
+ pthread_mutex_lock(&queueMutex);
+ ackWaitQueue.clear();
+ holdQueue.clear();
+ pthread_mutex_unlock(&queueMutex);
+ return;
}
- return ret;
+ pthread_mutex_lock(&queueMutex);
+ vector<ackWaitQueueElement>::iterator i;
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ struct timeval expired = now;
+ timesub(&expired, retransTimeout);
+ for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
+ if (olderthan(i->stamp, expired)) {
+ if (i->txcount-- == 0) {
+ // timeout, remove packet
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> TRANSMIT timeout seq=" << i->seq << endl;
+ ackWaitQueue.erase(i);
+ i--;
+ } else {
+ // retransmit it
+ i->stamp = now;
+ if (verbose & LNK_DEBUG_LOG)
+ cout << "Link: >> RETRANSMIT seq=" << i->seq << endl;
+ p->send(i->data);
+ }
+ }
+ pthread_mutex_unlock(&queueMutex);
}
-void link::
+void Link::
flush() {
- while ((!failed) && stuffToSend())
- poll();
+ while (stuffToSend())
+ sleep(1);
}
-bool link::
+bool Link::
stuffToSend()
{
- return (!failed && (somethingToSend || !sendQueue.empty()));
+ return ((!failed) && (!ackWaitQueue.empty()));
}
-bool link::
+bool Link::
hasFailed()
{
+ failed |= p->linkFailed();
return failed;
}
diff --git a/ncpd/link.h b/ncpd/link.h
index 4fbd391..0830bbc 100644
--- a/ncpd/link.h
+++ b/ncpd/link.h
@@ -4,7 +4,7 @@
* This file is part of plptools.
*
* Copyright (C) 1999 Philip Proudman <philip.proudman@btinternet.com>
- * Copyright (C) 1999-2001 Fritz Elfert <felfert@to.com>
+ * Copyright (C) 1999-2002 Fritz Elfert <felfert@to.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -27,44 +27,148 @@
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
+#include <pthread.h>
+
#include "bufferstore.h"
#include "bufferarray.h"
+#include <vector>
-#define LNK_DEBUG_LOG 1
-#define LNK_DEBUG_DUMP 2
+#define LNK_DEBUG_LOG 4
+#define LNK_DEBUG_DUMP 8
+class ncp;
class packet;
-class IOWatch;
-
-class link {
- public:
- link(const char *fname, int baud, IOWatch *iow, unsigned short _verbose = 0);
- ~link();
- void send(const bufferStore &buff);
- bufferArray poll();
- bool stuffToSend();
- bool hasFailed();
- void reset();
- void flush();
- void purgeQueue(int);
- void setVerbose(short int);
- short int getVerbose();
- void setPktVerbose(short int);
- short int getPktVerbose();
-
- private:
- packet *p;
- int idSent;
- int countToResend;
- int timesSent;
- bufferArray sendQueue;
- bufferStore toSend;
- int idLastGot;
- bool newLink;
- unsigned short verbose;
- bool somethingToSend;
- bool failed;
- bool xoff[256];
+
+/**
+ * Describes a transmitted packet which has not yet
+ * been acknowledged by the peer.
+ */
+typedef struct {
+ /**
+ * Original sequence number.
+ */
+ int seq;
+ /**
+ * Number of remaining transmit retries.
+ */
+ int txcount;
+ /**
+ * Time of last transmit.
+ */
+ struct timeval stamp;
+ /**
+ * Packet content.
+ */
+ bufferStore data;
+} ackWaitQueueElement;
+
+extern "C" {
+ static void *expire_check(void *);
+}
+
+class Link {
+public:
+
+ /**
+ * Construct a new link instance.
+ *
+ * @param fname Name of serial device.
+ * @param baud Speed of serial device.
+ * @param ncp The calling ncp instance.
+ * @_verbose Verbosity (for debugging/troubleshooting)
+ */
+ Link(const char *fname, int baud, ncp *_ncp, unsigned short _verbose = 0);
+
+ /**
+ * Disconnects from device and destroys instance.
+ */
+ ~Link();
+
+ /**
+ * Send a PLP packet to the Peer.
+ *
+ * @param buff The contents of the PLP packet.
+ */
+ void send(const bufferStore &buff);
+
+ /**
+ * Query outstanding packets.
+ *
+ * @returns true, if packets are outstanding (not yet acknowledged), false
+ * otherwise.
+ */
+ bool stuffToSend();
+
+ /**
+ * Query connection failure.
+ *
+ * @returns true, if the peer could not be contacted or did not response,
+ * false if everything is ok.
+ */
+ bool hasFailed();
+
+ /**
+ * Reset connection and attempt to reconnect to the peer.
+ */
+ void reset();
+
+ /**
+ * Wait, until all outstanding packets are acknowledged or timed out.
+ */
+ void flush();
+
+ /**
+ * Purge all outstanding packets for a specified remote channel.
+ *
+ * @param channel The of the channel for which to remove outstanding
+ * packets.
+ */
+ void purgeQueue(int channel);
+
+ /**
+ * Set verbosity of Link and underlying packet instance.
+ *
+ * @param _verbose Verbosity (a bitmapped value, see LINK_DEBUG_.. constants)
+ */
+ void setVerbose(unsigned short _verbose);
+
+ /**
+ * Get current verbosity of Link.
+ *
+ * @returns The verbosity, specified at construction or last call to
+ * setVerbosity();
+ */
+ unsigned short getVerbose();
+
+private:
+ friend class packet;
+ friend void * ::expire_check(void *);
+
+ void receive(bufferStore buf);
+ void transmit(bufferStore buf);
+ void sendAck(int seq);
+ void sendCon();
+ void multiAck(struct timeval);
+ void retransmit();
+ void transmitHoldQueue(int channel);
+
+ pthread_t checkthread;
+ pthread_mutex_t queueMutex;
+
+ ncp *theNCP;
+ packet *p;
+ int txSequence;
+ int rxSequence;
+ int seqMask;
+ int maxOutstanding;
+ unsigned long retransTimeout;
+ unsigned long conMagic;
+ unsigned short verbose;
+ bool failed;
+
+ vector<ackWaitQueueElement> ackWaitQueue;
+ vector<bufferStore> holdQueue;
+ bool xoff[256];
};
#endif
diff --git a/ncpd/linkchan.cc b/ncpd/linkchan.cc
index b95ec64..850b959 100644
--- a/ncpd/linkchan.cc
+++ b/ncpd/linkchan.cc
@@ -26,12 +26,15 @@
#include <string>
#include "linkchan.h"
+#include "ncp.h"
#include "bufferstore.h"
#include "bufferarray.h"
-linkChan::linkChan(ncp * _ncpController):channel(_ncpController)
+linkChan::linkChan(ncp * _ncpController, int _ncpChannel):channel(_ncpController)
{
registerSer = 0x1234;
+ if (_ncpChannel != -1)
+ setNcpChannel(_ncpChannel);
ncpConnect();
}
diff --git a/ncpd/linkchan.h b/ncpd/linkchan.h
index 31df803..6a073c4 100644
--- a/ncpd/linkchan.h
+++ b/ncpd/linkchan.h
@@ -32,8 +32,8 @@
class linkChan : public channel {
public:
- linkChan(ncp *ncpController);
-
+ linkChan(ncp *ncpController, int ncpChannel = -1);
+
void ncpDataCallback(bufferStore &a);
char *getNcpRegisterName();
void ncpConnectAck();
diff --git a/ncpd/main.cc b/ncpd/main.cc
index b2f550f..279f0f5 100644
--- a/ncpd/main.cc
+++ b/ncpd/main.cc
@@ -34,6 +34,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include <pthread.h>
#include "ncp.h"
#include "bufferstore.h"
@@ -47,6 +48,15 @@
static bool verbose = false;
static bool active = true;
+static bool autoexit = false;
+
+static ncp *theNCP = NULL;
+static IOWatch iow;
+static IOWatch accept_iow;
+static ppsocket skt;
+static int numScp = 0;
+static socketChan *scp[257]; // MAX_CHANNELS_PSION + 1
+
static RETSIGTYPE
term_handler(int)
@@ -65,15 +75,19 @@ int_handler(int)
};
void
-checkForNewSocketConnection(ppsocket & skt, int &numScp, socketChan ** scp, ncp * a)
+checkForNewSocketConnection()
{
string peer;
- ppsocket *next = skt.accept(&peer);
+ if (accept_iow.watch(5,0) <= 0) {
+ return;
+ }
+ ppsocket *next = skt.accept(&peer, &iow);
if (next != NULL) {
+ next->setWatch(&iow);
// New connect
if (verbose)
cout << "New socket connection from " << peer << endl;
- if ((numScp >= a->maxLinks()) || (!a->gotLinkChannel())) {
+ if ((numScp >= theNCP->maxLinks()) || (!theNCP->gotLinkChannel())) {
bufferStore a;
// Give the client time to send it's version request.
@@ -87,49 +101,68 @@ checkForNewSocketConnection(ppsocket & skt, int &numScp, socketChan ** scp, ncp
if (verbose)
cout << "rejected" << endl;
} else
- scp[numScp++] = new socketChan(next, a);
+ scp[numScp++] = new socketChan(next, theNCP);
}
}
-void
-pollSocketConnections(int &numScp, socketChan ** scp)
+void *
+pollSocketConnections(void *)
{
- for (int i = 0; i < numScp; i++) {
- scp[i]->socketPoll();
- if (scp[i]->terminate()) {
- // Requested channel termination
- delete scp[i];
- numScp--;
- for (int j = i; j < numScp; j++)
- scp[j] = scp[j + 1];
- i--;
- }
+ while (active) {
+ iow.watch(0, 10000);
+ for (int i = 0; i < numScp; i++) {
+ scp[i]->socketPoll();
+ if (scp[i]->terminate()) {
+ // Requested channel termination
+ delete scp[i];
+ numScp--;
+ for (int j = i; j < numScp; j++)
+ scp[j] = scp[j + 1];
+ i--;
+ }
+ }
}
+ return NULL;
}
void
usage()
{
- cerr << "Usage : ncpd [-V] [-v logclass] [-d] [-e] [-p <port>] [-s <device>] [-b <baudrate>]\n";
+ cerr << "Usage : ncpd [-V] [-v logclass] [-d] [-e] [-p [<host>:]<port>] [-s <device>] [-b <baudrate>]\n";
exit(1);
}
+static void *
+link_thread(void *arg)
+{
+ while (active) {
+ // psion
+ iow.watch(1, 0);
+ if (theNCP->hasFailed()) {
+ if (autoexit) {
+ active = false;
+ break;
+ }
+ iow.watch(5, 0);
+ if (verbose)
+ cout << "ncp: restarting\n";
+ theNCP->reset();
+ }
+ }
+ return NULL;
+}
+
int
main(int argc, char **argv)
{
- ppsocket skt;
- IOWatch iow;
int pid;
bool dofork = true;
- bool autoexit = false;
int sockNum = DPORT;
int baudRate = DSPEED;
const char *host = "127.0.0.1";
const char *serialDevice = NULL;
- short int nverbose = 0;
- short int pverbose = 0;
- short int lverbose = 0;
+ unsigned short nverbose = 0;
struct servent *se = getservbyname("psion", "tcp");
endservent();
@@ -169,22 +202,21 @@ main(int argc, char **argv)
if (!strcmp(argv[i], "nd"))
nverbose |= NCP_DEBUG_DUMP;
if (!strcmp(argv[i], "ll"))
- lverbose |= LNK_DEBUG_LOG;
+ nverbose |= LNK_DEBUG_LOG;
if (!strcmp(argv[i], "ld"))
- lverbose |= LNK_DEBUG_DUMP;
+ nverbose |= LNK_DEBUG_DUMP;
if (!strcmp(argv[i], "pl"))
- pverbose |= PKT_DEBUG_LOG;
+ nverbose |= PKT_DEBUG_LOG;
if (!strcmp(argv[i], "pd"))
- pverbose |= PKT_DEBUG_DUMP;
+ nverbose |= PKT_DEBUG_DUMP;
if (!strcmp(argv[i], "ph"))
- pverbose |= PKT_DEBUG_HANDSHAKE;
+ nverbose |= PKT_DEBUG_HANDSHAKE;
if (!strcmp(argv[i], "m"))
verbose = true;
if (!strcmp(argv[i], "all")) {
- nverbose = NCP_DEBUG_LOG | NCP_DEBUG_DUMP;
- lverbose = LNK_DEBUG_LOG | LNK_DEBUG_DUMP;
- pverbose = PKT_DEBUG_LOG | PKT_DEBUG_DUMP |
- PKT_DEBUG_HANDSHAKE;
+ nverbose = NCP_DEBUG_LOG | NCP_DEBUG_DUMP |
+ LNK_DEBUG_LOG | LNK_DEBUG_DUMP |
+ PKT_DEBUG_LOG | PKT_DEBUG_DUMP | PKT_DEBUG_HANDSHAKE;
verbose = true;
}
} else if (!strcmp(argv[i], "-b") && i + 1 < argc) {
@@ -217,9 +249,10 @@ main(int argc, char **argv)
case 0:
signal(SIGTERM, term_handler);
signal(SIGINT, int_handler);
- skt.setWatch(&iow);
+ skt.setWatch(&accept_iow);
if (!skt.listen(host, sockNum))
- cerr << "listen on " << host << ":" << sockNum << ": " << strerror(errno) << endl;
+ cerr << "listen on " << host << ":" << sockNum << ": "
+ << strerror(errno) << endl;
else {
if (dofork || autoexit) {
logbuf dlog(LOG_DEBUG);
@@ -230,8 +263,8 @@ main(int argc, char **argv)
cerr = lerr;
openlog("ncpd", LOG_CONS|LOG_PID, LOG_DAEMON);
syslog(LOG_INFO,
- "daemon started. Listening at %s:%d, using device %s\n",
- host, sockNum, serialDevice);
+ "daemon started. Listening at %s:%d, "
+ "using device %s\n", host, sockNum, serialDevice);
setsid();
chdir("/");
int devnull =
@@ -244,37 +277,28 @@ main(int argc, char **argv)
close(devnull);
}
}
- ncp *a = new ncp(serialDevice, baudRate, &iow);
- int numScp = 0;
- socketChan *scp[257]; // MAX_CHANNELS_PSION + 1
-
- a->setVerbose(nverbose);
- a->setLinkVerbose(lverbose);
- a->setPktVerbose(pverbose);
- while (active) {
- // sockets
- pollSocketConnections(numScp, scp);
- checkForNewSocketConnection(skt, numScp, scp, a);
-
- // psion
- a->poll();
-
- if (a->stuffToSend())
- iow.watch(0, 100000);
- else
- iow.watch(1, 0);
-
- if (a->hasFailed()) {
- if (autoexit)
- break;
-
- iow.watch(5, 0);
- if (verbose)
- cout << "ncp: restarting\n";
- a->reset();
- }
+ memset(scp, 0, sizeof(scp));
+ theNCP = new ncp(serialDevice, baudRate, nverbose);
+ if (!theNCP) {
+ cerr << "Could not create NCP object" << endl;
+ exit(-1);
+ }
+ pthread_t thr_a, thr_b;
+ if (pthread_create(&thr_a, NULL, link_thread, NULL) != 0) {
+ cerr << "Could not create Link thread" << endl;
+ exit(-1);
+ }
+ if (pthread_create(&thr_a, NULL,
+ pollSocketConnections, NULL) != 0) {
+ cerr << "Could not create Socket thread" << endl;
+ exit(-1);
}
- delete a;
+ while (active)
+ checkForNewSocketConnection();
+ void *ret;
+ pthread_join(thr_a, &ret);
+ pthread_join(thr_b, &ret);
+ delete theNCP;
}
skt.closeSocket();
break;
diff --git a/ncpd/mp_serial.c b/ncpd/mp_serial.c
index 155dea1..7f6a792 100644
--- a/ncpd/mp_serial.c
+++ b/ncpd/mp_serial.c
@@ -67,7 +67,7 @@
int
init_serial(const char *dev, int speed, int debug)
{
- int fd, baud, clocal;
+ int fd, baud;
int uid, euid;
struct termios ti;
#ifdef hpux
@@ -127,12 +127,11 @@ init_serial(const char *dev, int speed, int debug)
#define seteuid(a) setresuid(-1, a, -1)
#endif
- clocal = CLOCAL;
if (seteuid(uid)) {
perror("seteuid");
exit(1);
}
- if ((fd = open(dev, O_RDWR | O_NDELAY | O_NOCTTY, 0)) < 0) {
+ if ((fd = open(dev, O_RDWR /*FRITZTEST | O_NDELAY */ | O_NOCTTY, 0)) < 0) {
perror(dev);
exit(1);
}
@@ -150,12 +149,12 @@ init_serial(const char *dev, int speed, int debug)
memset(&ti, 0, sizeof(struct termios));
#if defined(hpux) || defined(_IBMR2)
- ti.c_cflag = CS8 | HUPCL | clocal | CREAD;
+ ti.c_cflag = CS8 | HUPCL | CLOCAL | CREAD;
#endif
#if defined(sun) || defined(linux) || defined(__sgi) || \
defined(__NetBSD__) || defined(__FreeBSD__)
- ti.c_cflag = CS8 | HUPCL | clocal | CRTSCTS | CREAD;
- ti.c_iflag = IGNBRK | IGNPAR | IXON | IXOFF;
+ ti.c_cflag = CS8 | HUPCL | CLOCAL | CRTSCTS | CREAD;
+ ti.c_iflag = IGNBRK | IGNPAR /*| IXON | IXOFF */;
ti.c_cc[VMIN] = 1;
ti.c_cc[VTIME] = 0;
#endif
diff --git a/ncpd/ncp.cc b/ncpd/ncp.cc
index 031487f..76f4d63 100644
--- a/ncpd/ncp.cc
+++ b/ncpd/ncp.cc
@@ -39,15 +39,17 @@
#define MAX_CHANNELS_SIBO 8
#define NCP_SENDLEN 250
-ncp::ncp(const char *fname, int baud, IOWatch *iow)
+ncp::ncp(const char *fname, int baud, unsigned short _verbose)
{
channelPtr = new channel*[MAX_CHANNELS_PSION + 1];
+ assert(channelPtr);
messageList = new bufferStore[MAX_CHANNELS_PSION + 1];
+ assert(messageList);
remoteChanList = new int[MAX_CHANNELS_PSION + 1];
+ assert(remoteChanList);
- l = new link(fname, baud, iow);
failed = false;
- verbose = 0;
+ verbose = _verbose;
// until detected on receipt of INFO we use these.
maxChannels = MAX_CHANNELS_SIBO;
@@ -56,13 +58,16 @@ ncp::ncp(const char *fname, int baud, IOWatch *iow)
// init channels
for (int i = 0; i < MAX_CHANNELS_PSION; i++)
channelPtr[i] = NULL;
+
+ l = new Link(fname, baud, this, verbose);
+ assert(l);
}
ncp::~ncp()
{
bufferStore b;
for (int i = 0; i < maxLinks(); i++) {
- if (channelPtr[i]) {
+ if (isValidChannel(i)) {
bufferStore b2;
b2.addByte(remoteChanList[i]);
controlChannel(i, NCON_MSG_CHANNEL_DISCONNECT, b2);
@@ -84,7 +89,7 @@ maxLinks() {
void ncp::
reset() {
for (int i = 0; i < maxLinks(); i++) {
- if (channelPtr[i])
+ if (isValidChannel(i))
channelPtr[i]->terminateWhenAsked();
channelPtr[i] = NULL;
}
@@ -94,40 +99,17 @@ reset() {
l->reset();
}
-short int ncp::
+unsigned short ncp::
getVerbose()
{
return verbose;
}
void ncp::
-setVerbose(short int _verbose)
+setVerbose(unsigned short _verbose)
{
verbose = _verbose;
-}
-
-short int ncp::
-getLinkVerbose()
-{
- return l->getVerbose();
-}
-
-void ncp::
-setLinkVerbose(short int _verbose)
-{
- l->setVerbose(_verbose);
-}
-
-short int ncp::
-getPktVerbose()
-{
- return l->getPktVerbose();
-}
-
-void ncp::
-setPktVerbose(short int _verbose)
-{
- l->setPktVerbose(_verbose);
+ l->setVerbose(verbose);
}
short int ncp::
@@ -137,34 +119,29 @@ getProtocolVersion()
}
void ncp::
-poll()
-{
- bufferArray res(l->poll());
- while (!res.empty()) {
- bufferStore s = res.pop();
- if (s.getLen() > 1) {
- int channel = s.getByte(0);
- s.discardFirstBytes(1);
- if (channel == 0) {
- decodeControlMessage(s);
+receive(bufferStore s) {
+ if (s.getLen() > 1) {
+ int channel = s.getByte(0);
+ s.discardFirstBytes(1);
+ if (channel == 0) {
+ decodeControlMessage(s);
+ } else {
+ int allData = s.getByte(1);
+ s.discardFirstBytes(2);
+ if (!isValidChannel(channel)) {
+ cerr << "ncp: Got message for unknown channel\n";
} else {
- int allData = s.getByte(1);
- s.discardFirstBytes(2);
- if (channelPtr[channel] == NULL) {
- cerr << "ncp: Got message for unknown channel\n";
- } else {
- messageList[channel].addBuff(s);
- if (allData == LAST_MESS) {
- channelPtr[channel]->ncpDataCallback(messageList[channel]);
- messageList[channel].init();
- } else if (allData != NOT_LAST_MESS) {
- cerr << "ncp: bizarre third byte!\n";
- }
+ messageList[channel].addBuff(s);
+ if (allData == LAST_MESS) {
+ channelPtr[channel]->ncpDataCallback(messageList[channel]);
+ messageList[channel].init();
+ } else if (allData != NOT_LAST_MESS) {
+ cerr << "ncp: bizarre third byte!\n";
}
}
- } else
- cerr << "Got null message\n";
- }
+ }
+ } else
+ cerr << "Got null message\n";
}
void ncp::
@@ -206,7 +183,7 @@ decodeControlMessage(bufferStore & buff)
// Ack with connect response
localChan = getFirstUnusedChan();
b.addByte(remoteChan);
- b.addByte(0x0);
+ b.addByte(0);
controlChannel(localChan, NCON_MSG_CONNECT_RESPONSE, b);
// NOTE: we don't allow connections from the
@@ -219,10 +196,9 @@ decodeControlMessage(bufferStore & buff)
failed = true;
if (verbose & NCP_DEBUG_LOG)
cout << "ncp: Link UP" << endl;
- channelPtr[localChan] = lChan = new linkChan(this);
- lChan->setNcpChannel(localChan);
- lChan->ncpConnectAck();
+ channelPtr[localChan] = lChan = new linkChan(this, localChan);
lChan->setVerbose(verbose);
+ lChan->ncpConnectAck();
} else {
if (verbose & NCP_DEBUG_LOG)
cout << "ncp: REJECT connect" << endl;
@@ -242,7 +218,7 @@ decodeControlMessage(bufferStore & buff)
if (buff.getByte(1) == 0) {
if (verbose & NCP_DEBUG_LOG)
cout << "OK" << endl;
- if (channelPtr[forChan]) {
+ if (isValidChannel(forChan)) {
remoteChanList[forChan] = remoteChan;
channelPtr[forChan]->ncpConnectAck();
} else {
@@ -252,7 +228,7 @@ decodeControlMessage(bufferStore & buff)
} else {
if (verbose & NCP_DEBUG_LOG)
cout << "Unknown " << (int) buff.getByte(1) << endl;
- if (channelPtr[forChan])
+ if (isValidChannel(forChan))
channelPtr[forChan]->ncpConnectNak();
}
break;
@@ -322,12 +298,19 @@ getFirstUnusedChan()
if (channelPtr[cNum] == NULL) {
if (verbose & NCP_DEBUG_LOG)
cout << "ncp: getFirstUnusedChan=" << cNum << endl;
+ channelPtr[cNum] = (channel *)0xdeadbeef;
return cNum;
}
}
return 0;
}
+bool ncp::
+isValidChannel(int channel)
+{
+ return (channelPtr[channel] && ((int)channelPtr[channel] != 0xdeadbeef));
+}
+
void ncp::
RegisterAck(int chan, const char *name)
{
@@ -415,7 +398,7 @@ send(int channel, bufferStore & a)
void ncp::
disconnect(int channel)
{
- if (channelPtr[channel] == NULL) {
+ if (!isValidChannel(channel)) {
cerr << "ncp: Ignored disconnect for unknown channel #" << channel << endl;
return;
}
diff --git a/ncpd/ncp.h b/ncpd/ncp.h
index c0c5abe..72c5ca8 100644
--- a/ncpd/ncp.h
+++ b/ncpd/ncp.h
@@ -29,16 +29,15 @@
#endif
#include "bufferstore.h"
#include "linkchan.h"
-class link;
+class Link;
class channel;
-class IOWatch;
#define NCP_DEBUG_LOG 1
#define NCP_DEBUG_DUMP 2
class ncp {
public:
- ncp(const char *fname, int baud, IOWatch *iow);
+ ncp(const char *fname, int baud, unsigned short _verbose = 0);
~ncp();
int connect(channel *c); // returns channel, or -1 if failure
@@ -46,22 +45,19 @@ public:
void RegisterAck(int, const char *);
void disconnect(int channel);
void send(int channel, bufferStore &a);
- void poll();
void reset();
int maxLinks();
bool stuffToSend();
bool hasFailed();
bool gotLinkChannel();
- void setVerbose(short int);
- short int getVerbose();
- void setLinkVerbose(short int);
- short int getLinkVerbose();
- void setPktVerbose(short int);
- short int getPktVerbose();
+ void setVerbose(unsigned short);
+ unsigned short getVerbose();
short int getProtocolVersion();
private:
+ friend class Link;
+
enum c { MAX_LEN = 200, LAST_MESS = 1, NOT_LAST_MESS = 2 };
enum interControllerMessageType {
// Inter controller message types
@@ -75,12 +71,14 @@ private:
NCON_MSG_NCP_END=8
};
enum protocolVersionType { PV_SERIES_5 = 6, PV_SERIES_3 = 3 };
+ void receive(bufferStore s);
int getFirstUnusedChan();
+ bool isValidChannel(int);
void decodeControlMessage(bufferStore &buff);
void controlChannel(int chan, enum interControllerMessageType t, bufferStore &command);
char * ctrlMsgName(unsigned char);
- link *l;
+ Link *l;
unsigned short verbose;
channel **channelPtr;
bufferStore *messageList;
diff --git a/ncpd/packet.cc b/ncpd/packet.cc
index fc0ce11..84f0f66 100644
--- a/ncpd/packet.cc
+++ b/ncpd/packet.cc
@@ -36,329 +36,433 @@
#include <errno.h>
#include <sys/ioctl.h>
#include <termios.h>
+#include <signal.h>
extern "C" {
#include "mp_serial.h"
}
-#include "bufferstore.h"
#include "packet.h"
-#include "iowatch.h"
+#include "link.h"
-#define BUFFERLEN 2000
+#define BUFLEN 4096 // Must be a power of 2
+#define BUFMASK (BUFLEN-1)
+#define hasSpace(dir) (((dir##Write + 1) & BUFMASK) != dir##Read)
+#define hasData(dir) (dir##Write != dir##Read)
+#define inca(idx,amount) do { \
+ idx = (idx + amount) & BUFMASK; \
+} while (0)
+#define inc1(idx) inca(idx, 1)
+#define normalize(idx) do { idx &= BUFMASK; } while (0)
-packet::packet(const char *fname, int _baud, IOWatch *_iow, short int _verbose = 0):
-iow(_iow)
+static unsigned short pumpverbose = 0;
+
+extern "C" {
+/**
+ * Signal handler does nothing. It just exists
+ * for having the select() below return an
+ * interrupted system call.
+ */
+static void usr1handler(int sig)
{
- verbose = _verbose;
- devname = strdup(fname);
- baud = _baud;
- inPtr = inBuffer = new unsigned char[BUFFERLEN + 1];
- outPtr = outBuffer = new unsigned char[BUFFERLEN + 1];
- inLen = outLen = termLen = 0;
- foundSync = 0;
- esc = false;
- lastFatal = false;
- iowLocal = false;
- serialStatus = -1;
- crcIn = crcOut = 0;
+ signal(SIGUSR1, usr1handler);
+}
- fd = init_serial(devname, baud, 0);
- if (!_iow) {
- iow = new IOWatch();
- iowLocal = true;
+
+static void *pump_run(void *arg)
+{
+ packet *p = (packet *)arg;
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+ while (1) {
+ if (p->fd != -1) {
+ fd_set r_set;
+ fd_set w_set;
+ int res;
+ int count;
+
+ FD_ZERO(&r_set);
+ w_set = r_set;
+ if (hasSpace(p->in))
+ FD_SET(p->fd, &r_set);
+ if (hasData(p->out))
+ FD_SET(p->fd, &w_set);
+ res = select(p->fd+1, &r_set, &w_set, NULL, NULL);
+ switch (res) {
+ case 0:
+ break;
+ case -1:
+ break;
+ default:
+ if (FD_ISSET(p->fd, &w_set)) {
+ count = p->outWrite - p->outRead;
+ if (count < 0)
+ count = (BUFLEN - p->outRead);
+ res = write(p->fd, &p->outBuffer[p->outRead], count);
+ if (res > 0) {
+ if (pumpverbose & PKT_DEBUG_DUMP) {
+ int i;
+ printf("pump: wrote %d bytes: (", res);
+ for (i = 0; i<res; i++)
+ printf("%02x ",
+ p->outBuffer[p->outRead + i]);
+ printf(")\n");
+ }
+ int hadSpace = hasSpace(p->out);
+ inca(p->outRead, res);
+ if (!hadSpace)
+ pthread_kill(p->thisThread, SIGUSR1);
+ }
+ }
+ if (FD_ISSET(p->fd, &r_set)) {
+ count = p->inRead - p->inWrite;
+ if (count <= 0)
+ count = (BUFLEN - p->inWrite);
+ res = read(p->fd, &p->inBuffer[p->inWrite], count);
+ if (res > 0) {
+ if (pumpverbose & PKT_DEBUG_DUMP) {
+ int i;
+ printf("pump: read %d bytes: (", res);
+ for (i = 0; i<res; i++)
+ printf("%02x ", p->inBuffer[p->inWrite + i]);
+ printf(")\n");
+ }
+ inca(p->inWrite, res);
+ p->findSync();
+ }
+ } else {
+ if (hasData(p->in))
+ p->findSync();
+ }
+ break;
+ }
}
- if (fd == -1)
- lastFatal = true;
- else
- iow->addIO(fd);
+ }
}
-void packet::reset()
+//static pthread_mutex_t outMutex;
+//static pthread_mutex_t inMutex;
+};
+
+packet::
+packet(const char *fname, int _baud, Link *_link, unsigned short _verbose)
{
- if (verbose & PKT_DEBUG_LOG)
- cout << "resetting serial connection" << endl;
- if (fd != -1) {
- iow->remIO(fd);
- ser_exit(fd);
- }
- usleep(100000);
- inLen = outLen = termLen = 0;
- inPtr = inBuffer;
- outPtr = outBuffer;
- foundSync = 0;
- esc = false;
+ verbose = pumpverbose = _verbose;
+ devname = strdup(fname);
+ assert(devname);
+ baud = _baud;
+ theLINK = _link;
+ isEPOC = false;
+
+ // Initialize CRC table
+ crc_table[0] = 0;
+ for (int i = 0; i < 128; i++) {
+ unsigned int carry = crc_table[i] & 0x8000;
+ unsigned int tmp = (crc_table[i] << 1) & 0xffff;
+ crc_table[i * 2 + (carry ? 0 : 1)] = tmp ^ 0x1021;
+ crc_table[i * 2 + (carry ? 1 : 0)] = tmp;
+ }
+
+ inRead = inWrite = outRead = outWrite = 0;
+ inBuffer = new unsigned char[BUFLEN + 1];
+ outBuffer = new unsigned char[BUFLEN + 1];
+ assert(inBuffer);
+ assert(outBuffer);
+
+ esc = false;
+ lastFatal = false;
+ serialStatus = -1;
+ lastSYN = startPkt = -1;
+ crcIn = crcOut = 0;
+
+ thisThread = pthread_self();
+ fd = init_serial(devname, baud, 0);
+ if (fd == -1)
+ lastFatal = true;
+ else {
+// pthread_mutex_init(&inMutex, NULL);
+// pthread_mutex_init(&outMutex, NULL);
+ signal(SIGUSR1, usr1handler);
+ pthread_create(&datapump, NULL, pump_run, this);
+ }
+}
+
+packet::
+~packet()
+{
+ if (fd != -1) {
+ pthread_cancel(datapump);
+ ser_exit(fd);
+ }
+ fd = -1;
+ delete []inBuffer;
+ delete []outBuffer;
+ free(devname);
+}
+
+void packet::
+reset()
+{
+ if (verbose & PKT_DEBUG_LOG)
+ cout << "resetting serial connection" << endl;
+ if (fd != -1) {
+ pthread_cancel(datapump);
+ ser_exit(fd);
+ fd = -1;
+ }
+ usleep(100000);
+ inRead = inWrite = outRead = outWrite = 0;
+ esc = false;
+ lastFatal = false;
+ serialStatus = -1;
+ lastSYN = startPkt = -1;
+ crcIn = crcOut = 0;
+ fd = init_serial(devname, baud, 0);
+ if (fd != -1)
lastFatal = false;
- serialStatus = -1;
- crcIn = crcOut = 0;
- fd = init_serial(devname, baud, 0);
- if (fd != -1) {
- iow->addIO(fd);
- lastFatal = false;
- }
- if (verbose & PKT_DEBUG_LOG)
- cout << "serial connection reset, fd=" << fd << endl;
- sleep(1);
+ else {
+// pthread_mutex_init(&inMutex, NULL);
+// pthread_mutex_init(&outMutex, NULL);
+ pthread_create(&datapump, NULL, pump_run, this);
+ }
+ if (verbose & PKT_DEBUG_LOG)
+ cout << "serial connection reset, fd=" << fd << endl;
}
short int packet::
getVerbose()
{
- return verbose;
+ return verbose;
}
void packet::
setVerbose(short int _verbose)
{
- verbose = _verbose;
+ verbose = pumpverbose = _verbose;
}
-packet::~packet()
+void packet::
+setEpoc(bool _epoc)
{
- if (fd != -1) {
- iow->remIO(fd);
- ser_exit(fd);
- }
- usleep(100000);
- delete[]inBuffer;
- delete[]outBuffer;
- free(devname);
- if (iowLocal)
- delete iow;
+ isEPOC = _epoc;
}
void packet::
-send(unsigned char type, const bufferStore & b)
+send(bufferStore &b)
{
- if (verbose & PKT_DEBUG_LOG) {
- cout << "packet: type " << hex << (int) type << " >> ";
- if (verbose & PKT_DEBUG_DUMP)
- cout << b << endl;
- else
- cout << "len=" << b.getLen() << endl;
- }
- opByte(0x16);
- opByte(0x10);
- opByte(0x02);
+ opByte(0x16);
+ opByte(0x10);
+ opByte(0x02);
- crcOut = 0;
- opByte(type);
- addToCrc(type, &crcOut);
+ crcOut = 0;
+ long len = b.getLen();
- long len = b.getLen();
- for (int i = 0; i < len; i++) {
- unsigned char c = b.getByte(i);
- if (c == 0x10)
- opByte(c);
- opByte(c);
- addToCrc(c, &crcOut);
- }
-
- opByte(0x10);
- opByte(0x03);
-
- opByte(crcOut >> 8);
- opByte(crcOut & 0xff);
- realWrite();
-}
+ if (verbose & PKT_DEBUG_LOG) {
+ cout << "packet: >> ";
+ if (verbose & PKT_DEBUG_DUMP)
+ cout << b;
+ else
+ cout << " len=" << dec << len;
+ cout << endl;
+ }
-void packet::
-addToCrc(unsigned short c, unsigned short *crc)
-{
- c <<= 8;
- for (int i = 0; i < 8; i++) {
- if ((*crc ^ c) & 0x8000)
- *crc = (*crc << 1) ^ 0x1021;
- else
- *crc <<= 1;
- c <<= 1;
+ for (int i = 0; i < len; i++) {
+ unsigned char c = b.getByte(i);
+ switch (c) {
+ case 0x03:
+ if (isEPOC) {
+ opByte(0x10);
+ opByte(0x04);
+ addToCrc(0x03, &crcOut);
+ } else
+ opCByte(c, &crcOut);
+ break;
+ case 0x10:
+ opByte(0x10);
+ // fall thru
+ default:
+ opCByte(c, &crcOut);
}
+ }
+ opByte(0x10);
+ opByte(0x03);
+ opByte(crcOut >> 8);
+ opByte(crcOut & 0xff);
+ realWrite();
}
void packet::
opByte(unsigned char a)
{
- *outPtr++ = a;
- outLen++;
- if (outLen >= BUFFERLEN)
- realWrite();
+ if (!hasSpace(out))
+ realWrite();
+ outBuffer[outWrite] = a;
+ inc1(outWrite);
}
void packet::
-realWrite()
+opCByte(unsigned char a, unsigned short *crc)
{
- outPtr = outBuffer;
- while (outLen > 0) {
- int r = write(fd, outPtr, outLen);
- if (verbose & PKT_DEBUG_LOG)
- cout << "packet: WR=" << dec << r << endl;
- if (r > 0) {
- outLen -= r;
- outPtr += r;
- }
- }
- outPtr = outBuffer;
+ addToCrc(a, crc);
+ if (!hasSpace(out))
+ realWrite();
+ outBuffer[outWrite] = a;
+ inc1(outWrite);
}
-bool packet::
-get(unsigned char &type, bufferStore & ret)
+void packet::
+realWrite()
{
- while (!terminated()) {
- if (linkFailed())
- return false;
- int res = read(fd, inPtr, BUFFERLEN - inLen);
- if (res > 0) {
- if (verbose & PKT_DEBUG_LOG)
- cout << "packet: rcv " << dec << res << endl;
- inPtr += res;
- inLen += res;
- }
- if (res < 0)
- return false;
- // XXX Solaris returns 0 on non blocking TTY lines
- // even when VMIN > 0
- if( res == 0 && inLen == 0 )
- return false;
- if (inLen >= BUFFERLEN) {
- cerr << "packet: input buffer overflow!!!!" << endl;
- inLen = 0;
- inPtr = inBuffer;
- return false;
- }
- }
- if (verbose & PKT_DEBUG_LOG) {
- cout << "packet: get ";
- if (verbose & PKT_DEBUG_DUMP) {
- for (int i = foundSync - 3; i < termLen; i++)
- cout << hex << setw(2) << setfill('0') << (int) inBuffer[i] << " ";
- } else
- cout << "len=" << dec << termLen;
- cout << endl;
- }
- inLen -= termLen;
- termLen = 0;
- foundSync = 0;
- bool crcOk = (endPtr[0] == ((crcIn >> 8) & 0xff) && endPtr[1] == (crcIn & 0xff));
- if (inLen > 0)
- memmove(inBuffer, &endPtr[2], inLen);
- inPtr = inBuffer + inLen;
- if (crcOk) {
- type = rcv.getByte(0);
- ret = rcv;
- ret.discardFirstBytes(1);
- return true;
- } else {
- if (verbose & PKT_DEBUG_LOG)
- cout << "packet: BAD CRC" << endl;
+ pthread_kill(datapump, SIGUSR1);
+ while (!hasSpace(out)) {
+ sigset_t sigs;
+ int dummy;
+ sigemptyset(&sigs);
+ sigaddset(&sigs, SIGUSR1);
+ sigwait(&sigs, &dummy);
}
- return false;
}
-bool packet::
-terminated()
+void packet::
+findSync()
{
- unsigned char *p;
- int l;
+ int inw = inWrite;
+ int p;
- if (inLen < 6)
- return false;
- p = inBuffer + termLen;
- if (!foundSync) {
- while (!foundSync && (inLen - termLen >= 6))
- {
- termLen++;
- if (*p++ != 0x16)
+ outerLoop:
+ p = (lastSYN >= 0) ? lastSYN : inRead;
+ if (startPkt < 0) {
+ while (p != inw) {
+ normalize(p);
+ if (inBuffer[p++] != 0x16)
continue;
- termLen++;
- if (*p++ != 0x10)
+ lastSYN = p - 1;
+ normalize(p);
+ if (inBuffer[p++] != 0x10)
continue;
- termLen++;
- if (*p++ != 0x02)
+ normalize(p);
+ if (inBuffer[p++] != 0x02)
continue;
- foundSync = termLen;
- }
- if (!foundSync)
- return false;
-
- if (verbose & PKT_DEBUG_LOG) {
- if (foundSync != 3)
- cout << "packet: terminated found sync at " << foundSync << endl;
+ normalize(p);
+ lastSYN = startPkt = p;
+ crcIn = inCRCstate = 0;
+ rcv.init();
+ esc = false;
+ break;
}
- esc = false;
- // termLen = 3;
- crcIn = 0;
- rcv.init();
}
- for (l = termLen; l < inLen - 2; p++, l++) {
- if (esc) {
- esc = false;
- if (*p == 0x03) {
- endPtr = p + 1;
- termLen = l + 3;
- return true;
- }
- addToCrc(*p, &crcIn);
- rcv.addByte(*p);
- } else {
- if (*p == 0x10)
- esc = true;
- else {
- addToCrc(*p, &crcIn);
- rcv.addByte(*p);
+ if (startPkt >= 0) {
+ while (p != inw) {
+ unsigned char c = inBuffer[p];
+ switch (inCRCstate) {
+ case 0:
+ if (esc) {
+ esc = false;
+ switch (c) {
+ case 0x03:
+ inCRCstate = 1;
+ break;
+ case 0x04:
+ addToCrc(0x03, &crcIn);
+ rcv.addByte(0x03);
+ break;
+ default:
+ addToCrc(c, &crcIn);
+ rcv.addByte(c);
+ break;
+ }
+ } else {
+ if (c == 0x10)
+ esc = true;
+ else {
+ addToCrc(c, &crcIn);
+ rcv.addByte(c);
+ }
+ }
+ break;
+ case 1:
+ receivedCRC = c;
+ receivedCRC <<= 8;
+ inCRCstate = 2;
+ break;
+ case 2:
+ receivedCRC |= c;
+ inc1(p);
+ inRead = p;
+ startPkt = lastSYN = -1;
+ inCRCstate = 0;
+ if (receivedCRC != crcIn) {
+ if (verbose & PKT_DEBUG_LOG)
+ cout << "packet: BAD CRC" << endl;
+ } else {
+ // inQueue += rcv;
+ if (verbose & PKT_DEBUG_LOG) {
+ cout << "packet: << ";
+ if (verbose & PKT_DEBUG_DUMP)
+ cout << rcv;
+ else
+ cout << "len=" << dec << rcv.getLen();
+ cout << endl;
+ }
+ theLINK->receive(rcv);
+ }
+ rcv.init();
+ if (hasData(out))
+ return;
+ goto outerLoop;
}
+ inc1(p);
}
+ lastSYN = p;
}
- termLen = l;
- return false;
}
bool packet::
linkFailed()
{
- int arg;
- int res;
- bool failed = false;
+ int arg;
+ int res;
+ bool failed = false;
- if (lastFatal)
- reset();
- res = ioctl(fd, TIOCMGET, &arg);
- if (res < 0)
+ if (lastFatal)
+ reset();
+ res = ioctl(fd, TIOCMGET, &arg);
+ if (res < 0)
+ lastFatal = true;
+ if ((serialStatus == -1) || (arg != serialStatus)) {
+ if (verbose & PKT_DEBUG_HANDSHAKE)
+ cout << "packet: < DTR:" << ((arg & TIOCM_DTR)?1:0)
+ << " RTS:" << ((arg & TIOCM_RTS)?1:0)
+ << " DCD:" << ((arg & TIOCM_CAR)?1:0)
+ << " DSR:" << ((arg & TIOCM_DSR)?1:0)
+ << " CTS:" << ((arg & TIOCM_CTS)?1:0) << endl;
+ if (!((arg & TIOCM_RTS) && (arg & TIOCM_DTR))) {
+ arg |= (TIOCM_DTR | TIOCM_RTS);
+ res = ioctl(fd, TIOCMSET, &arg);
+ if (res < 0)
lastFatal = true;
- if ((serialStatus == -1) || (arg != serialStatus)) {
- if (verbose & PKT_DEBUG_HANDSHAKE)
- cout << "packet: < DTR:" << ((arg & TIOCM_DTR)?1:0)
- << " RTS:" << ((arg & TIOCM_RTS)?1:0)
- << " DCD:" << ((arg & TIOCM_CAR)?1:0)
- << " DSR:" << ((arg & TIOCM_DSR)?1:0)
- << " CTS:" << ((arg & TIOCM_CTS)?1:0) << endl;
- if (!((arg & TIOCM_RTS) && (arg & TIOCM_DTR))) {
- arg |= (TIOCM_DTR | TIOCM_RTS);
- res = ioctl(fd, TIOCMSET, &arg);
- if (res < 0)
- lastFatal = true;
- if (verbose & PKT_DEBUG_HANDSHAKE)
- cout << "packet: > DTR:" << ((arg & TIOCM_DTR)?1:0)
- << " RTS:" << ((arg & TIOCM_RTS)?1:0)
- << " DCD:" << ((arg & TIOCM_CAR)?1:0)
- << " DSR:" << ((arg & TIOCM_DSR)?1:0)
- << " CTS:" << ((arg & TIOCM_CTS)?1:0) << endl;
- }
- serialStatus = arg;
+ if (verbose & PKT_DEBUG_HANDSHAKE)
+ cout << "packet: > DTR:" << ((arg & TIOCM_DTR)?1:0)
+ << " RTS:" << ((arg & TIOCM_RTS)?1:0)
+ << " DCD:" << ((arg & TIOCM_CAR)?1:0)
+ << " DSR:" << ((arg & TIOCM_DSR)?1:0)
+ << " CTS:" << ((arg & TIOCM_CTS)?1:0) << endl;
}
- if (((arg & TIOCM_CTS) == 0)
+ serialStatus = arg;
+ }
+ if (((arg & TIOCM_CTS) == 0)
#ifndef sun
- || ((arg & TIOCM_DSR) == 0)
+ || ((arg & TIOCM_DSR) == 0)
#endif
- ) {
- // eat possible junk on line
- while (read(fd, &res, sizeof(res)) > 0)
- ;
- failed = true;
- }
- if ((verbose & PKT_DEBUG_LOG) && lastFatal)
- cout << "packet: linkFATAL\n";
- if ((verbose & PKT_DEBUG_LOG) && failed)
- cout << "packet: linkFAILED\n";
- return lastFatal || failed;
+ ) {
+ // eat possible junk on line
+ //while (read(fd, &res, sizeof(res)) > 0)
+ // ;
+ failed = true;
+ }
+ if ((verbose & PKT_DEBUG_LOG) && lastFatal)
+ cout << "packet: linkFATAL\n";
+ if ((verbose & PKT_DEBUG_LOG) && failed)
+ cout << "packet: linkFAILED\n";
+ return lastFatal || failed;
}
/*
diff --git a/ncpd/packet.h b/ncpd/packet.h
index f80e4c9..e4d7381 100644
--- a/ncpd/packet.h
+++ b/ncpd/packet.h
@@ -28,52 +28,82 @@
#include <config.h>
#endif
#include <stdio.h>
+#include <pthread.h>
-class bufferStore;
-class IOWatch;
-
-#define PKT_DEBUG_LOG 1
-#define PKT_DEBUG_DUMP 2
-#define PKT_DEBUG_HANDSHAKE 4
-
-class packet {
- public:
- packet(const char *fname, int baud, IOWatch *iow, short int verbose = 0);
- ~packet();
- void send(unsigned char type, const bufferStore &b);
- bool get(unsigned char &type, bufferStore &b);
- void setVerbose(short int);
- short int getVerbose();
- bool linkFailed();
- void reset();
-
- private:
- bool terminated();
- void addToCrc(unsigned short a, unsigned short *crc);
- void opByte(unsigned char a);
- void realWrite();
-
- unsigned short crcOut;
- unsigned short crcIn;
- unsigned char *inPtr;
- unsigned char *outPtr;
- unsigned char *endPtr;
- unsigned char *inBuffer;
- unsigned char *outBuffer;
- bufferStore rcv;
- int inLen;
- int outLen;
- int termLen;
- int foundSync;
- int fd;
- int serialStatus;
- short int verbose;
- bool esc;
- bool lastFatal;
- bool iowLocal;
- char *devname;
- int baud;
- IOWatch *iow;
+#include "bufferstore.h"
+#include "bufferarray.h"
+
+#define PKT_DEBUG_LOG 16
+#define PKT_DEBUG_DUMP 32
+#define PKT_DEBUG_HANDSHAKE 64
+
+extern "C" {
+ static void *pump_run(void *);
+}
+
+class Link;
+
+class packet
+{
+public:
+ packet(const char *fname, int baud, Link *_link, unsigned short verbose = 0);
+ ~packet();
+
+ /**
+ * Send a buffer out to serial line
+ */
+ void send(bufferStore &b);
+
+ void setEpoc(bool);
+ void setVerbose(short int);
+ short int getVerbose();
+ bool linkFailed();
+ void reset();
+
+private:
+ friend void * ::pump_run(void *);
+
+ inline void addToCrc(unsigned char a, unsigned short *crc) {
+ *crc = (*crc << 8) ^ crc_table[((*crc >> 8) ^ a) & 0xff];
+ }
+
+ void findSync();
+ void opByte(unsigned char a);
+ void opCByte(unsigned char a, unsigned short *crc);
+ void realWrite();
+
+ Link *theLINK;
+ pthread_t datapump;
+ pthread_t thisThread;
+ unsigned int crc_table[256];
+
+ unsigned short crcOut;
+ unsigned short crcIn;
+ unsigned short receivedCRC;
+ unsigned short inCRCstate;
+
+ unsigned char *inBuffer;
+ int inWrite;
+ int inRead;
+
+ unsigned char *outBuffer;
+ int outWrite;
+ int outRead;
+
+ int startPkt;
+ int lastSYN;
+
+ bufferArray inQueue;
+ bufferStore rcv;
+ int foundSync;
+ int fd;
+ int serialStatus;
+ short int verbose;
+ bool esc;
+ bool lastFatal;
+ bool isEPOC;
+ char *devname;
+ int baud;
};
#endif