client.c
author viric@mandarina
Fri, 02 Oct 2009 19:20:41 +0200
changeset 274 62048132f95f
parent 267 11631dd11ff8
child 276 dc78c982c8c5
permissions -rw-r--r--
Making changes so it looks like the code in ts-0.6.4, which I can't find

/*
    Task Spooler - a task queue system for the unix user
    Copyright (C) 2007-2009  LluĂ­s Batlle i Rossell

    Please find the license in the provided COPYING file.
*/
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include "main.h"

static void c_end_of_job(const struct Result *res);
static void c_wait_job_send();
static void c_wait_running_job_send();

char *build_command_string()
{
    int size;
    int i;
    int num;
    char **array;
    char *commandstring;
    
    size = 0;
    num = command_line.command.num;
    array = command_line.command.array;

    /* Count bytes needed */
    for (i = 0; i < num; ++i)
    {
        /* The '1' is for spaces, and at the last i,
         * for the null character */
        size = size + strlen(array[i]) + 1;
    }

    /* Alloc */
    commandstring = (char *) malloc(size);
    if(commandstring == NULL)
        error("Error in malloc for commandstring");

    /* Build the command */
    strcpy(commandstring, array[0]);
    for (i = 1; i < num; ++i)
    {
        strcat(commandstring, " ");
        strcat(commandstring, array[i]);
    }

    return commandstring;
}

void c_new_job()
{
    struct msg m;
    char *new_command;
    char *myenv;

    m.type = NEWJOB;

    new_command = build_command_string();

    myenv = get_environment();

    /* global */
    m.u.newjob.command_size = strlen(new_command) + 1; /* add null */
    if (myenv)
        m.u.newjob.env_size = strlen(myenv) + 1; /* add null */
    else
        m.u.newjob.env_size = 0;
    if (command_line.label)
        m.u.newjob.label_size = strlen(command_line.label) + 1; /* add null */
    else
        m.u.newjob.label_size = 0;
    m.u.newjob.store_output = command_line.store_output;
    m.u.newjob.do_depend = command_line.do_depend;
    m.u.newjob.depend_on = command_line.depend_on;
    m.u.newjob.should_keep_finished = command_line.should_keep_finished;
    m.u.newjob.command_size = strlen(new_command) + 1; /* add null */

    /* Send the message */
    send_msg(server_socket, &m);

    /* Send the command */
    send_bytes(server_socket, new_command, m.u.newjob.command_size);

    /* Send the label */
    send_bytes(server_socket, command_line.label, m.u.newjob.label_size);

    /* Send the environment */
    send_bytes(server_socket, myenv, m.u.newjob.env_size);

    free(new_command);
    free(myenv);
}

int c_wait_newjob_ok()
{
    struct msg m;
    int res;

    res = recv_msg(server_socket, &m);
    if(res == -1)
        error("Error in wait_newjob_ok");
    if(m.type != NEWJOB_OK)
        error("Error getting the newjob_ok");

    return m.u.jobid;
}

int c_wait_server_commands()
{
    struct msg m;
    int res;

    while (1)
    {
        res = recv_msg(server_socket, &m);
        if(res == -1)
            error("Error in wait_server_commands");

        if (res == 0)
            break;
        if(res != sizeof(m))
            error("Error in wait_server_commands");
        if (m.type == RUNJOB)
        {
            struct Result res;
            res.skipped = 0;
            /* These will send RUNJOB_OK */
            if (command_line.do_depend && m.u.last_errorlevel != 0)
            {
                res.errorlevel = -1;
                res.user_ms = 0.;
                res.system_ms = 0.;
                res.real_ms = 0.;
                res.skipped = 1;
                c_send_runjob_ok(0, -1);
            }
            else
                run_job(&res);
            c_end_of_job(&res);
            return res.errorlevel;
        }
    }
    return -1;
}

