Added wait for jobs. v0.2
authorviric@llimona
Sun, 25 Mar 2007 20:52:04 +0200
changeset 42 f093d0964cf5
parent 41 cad41574feda
child 43 288102ceed5c
Added wait for jobs.
TODO
client.c
jobs.c
main.c
main.h
msg.h
server.c
--- a/TODO	Sun Mar 25 19:55:42 2007 +0200
+++ b/TODO	Sun Mar 25 20:52:04 2007 +0200
@@ -9,7 +9,7 @@
  -* Allow killing the running job (Allowed with -p easily)
  -* Allow tailing any job
  -* Allow removing a job from the queue
- - Allow waiting any job
+ -* Allow waiting any job
 v0.1:
  -* The clients should _always_ go into background. *
  -* Allow to receive more parameters in the command line, and them be joined
--- a/client.c	Sun Mar 25 19:55:42 2007 +0200
+++ b/client.c	Sun Mar 25 20:52:04 2007 +0200
@@ -269,3 +269,34 @@
     }
     /* This will never be reached */
 }
+
+void c_wait_job()
+{
+    struct msg m;
+    int res;
+    char *string = 0;
+
+    /* Send the request */
+    m.type = WAITJOB;
+    m.u.jobid = command_line.jobid;
+    send_msg(server_socket, &m);
+
+    /* Receive the answer */
+    res = recv_msg(server_socket, &m);
+    assert(res == sizeof(m));
+    switch(m.type)
+    {
+    case WAITJOB_OK:
+        return;
+        /* WILL NOT GO FURTHER */
+    case LIST_LINE: /* Only ONE line accepted */
+        string = (char *) malloc(m.u.line_size);
+        res = recv_bytes(server_socket, string, m.u.line_size);
+        assert(res == m.u.line_size);
+        fprintf(stderr, "Error in the request: %s", 
+                string);
+        exit(-1);
+        /* WILL NOT GO FURTHER */
+    }
+    /* This will never be reached */
+}
--- a/jobs.c	Sun Mar 25 19:55:42 2007 +0200
+++ b/jobs.c	Sun Mar 25 20:52:04 2007 +0200
@@ -12,21 +12,30 @@
 
 struct Job
 {
+    struct Job *next;
     int jobid;
     char *command;
     enum Jobstate state;
     int errorlevel;
-    struct Job *next;
     char *output_filename;
     int store_output;
     int pid;
 };
 
+struct Notify
+{
+    int socket;
+    int jobid;
+    struct Notify *next;
+};
+
 /* Globals */
 static struct Job *firstjob = 0;
 static struct Job *first_finished_job = 0;
 static jobids = 0;
 
+static struct Notify *first_notify = 0;
+
 static void send_list_line(int s, const char * str)
 {
     struct msg m;
@@ -446,3 +455,164 @@
     }
     send_msg(s, &m);
 }
+
+static void add_to_notify_list(int s, int jobid)
+{
+    struct Notify *n;
+    struct Notify *new;
+
+    new = (struct Notify *) malloc(sizeof(*n));
+
+    new->socket = s;
+    new->jobid = jobid;
+    new->next = 0;
+
+    n = first_notify;
+    if (n == 0)
+    {
+        first_notify = new;
+        return;
+    }
+
+    while(n->next != 0)
+        n = n->next;
+
+    n->next = new;
+}
+
+static void send_waitjob_ok(int s)
+{
+    struct msg m;
+
+    m.type = WAITJOB_OK;
+    send_msg(s, &m);
+}
+
+static enum Jobstate
+get_job_state(int jobid)
+{
+    struct Job *j;
+
+    j = findjob(jobid);
+    if (j != NULL)
+        return j->state;
+
+    j = find_finished_job(jobid);
+
+    if (j != NULL)
+        return j->state;
+
+    return -1;
+}
+
+/* Don't complain, if the socket doesn't exist */
+void s_remove_notification(int s)
+{
+    struct Notify *n;
+    struct Notify *previous;
+    n = first_notify;
+    while (n != 0 && n->socket != s)
+        n = n->next;
+    if (n == 0)
+        return;
+
+    /* Remove the notification */
+    previous = first_notify;
+    if (n == previous)
+    {
+        free(first_notify);
+        first_notify = 0;
+        return;
+    }
+
+    /* if not the first... */
+    while(previous->next != n)
+        previous = previous->next;
+
+    previous->next = n->next;
+    free(n);
+}
+
+/* This is called when a job finishes */
+void check_notify_list(int jobid)
+{
+    struct Notify *n;
+    struct Notify *previous;
+    enum Jobstate s;
+
+    n = first_notify;
+    while (n != 0 && n->jobid != jobid)
+    {
+        n = n->next;
+    }
+
+    if (n == 0)
+    {
+        return;
+    }
+
+    s = get_job_state(jobid);
+    /* If the job finishes, notify the waiter */
+    if (s == FINISHED)
+    {
+        send_waitjob_ok(n->socket);
+        s_remove_notification(n->socket);
+    }
+}
+
+void s_wait_job(int s, int jobid)
+{
+    struct Job *p = 0;
+    struct msg m;
+
+    if (jobid == -1)
+    {
+        /* Find the last job added */
+        p = firstjob;
+
+        if (p != 0)
+            while (p->next != 0)
+                p = p->next;
+
+        /* Look in finished jobs if needed */
+        if (p == 0)
+        {
+            p = first_finished_job;
+            if (p != 0)
+                while (p->next != 0)
+                    p = p->next;
+        }
+    }
+    else
+    {
+        p = firstjob;
+        while (p != 0 && p->jobid != jobid)
+            p = p->next;
+
+        /* Look in finished jobs if needed */
+        if (p == 0)
+        {
+            p = first_finished_job;
+            while (p != 0 && p->jobid != jobid)
+                p = p->next;
+        }
+    }
+
+    if (p == 0)
+    {
+        char tmp[50];
+        if (jobid == -1)
+            sprintf(tmp, "The last job cannot be waited.\n", jobid);
+        else
+            sprintf(tmp, "The job %i cannot be waited.\n", jobid);
+        send_list_line(s, tmp);
+        return;
+    }
+
+    if (p->state == FINISHED)
+    {
+        send_waitjob_ok(s);
+    }
+    else
+        add_to_notify_list(s, p->jobid);
+}
--- a/main.c	Sun Mar 25 19:55:42 2007 +0200
+++ b/main.c	Sun Mar 25 20:52:04 2007 +0200
@@ -15,7 +15,7 @@
 /* Allocated in get_command() */
 char *new_command;
 
