#include "parallel.h" #include #include #include #include #include #include #include #include #include #define DEBUG(msg, ...) #define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__) //#define DEBUG(msg, ...) fprintf(stderr, "[ %10.3f] " msg, runtime(), ##__VA_ARGS__) //#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__) #define DONE(x) *((int*)(x)) enum message_tag { PARALLEL_ORDER, PARALLEL_RESULT, PARALLEL_SHUTDOWN, PARALLEL_GLOBAL_DATA }; struct timespec starttime; int mpi_rank(int activate_mpi) { static int active = 0; if(activate_mpi) active = 1; if(!active) return 0; else { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); return rank; } } void start_timer() { clock_gettime(CLOCK_MONOTONIC, &starttime); } double runtime() { struct timespec curtime; double diff; clock_gettime(CLOCK_MONOTONIC, &curtime); return (curtime.tv_sec - starttime.tv_sec) + (curtime.tv_nsec - starttime.tv_nsec) / 1e9; } parallel_context *parallel_init() { parallel_context *ctx = malloc(sizeof(parallel_context)); if(!getenv("OMPI_COMM_WORLD_SIZE")) { ctx->mpi_mode = 0; DEBUG("Running standalone.\n"); return ctx; } ctx->mpi_mode = 1; int result = MPI_Init(NULL, NULL); MPI_Comm_size(MPI_COMM_WORLD, &ctx->size); MPI_Comm_rank(MPI_COMM_WORLD, &ctx->rank); MPI_Get_processor_name(ctx->processor_name, &ctx->processor_name_len); mpi_rank(1); // display the rank in debug output from now on if(ctx->rank == 0) DEBUG("Running in mpi mode, %d nodes.\n", ctx->size); return ctx; } void parallel_destroy(parallel_context* ctx) { if(ctx->mpi_mode) { MPI_Type_free(&ctx->order_datatype); MPI_Type_free(&ctx->result_datatype); MPI_Finalize(); } free(ctx); } void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size) { ctx->init = init; ctx->destroy = destroy; ctx->job = job; ctx->global_data_size = global_data_size; ctx->node_data_size = node_data_size; ctx->input_size = input_size; ctx->output_size = output_size; if(ctx->mpi_mode) { // create a datatype for job orders, consisting of an integer (the job id) and a user-defined section int order_blocklengths[2] = {1, input_size}; MPI_Aint order_displacements[2] = {0, sizeof(int)}; MPI_Datatype order_types[2] = {MPI_INT, MPI_BYTE}; MPI_Type_create_struct(2, order_blocklengths, order_displacements, order_types, &ctx->order_datatype); MPI_Type_commit(&ctx->order_datatype); int result_blocklengths[2] = {1, output_size}; MPI_Aint result_displacements[2] = {0, sizeof(int)}; MPI_Datatype result_types[2] = {MPI_INT, MPI_BYTE}; MPI_Type_create_struct(2, result_blocklengths, result_displacements, result_types, &ctx->result_datatype); MPI_Type_commit(&ctx->result_datatype); } } int parallel_work(parallel_context *ctx) { // do nothing in non-mpi mode if(ctx->mpi_mode == 0) return 0; MPI_Status status; void *global_data = malloc(ctx->global_data_size); void *node_data = malloc(ctx->node_data_size); void *input_and_job_nr = malloc(ctx->input_size + sizeof(int)); void *input = input_and_job_nr + sizeof(int); int *job_nr = (int *)input_and_job_nr; void *output_and_job_nr = malloc(ctx->output_size + sizeof(int)); void *output = output_and_job_nr + sizeof(int); int *output_job_nr = (int *)output_and_job_nr; double jobtime; // wait for global data MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); DEBUG("Global data received\n"); // initialize node_data (and do once-per-node computation) ctx->init(global_data, node_data); DEBUG("Initialization completed\n"); while(1) { MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); // DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG); if(status.MPI_TAG == PARALLEL_SHUTDOWN) { DEBUG("Shutting down\n"); break; } else if(status.MPI_TAG == PARALLEL_ORDER) { MPI_Recv(input_and_job_nr, 1, ctx->order_datatype, 0, PARALLEL_ORDER, MPI_COMM_WORLD, &status); DEBUG("Working on job %d\n", *job_nr); jobtime = -MPI_Wtime(); // do the actual work ctx->job(global_data, node_data, input, output); jobtime += MPI_Wtime(); DEBUG("Finished job %d in %f seconds\n", *job_nr, jobtime); *output_job_nr = *job_nr; MPI_Send(output_and_job_nr, 1, ctx->result_datatype, 0, PARALLEL_RESULT, MPI_COMM_WORLD); } } ctx->destroy(global_data, node_data); free(global_data); free(node_data); free(input_and_job_nr); free(output_and_job_nr); return 0; } int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *_restart_filename) { // in non-mpi-mode, just run init1, init2, forall(jobs) job if(ctx->mpi_mode == 0) { int result; void *node_data = malloc(ctx->node_data_size); result = ctx->init(global_data, node_data); if(result != 0) goto cleanup_standalone; for(int i = 0; i < njobs; i++) { result = ctx->job( global_data, node_data, input_array + ctx->input_size*i, output_array + ctx->output_size*i); if(result != 0) goto cleanup_standalone; } cleanup_standalone: ctx->destroy(global_data, node_data); return result; } else { // if no restart file was specified, pick a filename char *restart_filename; char buffer[128]; int restartf; if(_restart_filename == NULL) { time_t t = time(NULL); struct tm *loctm = localtime(&t); strftime(buffer, sizeof(buffer), "restart/restart_%y%m%d_%H%M%S", loctm); restart_filename = buffer; } else { restart_filename = (char *)_restart_filename; } // open restart file if it exists, otherwise create it int continuing = 1; restartf = open(restart_filename, O_RDWR); if(restartf == -1 && errno == ENOENT) { restartf = open(restart_filename, O_RDWR | O_CREAT, 0666); continuing = 0; } if(restartf == -1) { DEBUG("Error opening restart file: %s\n", strerror(errno)); exit(1); } // map restart file int itemsize = (ctx->output_size + sizeof(int)); // for every job, store output, and completed flag ftruncate(restartf, njobs*itemsize); void *alljobs = mmap(0, njobs*itemsize, PROT_READ | PROT_WRITE, MAP_SHARED, restartf, 0); if(alljobs == MAP_FAILED) { DEBUG("Error mapping restart file: %s\n", strerror(errno)); exit(1); } // count completed jobs, or initialize jobs int completed = 0; if(continuing) { for(int i = 0; i < njobs; i++) if(DONE(alljobs + i*itemsize)) completed++; } else { for(int i = 0; i < njobs; i++) { DONE(alljobs + i*itemsize) = 0; memcpy(alljobs + i*itemsize + sizeof(int), input_array + i*ctx->input_size, ctx->input_size); // copy input data } } fsync(restartf); if(continuing) { INFO("Continuing from restart file, %d/%d jobs completed, %d nodes\n", completed, njobs, ctx->size); } else { INFO("Starting from scratch, %d jobs, %d nodes\n", njobs, ctx->size); } if(completed >= njobs) goto cleanup_mpi; /* Send global data */ MPI_Bcast((void*)global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); DEBUG("Global data sent\n"); void *input_message_buffer = malloc(ctx->input_size + sizeof(int)); void *output_message_buffer = malloc(ctx->output_size + sizeof(int)); // find next unfinished job int current = 0; while(current < njobs && DONE(alljobs + current*itemsize)) current++; // assign initial jobs, 2 for each worker thread for(int i = 0; i < 2*(ctx->size-1); i++) { if(current >= njobs) // all jobs are assigned break; // send job id and input data // send to all nodes except ourself (node 0) *((int*)input_message_buffer) = current; memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); MPI_Send(input_message_buffer, 1, ctx->order_datatype, i%(ctx->size-1)+1, PARALLEL_ORDER, MPI_COMM_WORLD); DEBUG("Job %d sent to node %d\n", current, i%(ctx->size-1)+1); current++; } MPI_Status status; int active_worker_nodes = ctx->size - 1; while(1) { MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); if(status.MPI_TAG == PARALLEL_RESULT) { MPI_Recv(output_message_buffer, 1, ctx->result_datatype, MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status); int id = *((int*)output_message_buffer); memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size); DONE(alljobs + id*itemsize) = 1; completed++; // todo: deal with unresponsive nodes // strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter // if oversubscribe counter is at limit, shut node down instead // if(current >= njobs) { // all jobs are assigned, shut down node INFO("job %d completed by node %d, shut down\n", id, status.MPI_SOURCE); MPI_Send(NULL, 0, MPI_BYTE, status.MPI_SOURCE, PARALLEL_SHUTDOWN, MPI_COMM_WORLD); active_worker_nodes--; if(!active_worker_nodes) break; } else { INFO("job %d completed by node %d, continues with %d\n", id, status.MPI_SOURCE, current); *((int*)input_message_buffer) = current; memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); MPI_Send(input_message_buffer, 1, ctx->order_datatype, status.MPI_SOURCE, PARALLEL_ORDER, MPI_COMM_WORLD); current++; } } } for(int i = 0; i < njobs; i++) { memcpy(output_array + i*ctx->output_size, alljobs + i*itemsize + sizeof(int), ctx->output_size); } free(input_message_buffer); free(output_message_buffer); cleanup_mpi: munmap(alljobs, njobs*itemsize); close(restartf); } return 0; }