triangle_reflection_complex/parallelization/parallel.c

410 lines
12 KiB
C

#include "parallel.h"
#include <mpi.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <malloc.h>
#include <stdlib.h>
//#define DEBUG INFO
#define DEBUG(msg, ...)
#define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__)
//#define DEBUG(msg, ...) fprintf(stderr, "[ %10.3f] " msg, runtime(), ##__VA_ARGS__)
//#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__)
#define DONE(x) *((int*)(x))
enum message_tag {
PARALLEL_ORDER = 0,
PARALLEL_RESULT,
PARALLEL_SHUTDOWN,
PARALLEL_GLOBAL_DATA
};
struct timespec starttime;
int mpi_rank(int activate_mpi)
{
static int active = 0;
if(activate_mpi)
active = 1;
if(!active)
return 0;
else {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
}
void start_timer()
{
clock_gettime(CLOCK_MONOTONIC, &starttime);
}
double runtime()
{
struct timespec curtime;
double diff;
clock_gettime(CLOCK_MONOTONIC, &curtime);
return (curtime.tv_sec - starttime.tv_sec) + (curtime.tv_nsec - starttime.tv_nsec) / 1e9;
}
parallel_context *parallel_init()
{
parallel_context *ctx = malloc(sizeof(parallel_context));
if(!getenv("OMPI_COMM_WORLD_SIZE")) {
ctx->mpi_mode = 0;
DEBUG("Running standalone.\n");
return ctx;
}
ctx->mpi_mode = 1;
int result = MPI_Init(NULL, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &ctx->size);
MPI_Comm_rank(MPI_COMM_WORLD, &ctx->rank);
MPI_Get_processor_name(ctx->processor_name, &ctx->processor_name_len);
mpi_rank(1); // display the rank in debug output from now on
if(ctx->rank == 0)
DEBUG("Running in mpi mode, %d nodes.\n", ctx->size);
return ctx;
}
void parallel_destroy(parallel_context* ctx)
{
if(ctx->mpi_mode) {
MPI_Type_free(&ctx->order_datatype);
MPI_Type_free(&ctx->result_datatype);
MPI_Finalize();
}
free(ctx);
}
void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size)
{
ctx->init = init;
ctx->destroy = destroy;
ctx->job = job;
ctx->global_data_size = global_data_size;
ctx->node_data_size = node_data_size;
ctx->input_size = input_size;
ctx->output_size = output_size;
if(ctx->mpi_mode) {
// create a datatype for job orders, consisting of an integer (the job id) and a user-defined section
int order_blocklengths[2] = {1, input_size};
MPI_Aint order_displacements[2] = {0, sizeof(int)};
MPI_Datatype order_types[2] = {MPI_INT, MPI_BYTE};
MPI_Type_create_struct(2, order_blocklengths, order_displacements, order_types, &ctx->order_datatype);
MPI_Type_commit(&ctx->order_datatype);
int result_blocklengths[2] = {1, output_size};
MPI_Aint result_displacements[2] = {0, sizeof(int)};
MPI_Datatype result_types[2] = {MPI_INT, MPI_BYTE};
MPI_Type_create_struct(2, result_blocklengths, result_displacements, result_types, &ctx->result_datatype);
MPI_Type_commit(&ctx->result_datatype);
}
}
int parallel_job(parallel_context *ctx, const void *global_data, void *node_data, int block)
{
MPI_Status status;
double jobtime;
void *input_and_job_nr = malloc(ctx->input_size + sizeof(int));
void *output_and_job_nr = malloc(ctx->output_size + sizeof(int));
void *input = input_and_job_nr + sizeof(int);
int *job_nr = (int *)input_and_job_nr;
void *output = output_and_job_nr + sizeof(int);
int *output_job_nr = (int *)output_and_job_nr;
int result = 0;
int message_present;
if(block) {
jobtime = -MPI_Wtime();
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
&status);
jobtime += MPI_Wtime();
INFO("TIMING: Probe() took %f seconds\n", jobtime);
message_present = 1;
} else {
MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
&message_present, &status);
}
// DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG);
if(message_present) {
if(status.MPI_TAG == PARALLEL_SHUTDOWN) {
DEBUG("Shutting down\n");
result = 1;
} else if(status.MPI_TAG == PARALLEL_ORDER) {
MPI_Recv(input_and_job_nr,
1, ctx->order_datatype,
0, PARALLEL_ORDER, MPI_COMM_WORLD,
&status);
DEBUG("Working on job %d\n", *job_nr);
jobtime = -MPI_Wtime();
// do the actual work
ctx->job(global_data, node_data, input, output);
jobtime += MPI_Wtime();
INFO("TIMING: job %d took %f seconds\n", *job_nr, jobtime);
*output_job_nr = *job_nr;
jobtime = -MPI_Wtime();
MPI_Send(output_and_job_nr,
1, ctx->result_datatype,
0, PARALLEL_RESULT, MPI_COMM_WORLD);
jobtime += MPI_Wtime();
INFO("TIMING: Send() took %f seconds\n", jobtime);
}
} else {
result = 2;
}
free(input_and_job_nr);
free(output_and_job_nr);
return result;
}
int parallel_work(parallel_context *ctx)
{
// do nothing in non-mpi mode
if(ctx->mpi_mode == 0)
return 0;
void *global_data = malloc(ctx->global_data_size);
void *node_data = malloc(ctx->node_data_size);
// wait for global data
MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
DEBUG("Global data received\n");
// initialize node_data (and do once-per-node computation)
ctx->init(global_data, node_data);
DEBUG("Initialization completed\n");
int shutdown = 0;
while(shutdown != 1) {
shutdown = parallel_job(ctx, global_data, node_data, 1);
}
ctx->destroy(global_data, node_data);
free(global_data);
free(node_data);
return 0;
}
int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *_restart_filename)
{
// in non-mpi-mode, just run init, forall(jobs) job
if(ctx->mpi_mode == 0) {
void *node_data = malloc(ctx->node_data_size);
int result = ctx->init(global_data, node_data);
if(result != 0)
goto cleanup_standalone;
for(int i = 0; i < njobs; i++) {
result = ctx->job(
global_data,
node_data,
input_array + ctx->input_size*i,
output_array + ctx->output_size*i);
if(result != 0)
goto cleanup_standalone;
}
cleanup_standalone:
ctx->destroy(global_data, node_data);
return result;
} else {
// if no restart file was specified, pick a filename
char *restart_filename;
char buffer[128];
int restartf;
if(_restart_filename == NULL) {
time_t t = time(NULL);
struct tm *loctm = localtime(&t);
strftime(buffer, sizeof(buffer), "restart/restart_%y%m%d_%H%M%S", loctm);
restart_filename = buffer;
} else {
restart_filename = (char *)_restart_filename;
}
// open restart file if it exists, otherwise create it
int continuing = 1;
restartf = open(restart_filename, O_RDWR);
if(restartf == -1 && errno == ENOENT) {
restartf = open(restart_filename, O_RDWR | O_CREAT, 0666);
continuing = 0;
}
if(restartf == -1) {
DEBUG("Error opening restart file: %s\n", strerror(errno));
exit(1);
}
// map restart file
int itemsize = (ctx->output_size + sizeof(int)); // for every job, store output, and completed flag
ftruncate(restartf, njobs*itemsize);
void *alljobs = mmap(0, njobs*itemsize, PROT_READ | PROT_WRITE, MAP_SHARED, restartf, 0);
if(alljobs == MAP_FAILED) {
DEBUG("Error mapping restart file: %s\n", strerror(errno));
exit(1);
}
// count completed jobs, or initialize jobs
int completed = 0;
if(continuing) {
for(int i = 0; i < njobs; i++)
if(DONE(alljobs + i*itemsize))
completed++;
} else {
for(int i = 0; i < njobs; i++) {
DONE(alljobs + i*itemsize) = 0;
memcpy(alljobs + i*itemsize + sizeof(int), input_array + i*ctx->input_size, ctx->input_size); // copy input data
}
}
fsync(restartf);
if(continuing) {
INFO("Continuing from restart file, %d/%d jobs completed, %d nodes\n", completed, njobs, ctx->size);
} else {
INFO("Starting from scratch, %d jobs, %d nodes\n", njobs, ctx->size);
}
if(completed >= njobs)
goto cleanup_mpi;
/* Send global data */
MPI_Bcast((void*)global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
DEBUG("Global data sent\n");
// we want to be able to run jobs ourselves, so initialize node_data
void *node_data = malloc(ctx->node_data_size);
ctx->init(global_data, node_data);
void *input_message_buffer = malloc(ctx->input_size + sizeof(int));
void *output_message_buffer = malloc(ctx->output_size + sizeof(int));
int *active_jobs = malloc(sizeof(int)*ctx->size);
memset(active_jobs, 0, ctx->size*sizeof(int));
int active_worker_nodes = ctx->size - 1; // we don't count ourselves, since we can't shut ourselves down
// find next unfinished job
int current = 0;
while(current < njobs && DONE(alljobs + current*itemsize))
current++;
// assign initial jobs, 2 for each worker thread
for(int i = 0; i < 2*ctx->size; i++) {
if(current >= njobs) // all jobs are assigned
break;
// send job id and input data
// send to all nodes except ourself (node 0)
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
i%ctx->size, PARALLEL_ORDER, MPI_COMM_WORLD);
DEBUG("Job %d sent to node %d\n", current, i%ctx->size);
active_jobs[i%ctx->size]++;
current++;
}
MPI_Status status;
int message_present;
while(active_jobs[0] != 0 || active_worker_nodes != 0) {
MPI_Iprobe(MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &message_present, &status);
DEBUG("Message present, tag = %d, source = %d\n", status.MPI_TAG, status.MPI_SOURCE);
if(!message_present) {
// if there are no more messages to process, we can run a job ourselves before returning to managing
DEBUG("Start running job myself\n");
int result = parallel_job(ctx, global_data, node_data, 0);
DEBUG("Finished running job myself, result = %d\n");
} else if(status.MPI_TAG == PARALLEL_RESULT) {
MPI_Recv(output_message_buffer, 1, ctx->result_datatype,
MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status);
DEBUG("Got message tag %d from node %d\n", status.MPI_TAG, status.MPI_SOURCE);
int node = status.MPI_SOURCE;
int id = *((int*)output_message_buffer);
memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size);
DONE(alljobs + id*itemsize) = 1;
active_jobs[node]--;
// todo: deal with unresponsive nodes
// strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter
// if oversubscribe counter is at limit, shut node down instead
//
if(current >= njobs) { // all jobs are assigned, try to shut down node
// don't try to shut down ourselves, and only if it has no other jobs to do
if(node != 0 && active_jobs[node] == 0) {
MPI_Send(NULL, 0, MPI_BYTE, node, PARALLEL_SHUTDOWN, MPI_COMM_WORLD);
active_worker_nodes--;
INFO("job %d completed by node %d, shut down, %d workers remaining\n", id, node, active_worker_nodes);
}
} else {
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
node, PARALLEL_ORDER, MPI_COMM_WORLD);
active_jobs[node]++;
current++;
if(active_jobs[node] < 3) {
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
node, PARALLEL_ORDER, MPI_COMM_WORLD);
active_jobs[node]++;
current++;
INFO("job %d completed by node %d, continues with %d and %d\n", id, node, current-1, current-2);
} else {
INFO("job %d completed by node %d, continues with %d\n", id, node, current-1);
}
}
}
}
for(int i = 0; i < njobs; i++) {
memcpy(output_array + i*ctx->output_size, alljobs + i*itemsize + sizeof(int), ctx->output_size);
}
free(input_message_buffer);
free(output_message_buffer);
free(node_data);
free(active_jobs);
cleanup_mpi:
munmap(alljobs, njobs*itemsize);
close(restartf);
}
return 0;
}