Reliable ethernet protocol. I still need pselect instead of select.
--- a/Makefile Sat Oct 06 21:56:22 2007 +0200
+++ b/Makefile Sun Oct 07 00:10:17 2007 +0200
@@ -18,6 +18,7 @@
simple_math.o \
xterm.o \
dump.o \
+ flow.o \
filter.o \
filter_string.o \
filter_telnet.o $(LINUX_OBJECTS)
@@ -56,3 +57,4 @@
eth_proto.o: eth_proto.c main.h eth_linux.h
eth_server.o: eth_server.c main.h handlers.h
eth_client.o: eth_client.c main.h handlers.h
+flow.o: flow.c main.h handlers.h
--- a/app_control.c Sat Oct 06 21:56:22 2007 +0200
+++ b/app_control.c Sun Oct 07 00:10:17 2007 +0200
@@ -183,3 +183,7 @@
close(app_stdin);
}
}
+
+void app_control_avoid_sending(fd_set *read_set)
+{
+}
--- a/client.c Sat Oct 06 21:56:22 2007 +0200
+++ b/client.c Sun Oct 07 00:10:17 2007 +0200
@@ -36,6 +36,15 @@
net_prepare(&read_set, &maxfd);
res = select(maxfd + 1, &read_set, 0, 0, 0);
+#ifdef linux
+ if (command_line.c_param.transport == ETHERNET)
+ /* If there isn't a good result, we quit */
+ if (!eth_proto_process_timeouts())
+ {
+ error("Connection lost (no ACK received).\n");
+ break;
+ }
+#endif
if (res == -1)
{
if (errno == EINTR)
--- a/dump.c Sat Oct 06 21:56:22 2007 +0200
+++ b/dump.c Sun Oct 07 00:10:17 2007 +0200
@@ -36,7 +36,7 @@
f = fopen("/tmp/dump.txt", "a");
if (f == 0) return;
- fprintf(f, "%s: ", head);
+ fprintf(f, "%s (%i): ", head, len);
for(i=0;i<len; ++i)
{
int c;
--- a/eth_client.c Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_client.c Sun Oct 07 00:10:17 2007 +0200
@@ -33,6 +33,8 @@
{
FD_SET(myfd, read_set);
*maxfd = max(*maxfd, myfd);
+ if (!eth_proto_allow_sending())
+ avoid_sending(read_set);
}
/* Send -1 on eof */
@@ -43,8 +45,8 @@
int res;
int olen;
res = eth_proto_recv(stream_buffer, stream_buffer_size);
- if (res == -1)
- return 0;
+ if (res < 0)
+ return 1; /* ignore it */
if (res == 0) /* EOF */
{
filter_flush(client_fr, ostream_buffer, &olen);
--- a/eth_linux.c Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_linux.c Sun Oct 07 00:10:17 2007 +0200
@@ -24,6 +24,7 @@
#include <linux/fs.h>
#include <sys/stat.h>
+#include "main.h"
#include "eth_linux.h"
enum {
@@ -96,7 +97,7 @@
struct sockaddr_ll sa;
int sa_len = sizeof(sa);
int res;
- dump_line("eth_recv");
+ dump_line("eth_recv\n");
res = recvfrom(fd, buf, len, 0, (struct sockaddr *) &sa, &sa_len);
if (debug) {
printf("read %d bytes\r\n", res);
@@ -113,7 +114,7 @@
struct sockaddr_ll sa;
int i;
- dump_line("eth_send");
+ dump_line("eth_send\n");
i = getindx(fd, dev);
sa.sll_family = AF_PACKET;
--- a/eth_proto.c Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_proto.c Sun Oct 07 00:10:17 2007 +0200
@@ -6,23 +6,31 @@
*/
#include <netinet/in.h>
#include <string.h>
+#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include "eth_linux.h"
#include "main.h"
+enum {
+ MAXPACKET = 1500,
+ MAXSEQ = 100,
+ HEAD = 9
+};
+
static struct
{
int socket;
char partner[6];
int partner_set;
+ int send_acked;
+ int send_retries_left;
unsigned int seq_send;
unsigned int seq_wait;
unsigned int wrong_recv;
- char tmp_eth_buffer[1000];
- char tmp_eth_buffer_size;
- char tmp_buffer_filled;
+ char send_buffer[MAXPACKET];
+ int send_buffer_size;
} edata;
enum Control {
@@ -31,12 +39,7 @@
INIT
};
-enum {
- MAXPACKET = 1000,
- HEAD = 9
-};
-
-static char eth_buffer[1000];
+static char eth_buffer[MAXPACKET];
static void eth_fill_mac(unsigned char *mac, const char *str);
@@ -59,13 +62,41 @@
return HEAD;
}
+int eth_proto_max_send()
+{
+ return MAXPACKET - HEAD;
+}
+
void eth_proto_init()
{
edata.socket = -1;
edata.seq_send = 0;
edata.seq_wait = 0;
+ edata.send_acked = 1; /* Fine at the beginning, as if the last data was acked */
edata.partner_set = 0;
- edata.tmp_buffer_filled = 0;
+}
+
+int eth_proto_allow_sending()
+{
+ return edata.send_acked;
+}
+
+static int seq_next(int val)
+{
+ if (val >= 0 && val < MAXSEQ)
+ val = val + 1;
+ else
+ val = 0;
+ return val;
+}
+
+static int seq_before(int val)
+{
+ if (val > 0 && val < MAXSEQ)
+ val = val - 1;
+ else
+ val = MAXSEQ;
+ return val;
}
int eth_proto_open()
@@ -83,7 +114,7 @@
make_head(eth_buffer, edata.seq_send, INIT, 0);
eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
- edata.seq_send++;
+ edata.seq_send = seq_next(edata.seq_send);
}
return edata.socket;
@@ -93,26 +124,15 @@
{
int res;
int seq;
+ int data_length;
enum Control c;
char partner[6];
- if (edata.tmp_buffer_filled)
- {
- int nbytes;
- nbytes = min(size, edata.tmp_eth_buffer_size);
- memcpy(data, edata.tmp_eth_buffer, nbytes);
-
- make_head(eth_buffer, seq, ACK, 0);
- eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
- edata.seq_wait++;
- edata.tmp_buffer_filled = 0;
- return nbytes;
- }
do {
res = eth_recv(eth_buffer, sizeof(eth_buffer), partner);
edata.partner_set = 1;
} while(res < HEAD);
- parse_head(eth_buffer, &seq, &c, &res);
+ parse_head(eth_buffer, &seq, &c, &data_length);
/* We admit any first connection */
if (seq == 0 && c == INIT)
{
@@ -120,32 +140,90 @@
memcpy(edata.partner, partner, sizeof(edata.partner));
return -1; /* Nothing the parent should care about */
}
- if (seq != edata.seq_wait || c != SEND)
+ if (c == SEND)
{
- edata.wrong_recv++;
- return -1;
- }
- /* res comes from parse_head. */
- memcpy(data, eth_buffer + HEAD, min(size, res));
+ if (seq != edata.seq_wait && seq != seq_before(edata.seq_wait))
+ {
+ dump_line("Wrong data packet seq received. Recvd: %i Expected: %i\n",
+ seq, edata.seq_wait);
+ edata.wrong_recv++;
+ return -1;
+ }
+
+ if (seq == seq_before(edata.seq_wait))
+ dump_line("Repeated data seq received: %i\n", seq);
+
+ if (seq == edata.seq_wait)
+ {
+ if (data_length == 0)
+ {
+ edata.partner_set = 0;
+ edata.seq_wait = 0;
+ edata.seq_send = 0;
+ /* We should send ACK anyway */
+ }
+ else
+ {
+ memcpy(data, eth_buffer + HEAD, data_length);
+ edata.seq_wait = seq_next(edata.seq_wait);
+ }
+ }
- make_head(eth_buffer, seq, ACK, 0);
- eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
- edata.seq_wait++;
-
- if (res == 0 && c == SEND)
+ /* Ack the packed we received. In these conditions:
+ * - We received the packed we expected
+ * - We received a repeat of the old packet. The
+ * ACK was lost probably, so we resend it */
+ make_head(eth_buffer, seq, ACK, 0);
+ eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
+ }
+ else if (c == ACK)
{
- edata.partner_set = 0;
- edata.seq_wait = 0;
- edata.seq_send = 0;
+ if (seq == edata.seq_send)
+ {
+ edata.send_acked = 1;
+ edata.seq_send = seq_next(edata.seq_send);
+ unprogram_timeout();
+ }
+ else
+ {
+ dump_line("Wrong ack received. Recvd: %i Expected: %i\n",
+ seq, edata.seq_send);
+ }
+ return -1; /* not data */
}
+ else
+ return -1;
- return min(size, res);
+ return data_length;
+}
+
+static int eth_proto_link_send()
+{
+ int sent;
+ sent = eth_send(command_line.eth_device,
+ edata.partner,
+ edata.send_buffer,
+ edata.send_buffer_size);
+
+ if (sent >= 0) /* expected */
+ {
+ edata.send_retries_left -= 1;
+ edata.send_acked = 0;
+ program_timeout(2);
+ }
+ else /* strange case, data not sent */
+ {
+ sent = 0;
+ edata.send_acked = 1;
+ }
+ return sent;
}
int eth_proto_send(const char *data, int size)
{
- int total = size;
- int retries;
+ int sent;
+
+ assert(edata.send_acked);
if (!edata.partner_set)
{
@@ -159,60 +237,40 @@
return 0;
}
- retries = 3;
- do {
- int once;
- int sent;
- int rseq;
- int res;
- enum Control rc;
+ edata.send_retries_left = 3;
- if (retries == 0)
- {
- edata.partner_set = 0;
- return -1;
- }
-
- once = min(sizeof(eth_buffer)-HEAD, size);
+ /* Prepare packet */
+ make_head(edata.send_buffer, edata.seq_send, SEND, size);
+ memcpy(edata.send_buffer+HEAD, data, size);
+ edata.send_buffer_size = size + HEAD;
- make_head(eth_buffer, edata.seq_send, SEND, once);
- memcpy(eth_buffer+HEAD, data, once);
- sent = eth_send(command_line.eth_device, edata.partner,
- eth_buffer, once+HEAD);
- sent -= HEAD;
- if (sent < 0)
- sent = 0;
+ sent = eth_proto_link_send();
+ sent -= HEAD;
+ return sent;
+}
- program_timeout(2);
- res = eth_recv(eth_buffer, HEAD, 0);
- if (res == -1 && errno == EINTR)
- {
- unprogram_timeout();
- retries--;
- continue;
- }
+int eth_proto_process_timeouts()
+{
+ if (!edata.send_acked && did_timeout_happen())
+ {
unprogram_timeout();
- retries = 3;
-
- parse_head(eth_buffer, &rseq, &rc, 0);
- if (rc == SEND)
+ if (edata.send_retries_left > 0)
{
- if (rseq != edata.seq_wait)
- edata.wrong_recv++;
- else if (!edata.tmp_buffer_filled)
- {
- memcpy(edata.tmp_eth_buffer, eth_buffer, res);
- edata.tmp_buffer_filled = 1;
- }
+ dump_line("Retrying. Left:%i\n", edata.send_retries_left);
+ eth_proto_link_send();
}
- else if (rc != ACK || rseq != edata.seq_send)
- continue;
-
- edata.seq_send++;
- size -= sent;
- data += sent;
- } while(size > 0);
- return total;
+ else
+ {
+ /* The connection has been lost */
+ dump_line("Connection lost");
+ edata.send_acked = 1;
+ edata.partner_set = 0;
+ edata.seq_wait = 0;
+ edata.seq_send = 0;
+ return 0; /* FAIL */
+ }
+ }
+ return 1; /* OK */
}
static void eth_fill_mac(unsigned char *mac, const char *str)
--- a/eth_server.c Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_server.c Sun Oct 07 00:10:17 2007 +0200
@@ -31,6 +31,9 @@
{
FD_SET(myfd, read_set);
*maxfd = max(*maxfd, myfd);
+
+ if (!eth_proto_allow_sending())
+ avoid_sending(read_set);
}
void s_eth_process_read_fdset(fd_set *read_set)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/flow.c Sun Oct 07 00:10:17 2007 +0200
@@ -0,0 +1,28 @@
+/*
+ Terminal Mixer - multi-point multi-user access to terminal applications
+ Copyright (C) 2007 LluĂs Batlle i Rossell
+
+ Please find the license in the provided COPYING file.
+*/
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/select.h>
+
+#include "main.h"
+#include "handlers.h"
+
+void avoid_sending(fd_set *read_set)
+{
+ dump_line("Avoid sending\n");
+ if (command_line.is_server)
+ {
+ if (app_stdout >= 0)
+ FD_CLR(app_stdout, read_set);
+ if (app_stderr >= 0)
+ FD_CLR(app_stderr, read_set);
+ }
+ else
+ {
+ FD_CLR(0, read_set); /* client terminal stdin */
+ }
+}
--- a/gen_sockets.c Sat Oct 06 21:56:22 2007 +0200
+++ b/gen_sockets.c Sun Oct 07 00:10:17 2007 +0200
@@ -20,9 +20,19 @@
{
if (stream_buffer == 0)
{
+ int eth_buf;
assert(command_line.buffer_size > 0);
+
stream_buffer_size = command_line.buffer_size;
- ostream_buffer_size = command_line.buffer_size;
+#ifdef linux
+ if ((command_line.is_server && command_line.s_param.serve_eth)
+ || (!command_line.is_server && command_line.c_param.transport == ETHERNET))
+ {
+ eth_buf = eth_proto_max_send();
+ stream_buffer_size = min(command_line.buffer_size, eth_buf);
+ }
+#endif
+ ostream_buffer_size = stream_buffer_size;
stream_buffer = (char *) malloc(stream_buffer_size);
ostream_buffer = (char *) malloc(ostream_buffer_size);
}
--- a/handlers.h Sat Oct 06 21:56:22 2007 +0200
+++ b/handlers.h Sun Oct 07 00:10:17 2007 +0200
@@ -49,3 +49,6 @@
void c_eth_prepare_read_fdset(fd_set *read_set, int *maxfd);
int c_eth_process_read_fdset(fd_set *read_set);
void c_eth_send_to_connected(const char *buffer, size_t size);
+
+/* flow.c */
+void avoid_sending(fd_set *read_set);
--- a/main.h Sat Oct 06 21:56:22 2007 +0200
+++ b/main.h Sun Oct 07 00:10:17 2007 +0200
@@ -43,6 +43,7 @@
void install_signal_forwarders();
void program_timeout(int secs);
void unprogram_timeout();
+int did_timeout_happen();
/* main.c */
extern int app_stdin;
@@ -85,3 +86,6 @@
int eth_proto_open();
int eth_proto_recv(char *data, int size);
int eth_proto_send(const char *data, int size);
+int eth_proto_allow_sending();
+int eth_proto_max_send();
+int eth_proto_process_timeouts();
--- a/server.c Sat Oct 06 21:56:22 2007 +0200
+++ b/server.c Sun Oct 07 00:10:17 2007 +0200
@@ -47,6 +47,10 @@
/* Will block */
res = select(maxfd + 1, &read_set, 0, 0, 0);
+#ifdef linux
+ if (command_line.s_param.serve_eth)
+ eth_proto_process_timeouts();
+#endif
if (res == -1)
{
if (errno == EINTR)
--- a/signals.c Sat Oct 06 21:56:22 2007 +0200
+++ b/signals.c Sun Oct 07 00:10:17 2007 +0200
@@ -16,6 +16,8 @@
static char old_alarm_handler_set;
static struct sigaction old_alarm_handler;
+static int timeout_timed_out;
+
static void forward_signals_to_child_handler(int val)
{
kill(child, val);
@@ -61,6 +63,7 @@
static void alarm_handler(int x)
{
+ timeout_timed_out = 1;
}
void program_timeout(int secs)
@@ -81,14 +84,22 @@
sigaction(SIGALRM, &act, old);
old_alarm_handler_set = 1;
+ timeout_timed_out = 0;
alarm(secs);
}
+int did_timeout_happen()
+{
+ return timeout_timed_out;
+}
+
void unprogram_timeout()
{
alarm(0);
+ timeout_timed_out = 0;
+
if (old_alarm_handler_set)
{
sigaction(SIGALRM, &old_alarm_handler, 0);