Reliable ethernet protocol. I still need pselect instead of select.
authorviric@mandarina
Sun, 07 Oct 2007 00:10:17 +0200
changeset 66 b2469563a1dc
parent 65 107ab713b65b
child 67 d7405e4f12e1
Reliable ethernet protocol. I still need pselect instead of select.
Makefile
app_control.c
client.c
dump.c
eth_client.c
eth_linux.c
eth_proto.c
eth_server.c
flow.c
gen_sockets.c
handlers.h
main.h
server.c
signals.c
--- a/Makefile	Sat Oct 06 21:56:22 2007 +0200
+++ b/Makefile	Sun Oct 07 00:10:17 2007 +0200
@@ -18,6 +18,7 @@
 	simple_math.o \
 	xterm.o \
 	dump.o \
+	flow.o \
 	filter.o \
 	filter_string.o \
 	filter_telnet.o $(LINUX_OBJECTS)
@@ -56,3 +57,4 @@
 eth_proto.o: eth_proto.c main.h eth_linux.h
 eth_server.o: eth_server.c main.h handlers.h
 eth_client.o: eth_client.c main.h handlers.h
+flow.o: flow.c main.h handlers.h
--- a/app_control.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/app_control.c	Sun Oct 07 00:10:17 2007 +0200
@@ -183,3 +183,7 @@
             close(app_stdin);
     }
 }
+
+void app_control_avoid_sending(fd_set *read_set)
+{
+}
--- a/client.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/client.c	Sun Oct 07 00:10:17 2007 +0200
@@ -36,6 +36,15 @@
         net_prepare(&read_set, &maxfd);
 
         res = select(maxfd + 1, &read_set, 0, 0, 0);