void c_wait_server_lines()
{
    struct msg m;
    int res;

    while (1)
    {
        res = recv_msg(server_socket, &m);
        if(res == -1)
            error("Error in wait_server_lines");

        if (res == 0)
            break;
        if(res != sizeof(m))
            error("Error in wait_server_lines 2");
        if (m.type == LIST_LINE)
        {
            char * buffer;
            buffer = (char *) malloc(m.u.size);
            recv_bytes(server_socket, buffer, m.u.size);
            printf("%s", buffer);
            free(buffer);
        }
    }
}

void c_list_jobs()
{
    struct msg m;

    m.type = LIST;

    send_msg(server_socket, &m);
    send_msg(server_socket, &m);
}

/* Exits if wrong */
void c_check_version()
{
    struct msg m;
    int res;

    m.type = GET_VERSION;
    send_msg(server_socket, &m);

    /* Set up a 2 second timeout to receive the
    version msg. */
    install_sigalrm_donothing();
    alarm(2);

    res = recv_msg(server_socket, &m);
    if(res == -1 || res == 0)
    {
        fprintf(stderr,
            "Error checking version with the server. Did you forget an "
            "old ts server running?\n");
        error("Error checking version");
    }
    alarm(0);

    if (m.type != VERSION || m.u.version != PROTOCOL_VERSION)
    {
        printf("Wrong server version. Received %i, expecting %i\n",
            m.u.version, PROTOCOL_VERSION);

        error("Wrong server version. Received %i, expecting %i",
            m.u.version, PROTOCOL_VERSION);
    }
}

void c_show_info()
{
    struct msg m;
    int res;

    m.type = INFO;
    m.u.jobid = command_line.jobid;

    send_msg(server_socket, &m);

    while (1)
    {
        res = recv_msg(server_socket, &m);
        if(res == -1)
            error("Error in wait_server_lines");

        if (res == 0)
            break;
        if(res != sizeof(m))
            error("Error in wait_server_lines 2");
        if (m.type == INFO_DATA)
        {
            char * buffer;
            enum { DSIZE = 1000 };

            /* We're going to output data using the stdout fd */
            fflush(stdout);
            buffer = (char *) malloc(DSIZE);
            do
            {
                res = recv(server_socket, buffer, DSIZE, 0);
                if (res > 0)
                    write(1, buffer, res);
            } while(res > 0);
            free(buffer);
        }
    }
}

void c_send_runjob_ok(const char *ofname, int pid)
{
    struct msg m;

    /* Prepare the message */
    m.type = RUNJOB_OK;
    if (ofname) /* ofname == 0, skipped execution */
	m.u.output.store_output = command_line.store_output;
    else
	m.u.output.store_output = 0;
    m.u.output.pid = pid;
    if (m.u.output.store_output)
        m.u.output.ofilename_size = strlen(ofname) + 1;
    else
        m.u.output.ofilename_size = 0;

    send_msg(server_socket, &m);

    /* Send the filename */
    if (command_line.store_output)
        send_bytes(server_socket, ofname, m.u.output.ofilename_size);
}

static void c_end_of_job(const struct Result *res)
{
    struct msg m;

    m.type = ENDJOB;
    m.u.result = *res; /* struct copy */

    send_msg(server_socket, &m);
}

void c_shutdown_server()
{
    struct msg m;

    m.type = KILL_SERVER;
    send_msg(server_socket, &m);
}

void c_clear_finished()
{
    struct msg m;

    m.type = CLEAR_FINISHED;
    send_msg(server_socket, &m);
}

