diff --git a/parallel.c b/parallel.c index 4136baf..95e2bca 100644 --- a/parallel.c +++ b/parallel.c @@ -10,14 +10,14 @@ #include #include -#define DEBUG(msg, ...) +#define DEBUG INFO #define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__) //#define DEBUG(msg, ...) fprintf(stderr, "[ %10.3f] " msg, runtime(), ##__VA_ARGS__) //#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__) #define DONE(x) *((int*)(x)) enum message_tag { - PARALLEL_ORDER, + PARALLEL_ORDER = 0, PARALLEL_RESULT, PARALLEL_SHUTDOWN, PARALLEL_GLOBAL_DATA @@ -114,42 +114,34 @@ void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callbac } } -int parallel_work(parallel_context *ctx) +int parallel_job(parallel_context *ctx, const void *global_data, void *node_data, int block) { - // do nothing in non-mpi mode - if(ctx->mpi_mode == 0) - return 0; - MPI_Status status; - void *global_data = malloc(ctx->global_data_size); - void *node_data = malloc(ctx->node_data_size); + double jobtime; void *input_and_job_nr = malloc(ctx->input_size + sizeof(int)); + void *output_and_job_nr = malloc(ctx->output_size + sizeof(int)); void *input = input_and_job_nr + sizeof(int); int *job_nr = (int *)input_and_job_nr; - void *output_and_job_nr = malloc(ctx->output_size + sizeof(int)); void *output = output_and_job_nr + sizeof(int); int *output_job_nr = (int *)output_and_job_nr; - double jobtime; + int result = 0; + int message_present; - // wait for global data - MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); - - DEBUG("Global data received\n"); - - // initialize node_data (and do once-per-node computation) - ctx->init(global_data, node_data); - - DEBUG("Initialization completed\n"); - - while(1) { + if(block) { MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + message_present = 1; + } else { + MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD, + &message_present, &status); + } // DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG); + if(message_present) { if(status.MPI_TAG == PARALLEL_SHUTDOWN) { DEBUG("Shutting down\n"); - break; + result = 1; } else if(status.MPI_TAG == PARALLEL_ORDER) { MPI_Recv(input_and_job_nr, 1, ctx->order_datatype, @@ -172,14 +164,44 @@ int parallel_work(parallel_context *ctx) 1, ctx->result_datatype, 0, PARALLEL_RESULT, MPI_COMM_WORLD); } + } else { + result = 2; + } + + free(input_and_job_nr); + free(output_and_job_nr); + + return result; +} + +int parallel_work(parallel_context *ctx) +{ + // do nothing in non-mpi mode + if(ctx->mpi_mode == 0) + return 0; + + void *global_data = malloc(ctx->global_data_size); + void *node_data = malloc(ctx->node_data_size); + + // wait for global data + MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); + + DEBUG("Global data received\n"); + + // initialize node_data (and do once-per-node computation) + ctx->init(global_data, node_data); + + DEBUG("Initialization completed\n"); + + int shutdown = 0; + while(shutdown != 1) { + shutdown = parallel_job(ctx, global_data, node_data, 1); } ctx->destroy(global_data, node_data); free(global_data); free(node_data); - free(input_and_job_nr); - free(output_and_job_nr); return 0; } @@ -188,9 +210,8 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp { // in non-mpi-mode, just run init1, init2, forall(jobs) job if(ctx->mpi_mode == 0) { - int result; void *node_data = malloc(ctx->node_data_size); - result = ctx->init(global_data, node_data); + int result = ctx->init(global_data, node_data); if(result != 0) goto cleanup_standalone; @@ -271,8 +292,15 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp DEBUG("Global data sent\n"); + // we want to be able to run jobs ourselves, so initialize node_data + void *node_data = malloc(ctx->node_data_size); + ctx->init(global_data, node_data); + void *input_message_buffer = malloc(ctx->input_size + sizeof(int)); void *output_message_buffer = malloc(ctx->output_size + sizeof(int)); + int *active_jobs = malloc(sizeof(int)*ctx->size); + memset(active_jobs, 0, ctx->size*sizeof(int)); + int active_worker_nodes = ctx->size - 1; // we don't count ourselves, since we can't shut ourselves down // find next unfinished job int current = 0; @@ -280,7 +308,7 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp current++; // assign initial jobs, 2 for each worker thread - for(int i = 0; i < 2*(ctx->size-1); i++) { + for(int i = 0; i < 2*ctx->size; i++) { if(current >= njobs) // all jobs are assigned break; @@ -289,42 +317,54 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp *((int*)input_message_buffer) = current; memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); MPI_Send(input_message_buffer, 1, ctx->order_datatype, - i%(ctx->size-1)+1, PARALLEL_ORDER, MPI_COMM_WORLD); + i%ctx->size, PARALLEL_ORDER, MPI_COMM_WORLD); - DEBUG("Job %d sent to node %d\n", current, i%(ctx->size-1)+1); + DEBUG("Job %d sent to node %d\n", current, i%ctx->size); + active_jobs[i%ctx->size]++; current++; } MPI_Status status; - int active_worker_nodes = ctx->size - 1; - while(1) { - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); - if(status.MPI_TAG == PARALLEL_RESULT) { + int message_present; + while(active_jobs[0] != 0 || active_worker_nodes != 0) { + MPI_Iprobe(MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &message_present, &status); + DEBUG("Message present, tag = %d, source = %d\n", status.MPI_TAG, status.MPI_SOURCE); + if(!message_present) { + // if there are no more messages to process, we can run a job ourselves before returning to managing + DEBUG("Start running job myself\n"); + int result = parallel_job(ctx, global_data, node_data, 0); + DEBUG("Finished running job myself, result = %d\n"); + } else if(status.MPI_TAG == PARALLEL_RESULT) { MPI_Recv(output_message_buffer, 1, ctx->result_datatype, MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status); + DEBUG("Got message tag %d from node %d\n", status.MPI_TAG, status.MPI_SOURCE); + + int node = status.MPI_SOURCE; int id = *((int*)output_message_buffer); memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size); DONE(alljobs + id*itemsize) = 1; - completed++; + active_jobs[node]--; // todo: deal with unresponsive nodes // strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter // if oversubscribe counter is at limit, shut node down instead // - if(current >= njobs) { // all jobs are assigned, shut down node - INFO("job %d completed by node %d, shut down\n", id, status.MPI_SOURCE); - MPI_Send(NULL, 0, MPI_BYTE, status.MPI_SOURCE, PARALLEL_SHUTDOWN, MPI_COMM_WORLD); - active_worker_nodes--; - if(!active_worker_nodes) - break; + if(current >= njobs) { // all jobs are assigned, try to shut down node + // don't try to shut down ourselves, and only if it has no other jobs to do + if(node != 0 && active_jobs[node] == 0) { + MPI_Send(NULL, 0, MPI_BYTE, node, PARALLEL_SHUTDOWN, MPI_COMM_WORLD); + active_worker_nodes--; + INFO("job %d completed by node %d, shut down, %d workers remaining\n", id, node, active_worker_nodes); + } } else { - INFO("job %d completed by node %d, continues with %d\n", id, status.MPI_SOURCE, current); + INFO("job %d completed by node %d, continues with %d\n", id, node, current); *((int*)input_message_buffer) = current; memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size); MPI_Send(input_message_buffer, 1, ctx->order_datatype, - status.MPI_SOURCE, PARALLEL_ORDER, MPI_COMM_WORLD); + node, PARALLEL_ORDER, MPI_COMM_WORLD); + active_jobs[node]++; current++; } } @@ -336,6 +376,8 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp free(input_message_buffer); free(output_message_buffer); + free(node_data); + free(active_jobs); cleanup_mpi: munmap(alljobs, njobs*itemsize);