+#ifdef linux
+        if (command_line.c_param.transport == ETHERNET)
+            /* If there isn't a good result, we quit */
+            if (!eth_proto_process_timeouts())
+            {
+                    error("Connection lost (no ACK received).\n");
+                    break;
+            }
+#endif
         if (res == -1)
         {
             if (errno == EINTR)
--- a/dump.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/dump.c	Sun Oct 07 00:10:17 2007 +0200
@@ -36,7 +36,7 @@
     f = fopen("/tmp/dump.txt", "a");
     if (f == 0) return;
 
-    fprintf(f, "%s: ", head);
+    fprintf(f, "%s (%i): ", head, len);
     for(i=0;i<len; ++i)
     {
         int c;
--- a/eth_client.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_client.c	Sun Oct 07 00:10:17 2007 +0200
@@ -33,6 +33,8 @@
 {
     FD_SET(myfd, read_set);
     *maxfd = max(*maxfd, myfd);
+    if (!eth_proto_allow_sending())
+        avoid_sending(read_set);
 }
 
 /* Send -1 on eof */
@@ -43,8 +45,8 @@
         int res;
         int olen;
         res = eth_proto_recv(stream_buffer, stream_buffer_size);
-        if (res == -1)
-            return 0;
+        if (res < 0)
+            return 1; /* ignore it */
         if (res == 0) /* EOF */
         {
             filter_flush(client_fr, ostream_buffer, &olen);
--- a/eth_linux.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_linux.c	Sun Oct 07 00:10:17 2007 +0200
@@ -24,6 +24,7 @@
 #include <linux/fs.h>
 #include <sys/stat.h>
 
+#include "main.h"
 #include "eth_linux.h"
 
 enum {
@@ -96,7 +97,7 @@
     struct sockaddr_ll sa;
     int sa_len = sizeof(sa);
     int res;
-    dump_line("eth_recv");
+    dump_line("eth_recv\n");
     res = recvfrom(fd, buf, len, 0, (struct sockaddr *) &sa, &sa_len);
     if (debug) {
         printf("read %d bytes\r\n", res);
@@ -113,7 +114,7 @@
     struct sockaddr_ll sa;
     int i;
 
-    dump_line("eth_send");
+    dump_line("eth_send\n");
 
     i = getindx(fd, dev);
     sa.sll_family = AF_PACKET;
--- a/eth_proto.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_proto.c	Sun Oct 07 00:10:17 2007 +0200
@@ -6,23 +6,31 @@
 */
 #include <netinet/in.h>
 #include <string.h>
+#include <assert.h>
 #include <stdio.h>
 #include <errno.h>
 
 #include "eth_linux.h"
 #include "main.h"
 
+enum {
+    MAXPACKET = 1500,
+    MAXSEQ = 100,
+    HEAD = 9
+};
+
 static struct
 {
     int socket;
     char partner[6];
     int partner_set;
+    int send_acked;
+    int send_retries_left;
     unsigned int seq_send;
     unsigned int seq_wait;
     unsigned int wrong_recv;
-    char tmp_eth_buffer[1000];
-    char tmp_eth_buffer_size;
-    char tmp_buffer_filled;
+    char send_buffer[MAXPACKET];
+    int send_buffer_size;
 } edata;
 
 enum Control {
@@ -31,12 +39,7 @@
     INIT
 };
 
-enum {
-    MAXPACKET = 1000,
-    HEAD = 9
-};
-
-static char eth_buffer[1000];
+static char eth_buffer[MAXPACKET];
 
 static void eth_fill_mac(unsigned char *mac, const char *str);
 
@@ -59,13 +62,41 @@
     return HEAD;
 }
 
+int eth_proto_max_send()
+{
+    return MAXPACKET - HEAD;
+}
+
 void eth_proto_init()
 {
     edata.socket = -1;
     edata.seq_send = 0;
     edata.seq_wait = 0;
+    edata.send_acked = 1; /* Fine at the beginning, as if the last data was acked */
     edata.partner_set = 0;
-    edata.tmp_buffer_filled = 0;
+}
+
+int eth_proto_allow_sending()
+{
+    return edata.send_acked;
+}
+
+static int seq_next(int val)
+{
+    if (val >= 0 && val < MAXSEQ)
+        val = val + 1;
+    else
+        val = 0;
+    return val;
+}
+
+static int seq_before(int val)
+{
+    if (val > 0 && val < MAXSEQ)
+        val = val - 1;
+    else
+        val = MAXSEQ;
+    return val;
 }
 
 int eth_proto_open()
@@ -83,7 +114,7 @@
 
         make_head(eth_buffer, edata.seq_send, INIT, 0);
         eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
-        edata.seq_send++;
+        edata.seq_send = seq_next(edata.seq_send);
     }
 
     return edata.socket;
@@ -93,26 +124,15 @@
 {
     int res;
     int seq;
+    int data_length;
     enum Control c;
     char partner[6];
 
-    if (edata.tmp_buffer_filled)
-    {
-        int nbytes;
-        nbytes = min(size, edata.tmp_eth_buffer_size);
-        memcpy(data, edata.tmp_eth_buffer, nbytes);
-
-        make_head(eth_buffer, seq, ACK, 0);
-        eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
-        edata.seq_wait++;
-        edata.tmp_buffer_filled = 0;
-        return nbytes;
-    }
     do {
             res = eth_recv(eth_buffer, sizeof(eth_buffer), partner);
             edata.partner_set = 1;
     } while(res < HEAD);
-    parse_head(eth_buffer, &seq, &c, &res);
+    parse_head(eth_buffer, &seq, &c, &data_length);
     /* We admit any first connection */
     if (seq == 0 && c == INIT)
     {
@@ -120,32 +140,90 @@
       memcpy(edata.partner, partner, sizeof(edata.partner));
       return -1; /* Nothing the parent should care about */
     }
-    if (seq != edata.seq_wait || c != SEND)
+    if (c == SEND)
     {
-        edata.wrong_recv++;
-        return -1;
-    }
-    /* res comes from parse_head. */
-    memcpy(data, eth_buffer + HEAD, min(size, res));
+        if (seq != edata.seq_wait && seq != seq_before(edata.seq_wait))
+        {
+            dump_line("Wrong data packet seq received. Recvd: %i Expected: %i\n",
+                    seq, edata.seq_wait);
+            edata.wrong_recv++;
+            return -1;
+        }
+
+        if (seq == seq_before(edata.seq_wait))
+            dump_line("Repeated data seq received: %i\n", seq);
+
+        if (seq == edata.seq_wait)
+        {
+            if (data_length == 0)
+            {
+                edata.partner_set = 0;
+                edata.seq_wait = 0;
+                edata.seq_send = 0;
+                /* We should send ACK anyway */
+            }
+            else
+            {
+                memcpy(data, eth_buffer + HEAD, data_length);
+                edata.seq_wait = seq_next(edata.seq_wait);
+            }
+        }
 
-    make_head(eth_buffer, seq, ACK, 0);
-    eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
-    edata.seq_wait++;
-
-    if (res == 0 && c == SEND)
+        /* Ack the packed we received. In these conditions:
+         * - We received the packed we expected
+         * - We received a repeat of the old packet. The
+         *   ACK was lost probably, so we resend it */
+        make_head(eth_buffer, seq, ACK, 0);
+        eth_send(command_line.eth_device, edata.partner, eth_buffer, HEAD);
+    }
+    else if (c == ACK)
     {
-        edata.partner_set = 0;
-        edata.seq_wait = 0;
-        edata.seq_send = 0;
+        if (seq == edata.seq_send)
+        {
+            edata.send_acked = 1;
+            edata.seq_send = seq_next(edata.seq_send);
+            unprogram_timeout();
+        }
+        else
+        {
+            dump_line("Wrong ack received. Recvd: %i Expected: %i\n",
+                    seq, edata.seq_send);
+        }
+        return -1; /* not data */
     }
+    else
+        return -1;
 
-    return min(size, res);
+    return data_length;
+}
+
+static int eth_proto_link_send()
+{
+    int sent;
+    sent = eth_send(command_line.eth_device,
+            edata.partner,
+            edata.send_buffer,
+            edata.send_buffer_size);
+
+    if (sent >= 0) /* expected */
+    {
+        edata.send_retries_left -= 1;
+        edata.send_acked = 0;
+        program_timeout(2);
+    }
+    else /* strange case, data not sent */
+    {
+        sent = 0;
+        edata.send_acked = 1;
+    }
+    return sent;
 }
 
 int eth_proto_send(const char *data, int size)
 {
-    int total = size;
-    int retries;
+    int sent;
+
+    assert(edata.send_acked);
 
     if (!edata.partner_set)
     {
@@ -159,60 +237,40 @@
             return 0;
     }
 
-    retries = 3;
-    do {
-        int once;
-        int sent;
-        int rseq;
-        int res;
-        enum Control rc;
+    edata.send_retries_left = 3;
 
-        if (retries == 0)
-        {
-            edata.partner_set = 0;
-            return -1;
-        }
-
-        once = min(sizeof(eth_buffer)-HEAD, size);
+    /* Prepare packet */
+    make_head(edata.send_buffer, edata.seq_send, SEND, size);
+    memcpy(edata.send_buffer+HEAD, data, size);
+    edata.send_buffer_size = size + HEAD;
 
-        make_head(eth_buffer, edata.seq_send, SEND, once);
-        memcpy(eth_buffer+HEAD, data, once);
-        sent = eth_send(command_line.eth_device, edata.partner,
-                eth_buffer, once+HEAD);
-        sent -= HEAD;
-        if (sent < 0)
-            sent = 0;
+    sent = eth_proto_link_send();
+    sent -= HEAD;
+    return sent;
+}
 
-        program_timeout(2);
-        res = eth_recv(eth_buffer, HEAD, 0);
-        if (res == -1 && errno == EINTR)
-        {
-            unprogram_timeout();
-            retries--;
-            continue;
-        }
+int eth_proto_process_timeouts()
+{
+    if (!edata.send_acked && did_timeout_happen())
+    {
         unprogram_timeout();
-        retries = 3;
-
-        parse_head(eth_buffer, &rseq, &rc, 0);
-        if (rc == SEND)
+        if (edata.send_retries_left > 0)
         {
-            if (rseq != edata.seq_wait)
-                edata.wrong_recv++;
-            else if (!edata.tmp_buffer_filled)
-            {
-                memcpy(edata.tmp_eth_buffer, eth_buffer, res);
-                edata.tmp_buffer_filled = 1;
-            }
+            dump_line("Retrying. Left:%i\n", edata.send_retries_left);
+            eth_proto_link_send();
         }
-        else if (rc != ACK || rseq != edata.seq_send)
-            continue;
-
-        edata.seq_send++;
-        size -= sent;
-        data += sent;
-    } while(size > 0);
-    return total;
+        else
+        {
+            /* The connection has been lost */
+            dump_line("Connection lost");
+            edata.send_acked = 1;
+            edata.partner_set = 0;
+            edata.seq_wait = 0;
+            edata.seq_send = 0;
+            return 0; /* FAIL */
+        }
+    }
+    return 1; /* OK */
 }
 
 static void eth_fill_mac(unsigned char *mac, const char *str)
--- a/eth_server.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/eth_server.c	Sun Oct 07 00:10:17 2007 +0200
@@ -31,6 +31,9 @@
 {
     FD_SET(myfd, read_set);
     *maxfd = max(*maxfd, myfd);
+
+    if (!eth_proto_allow_sending())
+        avoid_sending(read_set);
 }
 
 void s_eth_process_read_fdset(fd_set *read_set)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow.c	Sun Oct 07 00:10:17 2007 +0200
@@ -0,0 +1,28 @@
+/*
+    Terminal Mixer - multi-point multi-user access to terminal applications
+    Copyright (C) 2007  LluĂ­s Batlle i Rossell
+
+    Please find the license in the provided COPYING file.
+*/
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/select.h>
+
+#include "main.h"
+#include "handlers.h"
+
+void avoid_sending(fd_set *read_set)
+{
+    dump_line("Avoid sending\n");
+    if (command_line.is_server)
+    {
+        if (app_stdout >= 0)
+            FD_CLR(app_stdout, read_set);
+        if (app_stderr >= 0)
+            FD_CLR(app_stderr, read_set);
+    }
+    else
+    {
+            FD_CLR(0, read_set); /* client terminal stdin */
+    }
+}
--- a/gen_sockets.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/gen_sockets.c	Sun Oct 07 00:10:17 2007 +0200
@@ -20,9 +20,19 @@
 {
     if (stream_buffer == 0)
     {
+        int eth_buf;
         assert(command_line.buffer_size > 0);
+
         stream_buffer_size = command_line.buffer_size;
-        ostream_buffer_size = command_line.buffer_size;
+#ifdef linux
+        if ((command_line.is_server && command_line.s_param.serve_eth)
+                || (!command_line.is_server && command_line.c_param.transport == ETHERNET))
+        {
+            eth_buf = eth_proto_max_send();
+            stream_buffer_size = min(command_line.buffer_size, eth_buf);
+        }
+#endif
+        ostream_buffer_size = stream_buffer_size;
         stream_buffer = (char *) malloc(stream_buffer_size);
         ostream_buffer = (char *) malloc(ostream_buffer_size);
     }
--- a/handlers.h	Sat Oct 06 21:56:22 2007 +0200
+++ b/handlers.h	Sun Oct 07 00:10:17 2007 +0200
@@ -49,3 +49,6 @@
 void c_eth_prepare_read_fdset(fd_set *read_set, int *maxfd);
 int  c_eth_process_read_fdset(fd_set *read_set);
 void c_eth_send_to_connected(const char *buffer, size_t size);
+
+/* flow.c */
+void avoid_sending(fd_set *read_set);
--- a/main.h	Sat Oct 06 21:56:22 2007 +0200
+++ b/main.h	Sun Oct 07 00:10:17 2007 +0200
@@ -43,6 +43,7 @@
 void install_signal_forwarders();
 void program_timeout(int secs);
 void unprogram_timeout();
+int did_timeout_happen();
 
 /* main.c */
 extern int app_stdin;
@@ -85,3 +86,6 @@
 int eth_proto_open();
 int eth_proto_recv(char *data, int size);
 int eth_proto_send(const char *data, int size);
+int eth_proto_allow_sending();
+int eth_proto_max_send();
+int eth_proto_process_timeouts();
--- a/server.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/server.c	Sun Oct 07 00:10:17 2007 +0200
@@ -47,6 +47,10 @@
 
         /* Will block */
         res = select(maxfd + 1, &read_set, 0, 0, 0);
+#ifdef linux
+        if (command_line.s_param.serve_eth)
+            eth_proto_process_timeouts();
+#endif
         if (res == -1)
         {
             if (errno == EINTR)
--- a/signals.c	Sat Oct 06 21:56:22 2007 +0200
+++ b/signals.c	Sun Oct 07 00:10:17 2007 +0200
@@ -16,6 +16,8 @@
 static char old_alarm_handler_set;
 static struct sigaction old_alarm_handler;
 
+static int timeout_timed_out; 
+
 static void forward_signals_to_child_handler(int val)
 {
     kill(child, val);
@@ -61,6 +63,7 @@
 
 static void alarm_handler(int x)
 {
+    timeout_timed_out = 1;
 }
 
 void program_timeout(int secs)
@@ -81,14 +84,22 @@
   sigaction(SIGALRM, &act, old);
 
   old_alarm_handler_set = 1;
+  timeout_timed_out = 0;
 
   alarm(secs);
 }
 
+int did_timeout_happen()
+{
+    return timeout_timed_out;
+}
+
 void unprogram_timeout()
 {
     alarm(0);
 
+    timeout_timed_out = 0;
+
     if (old_alarm_handler_set)
     {
         sigaction(SIGALRM, &old_alarm_handler, 0);