static char * get_output_file(int *pid)
{
    struct msg m;
    int res;
    char *string = 0;

    /* Send the request */
    m.type = ASK_OUTPUT;
    m.u.jobid = command_line.jobid;
    send_msg(server_socket, &m);

    /* Receive the answer */
    res = recv_msg(server_socket, &m);
    if(res != sizeof(m))
        error("Error in get_output_file");
    switch(m.type)
    {
    case ANSWER_OUTPUT:
        if (m.u.output.store_output)
        {
            /* Receive the output file name */
            string = 0;
            if (m.u.output.ofilename_size > 0)
            {
                string = (char *) malloc(m.u.output.ofilename_size);
                recv_bytes(server_socket, string, m.u.output.ofilename_size);
            }
            *pid = m.u.output.pid;
            return string;
        }
        *pid = m.u.output.pid;
        return 0;
        /* WILL NOT GO FURTHER */
    case LIST_LINE: /* Only ONE line accepted */
        string = (char *) malloc(m.u.size);
        res = recv_bytes(server_socket, string, m.u.size);
        if(res != m.u.size)
            error("Error in get_output_file line size");
        fprintf(stderr, "Error in the request: %s", 
                string);
        exit(-1);
        /* WILL NOT GO FURTHER */
    default:
        warning("Wrong internal message in get_output_file line size");
    }
    /* This will never be reached */
    return 0;
}

int c_tail()
{
    char *str;
    int pid;
    str = get_output_file(&pid);
    if (str == 0)
    {
        fprintf(stderr, "The output is not stored. Cannot tail.\n");
        exit(-1);
    }

    c_wait_running_job_send();

    return tail_file(str, 10 /* Last lines to show */);
}

int c_cat()
{
    char *str;
    int pid;
    str = get_output_file(&pid);
    if (str == 0)
    {
        fprintf(stderr, "The output is not stored. Cannot cat.\n");
        exit(-1);
    }
    c_wait_running_job_send();

    return tail_file(str, -1 /* All the lines */);
}

void c_show_output_file()
{
    char *str;
    int pid;
    /* This will exit if there is any error */
    str = get_output_file(&pid);
    if (str == 0)
    {
        fprintf(stderr, "The output is not stored.\n");
        exit(-1);
    }
    printf("%s\n", str);
    free(str);
}

void c_show_pid()
{
    char *str;
    int pid;
    /* This will exit if there is any error */
    str = get_output_file(&pid);
    printf("%i\n", pid);
}

void c_remove_job()
{
    struct msg m;
    int res;
    char *string = 0;

    /* Send the request */
    m.type = REMOVEJOB;
    m.u.jobid = command_line.jobid;
    send_msg(server_socket, &m);

    /* Receive the answer */
    res = recv_msg(server_socket, &m);
    if(res != sizeof(m))
        error("Error in remove_job");
    switch(m.type)
    {
    case REMOVEJOB_OK:
        return;
        /* WILL NOT GO FURTHER */
    case LIST_LINE: /* Only ONE line accepted */
        string = (char *) malloc(m.u.size);
        res = recv_bytes(server_socket, string, m.u.size);
        fprintf(stderr, "Error in the request: %s", 
                string);
        exit(-1);
        /* WILL NOT GO FURTHER */
    default:
        warning("Wrong internal message in remove_job");
    }
    /* This will never be reached */
}

int c_wait_job_recv()
{
    struct msg m;
    int res;
    char *string = 0;

    /* Receive the answer */
    res = recv_msg(server_socket, &m);
    if(res != sizeof(m))
        error("Error in wait_job");
    switch(m.type)
    {
    case WAITJOB_OK:
        return m.u.result.errorlevel;
        /* WILL NOT GO FURTHER */
    case LIST_LINE: /* Only ONE line accepted */
        string = (char *) malloc(m.u.size);
        res = recv_bytes(server_socket, string, m.u.size);
        if(res != m.u.size)
            error("Error in wait_job - line size");
        fprintf(stderr, "Error in the request: %s", 
                string);
        exit(-1);
        /* WILL NOT GO FURTHER */
    default:
        warning("Wrong internal message in c_wait_job");
    }
    /* This will never be reached */
    return -1;
}

static void c_wait_job_send()
{
    struct msg m;

    /* Send the request */
    m.type = WAITJOB;
    m.u.jobid = command_line.jobid;
    send_msg(server_socket, &m);
}

static void c_wait_running_job_send()
{
    struct msg m;

    /* Send the request */
    m.type = WAIT_RUNNING_JOB;
    m.u.jobid = command_line.jobid;
    send_msg(server_socket, &m);
}

