Robert R. McCormick School of Engineering and Applied Science Electrical Engineering and Computer Science Department Center for Ultra-scale Computing and Information Security at Northwestern University

Linux Magazine July 2004
Copyright Linux Magazine ©2004 EXTREME LINUX
Parallel netCDF
by Forrest Hoffman

As clusters grow and computational applications scale to more and more processors, input and output (I/O) frequently becomes a performance-limiting bottleneck. While hardware manufacturers continue to improve I/O bandwidth between memory and disk, the distributed memory environment of Linux clusters poses unique challenges for obtaining high performance I/O.

Parallel and cluster file systems attempt to provide the needed scalability by distributing file data on disks around the cluster. In addition, parallel I/O is accessible to applications through MPI-IO, the I/O layer specified in the MPI-2 standard. However, most model developers want a high-level interface that can deal with the details of collective I/O and distributed file systems no matter what they are.

Parallel file systems are finally coming of age, and MPI-IO is available in both MPICH and LAM/MPI, the two most popular MPI implementations for Linux clusters. [The Parallel Virtual File System (PVFS) and rudimentary MPI-IO use were first discussed in this column in July and August of 2002, available online at http://www.linuxmagazine.com/2002-07/extreme_01.html.] Now, PVFS2 is also available for testing, and the ROMIO implementation of MPI-IO is available in MPICH2 (see sidebar). Other file system solutions are also available, including Lustre, the Global File System (GFS) from Red Hat, and CXFS from Silicon Graphics (SGI).

In general, you won't want to use a programming interface to any one of these file systems since it ties your application to a specific file system for I/O. Instead, MPI-IO should be used to access these file systems, because the MPI-IO implementation provides native access to the file systems anyway. Using MPI-IO makes applications portable to any kind of parallel environment, while providing at least rudimentary parallel I/O.

In many cases, the model developer may prefer to perform I/O using an interface developed specifically for the file format in common use. These domain-specific file programming interfaces are typically written for sequential I/O, often for use only in serial applications.

What's needed is a parallel implementation of such file interfaces, preferably layered on top of MPI-IO so that they can take advantage of its collective operations and its interfaces to parallel, cluster, and global file systems.

Enter parallel netCDF and parallel HDF5, versions of the network Common Data Form and the Hierarchical Data Format, respectively. netCDF was developed by the University Corporation for Atmospheric Research (UCAR) and is maintained by its Unidata branch (http://www.unidata.ucar.edu/packages/netcdf). netCDF is used by atmospheric scientists and climate modelers. HDF, developed by the National Center for Supercomputing Application (NCSA) at the University of Illinois at Urbana-Champaign (http://hdf.ncsa.uiuc.edu), is commonly used by the satellite and remote sensing communities.

Both HDF and netCDF consist of application programming interfaces (APIs) and self-describing file formats containing metadata and data all in a single file. HDF files contain various objects or files within them making it a more complex and potentially more powerful method for storing data. netCDF, on the other hand, is simpler. It contains a header where metadata is stored and a subsequent data section containing the array-based data frequently used by earth scientists and modelers.

Both formats are binary, yet are portable to any computer platform. Only parallel netCDF is described here.

The Standard netCDF Interface

The netCDF interface and file format has been around for many years. netCDF data is self-describing since information about the data is contained in the same file, and over time, many conventions have developed describing standard tags and attributes that should be contained in netCDF headers. netCDF data is architecture-independent because data is represented in a standard binary format called eXternal Data Representation, or XDR.

The standard netCDF API is available for AIX, HPUX, IRIX, Linux, Mac OS, OSF1, SunOS, UNICOS, and various versions of Windows. It can be used with C, C++, FORTRAN, FORTRAN-90, as well as other programming languages.

Data can be accessed directly and efficiently through the netCDF API, and data can be appended to a netCDF dataset at a later time. Normally, one process can write to a netCDF file while multiple processes simultaneously read the same file. However, for most parallel applications this is insufficient.

To use the standard netCDF API in a parallel application, one must usually ship data to a single process that performs all I/O operations. This is inefficient and cumbersome, particularly if the machine on which the process runs has insufficient memory to hold the otherwise-distributed data. In such a case, data must be brought in a block at a time and written to disk, thus slowing computational processing.

Listing One contains an example program called sequential.c that demonstrates how data generated in parallel is typically written to a netCDF file using the standard interface. (As is usual for examples, very little error checking is done for the sake of brevity.) This program assumes it will be run on an even number of MPI processes so that data is evenly divided among them.

Listing One: sequential.c captures data that's generated in parallel

#include <stdio.h>
#include <time.h>
#include <strings.h>
#include <netcdf.h>
#include <mpi.h>

#define NLON 128
#define NLAT 64

double *make_temp(int len)
{
int i;
double *temp;

if (!(temp = (double *)malloc(len * sizeof(double)))) {
perror("temp");
}
for (i = 0; i < len; i++)
temp[i] = 278.15 + (i * 0.001);

return temp;
}

void handle_error(int status)
{
if (status != NC_NOERR) {
fprintf(stderr, "%s\n", nc_strerror(status));
exit(1);
}
return;
}

int main(int argc, char** argv)
{
int rank, size, namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int ncid, lonid, latid, timeid, Tid, Tdimids[3], len;
size_t counts[3], starts[3];
char *Tname = "temperature", *Tunits = "K";
double Trange[] = { 0.0, 500.0 }, *temp, *alltemps;
char *title = "Global surface temperature data", history[256], buf[26];
time_t tm;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_processor_name(processor_name, &namelen);
printf("Rank %d of %d started on %s\n", rank, size, processor_name);

if (!rank) {
/* Create the netCDF dataset and define everything */
handle_error(nc_create("global_data_seq.nc", 0, &ncid));
nc_def_dim(ncid, "lon", NLON, &lonid);
nc_def_dim(ncid, "lat", NLAT, &latid);
nc_def_dim(ncid, "time", NC_UNLIMITED, &timeid);
Tdimids[0] = timeid, Tdimids[2] = lonid, Tdimids[1] = latid;
nc_def_var(ncid, "T", NC_DOUBLE, 3, Tdimids, &Tid);
nc_put_att_text(ncid, Tid, "long_name", strlen(Tname), Tname);
nc_put_att_text(ncid, Tid, "units", strlen(Tunits), Tunits);
nc_put_att_double(ncid, Tid, "valid_range", NC_DOUBLE, 2, Trange);
nc_put_att_text(ncid, NC_GLOBAL, "title", strlen(title), title);
tm = time((time_t *)NULL);
(void)ctime_r(&tm, buf);
if (buf[24] = '\n') buf[24] = '\0';
sprintf(history, "Created %s", buf);
nc_put_att_text(ncid, NC_GLOBAL, "history", strlen(history), history);
nc_enddef(ncid);
}

len = NLON * NLAT / size;
/* Read or compute temperatures */
temp = make_temp(len);
/* Gather temperatures to master process for writing */
if (!rank) {
if (!(alltemps = (double *)malloc(NLON * NLAT * sizeof(double))))
perror("alltemps");
}
if (MPI_Gather(temp, len, MPI_DOUBLE, alltemps, len, MPI_DOUBLE, 0,
MPI_COMM_WORLD) != MPI_SUCCESS)
fprintf(stderr, "%d: Error in MPI_Gather()\n", rank);
if (!rank) {
starts[0] = starts[1] = starts[2] = 0;
counts[0] = 1, counts[1] = NLAT, counts[2] = NLON;
handle_error(nc_put_vara_double(ncid, Tid, starts, counts, alltemps));
nc_close(ncid);
free(alltemps);
}

free(temp);
MPI_Finalize();
}

In sequential.c., MPI is initialized using MPI_Init() just inside main(). The process rank is collected using MPI_ Comm_rank(); the number of processes involved in the computation is found using MPI_Comm_size(); and the processor name using MPI_Get_processor_name(). Then the first MPI process creates the netCDF output file using nc_create() and defines all the dimensions, variables, and attributes to be contained in the file. The call to nc_enddef() moves the file from define mode to data mode.

At this point, the vector lengths for each process are computed and temperature vectors are constructed and filled in the call to make_temp(). Upon return from that call, the first process allocates memory to hold the temperatures for the entire globe from all processes. Next, MPI_Gather() is called to transfer the subsets of temperatures from all processes to the first process. The first process sets start and count vectors and calls nc_put_vara_double() with the alltemps buffer containing all the temperatures to store the data into the netCDF file. Then the first process closes the netCDF file by calling nc_close() and frees the temporary global temperature array alltemps. Finally, all processes free their smaller vector of temperatures before calling MPI_Finalize() and exiting.

Figure One shows the results of compiling and running sequential.c with two processes using MPICH. The ncdump program, a standard netCDF utility, is used to view the contents of the netCDF file. All the header information looks correct, and the data section contains values that are expected.

Figure One: Running Listing One

[forrest@node01 pnetcdf]$ mpicc -O -o sequential sequential.c -lnetcdf
[forrest@node01 pnetcdf]$ time mpirun -np 2 sequential
Rank 0 of 2 started on node01.cluster.ornl.gov
Rank 1 of 2 started on node02.cluster.ornl.gov

real    0m0.706s
user    0m0.045s
sys     0m0.172s
[forrest@node01 pnetcdf]$ ncdump global_data_seq.nc | more
netcdf global_data_seq {
dimensions:
lon = 128 ;
lat = 64 ;
time = UNLIMITED ; // (1 currently)
variables:
double T(time, lat, lon) ;
T:long_name = "temperature" ;
T:units = "K" ;
T:valid_range = 0., 500. ;

// global attributes:
:title = "Global surface temperature data" ;
:history = "Created Tue Apr 13 01:32:12 2004" ;
data:

T =278.15, 278.151, 278.152, 278.153, 278.154, 278.155, 278.156, 278.157,
278.158, 278.159, 278.16, 278.161, 278.162, 278.163, 278.164, 278.165,
278.166, 278.167, 278.168, 278.169, 278.17, 278.171, 278.172, 278.173,
...
282.23, 282.231, 282.232, 282.233, 282.234, 282.235, 282.236, 282.237,
282.238, 282.239, 282.24, 282.241, 282.242, 282.243, 282.244, 282.245 ;
}

A Parallel Implementation

While programs like sequential.c work well for small numbers of nodes, they do not scale well. The process performing I/O becomes a serious bottleneck, and memory limitations can make the problem worse.

To solve these problems, researchers at Argonne National Laboratory have developed a parallel implementation of the netCDF API that retains complete file compatibility with the standard (sequential) implementation (http://www-unix.mcs.anl.gov/parallel-netcdf/). Since it's written on top of MPI-IO, this new parallel implementation of the netCDF library allows the developer to leverage the collective operations and parallel file system access (if appropriate) in the MPI-IO layer.

Just as with the serial interface, netCDF file access occurs in one of two modes: define mode for manipulating the metadata header or declaring new variables, and data mode for reading and writing array data. The parallel implementation uses many of the library calls familiar to developers; however, the prefix to these function names is changed to ncmpi_ for C and nfmpi_ for Fortran to avoid name clashes.

In data mode, the parallel netCDF implementation has two modes of operation: collective data mode and independent data mode. In collective data mode (the default), all processes must call the same function on the same netCDF file handle at the same point in the code; however, different parameters for these calls are acceptable. In independent data mode, each process can perform different I/O operations independently. It's expected that most developers will want to use collective data mode.

Two varieties of library calls are available for storing data when in data mode: the high level data mode and the flexible data mode. The high level data mode interface more closely resembles the original netCDF functions, but the flexible data mode interfaces takes advantage of MPI functionality by providing better handling of internal data. In both cases, the collective functions all end in _all.

Listing Two contains a program called collective.c that's very much like Listing One. However, in this program, all the netCDF calls have been replaced with matching parallel netCDF calls, which begin with ncmpi_. In collective.c, all MPI processes call the parallel netCDF functions to define the dataset and store data into it. Since headers are assumed to be small, a single process actually performs the I/O operations in define mode once ncmpi_enddef() is called.

Listing Two: Capturing data in parallel

#include <stdio.h>
#include <time.h>
#include <strings.h>
#include <pnetcdf.h>
#include <mpi.h>

#define NLON 128
#define NLAT 64

double *make_temp(int len)
{
int i;
double *temp;

if (!(temp = (double *)malloc(len * sizeof(double)))) {
perror("temp");
}
for (i = 0; i < len; i++)
temp[i] = 278.15 + (i * 0.001);

return temp;
}

void handle_error(int status)
{
if (status != NC_NOERR) {
fprintf(stderr, "%s\n", ncmpi_strerror(status));
exit(1);
}
return;
}

int main(int argc, char** argv)
{
int rank, size, namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int ncid, lonid, latid, timeid, Tid, Tdimids[3], len;
size_t counts[3], starts[3];
char *Tname = "temperature", *Tunits = "K";
double Trange[] = { 0.0, 500.0 }, *temp, *alltemps;
char *title = "Global surface temperature data", history[256], buf[26];
time_t tm;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_processor_name(processor_name, &namelen);
printf("Rank %d of %d started on %s\n", rank, size, processor_name);

/* Create the netCDF dataset and define everything */
handle_error(ncmpi_create(MPI_COMM_WORLD, "global_data_col.nc", 0,
MPI_INFO_NULL, &ncid));
ncmpi_def_dim(ncid, "lon", NLON, &lonid);
ncmpi_def_dim(ncid, "lat", NLAT, &latid);
ncmpi_def_dim(ncid, "time", NC_UNLIMITED, &timeid);
Tdimids[0] = timeid, Tdimids[2] = lonid, Tdimids[1] = latid;
ncmpi_def_var(ncid, "T", NC_DOUBLE, 3, Tdimids, &Tid);
ncmpi_put_att_text(ncid, Tid, "long_name", strlen(Tname), Tname);
ncmpi_put_att_text(ncid, Tid, "units", strlen(Tunits), Tunits);
ncmpi_put_att_double(ncid, Tid, "valid_range", NC_DOUBLE, 2, Trange);
ncmpi_put_att_text(ncid, NC_GLOBAL, "title", strlen(title), title);
tm = time((time_t *)NULL);
(void)ctime_r(&tm, buf);
if (buf[24] = '\n') buf[24] = '\0';
sprintf(history, "Created %s", buf);
ncmpi_put_att_text(ncid, NC_GLOBAL, "history", strlen(history), history);
ncmpi_enddef(ncid);

len = NLON * NLAT / size;
/* Read or compute temperatures */
temp = make_temp(len);
/* Don't gather temperatures, just do a collective write */
starts[0] = starts[2] = 0;
starts[1] = NLAT / size * rank;
counts[0] = 1, counts[1] = NLAT / size, counts[2] = NLON;
handle_error(ncmpi_put_vara_all(ncid, Tid, starts, counts, temp, len, MPI_DOUBLE));
ncmpi_close(ncid);

free(temp);
MPI_Finalize();
}

Just as before, each process calls make_temp() to generate a subset of the global temperature data. But upon return, it isn't necessary for a single process to allocate more memory and gather the data from all other processes. Instead, the processes figure out where their data should start and how large it is (storing these values in the starts and counts vectors). Then all processes call ncmpi_put_vara_all() to store their data into the netCDF dataset. Next, all processes call ncmpi_close() to close the file, free() to free their subset of data, and MPI_Finalize() to stop MPI.

The ncmpi_put_vara_all() call is a flexible data mode function. It's passed not only the pointer to the buffer, but also the length and its MPI data type (MPI_DOUBLE). Similar high level data mode functions may also be used to store data.

Notice that the counts and starts vectors in collective.c are of type size_t as they were in sequential.c. Some of the parallel netCDF documentation states that these vectors should be of type MPI_Offset (which is long long) to support large files. However, in the version of the library presently available (0.9.3), these are still of type size_t.

Figure Two contains the results of compiling and running collective.c on two nodes. To ensure that the same results were obtained, the ncdump utility was used to convert the netCDF files into ASCII. Then diff was used to compare the two ASCII files. As can be seen in Figure Two, only the dataset names and creation times differ. Otherwise, the two files are identical.

Figure Two: the output of running Listing Two

[forrest@node01 pnetcdf]$ mpicc -O \
  -I/usr/local/parallel-netcdf-0.9.3/include -o    collective collective.c \
  -L/usr/local/parallel-netcdf-0.9.3/lib -lpnetcdf
[forrest@node01 pnetcdf]$ time mpirun -np 2 collective
Rank 0 of 2 started on node01.cluster.ornl.gov
Rank 1 of 2 started on node02.cluster.ornl.gov
real    0m0.336s
user    0m0.023s
sys     0m0.121s
[forrest@node01 pnetcdf]$ ncdump global_data_seq.nc > seq
[forrest@node01 pnetcdf]$ ncdump global_data_col.nc > col
[forrest@node01 pnetcdf]$ iff seq col
1c1
< netcdf global_data_seq {
---
> netcdf global_data_col {
14c14
<:history = "Created Tue Apr 13 01:32:12 2004" ;
---
>:history = "Created Tue Apr 13 01:34:48 2004" ;

While these example programs are too small for benchmarking performance, it's apparent that collective I/O offers better performance for large problems.

Moreover, the parallel netCDF library allows model developers to avoid allocating temporary memory and performing gather operations on big array just to store the data to disk.

While the documentation on the Argonne website is a bit sketchy at present and the library is not yet up to rev 1.0, you might try it out on your own applications. It could save you programming time as well as run time.

Report from the Front

At the recent ClusterWorld Conference and Expo -- held April 5-8 in San Jose -- the latest hardware and software products for Linux clusters were shown off by vendors and project leaders. In only its second year, ClusterWorld is attracting individuals leading the high performance computing revolution with Linux.

As a part of the conference, a variety of open source developers presented information about how their software can be used in a clustered environment. A number of presentations focused on grid computing, bioinformatics, digital content creation, automotive and aerospace engineering, finance, and petroleum and geophysical exploration applications using Linux clusters.

Presentations and tutorials covered building and using system monitoring and management tools like Ganglia and xCAT, installing a cluster using OSCAR, and getting started with the Globus Toolkit for grid applications. Rusty Lusk and Bill Gropp discussed their latest MPI-2 implementation, called MPICH2, which is in beta. It will soon be ready for production use.

Graham Fagg presented a talk on MPI and fault tolerance. The Portland Group discussed their compilers for the Intel Pentium 4 and the AMD Opteron, and an Etnus representative gave a presentation on debugging memory problems using TotalView.

Greg Lindahl, now with Pathscale, discussed benchmarks and the new Pathscale Opteron compilers.

Presentations also covered PVFS2, the second generation parallel virtual file system from Argonne National Laboratory and Clemson University, and Lustre, a parallel filesystem based on ext3.

These two parallel filesystem solutions, along with GFS (Global File System) from Red Hat, are implemented differently, but attempt to provide high performance I/O in large, parallel compute environments.

It will be very interesting to see how these competing solutions evolve in the coming years. All are available in some form at present so you can see if any one of them improves I/O performance for your parallel applications now.

Interconnects were another hot topic. Myricom Myrinet, Quadrics QsNetII, and Dolphin SCI were well represented at the Expo.

However, the latest kid on the block is Infiniband. [See the story "Pipe Dreams" at http://www.linux-mag.com/2003-07/infiniband_01.html for information on Infiniband.]

This interconnect solution is based on open standards instead of proprietary hardware, offers very high bandwidth, and is finally available from vendors like Voltaire and others.

Cluster vendors are already offering Infiniband solutions to their customers. The evolution of interconnects is another important area to watch in the coming months and years.

The venerable Thomas Sterling gave one of the many keynote speeches, and Jon "maddog" Hall organized a panel discussion on the future of Linux cluster distributions.

ClusterWorld should continue to grow as Linux clusters gain popularity in both high performance computing and enterprise IT environments.


Forrest Hoffman is a computer modeling and simulation researcher at Oak Ridge National Laboratory. He can be reached at forrest@climate.ornl.gov. You can download this month's code from http://www.linuxmagazine.com/downloads/2004-07/extreme.


Linux Magazine July 2004
Copyright Linux Magazine ©2004