import enumerate.c and generators.c, prepare Makefile

This commit is contained in:
Florian Stecker
2022-06-13 12:05:34 +02:00
parent 3309c37955
commit 15681c308b
12 changed files with 178 additions and 38 deletions

409
parallelization/parallel.c Normal file
View File

@@ -0,0 +1,409 @@
#include "parallel.h"
#include <mpi.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <malloc.h>
#include <stdlib.h>
//#define DEBUG INFO
#define DEBUG(msg, ...)
#define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__)
//#define DEBUG(msg, ...) fprintf(stderr, "[ %10.3f] " msg, runtime(), ##__VA_ARGS__)
//#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__)
#define DONE(x) *((int*)(x))
enum message_tag {
PARALLEL_ORDER = 0,
PARALLEL_RESULT,
PARALLEL_SHUTDOWN,
PARALLEL_GLOBAL_DATA
};
struct timespec starttime;
int mpi_rank(int activate_mpi)
{
static int active = 0;
if(activate_mpi)
active = 1;
if(!active)
return 0;
else {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
}
void start_timer()
{
clock_gettime(CLOCK_MONOTONIC, &starttime);
}
double runtime()
{
struct timespec curtime;
double diff;
clock_gettime(CLOCK_MONOTONIC, &curtime);
return (curtime.tv_sec - starttime.tv_sec) + (curtime.tv_nsec - starttime.tv_nsec) / 1e9;
}
parallel_context *parallel_init()
{
parallel_context *ctx = malloc(sizeof(parallel_context));
if(!getenv("OMPI_COMM_WORLD_SIZE")) {
ctx->mpi_mode = 0;
DEBUG("Running standalone.\n");
return ctx;
}
ctx->mpi_mode = 1;
int result = MPI_Init(NULL, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &ctx->size);
MPI_Comm_rank(MPI_COMM_WORLD, &ctx->rank);
MPI_Get_processor_name(ctx->processor_name, &ctx->processor_name_len);
mpi_rank(1); // display the rank in debug output from now on
if(ctx->rank == 0)
DEBUG("Running in mpi mode, %d nodes.\n", ctx->size);
return ctx;
}
void parallel_destroy(parallel_context* ctx)
{
if(ctx->mpi_mode) {
MPI_Type_free(&ctx->order_datatype);
MPI_Type_free(&ctx->result_datatype);
MPI_Finalize();
}
free(ctx);
}
void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size)
{
ctx->init = init;
ctx->destroy = destroy;
ctx->job = job;
ctx->global_data_size = global_data_size;
ctx->node_data_size = node_data_size;
ctx->input_size = input_size;
ctx->output_size = output_size;
if(ctx->mpi_mode) {
// create a datatype for job orders, consisting of an integer (the job id) and a user-defined section
int order_blocklengths[2] = {1, input_size};
MPI_Aint order_displacements[2] = {0, sizeof(int)};
MPI_Datatype order_types[2] = {MPI_INT, MPI_BYTE};
MPI_Type_create_struct(2, order_blocklengths, order_displacements, order_types, &ctx->order_datatype);
MPI_Type_commit(&ctx->order_datatype);
int result_blocklengths[2] = {1, output_size};
MPI_Aint result_displacements[2] = {0, sizeof(int)};
MPI_Datatype result_types[2] = {MPI_INT, MPI_BYTE};
MPI_Type_create_struct(2, result_blocklengths, result_displacements, result_types, &ctx->result_datatype);
MPI_Type_commit(&ctx->result_datatype);
}
}
int parallel_job(parallel_context *ctx, const void *global_data, void *node_data, int block)
{
MPI_Status status;
double jobtime;
void *input_and_job_nr = malloc(ctx->input_size + sizeof(int));
void *output_and_job_nr = malloc(ctx->output_size + sizeof(int));
void *input = input_and_job_nr + sizeof(int);
int *job_nr = (int *)input_and_job_nr;
void *output = output_and_job_nr + sizeof(int);
int *output_job_nr = (int *)output_and_job_nr;
int result = 0;
int message_present;
if(block) {
jobtime = -MPI_Wtime();
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
&status);
jobtime += MPI_Wtime();
INFO("TIMING: Probe() took %f seconds\n", jobtime);
message_present = 1;
} else {
MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
&message_present, &status);
}
// DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG);
if(message_present) {
if(status.MPI_TAG == PARALLEL_SHUTDOWN) {
DEBUG("Shutting down\n");
result = 1;
} else if(status.MPI_TAG == PARALLEL_ORDER) {
MPI_Recv(input_and_job_nr,
1, ctx->order_datatype,
0, PARALLEL_ORDER, MPI_COMM_WORLD,
&status);
DEBUG("Working on job %d\n", *job_nr);
jobtime = -MPI_Wtime();
// do the actual work
ctx->job(global_data, node_data, input, output);
jobtime += MPI_Wtime();
INFO("TIMING: job %d took %f seconds\n", *job_nr, jobtime);
*output_job_nr = *job_nr;
jobtime = -MPI_Wtime();
MPI_Send(output_and_job_nr,
1, ctx->result_datatype,
0, PARALLEL_RESULT, MPI_COMM_WORLD);
jobtime += MPI_Wtime();
INFO("TIMING: Send() took %f seconds\n", jobtime);
}
} else {
result = 2;
}
free(input_and_job_nr);
free(output_and_job_nr);
return result;
}
int parallel_work(parallel_context *ctx)
{
// do nothing in non-mpi mode
if(ctx->mpi_mode == 0)
return 0;
void *global_data = malloc(ctx->global_data_size);
void *node_data = malloc(ctx->node_data_size);
// wait for global data
MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
DEBUG("Global data received\n");
// initialize node_data (and do once-per-node computation)
ctx->init(global_data, node_data);
DEBUG("Initialization completed\n");
int shutdown = 0;
while(shutdown != 1) {
shutdown = parallel_job(ctx, global_data, node_data, 1);
}
ctx->destroy(global_data, node_data);
free(global_data);
free(node_data);
return 0;
}
int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *_restart_filename)
{
// in non-mpi-mode, just run init, forall(jobs) job
if(ctx->mpi_mode == 0) {
void *node_data = malloc(ctx->node_data_size);
int result = ctx->init(global_data, node_data);
if(result != 0)
goto cleanup_standalone;
for(int i = 0; i < njobs; i++) {
result = ctx->job(
global_data,
node_data,
input_array + ctx->input_size*i,
output_array + ctx->output_size*i);
if(result != 0)
goto cleanup_standalone;
}
cleanup_standalone:
ctx->destroy(global_data, node_data);
return result;
} else {
// if no restart file was specified, pick a filename
char *restart_filename;
char buffer[128];
int restartf;
if(_restart_filename == NULL) {
time_t t = time(NULL);
struct tm *loctm = localtime(&t);
strftime(buffer, sizeof(buffer), "restart/restart_%y%m%d_%H%M%S", loctm);
restart_filename = buffer;
} else {
restart_filename = (char *)_restart_filename;
}
// open restart file if it exists, otherwise create it
int continuing = 1;
restartf = open(restart_filename, O_RDWR);
if(restartf == -1 && errno == ENOENT) {
restartf = open(restart_filename, O_RDWR | O_CREAT, 0666);
continuing = 0;
}
if(restartf == -1) {
DEBUG("Error opening restart file: %s\n", strerror(errno));
exit(1);
}
// map restart file
int itemsize = (ctx->output_size + sizeof(int)); // for every job, store output, and completed flag
ftruncate(restartf, njobs*itemsize);
void *alljobs = mmap(0, njobs*itemsize, PROT_READ | PROT_WRITE, MAP_SHARED, restartf, 0);
if(alljobs == MAP_FAILED) {
DEBUG("Error mapping restart file: %s\n", strerror(errno));
exit(1);
}
// count completed jobs, or initialize jobs
int completed = 0;
if(continuing) {
for(int i = 0; i < njobs; i++)
if(DONE(alljobs + i*itemsize))
completed++;
} else {
for(int i = 0; i < njobs; i++) {
DONE(alljobs + i*itemsize) = 0;
memcpy(alljobs + i*itemsize + sizeof(int), input_array + i*ctx->input_size, ctx->input_size); // copy input data
}
}
fsync(restartf);
if(continuing) {
INFO("Continuing from restart file, %d/%d jobs completed, %d nodes\n", completed, njobs, ctx->size);
} else {
INFO("Starting from scratch, %d jobs, %d nodes\n", njobs, ctx->size);
}
if(completed >= njobs)
goto cleanup_mpi;
/* Send global data */
MPI_Bcast((void*)global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
DEBUG("Global data sent\n");
// we want to be able to run jobs ourselves, so initialize node_data
void *node_data = malloc(ctx->node_data_size);
ctx->init(global_data, node_data);
void *input_message_buffer = malloc(ctx->input_size + sizeof(int));
void *output_message_buffer = malloc(ctx->output_size + sizeof(int));
int *active_jobs = malloc(sizeof(int)*ctx->size);
memset(active_jobs, 0, ctx->size*sizeof(int));
int active_worker_nodes = ctx->size - 1; // we don't count ourselves, since we can't shut ourselves down
// find next unfinished job
int current = 0;
while(current < njobs && DONE(alljobs + current*itemsize))
current++;
// assign initial jobs, 2 for each worker thread
for(int i = 0; i < 2*ctx->size; i++) {
if(current >= njobs) // all jobs are assigned
break;
// send job id and input data
// send to all nodes except ourself (node 0)
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
i%ctx->size, PARALLEL_ORDER, MPI_COMM_WORLD);
DEBUG("Job %d sent to node %d\n", current, i%ctx->size);
active_jobs[i%ctx->size]++;
current++;
}
MPI_Status status;
int message_present;
while(active_jobs[0] != 0 || active_worker_nodes != 0) {
MPI_Iprobe(MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &message_present, &status);
DEBUG("Message present, tag = %d, source = %d\n", status.MPI_TAG, status.MPI_SOURCE);
if(!message_present) {
// if there are no more messages to process, we can run a job ourselves before returning to managing
DEBUG("Start running job myself\n");
int result = parallel_job(ctx, global_data, node_data, 0);
DEBUG("Finished running job myself, result = %d\n");
} else if(status.MPI_TAG == PARALLEL_RESULT) {
MPI_Recv(output_message_buffer, 1, ctx->result_datatype,
MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status);
DEBUG("Got message tag %d from node %d\n", status.MPI_TAG, status.MPI_SOURCE);
int node = status.MPI_SOURCE;
int id = *((int*)output_message_buffer);
memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size);
DONE(alljobs + id*itemsize) = 1;
active_jobs[node]--;
// todo: deal with unresponsive nodes
// strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter
// if oversubscribe counter is at limit, shut node down instead
//
if(current >= njobs) { // all jobs are assigned, try to shut down node
// don't try to shut down ourselves, and only if it has no other jobs to do
if(node != 0 && active_jobs[node] == 0) {
MPI_Send(NULL, 0, MPI_BYTE, node, PARALLEL_SHUTDOWN, MPI_COMM_WORLD);
active_worker_nodes--;
INFO("job %d completed by node %d, shut down, %d workers remaining\n", id, node, active_worker_nodes);
}
} else {
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
node, PARALLEL_ORDER, MPI_COMM_WORLD);
active_jobs[node]++;
current++;
if(active_jobs[node] < 3) {
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
node, PARALLEL_ORDER, MPI_COMM_WORLD);
active_jobs[node]++;
current++;
INFO("job %d completed by node %d, continues with %d and %d\n", id, node, current-1, current-2);
} else {
INFO("job %d completed by node %d, continues with %d\n", id, node, current-1);
}
}
}
}
for(int i = 0; i < njobs; i++) {
memcpy(output_array + i*ctx->output_size, alljobs + i*itemsize + sizeof(int), ctx->output_size);
}
free(input_message_buffer);
free(output_message_buffer);
free(node_data);
free(active_jobs);
cleanup_mpi:
munmap(alljobs, njobs*itemsize);
close(restartf);
}
return 0;
}