/* Returns the errorlevel */
int c_wait_job()
{
    c_wait_job_send();
    return c_wait_job_recv();
}

/* Returns the errorlevel */
int c_wait_running_job()
{
    c_wait_running_job_send();
    return c_wait_job_recv();
}

void c_send_max_slots(int max_slots)
{
    struct msg m;

    /* Send the request */
    m.type = SET_MAX_SLOTS;
    m.u.max_slots = command_line.max_slots;
    send_msg(server_socket, &m);
}

void c_get_max_slots()
{
    struct msg m;
    int res;

    /* Send the request */
    m.type = GET_MAX_SLOTS;
    m.u.max_slots = command_line.max_slots;
    send_msg(server_socket, &m);

    /* Receive the answer */
    res = recv_msg(server_socket, &m);
    if(res != sizeof(m))
        error("Error in move_urgent");
    switch(m.type)
    {
        case GET_MAX_SLOTS_OK:
            printf("%i\n", m.u.max_slots);
            return;
        default:
            warning("Wrong internal message in get_max_slots");
    }
}

void c_move_urgent()
{
    struct msg m;
    int res;
    char *string = 0;

    /* Send the request */
    m.type = URGENT;
    m.u.jobid = command_line.jobid;
    send_msg(server_socket, &m);

    /* Receive the answer */
    res = recv_msg(server_socket, &m);
    if(res != sizeof(m))
        error("Error in move_urgent");
    switch(m.type)
    {
    case URGENT_OK:
        return;
        /* WILL NOT GO FURTHER */
    case LIST_LINE: /* Only ONE line accepted */
        string = (char *) malloc(m.u.size);
        res = recv_bytes(server_socket, string, m.u.size);
        if(res != m.u.size)
            error("Error in move_urgent - line size");
        fprintf(stderr, "Error in the request: %s", 
                string);
        exit(-1);
        /* WILL NOT GO FURTHER */
    default:
        warning("Wrong internal message in move_urgent");
    }
    /* This will never be reached */
    return;
}

void c_get_state()
{
    struct msg m;
    int res;
    char *string = 0;

    /* Send the request */
    m.type = GET_STATE;
    m.u.jobid = command_line.jobid;
    send_msg(server_socket, &m);

    /* Receive the answer */
    res = recv_msg(server_socket, &m);
    if(res != sizeof(m))
        error("Error in get_state - line size");
    switch(m.type)
    {
    case ANSWER_STATE:
        printf("%s\n", jstate2string(m.u.state));
        return;
        /* WILL NOT GO FURTHER */
    case LIST_LINE: /* Only ONE line accepted */
        string = (char *) malloc(m.u.size);
        res = recv_bytes(server_socket, string, m.u.size);
        if(res != m.u.size)
            error("Error in get_state - line size");
        fprintf(stderr, "Error in the request: %s", 
                string);
        exit(-1);
        /* WILL NOT GO FURTHER */
    default:
        warning("Wrong internal message in get_state");
    }
    /* This will never be reached */
    return;
}

void c_swap_jobs()
{
    struct msg m;
    int res;
    char *string = 0;

    /* Send the request */
    m.type = SWAP_JOBS;
    m.u.swap.jobid1 = command_line.jobid;
    m.u.swap.jobid2 = command_line.jobid2;
    send_msg(server_socket, &m);

    /* Receive the answer */
    res = recv_msg(server_socket, &m);
    if(res != sizeof(m))
        error("Error in swap_jobs");
    switch(m.type)
    {
    case SWAP_JOBS_OK:
        return;
        /* WILL NOT GO FURTHER */
    case LIST_LINE: /* Only ONE line accepted */
        string = (char *) malloc(m.u.size);
        res = recv_bytes(server_socket, string, m.u.size);
        if(res != m.u.size)
            error("Error in swap_jobs - line size");
        fprintf(stderr, "Error in the request: %s", 
                string);
        exit(-1);
        /* WILL NOT GO FURTHER */
    default:
        warning("Wrong internal message in swap_jobs");
    }
    /* This will never be reached */
    return;
}