-static char version[] = "Task Spooler v0.1";
+static char version[] = "Task Spooler v0.2";
 
 static void default_command_line()
 {
@@ -58,7 +58,7 @@
 
     /* Parse options */
     while(1) {
-        c = getopt(argc, argv, ":VhKClnfr:t:c:o:p:");
+        c = getopt(argc, argv, ":VhKClnfr:t:c:o:p:w:");
 
         if (c == -1)
             break;
@@ -106,6 +106,10 @@
                 command_line.request = c_REMOVEJOB;
                 command_line.jobid = atoi(optarg);
                 break;
+            case 'w':
+                command_line.request = c_WAITJOB;
+                command_line.jobid = atoi(optarg);
+                break;
             case ':':
                 switch(optopt)
                 {
@@ -130,6 +134,11 @@
                         command_line.jobid = -1; /* This means the 'last'
                                                     added job */
                         break;
+                    case 'w':
+                        command_line.request = c_WAITJOB;
+                        command_line.jobid = -1; /* This means the 'last'
+                                                    added job */
+                        break;
                     default:
                         fprintf(stderr, "Option %c missing argument: %s\n",
                                 optopt, optarg);
@@ -177,14 +186,16 @@
 
 static void print_help(const char *cmd)
 {
-    printf("usage: %s < -K | -C | -l | -t [id] | -c [id] | -p [id] > "
-            "[-n] [ -f ] [cmd...]\n", cmd);
+    printf("usage: %s < -K | -C | -l | -t [id] | -c [id] | -p [id] | -r [id] >\n"
+           "       [-n] [ -f ] [cmd...]\n", cmd);
     printf("  -K       kill the task spooler server\n");
     printf("  -C       clear the list of finished jobs\n");
     printf("  -l       show the job list (default action)\n");
     printf("  -t [id]  tail -f the output of the job. Last if not specified.\n");
     printf("  -c [id]  cat the output of the job. Last if not specified.\n");
     printf("  -p [id]  show the pid of the job. Last if not specified.\n");
+    printf("  -r [id]  remove a job. The last added, if not specified.\n");
+    printf("  -w [id]  wait for a job. The last added, if not specified.\n");
     printf("  -h       show this help\n");
     printf("  -V       show the program version\n");
     printf("Adding jobs:\n");
@@ -264,6 +275,10 @@
         assert(command_line.need_server);
         c_remove_job();
         break;
+    case c_WAITJOB:
+        assert(command_line.need_server);
+        c_wait_job();
+        break;
     }
 
     if (command_line.need_server)
--- a/main.h	Sun Mar 25 19:55:42 2007 +0200
+++ b/main.h	Sun Mar 25 20:52:04 2007 +0200
@@ -10,7 +10,8 @@
     c_CAT,
     c_SHOW_OUTPUT_FILE,
     c_SHOW_PID,
-    c_REMOVEJOB
+    c_REMOVEJOB,
+    c_WAITJOB
 };
 
 struct Command_line {
@@ -50,6 +51,7 @@
 void s_process_runjob_ok(int jobid, char *oname, int pid);
 void s_send_output(int socket, int jobid);
 void s_remove_job(int s, int jobid);
+void s_remove_notification(int s);
 
 /* msgdump.c */
 void msgdump(const struct msg *m);
--- a/msg.h	Sun Mar 25 19:55:42 2007 +0200
+++ b/msg.h	Sun Mar 25 20:52:04 2007 +0200
@@ -20,7 +20,9 @@
     ASK_OUTPUT,
     ANSWER_OUTPUT,
     REMOVEJOB,
-    REMOVEJOB_OK
+    REMOVEJOB_OK,
+    WAITJOB,
+    WAITJOB_OK,
 };
 
 struct msg
--- a/server.c	Sun Mar 25 19:55:42 2007 +0200
+++ b/server.c	Sun Mar 25 20:52:04 2007 +0200
@@ -227,6 +227,7 @@
     if (m.type == ENDJOB)
     {
         job_finished(m.u.errorlevel);
+        check_notify_list(client_cs[index].jobid);
     }
 
     if (m.type == CLEAR_FINISHED)
@@ -244,6 +245,11 @@
         s_remove_job(client_cs[index].socket, m.u.jobid);
     }
 
+    if (m.type == WAITJOB)
+    {
+        s_wait_job(client_cs[index].socket, m.u.jobid);
+    }
+
     return NOBREAK; /* normal */
 }