118
parallelization/parallel.h Normal file
View File

@@ -0,0 +1,118 @@
#ifndef PARALLEL_H
#define PARALLEL_H
/*
this is a library to parallelize workloads which can be split up naturally
into a sequence of independent jobs, using MPI. A program will usually
- do precomputation
- fill array with input data
- do the parallel work
- print the output data
we want to enable restarts, so that only unfinshed jobs need to be repeated.
Further, we want to be resilient to slow/unreliable network and to losing
nodes. There is a main node and a number of workers. The main node does the
precomputation and then retires do do administrative work, and the workers
do the actual jobs. We also want to switch to serial mode if the program is
called without MPI.
The following data has to be transimitted between nodes:
- results of the precomputation (read-only, shared between nodes)
- job-specific input data, generated by main node before parallel part
- output data for each job
the parallel work shall be given as a callback function which takes
input data and precomputation data as parameter
the above program will look like this for us:
- parallel_init
- if we are a worker, do parallel_work(init_callback, job_callback), exit
- do precomputation
- fill array with input data
- output_array = parallel_run(input_array)
- print the output data
parallel_init:
- check if we're running as an mpi program
- init mpi, check what kind of node we are
parallel_work(init_callback1, init_callback2, job_callback):
- receive global_precomp (???)
- worker_precomp = init_callback2(global_precomp, worker_precomp)
- infinite loop:
- wait for job on network, receive input
- output = job_callback(global_precomp, worker_precomp, input)
- send output on network
- exit loop on shutdown signal
parallel_run(global_precomp, input_array, restart file, callbacks):
- check if we're running as an MPI program
- send global_precomp to all nodes (if MPI)
- if(restart file given and exists) read restart file
- else create new restart file
- until(all jobs finished):
- if MPI:
- send next job & input to appropriate node
- if all jobs are in work, reassign unfinished ones (up to limit)
- collect outputs
- if no MPI:
- worker_precomp = init_callback1
- worker_precomp = init_callback2(global_precomp, worker_precomp)
- for(j in jobs)
- output(j) = job_callback(global_precomp, worker_precomp, input(j))
- delete restart file
- return array of outputs
parallel_destroy():
- free everything
have a context? probably yes: parallel_context
plan:
- make interface
- implement no-MPI part
- restructure singular_values.c to use interface
- implement MPI part
*/
#include <mpi.h>
#include <time.h>
typedef void (*parallel_callback_destroy)(const void*, void*);
typedef int (*parallel_callback_init)(const void*,void*);
typedef int (*parallel_callback_job)(const void*,void*,const void*,void*);
typedef struct {
int mpi_mode;
struct timespec starttime;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int processor_name_len;
int rank;
int size;
MPI_Datatype order_datatype;
MPI_Datatype result_datatype;
parallel_callback_init init;
parallel_callback_job job;
parallel_callback_destroy destroy;
void *global_data;
void *node_data;
int global_data_size;
int node_data_size;
int input_size;
int output_size;
} parallel_context;
parallel_context *parallel_init();
void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size);
int parallel_work(parallel_context *ctx);
int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *restart_filename);
void parallel_destroy(parallel_context* ctx);
int mpi_rank();
void start_timer();
double runtime();
#endif