An implementation that looks like working for the queue limit and client queuelimit2
authorviric <viriketo@gmail.com>
Mon, 18 Jul 2011 22:22:55 +0200
branchqueuelimit2
changeset 287 b3c38ff8f41a
parent 284 6292b04b194b
child 288 8459747e1a73
An implementation that looks like working for the queue limit and client blocking.
jobs.c
main.h
server.c
--- a/jobs.c	Wed Oct 07 21:20:18 2009 +0200
+++ b/jobs.c	Mon Jul 18 22:22:55 2011 +0200
@@ -36,6 +36,8 @@
 
 static struct Notify *first_notify = 0;
 
+int max_jobs;
+
 static struct Job * get_job(int jobid);
 void notify_errorlevel(struct Job *p);
 
@@ -105,6 +107,22 @@
     return 0;
 }
 
+static struct Job * findjob_holding_client()
+{
+    struct Job *p;
+
+    /* Show Queued or Running jobs */
+    p = firstjob;
+    while(p != 0)
+    {
+        if (p->state == HOLDING_CLIENT)
+            return p;
+        p = p->next;
+    }
+
+    return 0;
+}
+
 static struct Job * find_finished_job(int jobid)
 {
     struct Job *p;
@@ -121,6 +139,21 @@
     return 0;
 }
 
+static int count_not_finished_jobs()
+{
+    int count=0;
+    struct Job *p;
+
+    /* Show Queued or Running jobs */
+    p = firstjob;
+    while(p != 0)
+    {
+        ++count;
+        p = p->next;
+    }
+    return count;
+}
+
 static void add_notify_errorlevel_to(struct Job *job, int jobid)
 {
     int *p;
@@ -148,6 +181,19 @@
     p->state = RUNNING;
 }
 
+/* -1 means nothing awaken, otherwise returns the jobid awaken */
+int wake_hold_client()
+{
+    struct Job *p;
+    p = findjob_holding_client();
+    if (p)
+    {
+        p->state = QUEUED;
+        return p->jobid;
+    }
+    return -1;
+}
+
 const char * jstate2string(enum Jobstate s)
 {
     const char * jobstate;
@@ -165,6 +211,9 @@
         case SKIPPED:
             jobstate = "skipped";
             break;
+        case HOLDING_CLIENT:
+            jobstate = "skipped";
+            break;
     }
     return jobstate;
 }
@@ -183,9 +232,12 @@
     p = firstjob;
     while(p != 0)
     {
-        buffer = joblist_line(p);
-        send_list_line(s,buffer);
-        free(buffer);
+        if (p->state != HOLDING_CLIENT)
+        {
+            buffer = joblist_line(p);
+            send_list_line(s,buffer);
+            free(buffer);
+        }
         p = p->next;
     }
 
@@ -270,7 +322,10 @@
     p = newjobptr();
 
     p->jobid = jobids++;
-    p->state = QUEUED;
+    if (count_not_finished_jobs() < max_jobs)
+        p->state = QUEUED;
+    else
+        p->state = HOLDING_CLIENT;
     p->store_output = m->u.newjob.store_output;
     p->should_keep_finished = m->u.newjob.should_keep_finished;
     p->notify_errorlevel_to = 0;
@@ -539,18 +594,28 @@
     return;
 }
 
-int job_is_running(int jobid)
+static int job_is_in_state(int jobid, enum Jobstate state)
 {
     struct Job *p;
 
     p = findjob(jobid);
     if (p == 0)
         return 0;
-    if (p->state == RUNNING)
+    if (p->state == state)
         return 1;
     return 0;
 }
 
+int job_is_running(int jobid)
+{
+    return job_is_in_state(jobid, RUNNING);
+}
+
+int job_is_holding_client(int jobid)
+{
+    return job_is_in_state(jobid, HOLDING_CLIENT);
+}
+
 void job_finished(const struct Result *result, int jobid)
 {
     struct Job *p;
--- a/main.h	Wed Oct 07 21:20:18 2009 +0200
+++ b/main.h	Mon Jul 18 22:22:55 2011 +0200
@@ -97,7 +97,8 @@
     QUEUED,
     RUNNING,
     FINISHED,
-    SKIPPED
+    SKIPPED,
+    HOLDING_CLIENT
 };
 
 struct msg
@@ -226,6 +227,8 @@
 void s_set_max_slots(int new_max_slots);
 void s_get_max_slots(int s);
 int job_is_running(int jobid);
+int job_is_holding_client(int jobid);
+int wake_hold_client();
 
 /* server.c */
 void server_main(int notify_fd, char *_path);
--- a/server.c	Wed Oct 07 21:20:18 2009 +0200
+++ b/server.c	Mon Jul 18 22:22:55 2011 +0200
@@ -59,6 +59,9 @@
 static char *path;
 static int max_descriptors;
 
+/* in jobs.c */
+extern int max_jobs;
+
 static void s_send_version(int s)
 {
     struct msg m;
@@ -168,6 +171,10 @@
     process_type = SERVER;
     max_descriptors = get_max_descriptors();
 
+    /* Arbitrary limit, that will block the enqueuing, but should allow space
+     * for usual ts queries */
+    max_jobs = max_descriptors - 5;
+
     path = _path;
 
     /* Move the server to the socket directory */
@@ -266,11 +273,19 @@
         newjob = next_run_job();
         if (newjob != -1)
         {
-            int conn;
+            int conn, awaken_job;
             conn = get_conn_of_jobid(newjob);
             /* This next marks the firstjob state to RUNNING */
             s_mark_job_running(newjob);
             s_runjob(newjob, conn);
+
+            while ((awaken_job = wake_hold_client()) != -1)
+            {
+                int wake_conn = get_conn_of_jobid(awaken_job);
+                if (wake_conn == -1)
+                    error("The job awaken does not have a connection open");
+                s_newjob_ok(wake_conn);
+            }
         }
     }
 
@@ -369,7 +384,8 @@
         case NEWJOB:
             client_cs[index].jobid = s_newjob(s, &m);
             client_cs[index].hasjob = 1;
-            s_newjob_ok(index);
+            if (!job_is_holding_client(client_cs[index].jobid))
+                s_newjob_ok(index);
             break;
         case RUNJOB_OK:
             {