119 lines
4.0 KiB
C
119 lines
4.0 KiB
C
|
#ifndef PARALLEL_H
|
||
|
#define PARALLEL_H
|
||
|
|
||
|
/*
|
||
|
this is a library to parallelize workloads which can be split up naturally
|
||
|
into a sequence of independent jobs, using MPI. A program will usually
|
||
|
|
||
|
- do precomputation
|
||
|
- fill array with input data
|
||
|
- do the parallel work
|
||
|
- print the output data
|
||
|
|
||
|
we want to enable restarts, so that only unfinshed jobs need to be repeated.
|
||
|
Further, we want to be resilient to slow/unreliable network and to losing
|
||
|
nodes. There is a main node and a number of workers. The main node does the
|
||
|
precomputation and then retires do do administrative work, and the workers
|
||
|
do the actual jobs. We also want to switch to serial mode if the program is
|
||
|
called without MPI.
|
||
|
|
||
|
The following data has to be transimitted between nodes:
|
||
|
- results of the precomputation (read-only, shared between nodes)
|
||
|
- job-specific input data, generated by main node before parallel part
|
||
|
- output data for each job
|
||
|
|
||
|
the parallel work shall be given as a callback function which takes
|
||
|
input data and precomputation data as parameter
|
||
|
|
||
|
the above program will look like this for us:
|
||
|
|
||
|
- parallel_init
|
||
|
- if we are a worker, do parallel_work(init_callback, job_callback), exit
|
||
|
- do precomputation
|
||
|
- fill array with input data
|
||
|
- output_array = parallel_run(input_array)
|
||
|
- print the output data
|
||
|
|
||
|
parallel_init:
|
||
|
- check if we're running as an mpi program
|
||
|
- init mpi, check what kind of node we are
|
||
|
|
||
|
parallel_work(init_callback1, init_callback2, job_callback):
|
||
|
- receive global_precomp (???)
|
||
|
- worker_precomp = init_callback2(global_precomp, worker_precomp)
|
||
|
- infinite loop:
|
||
|
- wait for job on network, receive input
|
||
|
- output = job_callback(global_precomp, worker_precomp, input)
|
||
|
- send output on network
|
||
|
- exit loop on shutdown signal
|
||
|
|
||
|
parallel_run(global_precomp, input_array, restart file, callbacks):
|
||
|
- check if we're running as an MPI program
|
||
|
- send global_precomp to all nodes (if MPI)
|
||
|
- if(restart file given and exists) read restart file
|
||
|
- else create new restart file
|
||
|
- until(all jobs finished):
|
||
|
- if MPI:
|
||
|
- send next job & input to appropriate node
|
||
|
- if all jobs are in work, reassign unfinished ones (up to limit)
|
||
|
- collect outputs
|
||
|
- if no MPI:
|
||
|
- worker_precomp = init_callback1
|
||
|
- worker_precomp = init_callback2(global_precomp, worker_precomp)
|
||
|
- for(j in jobs)
|
||
|
- output(j) = job_callback(global_precomp, worker_precomp, input(j))
|
||
|
- delete restart file
|
||
|
- return array of outputs
|
||
|
|
||
|
parallel_destroy():
|
||
|
- free everything
|
||
|
|
||
|
have a context? probably yes: parallel_context
|
||
|
|
||
|
plan:
|
||
|
- make interface
|
||
|
- implement no-MPI part
|
||
|
- restructure singular_values.c to use interface
|
||
|
- implement MPI part
|
||
|
*/
|
||
|
|
||
|
#include <mpi.h>
|
||
|
#include <time.h>
|
||
|
|
||
|
typedef void (*parallel_callback_destroy)(const void*, void*);
|
||
|
typedef int (*parallel_callback_init)(const void*,void*);
|
||
|
typedef int (*parallel_callback_job)(const void*,void*,const void*,void*);
|
||
|
|
||
|
typedef struct {
|
||
|
int mpi_mode;
|
||
|
struct timespec starttime;
|
||
|
char processor_name[MPI_MAX_PROCESSOR_NAME];
|
||
|
int processor_name_len;
|
||
|
int rank;
|
||
|
int size;
|
||
|
MPI_Datatype order_datatype;
|
||
|
MPI_Datatype result_datatype;
|
||
|
parallel_callback_init init;
|
||
|
parallel_callback_job job;
|
||
|
parallel_callback_destroy destroy;
|
||
|
void *global_data;
|
||
|
void *node_data;
|
||
|
int global_data_size;
|
||
|
int node_data_size;
|
||
|
int input_size;
|
||
|
int output_size;
|
||
|
} parallel_context;
|
||
|
|
||
|
parallel_context *parallel_init();
|
||
|
void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size);
|
||
|
int parallel_work(parallel_context *ctx);
|
||
|
int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *restart_filename);
|
||
|
void parallel_destroy(parallel_context* ctx);
|
||
|
|
||
|
int mpi_rank();
|
||
|
void start_timer();
|
||
|
double runtime();
|
||
|
|
||
|
|
||
|
#endif
|