Merging the implementation I did for a proper blocking on too much enqueuing.
--- a/jobs.c Wed Oct 07 21:20:18 2009 +0200
+++ b/jobs.c Mon Jul 18 22:44:02 2011 +0200
@@ -36,6 +36,8 @@
static struct Notify *first_notify = 0;
+int max_jobs;
+
static struct Job * get_job(int jobid);
void notify_errorlevel(struct Job *p);
@@ -105,6 +107,22 @@
return 0;
}
+static struct Job * findjob_holding_client()
+{
+ struct Job *p;
+
+ /* Show Queued or Running jobs */
+ p = firstjob;
+ while(p != 0)
+ {
+ if (p->state == HOLDING_CLIENT)
+ return p;
+ p = p->next;
+ }
+
+ return 0;
+}
+
static struct Job * find_finished_job(int jobid)
{
struct Job *p;
@@ -121,6 +139,21 @@
return 0;
}
+static int count_not_finished_jobs()
+{
+ int count=0;
+ struct Job *p;
+
+ /* Show Queued or Running jobs */
+ p = firstjob;
+ while(p != 0)
+ {
+ ++count;
+ p = p->next;
+ }
+ return count;
+}
+
static void add_notify_errorlevel_to(struct Job *job, int jobid)
{
int *p;
@@ -148,6 +181,19 @@
p->state = RUNNING;
}
+/* -1 means nothing awaken, otherwise returns the jobid awaken */
+int wake_hold_client()
+{
+ struct Job *p;
+ p = findjob_holding_client();
+ if (p)
+ {
+ p->state = QUEUED;
+ return p->jobid;
+ }
+ return -1;
+}
+
const char * jstate2string(enum Jobstate s)
{
const char * jobstate;
@@ -165,6 +211,9 @@
case SKIPPED:
jobstate = "skipped";
break;
+ case HOLDING_CLIENT:
+ jobstate = "skipped";
+ break;
}
return jobstate;
}
@@ -183,9 +232,12 @@
p = firstjob;
while(p != 0)
{
- buffer = joblist_line(p);
- send_list_line(s,buffer);
- free(buffer);
+ if (p->state != HOLDING_CLIENT)
+ {
+ buffer = joblist_line(p);
+ send_list_line(s,buffer);
+ free(buffer);
+ }
p = p->next;
}
@@ -270,7 +322,10 @@
p = newjobptr();
p->jobid = jobids++;
- p->state = QUEUED;
+ if (count_not_finished_jobs() < max_jobs)
+ p->state = QUEUED;
+ else
+ p->state = HOLDING_CLIENT;
p->store_output = m->u.newjob.store_output;
p->should_keep_finished = m->u.newjob.should_keep_finished;
p->notify_errorlevel_to = 0;
@@ -539,18 +594,28 @@
return;
}
-int job_is_running(int jobid)
+static int job_is_in_state(int jobid, enum Jobstate state)
{
struct Job *p;
p = findjob(jobid);
if (p == 0)
return 0;
- if (p->state == RUNNING)
+ if (p->state == state)
return 1;
return 0;
}
+int job_is_running(int jobid)
+{
+ return job_is_in_state(jobid, RUNNING);
+}
+
+int job_is_holding_client(int jobid)
+{
+ return job_is_in_state(jobid, HOLDING_CLIENT);
+}
+
void job_finished(const struct Result *result, int jobid)
{
struct Job *p;
@@ -562,9 +627,11 @@
if (p == 0)
error("on jobid %i finished, it doesn't exist", jobid);
- if (p->state != RUNNING)
- error("on jobid %i finished, it is not running but %i",
- jobid, p->state);
+ /* The job may be not only in running state, but also in other states, as
+ * we call this to clean up the jobs list in case of the client closing the
+ * connection. */
+ if (p->state == RUNNING)
+ --busy_slots;
/* Mark state */
if (result->skipped)
@@ -614,8 +681,6 @@
*jpointer = newfirst;
}
-
- --busy_slots;
}
void s_clear_finished()
@@ -815,12 +880,13 @@
m.type = ANSWER_OUTPUT;
m.u.output.store_output = p->store_output;
m.u.output.pid = p->pid;
- if (m.u.output.store_output)
+ if (m.u.output.store_output && p->output_filename)
m.u.output.ofilename_size = strlen(p->output_filename) + 1;
else
m.u.output.ofilename_size = 0;
send_msg(s, &m);
- send_bytes(s, p->output_filename, m.u.output.ofilename_size);
+ if (m.u.output.ofilename_size > 0)
+ send_bytes(s, p->output_filename, m.u.output.ofilename_size);
}
void notify_errorlevel(struct Job *p)
--- a/main.h Wed Oct 07 21:20:18 2009 +0200
+++ b/main.h Mon Jul 18 22:44:02 2011 +0200
@@ -97,7 +97,8 @@
QUEUED,
RUNNING,
FINISHED,
- SKIPPED
+ SKIPPED,
+ HOLDING_CLIENT
};
struct msg
@@ -226,6 +227,8 @@
void s_set_max_slots(int new_max_slots);
void s_get_max_slots(int s);
int job_is_running(int jobid);
+int job_is_holding_client(int jobid);
+int wake_hold_client();
/* server.c */
void server_main(int notify_fd, char *_path);
--- a/server.c Wed Oct 07 21:20:18 2009 +0200
+++ b/server.c Mon Jul 18 22:44:02 2011 +0200
@@ -59,6 +59,9 @@
static char *path;
static int max_descriptors;
+/* in jobs.c */
+extern int max_jobs;
+
static void s_send_version(int s)
{
struct msg m;
@@ -168,6 +171,10 @@
process_type = SERVER;
max_descriptors = get_max_descriptors();
+ /* Arbitrary limit, that will block the enqueuing, but should allow space
+ * for usual ts queries */
+ max_jobs = max_descriptors - 5;
+
path = _path;
/* Move the server to the socket directory */
@@ -266,11 +273,19 @@
newjob = next_run_job();
if (newjob != -1)
{
- int conn;
+ int conn, awaken_job;
conn = get_conn_of_jobid(newjob);
/* This next marks the firstjob state to RUNNING */
s_mark_job_running(newjob);
s_runjob(newjob, conn);
+
+ while ((awaken_job = wake_hold_client()) != -1)
+ {
+ int wake_conn = get_conn_of_jobid(awaken_job);
+ if (wake_conn == -1)
+ error("The job awaken does not have a connection open");
+ s_newjob_ok(wake_conn);
+ }
}
}
@@ -307,7 +322,7 @@
{
/* Act as if the job ended. */
int jobid = client_cs[index].jobid;
- if (client_cs[index].hasjob && job_is_running(jobid))
+ if (client_cs[index].hasjob)
{
struct Result r;
@@ -369,7 +384,8 @@
case NEWJOB:
client_cs[index].jobid = s_newjob(s, &m);
client_cs[index].hasjob = 1;
- s_newjob_ok(index);
+ if (!job_is_holding_client(client_cs[index].jobid))
+ s_newjob_ok(index);
break;
case RUNJOB_OK:
{