From ac80bc9f3f8241a778869a8755389426f81f77af Mon Sep 17 00:00:00 2001 From: Florian Stecker Date: Wed, 15 Jun 2022 15:17:54 +0200 Subject: [PATCH] new simpler approach to parallalization --- parallelization/allhosts | 135 --------- parallelization/generate_commands.py | 19 ++ parallelization/hostfile | 5 - parallelization/hostfile_big | 48 ---- parallelization/localnames | 51 ---- parallelization/parallel.c | 409 --------------------------- parallelization/parallel.h | 118 -------- parallelization/run_local | 9 - parallelization/run_utexas | 14 - parallelization/runjobs.py | 57 ++++ parallelization/stampede.slurm | 20 -- parallelization/sync_stampede | 8 - parallelization/sync_utexas | 7 - 13 files changed, 76 insertions(+), 824 deletions(-) delete mode 100644 parallelization/allhosts create mode 100755 parallelization/generate_commands.py delete mode 100644 parallelization/hostfile delete mode 100644 parallelization/hostfile_big delete mode 100644 parallelization/localnames delete mode 100644 parallelization/parallel.c delete mode 100644 parallelization/parallel.h delete mode 100755 parallelization/run_local delete mode 100755 parallelization/run_utexas create mode 100755 parallelization/runjobs.py delete mode 100644 parallelization/stampede.slurm delete mode 100755 parallelization/sync_stampede delete mode 100755 parallelization/sync_utexas diff --git a/parallelization/allhosts b/parallelization/allhosts deleted file mode 100644 index 3462df1..0000000 --- a/parallelization/allhosts +++ /dev/null @@ -1,135 +0,0 @@ - - -euler.ma.utexas.edu -fac1.ma.utexas.edu -fac4.ma.utexas.edu -fac8.ma.utexas.edu -fac9.ma.utexas.edu -frog.ma.utexas.edu -gummo.ma.utexas.edu -iguana.ma.utexas.edu -lab10.ma.utexas.edu -lab11.ma.utexas.edu -lab12.ma.utexas.edu -lab13.ma.utexas.edu -lab14.ma.utexas.edu -lab15.ma.utexas.edu -lab16.ma.utexas.edu -lab17.ma.utexas.edu -lab18.ma.utexas.edu -lab19.ma.utexas.edu -lab1.ma.utexas.edu -lab20.ma.utexas.edu -lab21.ma.utexas.edu -lab22.ma.utexas.edu -lab23.ma.utexas.edu -lab24.ma.utexas.edu -lab25.ma.utexas.edu -lab26.ma.utexas.edu -lab27.ma.utexas.edu -lab28.ma.utexas.edu -lab29.ma.utexas.edu -lab2.ma.utexas.edu -lab30.ma.utexas.edu -lab31.ma.utexas.edu -lab32.ma.utexas.edu -lab33.ma.utexas.edu -lab34.ma.utexas.edu -lab35.ma.utexas.edu -lab36.ma.utexas.edu -lab37.ma.utexas.edu -lab38.ma.utexas.edu -lab39.ma.utexas.edu -lab3.ma.utexas.edu -lab40.ma.utexas.edu -lab41.ma.utexas.edu -lab42.ma.utexas.edu -lab43.ma.utexas.edu -lab44.ma.utexas.edu -lab45.ma.utexas.edu -lab46.ma.utexas.edu -lab47.ma.utexas.edu -lab48.ma.utexas.edu -lab49.ma.utexas.edu -lab4.ma.utexas.edu -lab50.ma.utexas.edu -lab51.ma.utexas.edu -lab52.ma.utexas.edu -lab53.ma.utexas.edu -lab54.ma.utexas.edu -lab55.ma.utexas.edu -lab56.ma.utexas.edu -lab57.ma.utexas.edu -lab58.ma.utexas.edu -lab59.ma.utexas.edu -lab5.ma.utexas.edu -lab60.ma.utexas.edu -lab61.ma.utexas.edu -lab62.ma.utexas.edu -lab63.ma.utexas.edu -lab64.ma.utexas.edu -lab65.ma.utexas.edu -lab66.ma.utexas.edu -lab67.ma.utexas.edu -lab68.ma.utexas.edu -lab69.ma.utexas.edu -lab6.ma.utexas.edu -lab70.ma.utexas.edu -lab7.ma.utexas.edu -lab8.ma.utexas.edu -lab9.ma.utexas.edu -linux100.ma.utexas.edu -linux104.ma.utexas.edu -linux110.ma.utexas.edu -linux115.ma.utexas.edu -linux119.ma.utexas.edu -linux122.ma.utexas.edu -linux149.ma.utexas.edu -linux14.ma.utexas.edu -linux15.ma.utexas.edu -linux164.ma.utexas.edu -linux169.ma.utexas.edu -linux16.ma.utexas.edu -linux17.ma.utexas.edu -linux180.ma.utexas.edu -linux181.ma.utexas.edu -linux184.ma.utexas.edu -linux18.ma.utexas.edu -linux20.ma.utexas.edu -linux21.ma.utexas.edu -linux24.ma.utexas.edu -linux27.ma.utexas.edu -linux28.ma.utexas.edu -linux29.ma.utexas.edu -linux2.ma.utexas.edu -linux30.ma.utexas.edu -linux31.ma.utexas.edu -linux32.ma.utexas.edu -linux38.ma.utexas.edu -linux40.ma.utexas.edu -linux41.ma.utexas.edu -linux46.ma.utexas.edu -linux4.ma.utexas.edu -linux50.ma.utexas.edu -linux52.ma.utexas.edu -linux54.ma.utexas.edu -linux57.ma.utexas.edu -linux62.ma.utexas.edu -linux64.ma.utexas.edu -linux66.ma.utexas.edu -linux68.ma.utexas.edu -linux69.ma.utexas.edu -linux70.ma.utexas.edu -linux71.ma.utexas.edu -linux72.ma.utexas.edu -linux74.ma.utexas.edu -linux76.ma.utexas.edu -linux79.ma.utexas.edu -linux80.ma.utexas.edu -linux82.ma.utexas.edu -linux83.ma.utexas.edu -linux86.ma.utexas.edu -linux91.ma.utexas.edu -linux92.ma.utexas.edu -linux96.ma.utexas.edu -linux9.ma.utexas.edu diff --git a/parallelization/generate_commands.py b/parallelization/generate_commands.py new file mode 100755 index 0000000..9e8dcd2 --- /dev/null +++ b/parallelization/generate_commands.py @@ -0,0 +1,19 @@ +#!/usr/bin/python + +wordlength = 16 +n = 101038 +res = 50 +radius = 1.0 +q = [1,1,1] + +denom = round(res/radius) + +cmd = "IDLIST=./output/idlist_{len} ./complex_anosov summary {n} {q1} {q2} {q3} {rnum}/{rden} {inum}/{iden}" + +for i in range(-res,res+1): + for j in range(0,res+1): + if i == 0 and j == 0: + continue + print(cmd.format( + len=wordlength, n=n, q1=q[0], q2=q[1], q3=q[2], + rnum=i, inum=j, rden=denom, iden=denom)) diff --git a/parallelization/hostfile b/parallelization/hostfile deleted file mode 100644 index 064bdd1..0000000 --- a/parallelization/hostfile +++ /dev/null @@ -1,5 +0,0 @@ -linux50 slots=4 -linux52 slots=4 -linux57 slots=4 -linux110 slots=4 -linux115 slots=4 diff --git a/parallelization/hostfile_big b/parallelization/hostfile_big deleted file mode 100644 index 4d22c47..0000000 --- a/parallelization/hostfile_big +++ /dev/null @@ -1,48 +0,0 @@ -linux100 slots=4 -linux104 slots=4 -linux110 slots=4 -linux122 slots=4 -linux149 slots=4 -linux14 slots=4 -linux15 slots=2 -linux16 slots=4 -linux17 slots=2 -linux180 slots=4 -linux181 slots=2 -linux184 slots=4 -linux18 slots=4 -linux20 slots=4 -linux21 slots=4 -linux24 slots=4 -linux27 slots=4 -linux29 slots=4 -linux2 slots=4 -linux30 slots=4 -linux31 slots=4 -linux32 slots=4 -linux38 slots=2 -linux40 slots=4 -linux41 slots=4 -linux46 slots=4 -linux4 slots=4 -linux50 slots=4 -linux52 slots=4 -linux54 slots=4 -linux57 slots=4 -linux62 slots=2 -linux64 slots=4 -linux68 slots=4 -linux69 slots=4 -linux70 slots=4 -linux71 slots=4 -linux72 slots=4 -linux74 slots=4 -linux76 slots=4 -linux79 slots=4 -linux80 slots=4 -linux83 slots=4 -linux86 slots=4 -linux91 slots=4 -linux92 slots=4 -linux96 slots=4 -linux9 slots=2 diff --git a/parallelization/localnames b/parallelization/localnames deleted file mode 100644 index c8fbfe1..0000000 --- a/parallelization/localnames +++ /dev/null @@ -1,51 +0,0 @@ -linux100 -linux104 -linux110 -linux115 -linux122 -linux149 -linux14 -linux15 -linux164 -linux169 -linux16 -linux17 -linux180 -linux181 -linux184 -linux18 -linux20 -linux21 -linux24 -linux27 -linux29 -linux2 -linux30 -linux31 -linux32 -linux38 -linux40 -linux41 -linux46 -linux4 -linux50 -linux52 -linux54 -linux57 -linux62 -linux64 -linux68 -linux69 -linux70 -linux71 -linux72 -linux74 -linux76 -linux79 -linux80 -linux83 -linux86 -linux91 -linux92 -linux96 -linux9 diff --git a/parallelization/parallel.c b/parallelization/parallel.c deleted file mode 100644 index 68bb1b4..0000000 --- a/parallelization/parallel.c +++ /dev/null @@ -1,409 +0,0 @@ -#include "parallel.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -//#define DEBUG INFO -#define DEBUG(msg, ...) -#define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__) -//#define DEBUG(msg, ...) fprintf(stderr, "[ %10.3f] " msg, runtime(), ##__VA_ARGS__) -//#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__) -#define DONE(x) *((int*)(x)) - -enum message_tag { - PARALLEL_ORDER = 0, - PARALLEL_RESULT, - PARALLEL_SHUTDOWN, - PARALLEL_GLOBAL_DATA -}; - -struct timespec starttime; - -int mpi_rank(int activate_mpi) -{ - static int active = 0; - if(activate_mpi) - active = 1; - - if(!active) - return 0; - else { - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - return rank; - } -} - -void start_timer() -{ - clock_gettime(CLOCK_MONOTONIC, &starttime); -} - -double runtime() -{ - struct timespec curtime; - double diff; - clock_gettime(CLOCK_MONOTONIC, &curtime); - return (curtime.tv_sec - starttime.tv_sec) + (curtime.tv_nsec - starttime.tv_nsec) / 1e9; -} - -parallel_context *parallel_init() -{ - parallel_context *ctx = malloc(sizeof(parallel_context)); - - if(!getenv("OMPI_COMM_WORLD_SIZE")) { - ctx->mpi_mode = 0; - DEBUG("Running standalone.\n"); - return ctx; - } - - ctx->mpi_mode = 1; - int result = MPI_Init(NULL, NULL); - MPI_Comm_size(MPI_COMM_WORLD, &ctx->size); - MPI_Comm_rank(MPI_COMM_WORLD, &ctx->rank); - MPI_Get_processor_name(ctx->processor_name, &ctx->processor_name_len); - - mpi_rank(1); // display the rank in debug output from now on - - if(ctx->rank == 0) - DEBUG("Running in mpi mode, %d nodes.\n", ctx->size); - - return ctx; -} - -void parallel_destroy(parallel_context* ctx) -{ - if(ctx->mpi_mode) { - MPI_Type_free(&ctx->order_datatype); - MPI_Type_free(&ctx->result_datatype); - MPI_Finalize(); - } - - free(ctx); -} - -void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size) -{ - ctx->init = init; - ctx->destroy = destroy; - ctx->job = job; - ctx->global_data_size = global_data_size; - ctx->node_data_size = node_data_size; - ctx->input_size = input_size; - ctx->output_size = output_size; - - if(ctx->mpi_mode) { - // create a datatype for job orders, consisting of an integer (the job id) and a user-defined section - int order_blocklengths[2] = {1, input_size}; - MPI_Aint order_displacements[2] = {0, sizeof(int)}; - MPI_Datatype order_types[2] = {MPI_INT, MPI_BYTE}; - MPI_Type_create_struct(2, order_blocklengths, order_displacements, order_types, &ctx->order_datatype); - MPI_Type_commit(&ctx->order_datatype); - - int result_blocklengths[2] = {1, output_size}; - MPI_Aint result_displacements[2] = {0, sizeof(int)}; - MPI_Datatype result_types[2] = {MPI_INT, MPI_BYTE}; - MPI_Type_create_struct(2, result_blocklengths, result_displacements, result_types, &ctx->result_datatype); - MPI_Type_commit(&ctx->result_datatype); - } -} - -int parallel_job(parallel_context *ctx, const void *global_data, void *node_data, int block) -{ - MPI_Status status; - double jobtime; - void *input_and_job_nr = malloc(ctx->input_size + sizeof(int)); - void *output_and_job_nr = malloc(ctx->output_size + sizeof(int)); - void *input = input_and_job_nr + sizeof(int); - int *job_nr = (int *)input_and_job_nr; - void *output = output_and_job_nr + sizeof(int); - int *output_job_nr = (int *)output_and_job_nr; - int result = 0; - int message_present; - - if(block) { - jobtime = -MPI_Wtime(); - MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, - &status); - jobtime += MPI_Wtime(); - - INFO("TIMING: Probe() took %f seconds\n", jobtime); - message_present = 1; - } else { - MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD, - &message_present, &status); - } - -// DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG); - - if(message_present) { - if(status.MPI_TAG == PARALLEL_SHUTDOWN) { - DEBUG("Shutting down\n"); - result = 1; - } else if(status.MPI_TAG == PARALLEL_ORDER) { - MPI_Recv(input_and_job_nr, - 1, ctx->order_datatype, - 0, PARALLEL_ORDER, MPI_COMM_WORLD, - &status); - - DEBUG("Working on job %d\n", *job_nr); - - jobtime = -MPI_Wtime(); - - // do the actual work - ctx->job(global_data, node_data, input, output); - - jobtime += MPI_Wtime(); - - INFO("TIMING: job %d took %f seconds\n", *job_nr, jobtime); - - *output_job_nr = *job_nr; - jobtime = -MPI_Wtime(); - MPI_Send(output_and_job_nr, - 1, ctx->result_datatype, - 0, PARALLEL_RESULT, MPI_COMM_WORLD); - jobtime += MPI_Wtime(); - - INFO("TIMING: Send() took %f seconds\n", jobtime); - } - } else { - result = 2; - } - - free(input_and_job_nr); - free(output_and_job_nr); - - return result; -} - -int parallel_work(parallel_context *ctx) -{ - // do nothing in non-mpi mode - if(ctx->mpi_mode == 0) - return 0; - - void *global_data = malloc(ctx->global_data_size); - void *node_data = malloc(ctx->node_data_size); - - // wait for global data - MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); - - DEBUG("Global data received\n"); - - // initialize node_data (and do once-per-node computation) - ctx->init(global_data, node_data); - - DEBUG("Initialization completed\n"); - - int shutdown = 0; - while(shutdown != 1) { - shutdown = parallel_job(ctx, global_data, node_data, 1); - } - - ctx->destroy(global_data, node_data); - - free(global_data); - free(node_data); - - return 0; -} - -int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *_restart_filename) -{ - // in non-mpi-mode, just run init, forall(jobs) job - if(ctx->mpi_mode == 0) { - void *node_data = malloc(ctx->node_data_size); - int result = ctx->init(global_data, node_data); - if(result != 0) - goto cleanup_standalone; - - for(int i = 0; i < njobs; i++) { - result = ctx->job( - global_data, - node_data, - input_array + ctx->input_size*i, - output_array + ctx->output_size*i); - if(result != 0) - goto cleanup_standalone; - } - - cleanup_standalone: - ctx->destroy(global_data, node_data); - return result; - } else { - // if no restart file was specified, pick a filename - char *restart_filename; - char buffer[128]; - int restartf; - if(_restart_filename == NULL) { - time_t t = time(NULL); - struct tm *loctm = localtime(&t); - strftime(buffer, sizeof(buffer), "restart/restart_%y%m%d_%H%M%S", loctm); - restart_filename = buffer; - } else { - restart_filename = (char *)_restart_filename; - } - - // open restart file if it exists, otherwise create it - int continuing = 1; - restartf = open(restart_filename, O_RDWR); - if(restartf == -1 && errno == ENOENT) { - restartf = open(restart_filename, O_RDWR | O_CREAT, 0666); - continuing = 0; - } - if(restartf == -1) { - DEBUG("Error opening restart file: %s\n", strerror(errno)); - exit(1); - } - - // map restart file - int itemsize = (ctx->output_size + sizeof(int)); // for every job, store output, and completed flag - ftruncate(restartf, njobs*itemsize); - void *alljobs = mmap(0, njobs*itemsize, PROT_READ | PROT_WRITE, MAP_SHARED, restartf, 0); - if(alljobs == MAP_FAILED) { - DEBUG("Error mapping restart file: %s\n", strerror(errno)); - exit(1); - } - - // count completed jobs, or initialize jobs - int completed = 0; - if(continuing) { - for(int i = 0; i < njobs; i++) - if(DONE(alljobs + i*itemsize)) - completed++; - } else { - for(int i = 0; i < njobs; i++) { - DONE(alljobs + i*itemsize) = 0; - memcpy(alljobs + i*itemsize + sizeof(int), input_array + i*ctx->input_size, ctx->input_size); // copy input data - } - } - - fsync(restartf); - - if(continuing) { - INFO("Continuing from restart file, %d/%d jobs completed, %d nodes\n", completed, njobs, ctx->size); - } else { - INFO("Starting from scratch, %d jobs, %d nodes\n", njobs, ctx->size); - } - - if(completed >= njobs) - goto cleanup_mpi; - - /* Send global data */ - MPI_Bcast((void*)global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); - - DEBUG("Global data sent\n"); - - // we want to be able to run jobs ourselves, so initialize node_data - void *node_data = malloc(ctx->node_data_size); - ctx->init(global_data, node_data); - - void *input_message_buffer = malloc(ctx->input_size + sizeof(int)); - void *output_message_buffer = malloc(ctx->output_size + sizeof(int)); - int *active_jobs = malloc(sizeof(int)*ctx->size); - memset(active_jobs, 0, ctx->size*sizeof(int)); - int active_worker_nodes = ctx->size - 1; // we don't count ourselves, since we can't shut ourselves down - - // find next unfinished job - int current = 0; - while(current < njobs && DONE(alljobs + current*itemsize)) - current++; - - // assign initial jobs, 2 for each worker thread - for(int i = 0; i < 2*ctx->size; i++) { - if(current >= njobs) // all jobs are assigned - break; - - // send job id and input data - // send to all nodes except ourself (node 0) - *((int*)input_message_buffer) = current; - memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); - MPI_Send(input_message_buffer, 1, ctx->order_datatype, - i%ctx->size, PARALLEL_ORDER, MPI_COMM_WORLD); - - DEBUG("Job %d sent to node %d\n", current, i%ctx->size); - active_jobs[i%ctx->size]++; - current++; - } - - MPI_Status status; - int message_present; - while(active_jobs[0] != 0 || active_worker_nodes != 0) { - MPI_Iprobe(MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &message_present, &status); - DEBUG("Message present, tag = %d, source = %d\n", status.MPI_TAG, status.MPI_SOURCE); - if(!message_present) { - // if there are no more messages to process, we can run a job ourselves before returning to managing - DEBUG("Start running job myself\n"); - int result = parallel_job(ctx, global_data, node_data, 0); - DEBUG("Finished running job myself, result = %d\n"); - } else if(status.MPI_TAG == PARALLEL_RESULT) { - MPI_Recv(output_message_buffer, 1, ctx->result_datatype, - MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status); - - DEBUG("Got message tag %d from node %d\n", status.MPI_TAG, status.MPI_SOURCE); - - int node = status.MPI_SOURCE; - int id = *((int*)output_message_buffer); - memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size); - DONE(alljobs + id*itemsize) = 1; - active_jobs[node]--; - - // todo: deal with unresponsive nodes - // strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter - // if oversubscribe counter is at limit, shut node down instead - // - - if(current >= njobs) { // all jobs are assigned, try to shut down node - // don't try to shut down ourselves, and only if it has no other jobs to do - if(node != 0 && active_jobs[node] == 0) { - MPI_Send(NULL, 0, MPI_BYTE, node, PARALLEL_SHUTDOWN, MPI_COMM_WORLD); - active_worker_nodes--; - INFO("job %d completed by node %d, shut down, %d workers remaining\n", id, node, active_worker_nodes); - } - } else { - *((int*)input_message_buffer) = current; - memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); - MPI_Send(input_message_buffer, 1, ctx->order_datatype, - node, PARALLEL_ORDER, MPI_COMM_WORLD); - active_jobs[node]++; - current++; - - if(active_jobs[node] < 3) { - *((int*)input_message_buffer) = current; - memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); - MPI_Send(input_message_buffer, 1, ctx->order_datatype, - node, PARALLEL_ORDER, MPI_COMM_WORLD); - active_jobs[node]++; - current++; - - INFO("job %d completed by node %d, continues with %d and %d\n", id, node, current-1, current-2); - } else { - INFO("job %d completed by node %d, continues with %d\n", id, node, current-1); - } - } - } - } - - for(int i = 0; i < njobs; i++) { - memcpy(output_array + i*ctx->output_size, alljobs + i*itemsize + sizeof(int), ctx->output_size); - } - - free(input_message_buffer); - free(output_message_buffer); - free(node_data); - free(active_jobs); - - cleanup_mpi: - munmap(alljobs, njobs*itemsize); - close(restartf); - } - - return 0; -} diff --git a/parallelization/parallel.h b/parallelization/parallel.h deleted file mode 100644 index 4239875..0000000 --- a/parallelization/parallel.h +++ /dev/null @@ -1,118 +0,0 @@ -#ifndef PARALLEL_H -#define PARALLEL_H - -/* - this is a library to parallelize workloads which can be split up naturally - into a sequence of independent jobs, using MPI. A program will usually - - - do precomputation - - fill array with input data - - do the parallel work - - print the output data - - we want to enable restarts, so that only unfinshed jobs need to be repeated. - Further, we want to be resilient to slow/unreliable network and to losing - nodes. There is a main node and a number of workers. The main node does the - precomputation and then retires do do administrative work, and the workers - do the actual jobs. We also want to switch to serial mode if the program is - called without MPI. - - The following data has to be transimitted between nodes: - - results of the precomputation (read-only, shared between nodes) - - job-specific input data, generated by main node before parallel part - - output data for each job - - the parallel work shall be given as a callback function which takes - input data and precomputation data as parameter - - the above program will look like this for us: - - - parallel_init - - if we are a worker, do parallel_work(init_callback, job_callback), exit - - do precomputation - - fill array with input data - - output_array = parallel_run(input_array) - - print the output data - - parallel_init: - - check if we're running as an mpi program - - init mpi, check what kind of node we are - - parallel_work(init_callback1, init_callback2, job_callback): - - receive global_precomp (???) - - worker_precomp = init_callback2(global_precomp, worker_precomp) - - infinite loop: - - wait for job on network, receive input - - output = job_callback(global_precomp, worker_precomp, input) - - send output on network - - exit loop on shutdown signal - - parallel_run(global_precomp, input_array, restart file, callbacks): - - check if we're running as an MPI program - - send global_precomp to all nodes (if MPI) - - if(restart file given and exists) read restart file - - else create new restart file - - until(all jobs finished): - - if MPI: - - send next job & input to appropriate node - - if all jobs are in work, reassign unfinished ones (up to limit) - - collect outputs - - if no MPI: - - worker_precomp = init_callback1 - - worker_precomp = init_callback2(global_precomp, worker_precomp) - - for(j in jobs) - - output(j) = job_callback(global_precomp, worker_precomp, input(j)) - - delete restart file - - return array of outputs - - parallel_destroy(): - - free everything - - have a context? probably yes: parallel_context - - plan: - - make interface - - implement no-MPI part - - restructure singular_values.c to use interface - - implement MPI part -*/ - -#include -#include - -typedef void (*parallel_callback_destroy)(const void*, void*); -typedef int (*parallel_callback_init)(const void*,void*); -typedef int (*parallel_callback_job)(const void*,void*,const void*,void*); - -typedef struct { - int mpi_mode; - struct timespec starttime; - char processor_name[MPI_MAX_PROCESSOR_NAME]; - int processor_name_len; - int rank; - int size; - MPI_Datatype order_datatype; - MPI_Datatype result_datatype; - parallel_callback_init init; - parallel_callback_job job; - parallel_callback_destroy destroy; - void *global_data; - void *node_data; - int global_data_size; - int node_data_size; - int input_size; - int output_size; -} parallel_context; - -parallel_context *parallel_init(); -void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size); -int parallel_work(parallel_context *ctx); -int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *restart_filename); -void parallel_destroy(parallel_context* ctx); - -int mpi_rank(); -void start_timer(); -double runtime(); - - -#endif diff --git a/parallelization/run_local b/parallelization/run_local deleted file mode 100755 index 0c23ecc..0000000 --- a/parallelization/run_local +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -# nmax=895882 # up to reflection group word length 22 ( 555 group) -nmax=700000 # up to reflection group word length 22 ( 444 group) -# nmax=11575 # up to reflection group word length 14 - -#time mpirun --mca opal_warn_on_missing_libcuda 0 -x LD_LIBRARY_PATH=/home/stecker/svmpi/libs ./singular_values $nmax ejp_trg_restart test.out - -time mpirun --mca opal_warn_on_missing_libcuda 0 --mca mpi_yield_when_idle 1 -np 4 ./singular_values 700000 4 4 4 1 1 100 1 100 100 $1 diff --git a/parallelization/run_utexas b/parallelization/run_utexas deleted file mode 100755 index 2a67f8b..0000000 --- a/parallelization/run_utexas +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -cd /home/stecker/svmpi/ - -nmax=895882 # up to reflection group word length 22 -# nmax=11575 # up to reflection group word length 14 - -outfile=result_$(date +%Y%m%d_%H%M%S).out - - -unset DISPLAY - -make singular_values && -time mpirun -n 100 -x LD_LIBRARY_PATH=/home/stecker/svmpi/libs --hostfile hostfile_big ./singular_values $nmax utexas_cluster_restart $outfile diff --git a/parallelization/runjobs.py b/parallelization/runjobs.py new file mode 100755 index 0000000..fd1591f --- /dev/null +++ b/parallelization/runjobs.py @@ -0,0 +1,57 @@ +#!/usr/bin/python + +from mpi4py import MPI +import os +import re +import math +import subprocess +import time + +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +nodes = comm.Get_size() + +# print(os.path.abspath(os.curdir)) + +done = set() +for f in os.listdir('.'): + if re.search('^done_[0-9]+', f): + fp = open(f, "r") + for x in fp: + done.add(int(x)) + fp.close() + +f = open("commands", "r") +idx = 0 +todo = [] +for c in f: + if not idx in done: + todo.append((idx,c)) + idx = idx+1 +f.close() + +start = math.floor(len(todo)/nodes*rank) +end = math.floor(len(todo)/nodes*(rank+1)) +if(rank == nodes-1): + end = len(todo) + +print("{n:d} commands awaiting execution, {nnode:d} of them in node {rank:d}".format(n=len(todo),nnode=end-start,rank=rank)) + +time.sleep(1) # to make sure all nodes read the status first before more gets done + +outfilename = "result_{node:003d}".format(node=rank) +donefilename = "done_{node:003d}".format(node=rank) +outfile = open(outfilename, "a") +donefile = open(donefilename, "a") + +for i in range(start, end): + result = subprocess.call(todo[i][1], stdout=outfile, shell=True) + if result == 0: + donefile.write(str(todo[i][0]) + '\n') + else: + print("Command failed: {cmd}".format(cmd=todo[i][1])) + outfile.flush() + donefile.flush() + +outfile.close() +donefile.close() diff --git a/parallelization/stampede.slurm b/parallelization/stampede.slurm deleted file mode 100644 index 448858e..0000000 --- a/parallelization/stampede.slurm +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash - -#SBATCH -J ejp_trg -#SBATCH -o logs/ejp_trg.o%j -#SBATCH -e logs/ejp_trg.e%j -#SBATCH -p skx-dev -#SBATCH -N 1 -#SBATCH -n 48 -#SBATCH -t 00:05:00 -#SBATCH --mail-user=mail@florianstecker.net -#SBATCH --mail-type=all - -export LD_LIBRARY_PATH=$WORK/mps/lib:$LD_LIBRARY_PATH - -d=$(date +%Y%m%d_%H%M%S) - -nmax=895882 # up to reflection group word length 22 -# nmax=11575 # up to reflection group word length 1 - -ibrun ./singular_values $nmax $SCRATCH/ejp_trg_restart $WORK/ejp_trg/output/result_$d diff --git a/parallelization/sync_stampede b/parallelization/sync_stampede deleted file mode 100755 index 1a0a9c9..0000000 --- a/parallelization/sync_stampede +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash - -rsync -vt *.c *.h Makefile stampede.slurm stampede:work/ejp_trg/ -#rsync -lvt /usr/lib/libmps.so* /usr/include/mps utexas:work/ejp_trg/libs/ - -# now run it with a job script - -# get MPSolve from https://numpi.dm.unipi.it/_media/software/mpsolve/mpsolve-3.2.1.tar.bz2 diff --git a/parallelization/sync_utexas b/parallelization/sync_utexas deleted file mode 100755 index ce7be64..0000000 --- a/parallelization/sync_utexas +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -rsync -vt *.c *.h Makefile hostfile hostfile_big allhosts localnames run_utexas run_local utexas:svmpi/ -rsync -lvt /usr/lib/libmps.so* utexas:svmpi/libs/ -rsync -rvt /usr/include/mps utexas:svmpi/libs/ - -# now run it with ssh utexas -t ssh linux50 svmpi/run_utexas