#include "parallel.h" #include #include #include #include #include #include #include #include #include //#define DEBUG INFO #define DEBUG(msg, ...) #define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__) //#define DEBUG(msg, ...) fprintf(stderr, "[ %10.3f] " msg, runtime(), ##__VA_ARGS__) //#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__) #define DONE(x) *((int*)(x)) enum message_tag { PARALLEL_ORDER = 0, PARALLEL_RESULT, PARALLEL_SHUTDOWN, PARALLEL_GLOBAL_DATA }; struct timespec starttime; int mpi_rank(int activate_mpi) { static int active = 0; if(activate_mpi) active = 1; if(!active) return 0; else { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); return rank; } } void start_timer() { clock_gettime(CLOCK_MONOTONIC, &starttime); } double runtime() { struct timespec curtime; double diff; clock_gettime(CLOCK_MONOTONIC, &curtime); return (curtime.tv_sec - starttime.tv_sec) + (curtime.tv_nsec - starttime.tv_nsec) / 1e9; } parallel_context *parallel_init() { parallel_context *ctx = malloc(sizeof(parallel_context)); if(!getenv("OMPI_COMM_WORLD_SIZE")) { ctx->mpi_mode = 0; DEBUG("Running standalone.\n"); return ctx; } ctx->mpi_mode = 1; int result = MPI_Init(NULL, NULL); MPI_Comm_size(MPI_COMM_WORLD, &ctx->size); MPI_Comm_rank(MPI_COMM_WORLD, &ctx->rank); MPI_Get_processor_name(ctx->processor_name, &ctx->processor_name_len); mpi_rank(1); // display the rank in debug output from now on if(ctx->rank == 0) DEBUG("Running in mpi mode, %d nodes.\n", ctx->size); return ctx; } void parallel_destroy(parallel_context* ctx) { if(ctx->mpi_mode) { MPI_Type_free(&ctx->order_datatype); MPI_Type_free(&ctx->result_datatype); MPI_Finalize(); } free(ctx); } void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size) { ctx->init = init; ctx->destroy = destroy; ctx->job = job; ctx->global_data_size = global_data_size; ctx->node_data_size = node_data_size; ctx->input_size = input_size; ctx->output_size = output_size; if(ctx->mpi_mode) { // create a datatype for job orders, consisting of an integer (the job id) and a user-defined section int order_blocklengths[2] = {1, input_size}; MPI_Aint order_displacements[2] = {0, sizeof(int)}; MPI_Datatype order_types[2] = {MPI_INT, MPI_BYTE}; MPI_Type_create_struct(2, order_blocklengths, order_displacements, order_types, &ctx->order_datatype); MPI_Type_commit(&ctx->order_datatype); int result_blocklengths[2] = {1, output_size}; MPI_Aint result_displacements[2] = {0, sizeof(int)}; MPI_Datatype result_types[2] = {MPI_INT, MPI_BYTE}; MPI_Type_create_struct(2, result_blocklengths, result_displacements, result_types, &ctx->result_datatype); MPI_Type_commit(&ctx->result_datatype); } } int parallel_job(parallel_context *ctx, const void *global_data, void *node_data, int block) { MPI_Status status; double jobtime; void *input_and_job_nr = malloc(ctx->input_size + sizeof(int)); void *output_and_job_nr = malloc(ctx->output_size + sizeof(int)); void *input = input_and_job_nr + sizeof(int); int *job_nr = (int *)input_and_job_nr; void *output = output_and_job_nr + sizeof(int); int *output_job_nr = (int *)output_and_job_nr; int result = 0; int message_present; if(block) { jobtime = -MPI_Wtime(); MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); jobtime += MPI_Wtime(); INFO("TIMING: Probe() took %f seconds\n", jobtime); message_present = 1; } else { MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &message_present, &status); } // DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG); if(message_present) { if(status.MPI_TAG == PARALLEL_SHUTDOWN) { DEBUG("Shutting down\n"); result = 1; } else if(status.MPI_TAG == PARALLEL_ORDER) { MPI_Recv(input_and_job_nr, 1, ctx->order_datatype, 0, PARALLEL_ORDER, MPI_COMM_WORLD, &status); DEBUG("Working on job %d\n", *job_nr); jobtime = -MPI_Wtime(); // do the actual work ctx->job(global_data, node_data, input, output); jobtime += MPI_Wtime(); INFO("TIMING: job %d took %f seconds\n", *job_nr, jobtime); *output_job_nr = *job_nr; jobtime = -MPI_Wtime(); MPI_Send(output_and_job_nr, 1, ctx->result_datatype, 0, PARALLEL_RESULT, MPI_COMM_WORLD); jobtime += MPI_Wtime(); INFO("TIMING: Send() took %f seconds\n", jobtime); } } else { result = 2; } free(input_and_job_nr); free(output_and_job_nr); return result; } int parallel_work(parallel_context *ctx) { // do nothing in non-mpi mode if(ctx->mpi_mode == 0) return 0; void *global_data = malloc(ctx->global_data_size); void *node_data = malloc(ctx->node_data_size); // wait for global data MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); DEBUG("Global data received\n"); // initialize node_data (and do once-per-node computation) ctx->init(global_data, node_data); DEBUG("Initialization completed\n"); int shutdown = 0; while(shutdown != 1) { shutdown = parallel_job(ctx, global_data, node_data, 1); } ctx->destroy(global_data, node_data); free(global_data); free(node_data); return 0; } int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *_restart_filename) { // in non-mpi-mode, just run init1, init2, forall(jobs) job if(ctx->mpi_mode == 0) { void *node_data = malloc(ctx->node_data_size); int result = ctx->init(global_data, node_data); if(result != 0) goto cleanup_standalone; for(int i = 0; i < njobs; i++) { result = ctx->job( global_data, node_data, input_array + ctx->input_size*i, output_array + ctx->output_size*i); if(result != 0) goto cleanup_standalone; } cleanup_standalone: ctx->destroy(global_data, node_data); return result; } else { // if no restart file was specified, pick a filename char *restart_filename; char buffer[128]; int restartf; if(_restart_filename == NULL) { time_t t = time(NULL); struct tm *loctm = localtime(&t); strftime(buffer, sizeof(buffer), "restart/restart_%y%m%d_%H%M%S", loctm); restart_filename = buffer; } else { restart_filename = (char *)_restart_filename; } // open restart file if it exists, otherwise create it int continuing = 1; restartf = open(restart_filename, O_RDWR); if(restartf == -1 && errno == ENOENT) { restartf = open(restart_filename, O_RDWR | O_CREAT, 0666); continuing = 0; } if(restartf == -1) { DEBUG("Error opening restart file: %s\n", strerror(errno)); exit(1); } // map restart file int itemsize = (ctx->output_size + sizeof(int)); // for every job, store output, and completed flag ftruncate(restartf, njobs*itemsize); void *alljobs = mmap(0, njobs*itemsize, PROT_READ | PROT_WRITE, MAP_SHARED, restartf, 0); if(alljobs == MAP_FAILED) { DEBUG("Error mapping restart file: %s\n", strerror(errno)); exit(1); } // count completed jobs, or initialize jobs int completed = 0; if(continuing) { for(int i = 0; i < njobs; i++) if(DONE(alljobs + i*itemsize)) completed++; } else { for(int i = 0; i < njobs; i++) { DONE(alljobs + i*itemsize) = 0; memcpy(alljobs + i*itemsize + sizeof(int), input_array + i*ctx->input_size, ctx->input_size); // copy input data } } fsync(restartf); if(continuing) { INFO("Continuing from restart file, %d/%d jobs completed, %d nodes\n", completed, njobs, ctx->size); } else { INFO("Starting from scratch, %d jobs, %d nodes\n", njobs, ctx->size); } if(completed >= njobs) goto cleanup_mpi; /* Send global data */ MPI_Bcast((void*)global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); DEBUG("Global data sent\n"); // we want to be able to run jobs ourselves, so initialize node_data void *node_data = malloc(ctx->node_data_size); ctx->init(global_data, node_data); void *input_message_buffer = malloc(ctx->input_size + sizeof(int)); void *output_message_buffer = malloc(ctx->output_size + sizeof(int)); int *active_jobs = malloc(sizeof(int)*ctx->size); memset(active_jobs, 0, ctx->size*sizeof(int)); int active_worker_nodes = ctx->size - 1; // we don't count ourselves, since we can't shut ourselves down // find next unfinished job int current = 0; while(current < njobs && DONE(alljobs + current*itemsize)) current++; // assign initial jobs, 2 for each worker thread for(int i = 0; i < 2*ctx->size; i++) { if(current >= njobs) // all jobs are assigned break; // send job id and input data // send to all nodes except ourself (node 0) *((int*)input_message_buffer) = current; memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); MPI_Send(input_message_buffer, 1, ctx->order_datatype, i%ctx->size, PARALLEL_ORDER, MPI_COMM_WORLD); DEBUG("Job %d sent to node %d\n", current, i%ctx->size); active_jobs[i%ctx->size]++; current++; } MPI_Status status; int message_present; while(active_jobs[0] != 0 || active_worker_nodes != 0) { MPI_Iprobe(MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &message_present, &status); DEBUG("Message present, tag = %d, source = %d\n", status.MPI_TAG, status.MPI_SOURCE); if(!message_present) { // if there are no more messages to process, we can run a job ourselves before returning to managing DEBUG("Start running job myself\n"); int result = parallel_job(ctx, global_data, node_data, 0); DEBUG("Finished running job myself, result = %d\n"); } else if(status.MPI_TAG == PARALLEL_RESULT) { MPI_Recv(output_message_buffer, 1, ctx->result_datatype, MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status); DEBUG("Got message tag %d from node %d\n", status.MPI_TAG, status.MPI_SOURCE); int node = status.MPI_SOURCE; int id = *((int*)output_message_buffer); memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size); DONE(alljobs + id*itemsize) = 1; active_jobs[node]--; // todo: deal with unresponsive nodes // strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter // if oversubscribe counter is at limit, shut node down instead // if(current >= njobs) { // all jobs are assigned, try to shut down node // don't try to shut down ourselves, and only if it has no other jobs to do if(node != 0 && active_jobs[node] == 0) { MPI_Send(NULL, 0, MPI_BYTE, node, PARALLEL_SHUTDOWN, MPI_COMM_WORLD); active_worker_nodes--; INFO("job %d completed by node %d, shut down, %d workers remaining\n", id, node, active_worker_nodes); } } else { *((int*)input_message_buffer) = current; memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); MPI_Send(input_message_buffer, 1, ctx->order_datatype, node, PARALLEL_ORDER, MPI_COMM_WORLD); active_jobs[node]++; current++; if(active_jobs[node] < 3) { *((int*)input_message_buffer) = current; memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); MPI_Send(input_message_buffer, 1, ctx->order_datatype, node, PARALLEL_ORDER, MPI_COMM_WORLD); active_jobs[node]++; current++; INFO("job %d completed by node %d, continues with %d and %d\n", id, node, current-1, current-2); } else { INFO("job %d completed by node %d, continues with %d\n", id, node, current-1); } } } } for(int i = 0; i < njobs; i++) { memcpy(output_array + i*ctx->output_size, alljobs + i*itemsize + sizeof(int), ctx->output_size); } free(input_message_buffer); free(output_message_buffer); free(node_data); free(active_jobs); cleanup_mpi: munmap(alljobs, njobs*itemsize); close(restartf); } return 0; }