24 #include "partest_util.h" 27 test_single_mpi(
char *filename,
36 double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
37 double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
38 sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
41 sion_int64 bsumwrote, bsumread;
42 size_t bwrite, bwrote, btoread, bread, chunkcnt;
43 double checksum_fp, checksum_read_fp;
44 int ser_count,ser_step,ser_done;
49 if (communicators->work_size == -1) {
54 if(options->use_posix) {
60 for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
61 for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
63 do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
65 sprintf(fname,
"%s.%06d", filename, communicators->work_rank);
67 if(options->do_write) {
72 barrier_after_start(communicators->
local);
74 write_starttime = starttime = MPI_Wtime();
79 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
80 MPI_Abort(communicators->
all, 1);
82 stats[STATS_WR_NUM_FILES]=communicators->work_size;
83 timings[TIMINGS_WR_CREATE] = MPI_Wtime()-starttime;
85 starttime = MPI_Wtime();
86 barrier_after_open(communicators->
work);
87 timings[TIMINGS_WR_CREATE_BARR_OPEN] = MPI_Wtime()-starttime;
89 starttime = MPI_Wtime();
91 timings[TIMINGS_WR_CREATE_CLOSE] = MPI_Wtime()-starttime;
93 starttime = MPI_Wtime();
94 barrier_after_open(communicators->
work);
95 timings[TIMINGS_WR_CREATE_BARR_CLOSE] = MPI_Wtime()-starttime;
98 starttime = MPI_Wtime();
102 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
103 MPI_Abort(communicators->
all, 1);
105 timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
107 starttime = MPI_Wtime();
108 barrier_after_open(communicators->
work);
109 timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
111 starttime = MPI_Wtime();
112 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
113 else ser_step=communicators->local_size;
115 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
118 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
120 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
123 starttime = MPI_Wtime();
124 left = options->totalsize;
129 bwrite = options->bufsize;
134 fprintf(stderr,
"timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
139 left -= (sion_int64) bwrote;
140 bsumwrote += (sion_int64) bwrote;
144 if(!options->suppress_checksum) {
146 for (i = 0; i < bwrote; i++)
147 checksum_fp += (
double) localbuffer[i];
152 fprintf(stderr,
"timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
153 communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
158 timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
159 stats[STATS_BYTES_WR_WROTE]+=bsumwrote;
160 stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
164 starttime = MPI_Wtime();
165 barrier_after_write(communicators->
local);
166 timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
170 starttime = MPI_Wtime();
171 barrier_after_write(communicators->
work);
172 timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
174 starttime = MPI_Wtime();
176 timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
178 starttime = MPI_Wtime();
179 barrier_after_close(communicators->
work);
180 timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
182 timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
185 if (options->verbose) {
186 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
190 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
193 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
194 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
196 if (communicators->work_rank == 0) {
197 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
198 write_timings(
"TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
199 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
205 barrier_after_close(communicators->
work);
207 if(options->do_read) {
210 sprintf(fname,
"%s.%06d", filename, communicators->workread_rank);
215 barrier_after_start(communicators->
local);
217 read_starttime = starttime = MPI_Wtime();
221 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
222 MPI_Abort(communicators->
all, 1);
224 stats[STATS_RD_NUM_FILES]=communicators->work_size;
225 timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
227 starttime = MPI_Wtime();
228 barrier_after_open(communicators->
work);
229 timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
232 starttime = MPI_Wtime();
233 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
234 else ser_step=communicators->local_size;
236 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
238 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
240 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
243 starttime = MPI_Wtime();
244 left = options->totalsize;
249 btoread = options->bufsize;
260 if(!options->suppress_checksum) {
261 checksum_read_fp=0.0;
262 for (i = 0; i < bread; i++)
263 checksum_read_fp += (
double) localbuffer[i];
268 fprintf(stderr,
"timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
269 bsumread / 1024.0 / 1024.0, (sion_int64) left);
274 timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
276 stats[STATS_BYTES_RD_READ]+=bsumread;
277 stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
279 starttime = MPI_Wtime();
280 barrier_after_read(communicators->
local);
281 timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
287 starttime = MPI_Wtime();
288 barrier_after_read(communicators->
workread);
289 timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
291 starttime = MPI_Wtime();
293 timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
295 starttime = MPI_Wtime();
296 barrier_after_close(communicators->
workread);
297 timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
298 timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
300 starttime = MPI_Wtime();
302 if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
304 if (options->verbose) {
305 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
309 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
312 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
313 if (options->numfiles > 1) {
315 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
316 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
318 if (communicators->local_rank == 0) {
319 write_timings(
"FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
324 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
325 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
327 DPRINTFTS(communicators->all_rank,
"after red.");
328 if (communicators->workread_rank == 0) {
329 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
330 write_timings(
"TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
331 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
335 if(!options->suppress_checksum) {
336 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
337 fprintf(stderr,
"timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
338 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
345 if(options->unlink_files) {
346 starttime = MPI_Wtime();
347 barrier_before_unlink(communicators->
work);
349 barrier_after_unlink(communicators->
work);
350 unlinktime = MPI_Wtime() - starttime;
351 MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
352 if (communicators->work_rank == 0) {
353 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", gunlinktime, fname);
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
#define SION_FILE_FLAG_WRITE
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
#define SION_FILE_FLAG_READ
#define SION_FILE_FLAG_POSIX
#define SION_FILE_FLAG_ANSI
#define SION_FILE_FLAG_CREATE
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.