13 #define _XOPEN_SOURCE 700
29 #include "partest_util.h"
32 test_single_mpi(
char *filename,
41 double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
42 double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
43 sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
46 sion_int64 bsumwrote, bsumread;
47 size_t bwrite, bwrote, btoread, bread, chunkcnt;
48 double checksum_fp, checksum_read_fp;
49 int ser_count,ser_step,ser_done;
54 if (communicators->work_size == -1) {
59 if(options->use_posix) {
62 #if defined(_SION_SIONFWD)
63 else if (options->use_sionfwd) {
64 api_to_use = SION_FILE_FLAG_SIONFWD;
67 #ifdef _SION_IME_NATIVE
68 else if (options->use_ime_native) {
69 api_to_use = SION_FILE_FLAG_IME_NATIVE;
76 for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
77 for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
79 do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
81 sprintf(fname,
"%s.%06d", filename, communicators->work_rank);
83 if(options->do_write) {
88 barrier_after_start(communicators->
local);
90 write_starttime = starttime = MPI_Wtime();
95 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
96 MPI_Abort(communicators->
all, 1);
98 stats[STATS_WR_NUM_FILES]=communicators->work_size;
99 timings[TIMINGS_WR_CREATE] = MPI_Wtime()-starttime;
101 starttime = MPI_Wtime();
102 barrier_after_open(communicators->
work);
103 timings[TIMINGS_WR_CREATE_BARR_OPEN] = MPI_Wtime()-starttime;
105 starttime = MPI_Wtime();
107 timings[TIMINGS_WR_CREATE_CLOSE] = MPI_Wtime()-starttime;
109 starttime = MPI_Wtime();
110 barrier_after_open(communicators->
work);
111 timings[TIMINGS_WR_CREATE_BARR_CLOSE] = MPI_Wtime()-starttime;
114 starttime = MPI_Wtime();
118 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
119 MPI_Abort(communicators->
all, 1);
121 timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
123 starttime = MPI_Wtime();
124 barrier_after_open(communicators->
work);
125 timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
127 starttime = MPI_Wtime();
128 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
129 else ser_step=communicators->local_size;
131 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
134 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
136 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
139 starttime = MPI_Wtime();
140 left = options->totalsize;
145 bwrite = options->bufsize;
150 fprintf(stderr,
"timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
155 left -= (sion_int64) bwrote;
156 bsumwrote += (sion_int64) bwrote;
160 if(!options->suppress_checksum) {
162 for (i = 0; i < bwrote; i++)
163 checksum_fp += (
double) localbuffer[i];
168 fprintf(stderr,
"timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
169 communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
174 timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
175 stats[STATS_BYTES_WR_WROTE]+=bsumwrote;
176 stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
180 starttime = MPI_Wtime();
181 barrier_after_write(communicators->
local);
182 timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
186 starttime = MPI_Wtime();
187 barrier_after_write(communicators->
work);
188 timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
190 starttime = MPI_Wtime();
192 timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
194 starttime = MPI_Wtime();
195 barrier_after_close(communicators->
work);
196 timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
198 timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
201 if (options->verbose) {
202 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
206 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
209 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
210 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
212 if (communicators->work_rank == 0) {
213 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
214 write_timings(
"TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
215 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
221 barrier_after_close(communicators->
work);
223 if(options->do_read) {
226 sprintf(fname,
"%s.%06d", filename, communicators->workread_rank);
231 barrier_after_start(communicators->
local);
233 read_starttime = starttime = MPI_Wtime();
237 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
238 MPI_Abort(communicators->
all, 1);
240 stats[STATS_RD_NUM_FILES]=communicators->work_size;
241 timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
243 starttime = MPI_Wtime();
244 barrier_after_open(communicators->
work);
245 timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
248 starttime = MPI_Wtime();
249 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
250 else ser_step=communicators->local_size;
252 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
254 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
256 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
259 starttime = MPI_Wtime();
260 left = options->totalsize;
265 btoread = options->bufsize;
276 if(!options->suppress_checksum) {
277 checksum_read_fp=0.0;
278 for (i = 0; i < bread; i++)
279 checksum_read_fp += (
double) localbuffer[i];
284 fprintf(stderr,
"timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
285 bsumread / 1024.0 / 1024.0, (sion_int64) left);
290 timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
292 stats[STATS_BYTES_RD_READ]+=bsumread;
293 stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
295 starttime = MPI_Wtime();
296 barrier_after_read(communicators->
local);
297 timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
303 starttime = MPI_Wtime();
304 barrier_after_read(communicators->
workread);
305 timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
307 starttime = MPI_Wtime();
309 timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
311 starttime = MPI_Wtime();
312 barrier_after_close(communicators->
workread);
313 timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
314 timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
316 starttime = MPI_Wtime();
318 if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
320 if (options->verbose) {
321 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
325 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
328 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
329 if (options->numfiles > 1) {
331 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
332 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
334 if (communicators->local_rank == 0) {
335 write_timings(
"FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
340 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
341 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
343 DPRINTFTS(communicators->all_rank,
"after red.");
344 if (communicators->workread_rank == 0) {
345 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
346 write_timings(
"TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
347 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
351 if(!options->suppress_checksum) {
352 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
353 fprintf(stderr,
"timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
354 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
361 if(options->unlink_files) {
362 starttime = MPI_Wtime();
363 barrier_before_unlink(communicators->
work);
365 barrier_after_unlink(communicators->
work);
366 unlinktime = MPI_Wtime() - starttime;
367 MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
368 if (communicators->work_rank == 0) {
369 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", gunlinktime, fname);
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
#define SION_FILE_FLAG_READ
#define SION_FILE_FLAG_CREATE
#define SION_FILE_FLAG_ANSI
#define SION_FILE_FLAG_WRITE
#define SION_FILE_FLAG_POSIX