|
EXTREME
LINUX
Parallel
netCDF
by Forrest Hoffman
As clusters grow and computational
applications
scale to more and more processors, input and output (I/O) frequently
becomes a performance-limiting bottleneck. While hardware manufacturers
continue to improve I/O bandwidth between memory and disk, the
distributed memory environment of Linux clusters poses unique
challenges for obtaining high performance I/O.
Parallel and
cluster file systems attempt to provide the needed scalability by
distributing file data on disks around the cluster. In addition,
parallel I/O is accessible to applications through MPI-IO, the
I/O layer specified in the MPI-2
standard. However, most model developers want a high-level interface
that can deal with the details of collective I/O and distributed file
systems no matter what they are.
Parallel file systems are finally coming of age, and
MPI-IO is available in both MPICH and LAM/MPI, the two
most popular MPI implementations for Linux clusters. [The Parallel
Virtual File System (PVFS) and rudimentary MPI-IO use were first
discussed in this column in July and August of 2002, available online
at http://www.linuxmagazine.com/2002-07/extreme_01.html.]
Now, PVFS2 is also available for testing, and the ROMIO
implementation of MPI-IO is available in MPICH2 (see sidebar).
Other file system solutions are also available, including Lustre,
the Global File System (GFS) from Red Hat, and CXFS
from Silicon Graphics (SGI).
In
general, you won't want to use a programming interface to any one of
these file systems since it ties your application to a specific file
system for I/O. Instead, MPI-IO should be used to access these file
systems, because the MPI-IO implementation provides native access to
the file systems anyway. Using MPI-IO makes applications portable to
any kind of parallel environment, while providing at least rudimentary
parallel I/O.
In many cases, the model developer may prefer to
perform I/O using an interface developed specifically for the file
format in common use. These domain-specific file programming interfaces
are typically written for sequential I/O, often for use only in serial
applications.
What's needed is a parallel implementation of
such file interfaces, preferably layered on top of MPI-IO so that they
can take advantage of its collective operations and its interfaces to
parallel, cluster, and global file systems.
Enter parallel netCDF
and parallel HDF5, versions of the network Common Data Form
and the Hierarchical Data Format, respectively. netCDF
was developed by the University Corporation for Atmospheric Research
(UCAR) and is maintained by its Unidata branch (http://www.unidata.ucar.edu/packages/netcdf).
netCDF
is used by atmospheric scientists and climate modelers. HDF, developed
by the National Center for Supercomputing Application (NCSA) at the
University of Illinois at Urbana-Champaign (http://hdf.ncsa.uiuc.edu), is
commonly used by the satellite and remote sensing communities.
Both HDF and netCDF
consist of application programming interfaces (APIs) and
self-describing file formats containing metadata and data all in a
single file. HDF files contain various objects or files within them
making it a more complex and potentially more powerful method for
storing data. netCDF,
on the other hand, is simpler. It contains a header where metadata is
stored and a subsequent data section containing the array-based data
frequently used by earth scientists and modelers.
Both formats are binary, yet are portable to any computer
platform. Only parallel netCDF
is described here.
The Standard netCDF Interface
The netCDF
interface and file format has been around for many years. netCDF
data is self-describing since information about the data is contained
in the same file, and over time, many conventions have developed
describing standard tags and attributes that should be contained in
netCDF
headers. netCDF
data is architecture-independent because data is represented in a
standard binary format called eXternal Data Representation, or
XDR.
The standard netCDF
API is available for AIX, HPUX, IRIX, Linux, Mac OS, OSF1, SunOS,
UNICOS, and various versions of Windows. It can be used
with C, C++, FORTRAN, FORTRAN-90, as well as other programming
languages.
Data can be accessed directly and efficiently through the
netCDF
API, and data can be appended to a netCDF
dataset at a later time. Normally, one process can write to a netCDF
file while multiple processes simultaneously read the same file.
However, for most parallel applications this is insufficient.
To use the standard netCDF
API in a parallel application, one must usually ship data to a single
process that performs all I/O operations. This is inefficient and
cumbersome, particularly if the machine on which the process runs has
insufficient memory to hold the otherwise-distributed data. In such a
case, data must be brought in a block at a time and written to disk,
thus slowing computational processing.
Listing One contains an example program called sequential.c
that demonstrates how data generated in parallel is typically written
to a netCDF
file using the standard interface. (As is usual for examples, very
little error checking is done for the sake of brevity.) This program
assumes it will be run on an even number of MPI processes so that data
is evenly divided among them.
Listing One: sequential.c
captures data that's generated in parallel
#include <stdio.h> #include <time.h> #include <strings.h> #include <netcdf.h> #include <mpi.h>
#define NLON 128 #define NLAT 64
double *make_temp(int len) { int i; double *temp;
if (!(temp = (double *)malloc(len * sizeof(double)))) { perror("temp"); } for (i = 0; i < len; i++) temp[i] = 278.15 + (i * 0.001);
return temp; }
void handle_error(int status) { if (status != NC_NOERR) { fprintf(stderr, "%s\n", nc_strerror(status)); exit(1); } return; }
int main(int argc, char** argv) { int rank, size, namelen; char processor_name[MPI_MAX_PROCESSOR_NAME]; int ncid, lonid, latid, timeid, Tid, Tdimids[3], len; size_t counts[3], starts[3]; char *Tname = "temperature", *Tunits = "K"; double Trange[] = { 0.0, 500.0 }, *temp, *alltemps; char *title = "Global surface temperature data", history[256], buf[26]; time_t tm;
MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Get_processor_name(processor_name, &namelen); printf("Rank %d of %d started on %s\n", rank, size, processor_name);
if (!rank) { /* Create the netCDF dataset and define everything */ handle_error(nc_create("global_data_seq.nc", 0, &ncid)); nc_def_dim(ncid, "lon", NLON, &lonid); nc_def_dim(ncid, "lat", NLAT, &latid); nc_def_dim(ncid, "time", NC_UNLIMITED, &timeid); Tdimids[0] = timeid, Tdimids[2] = lonid, Tdimids[1] = latid; nc_def_var(ncid, "T", NC_DOUBLE, 3, Tdimids, &Tid); nc_put_att_text(ncid, Tid, "long_name", strlen(Tname), Tname); nc_put_att_text(ncid, Tid, "units", strlen(Tunits), Tunits); nc_put_att_double(ncid, Tid, "valid_range", NC_DOUBLE, 2, Trange); nc_put_att_text(ncid, NC_GLOBAL, "title", strlen(title), title); tm = time((time_t *)NULL); (void)ctime_r(&tm, buf); if (buf[24] = '\n') buf[24] = '\0'; sprintf(history, "Created %s", buf); nc_put_att_text(ncid, NC_GLOBAL, "history", strlen(history), history); nc_enddef(ncid); }
len = NLON * NLAT / size; /* Read or compute temperatures */ temp = make_temp(len); /* Gather temperatures to master process for writing */ if (!rank) { if (!(alltemps = (double *)malloc(NLON * NLAT * sizeof(double)))) perror("alltemps"); } if (MPI_Gather(temp, len, MPI_DOUBLE, alltemps, len, MPI_DOUBLE, 0, MPI_COMM_WORLD) != MPI_SUCCESS) fprintf(stderr, "%d: Error in MPI_Gather()\n", rank); if (!rank) { starts[0] = starts[1] = starts[2] = 0; counts[0] = 1, counts[1] = NLAT, counts[2] = NLON; handle_error(nc_put_vara_double(ncid, Tid, starts, counts, alltemps)); nc_close(ncid); free(alltemps); }
free(temp); MPI_Finalize(); }
|
In sequential.c., MPI is initialized using MPI_Init()
just inside main(). The process rank is collected using MPI_
Comm_rank(); the number of processes involved in the computation
is found using MPI_Comm_size(); and the processor name using MPI_Get_processor_name().
Then the first MPI process creates the netCDF
output file using nc_create() and defines all the dimensions,
variables, and attributes to be contained in the file. The call to nc_enddef()
moves the file from define mode to data mode.
At
this point, the vector lengths for each process are computed and
temperature vectors are constructed and filled in the call to make_temp().
Upon return from that call, the first process allocates memory to hold
the temperatures for the entire globe from all processes. Next, MPI_Gather()
is called to transfer the subsets of temperatures from all processes to
the first process. The first process sets start and count vectors and
calls nc_put_vara_double() with the alltemps buffer
containing all the temperatures to store the data into the netCDF
file. Then the first process closes the netCDF
file by calling nc_close() and frees the temporary global
temperature array alltemps. Finally, all processes free their
smaller vector of temperatures before calling MPI_Finalize()
and exiting.
Figure One shows the results of compiling and running
sequential.c with two processes using MPICH. The ncdump
program, a standard netCDF
utility, is used to view the contents of the netCDF
file. All the header information looks correct, and the data section
contains values that are expected.
Figure One: Running Listing One
[forrest@node01 pnetcdf]$ mpicc -O -o sequential sequential.c -lnetcdf
[forrest@node01 pnetcdf]$ time mpirun -np 2 sequential
Rank 0 of 2 started on node01.cluster.ornl.gov
Rank 1 of 2 started on node02.cluster.ornl.gov
real 0m0.706s
user 0m0.045s
sys 0m0.172s
[forrest@node01 pnetcdf]$ ncdump global_data_seq.nc | more netcdf global_data_seq { dimensions: lon = 128 ; lat = 64 ; time = UNLIMITED ; // (1 currently) variables: double T(time, lat, lon) ; T:long_name = "temperature" ; T:units = "K" ; T:valid_range = 0., 500. ;
// global attributes: :title = "Global surface temperature data" ; :history = "Created Tue Apr 13 01:32:12 2004" ; data:
T = 278.15, 278.151, 278.152, 278.153, 278.154, 278.155, 278.156, 278.157, 278.158, 278.159, 278.16, 278.161, 278.162, 278.163, 278.164, 278.165, 278.166, 278.167, 278.168, 278.169, 278.17, 278.171, 278.172, 278.173, ... 282.23, 282.231, 282.232, 282.233, 282.234, 282.235, 282.236, 282.237, 282.238, 282.239, 282.24, 282.241, 282.242, 282.243, 282.244, 282.245 ; }
|
A Parallel Implementation
While programs like sequential.c
work well for small numbers of nodes, they do not scale well. The
process performing I/O becomes a serious bottleneck, and memory
limitations can make the problem worse.
To solve these problems, researchers at Argonne National
Laboratory have developed a parallel implementation of the netCDF
API that retains complete file compatibility with the standard
(sequential) implementation (http://www-unix.mcs.anl.gov/parallel-netcdf/).
Since it's written on top of MPI-IO, this new parallel implementation
of the netCDF
library allows the developer to leverage the collective operations and
parallel file system access (if appropriate) in the MPI-IO layer.
Just as with the serial interface, netCDF
file access occurs in one of two modes: define mode for
manipulating the metadata header or declaring new variables, and data
mode
for reading and writing array data. The parallel implementation uses
many of the library calls familiar to developers; however, the prefix
to these function names is changed to ncmpi_ for C and nfmpi_
for Fortran to avoid name clashes.
In data mode, the parallel netCDF
implementation has two modes of operation: collective data mode
and independent data mode. In collective data mode (the
default), all processes must call the same function on the same netCDF
file handle at the same point in the code; however, different
parameters for these calls are acceptable. In independent data mode,
each process can perform different I/O operations independently. It's
expected that most developers will want to use collective data mode.
Two varieties of library calls are available for storing
data when in data mode: the high level data mode and the flexible
data mode. The high level data mode interface more closely
resembles the original netCDF
functions, but the flexible data mode
interfaces takes advantage of MPI functionality by providing better
handling of internal data. In both cases, the collective functions all
end in _all.
Listing Two contains a program called collective.c
that's very much like Listing One. However, in this program,
all the netCDF
calls have been replaced with matching parallel netCDF
calls, which begin with ncmpi_. In collective.c, all
MPI processes call the parallel netCDF
functions to define the dataset and store data into it. Since headers
are assumed to be small, a single process actually performs the I/O
operations in define mode once ncmpi_enddef() is called.
Listing Two: Capturing data in
parallel
#include <stdio.h> #include <time.h> #include <strings.h> #include <pnetcdf.h> #include <mpi.h>
#define NLON 128 #define NLAT 64
double *make_temp(int len) { int i; double *temp;
if (!(temp = (double *)malloc(len * sizeof(double)))) { perror("temp"); } for (i = 0; i < len; i++) temp[i] = 278.15 + (i * 0.001);
return temp; }
void handle_error(int status) { if (status != NC_NOERR) { fprintf(stderr, "%s\n", ncmpi_strerror(status)); exit(1); } return; }
int main(int argc, char** argv) { int rank, size, namelen; char processor_name[MPI_MAX_PROCESSOR_NAME]; int ncid, lonid, latid, timeid, Tid, Tdimids[3], len; size_t counts[3], starts[3]; char *Tname = "temperature", *Tunits = "K"; double Trange[] = { 0.0, 500.0 }, *temp, *alltemps; char *title = "Global surface temperature data", history[256], buf[26]; time_t tm;
MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Get_processor_name(processor_name, &namelen); printf("Rank %d of %d started on %s\n", rank, size, processor_name);
/* Create the netCDF dataset and define everything */ handle_error(ncmpi_create(MPI_COMM_WORLD, "global_data_col.nc", 0, MPI_INFO_NULL, &ncid)); ncmpi_def_dim(ncid, "lon", NLON, &lonid); ncmpi_def_dim(ncid, "lat", NLAT, &latid); ncmpi_def_dim(ncid, "time", NC_UNLIMITED, &timeid); Tdimids[0] = timeid, Tdimids[2] = lonid, Tdimids[1] = latid; ncmpi_def_var(ncid, "T", NC_DOUBLE, 3, Tdimids, &Tid); ncmpi_put_att_text(ncid, Tid, "long_name", strlen(Tname), Tname); ncmpi_put_att_text(ncid, Tid, "units", strlen(Tunits), Tunits); ncmpi_put_att_double(ncid, Tid, "valid_range", NC_DOUBLE, 2, Trange); ncmpi_put_att_text(ncid, NC_GLOBAL, "title", strlen(title), title); tm = time((time_t *)NULL); (void)ctime_r(&tm, buf); if (buf[24] = '\n') buf[24] = '\0'; sprintf(history, "Created %s", buf); ncmpi_put_att_text(ncid, NC_GLOBAL, "history", strlen(history), history); ncmpi_enddef(ncid);
len = NLON * NLAT / size; /* Read or compute temperatures */ temp = make_temp(len); /* Don't gather temperatures, just do a collective write */ starts[0] = starts[2] = 0; starts[1] = NLAT / size * rank; counts[0] = 1, counts[1] = NLAT / size, counts[2] = NLON; handle_error(ncmpi_put_vara_all(ncid, Tid, starts, counts, temp, len, MPI_DOUBLE)); ncmpi_close(ncid);
free(temp); MPI_Finalize(); }
|
Just as before, each process calls make_temp()
to generate a subset of the global temperature data. But upon return,
it isn't necessary for a single process to allocate more memory and
gather the data from all other processes. Instead, the processes figure
out where their data should start and how large it is (storing these
values in the starts and counts vectors). Then all
processes call ncmpi_put_vara_all() to store their data into
the netCDF
dataset. Next, all processes call ncmpi_close() to close the
file, free() to free their subset of data, and MPI_Finalize()
to stop MPI.
The ncmpi_put_vara_all() call is a flexible
data mode function. It's passed not only the pointer to the buffer,
but also the length and its MPI data type (MPI_DOUBLE).
Similar high level data mode functions may also be used to
store data.
Notice that the counts and starts
vectors in collective.c are of type size_t as they
were in sequential.c. Some of the parallel netCDF
documentation states that these vectors should be of type MPI_Offset
(which is long long) to support large files. However, in the
version of the library presently available (0.9.3), these are still of
type size_t.
Figure Two contains the results of compiling and
running collective.c on two nodes. To ensure that the same
results were obtained, the ncdump utility was used to convert
the netCDF
files into ASCII. Then diff was used to compare the two ASCII
files. As can be seen in Figure Two, only the dataset names and
creation times differ. Otherwise, the two files are identical.
Figure Two: the output of running
Listing Two
[forrest@node01 pnetcdf]$ mpicc -O \
-I/usr/local/parallel-netcdf-0.9.3/include -o collective collective.c \
-L/usr/local/parallel-netcdf-0.9.3/lib -lpnetcdf
[forrest@node01 pnetcdf]$ time mpirun -np 2 collective
Rank 0 of 2 started on node01.cluster.ornl.gov
Rank 1 of 2 started on node02.cluster.ornl.gov
real 0m0.336s
user 0m0.023s
sys 0m0.121s
[forrest@node01 pnetcdf]$ ncdump global_data_seq.nc > seq
[forrest@node01 pnetcdf]$ ncdump global_data_col.nc > col
[forrest@node01 pnetcdf]$ iff seq col
1c1
< netcdf global_data_seq { --- > netcdf global_data_col { 14c14 <:history = "Created Tue Apr 13 01:32:12 2004" ; --- >:history = "Created Tue Apr 13 01:34:48 2004" ;
|
While
these example programs are too small for benchmarking performance, it's
apparent that collective I/O offers better performance for large
problems.
Moreover, the parallel netCDF
library allows model developers to avoid allocating temporary memory
and performing gather operations on big array just to store the data to
disk.
While the documentation on the Argonne website is a
bit sketchy at present and the library is not yet up to rev 1.0, you
might try it out on your own applications. It could save you
programming time as well as run time.
At the recent ClusterWorld Conference and Expo -- held
April 5-8 in
San Jose -- the latest hardware and software products for Linux
clusters were shown off by vendors and project leaders. In only its
second year, ClusterWorld is attracting individuals leading the high
performance computing revolution with Linux.
As a part of the conference, a variety of open source
developers
presented information about how their software can be used in a
clustered environment. A number of presentations focused on grid
computing, bioinformatics, digital content creation, automotive and
aerospace engineering, finance, and petroleum and geophysical
exploration applications using Linux clusters.
Presentations and tutorials covered building and using
system monitoring and management tools like Ganglia and xCAT,
installing a cluster using OSCAR, and getting started with the Globus
Toolkit
for grid applications. Rusty Lusk and Bill Gropp discussed their latest
MPI-2 implementation, called MPICH2, which is in beta. It will soon be
ready for production use.
Graham Fagg presented a talk on MPI and fault
tolerance. The Portland Group discussed their compilers for the Intel Pentium
4 and the AMD Opteron, and an Etnus representative gave a
presentation on debugging memory problems using TotalView.
Greg Lindahl, now with Pathscale, discussed benchmarks
and the new Pathscale Opteron compilers.
Presentations also covered PVFS2, the second
generation
parallel virtual file system from Argonne National Laboratory and
Clemson University, and Lustre, a parallel filesystem based on ext3.
These two parallel filesystem solutions, along with GFS
(Global File System) from Red Hat, are implemented differently, but
attempt to provide high performance I/O in large, parallel compute
environments.
It will be very interesting to see how these competing
solutions
evolve in the coming years. All are available in some form at present
so you can see if any one of them improves I/O performance for your
parallel applications now.
Interconnects were another hot topic. Myricom Myrinet,
Quadrics QsNetII, and Dolphin SCI were well represented
at the Expo.
However, the latest kid on the block is Infiniband.
[See the story "Pipe Dreams" at http://www.linux-mag.com/2003-07/infiniband_01.html
for information on Infiniband.]
This interconnect solution is based on open standards
instead of
proprietary hardware, offers very high bandwidth, and is finally
available from vendors like Voltaire and others.
Cluster vendors are already offering Infiniband
solutions to their
customers. The evolution of interconnects is another important area to
watch in the coming months and years.
The venerable Thomas Sterling gave one of the many
keynote speeches,
and Jon "maddog" Hall organized a panel discussion on the future of
Linux cluster distributions.
ClusterWorld should continue to grow as Linux clusters
gain
popularity in both high performance computing and enterprise IT
environments.
|
Forrest Hoffman is a computer modeling and simulation
researcher at Oak Ridge National Laboratory. He can be reached at forrest@climate.ornl.gov.
You can download this month's code from http://www.linuxmagazine.com/downloads/2004-07/extreme.
|