
Introduction to UCX Network Programming
Network programming is always a hassle; on the bright side, as developers, we rarely get to write network programs ourselves. In most cases, network operations are abstracted in many API layers and presented as user-friendly APIs to the users.
Ethernet is the most widely used network in the data center today. Networks like Infiniband are popular in high-performance (HPC) clusters for scientific computing and Artificial Intelligence(AI). Ethernet is programmed with the socket programming APIs. Socket programming is more widely documented, and many libraries are available to hide the gory details.
Programming advanced networks like Infiniband is much more challenging because there is a need for more documentation regarding best practices and how to use them properly.
We discussed different networking libraries to program HPC networks in a previous article.
We will use a simple client-server program to review the basics of the UCX library. The program can be found in the Git Hub repository.
UCX Concepts
Now, let's go through some of the main concepts of UCX API.
UCP Context
ucp_context_h provides the primary abstraction to isolate resources used by an application. Every UCX program has this structure. UCX is configured using properties during context creation. The configuration values can be in a file, defined as environment variables, or can be passed programmatically.
Communication routines associated with the same context within the same application share the resources. Two communication routines related to different contexts within the same process cannot share the resources, providing a mechanism to isolate resources within the same process.
For example, in a computer with two network interface cards (NICs), if one part of an application needs to use one NIC and another part needs to use another NIC, we can create two contexts associated with each of the NICs.
Endpoint
An endpoint is the primary means of communication. An endpoint is an abstraction that represents a communication channel between two contexts. UCX can send messages utilizing multiple connections and network interfaces underneath the endpoint. The maxes for these are configured using lanes and rails properties.
Worker
A worker provides a threading point for progressing communications. A worker can be associated with multiple network interfaces to progress its operations. We must create numerous workers to progress network interfaces with multiple concurrent threads.
Message APIs
UCX provides the following types of messaging APIs.
- Stream-oriented send/receive operations for sending messages between connected endpoints.
- Tag-matched send/receive. Every message is associated with an integer ID, and receivers can use these tags to select messages to receive.
- Remote memory access for reading from or writing to remote memory.
- Remote atomic operations.
These APIs are available in both synchronous and asynchronous versions. In this example, we will use UCX's non-blocking tag APIs, which are among the most popular applications.
The Example Program
The program consists of three classes: a base class (UCXBase) that holds the standard structures for any UCX program and server (UCXServer) and client (UCXClient) classes.
The instructions for running the program are in the README.md.
Initializing UCX
Now, let's examine how to initialize the UCX library. To do so, we create the ucp_context_h and ucp_worker_h structures.
Here is how it is initialized in the sample program. We are requesting UCP_FEATURE_TAG when initializing the context.
void UCXBase::CreateUcpContext() {
ucp_params_t ucp_params;
memset(&ucp_params, 0, sizeof(ucp_params));
// the filed features is populated
ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;
// we are using the tag matching feature
ucp_params.features = UCP_FEATURE_TAG;
ucp_params.mt_workers_shared = 1;
// initialize the configuration first, this will read the configuration from the environment and use
// defaults for not specified configs
ucp_config_t *config;
ucs_status_t status = ucp_config_read(nullptr, nullptr, &config);
if (status != UCS_OK) {
throw std::runtime_error("UCX configuration failed");
}
// now initialize the context
status = ucp_init(&ucp_params, config, &ucp_context_);
ucp_config_release(config);
if (status != UCS_OK) {
throw std::runtime_error("UCX initialization failed");
}
}
Then, the worker is created.
void UCXBase::CreateUcpWorker() {
ucp_worker_params_t worker_params;
std::memset(&worker_params, 0, sizeof(worker_params));
// weather multiple threads can call ucx functions concurrently
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
worker_params.thread_mode = UCS_THREAD_MODE_MULTI;
ucs_status_t status = ucp_worker_create(ucp_context_, &worker_params, &ucp_worker_);
if (status != UCS_OK) {
throw std::runtime_error("UCX worker creation failed");
}
}
These are the two things we need in both client and server programs.
Initializing Server
Because UCX abstracts out many networks, it has an opaque mechanism for initializing the network endpoints. In socket programming, we create a server socket and listen to it for connections. Networks like Infiniband don’t have a concept like that and need out-of-band information exchange to initialize connections.
Usually, this network information is exchanged using an ethernet network with TCP. For simplicity, we will use the built-in UCX mechanism to establish the connections using TCP. We can also use a separate channel (apart from UCX) to exchange this information and create the endpoints.
void UCXServer::StartServer(const std::string &address_str, uint16_t server_port) {
struct sockaddr_storage listen_addr{};
ucp_listener_params_t params = {};
ucs_status_t status;
// prepare the socket address to listen for incoming connections
PrepareSocketAddress(&listen_addr, address_str.c_str(), server_port);
params.field_mask = UCP_LISTENER_PARAM_FIELD_SOCK_ADDR | UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
params.sockaddr.addr = (const struct sockaddr *) &listen_addr;
params.sockaddr.addrlen = sizeof(listen_addr);
params.conn_handler.cb = server_conn_handle_cb;
params.conn_handler.arg = this;
/* Create a listener on the server side to listen on the given address.*/
status = ucp_listener_create(ucp_worker_, ¶ms, &listener_);
if (status != UCS_OK) {
std::cerr << "Failed to listen " << ucs_status_string(status) << std::endl;
throw std::runtime_error("Failed to listen");
}
// wait for the client to connect, this program only accepts one connection
while (conn_request_ == nullptr) {
ucp_worker_progress(ucp_worker_);
}
// create the ep at the server side using the connection request
ucp_ep_params_t ep_params = {};
ep_params.field_mask = UCP_EP_PARAM_FIELD_ERR_HANDLER | UCP_EP_PARAM_FIELD_CONN_REQUEST;
ep_params.conn_request = conn_request_;
ep_params.err_handler.cb = err_cb;
ep_params.err_handler.arg = nullptr;
// create the endpoint on the server side
status = ucp_ep_create(ucp_worker_, &ep_params, &server_ep_);
if (status != UCS_OK) {
std::cerr << "Failed to create an endpoint on the server: " << ucs_status_string(status) << std::endl;
}
std::cout << "Connection established..." << std::endl;
}
Note: If other faster networks like Infiniband are available in the nodes, UCX will use TCP only to exchange bootstrapping information and use the quicker network for data transfers.
Initializing the Client
We create an endpoint pointing to the server address to initialize the client. The client does not need to wait for the connection to be established.
void UCXClient::Connect(const std::string &address_str, uint16_t server_port) {
ucp_ep_params_t ep_params;
struct sockaddr_storage connect_addr{};
ucs_status_t status;
SetSocketAddress(address_str.c_str(), &connect_addr, server_port);
ep_params.field_mask = UCP_EP_PARAM_FIELD_FLAGS | UCP_EP_PARAM_FIELD_SOCK_ADDR |
UCP_EP_PARAM_FIELD_ERR_HANDLER | UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE;
ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
ep_params.err_handler.cb = err_cb;
ep_params.err_handler.arg = nullptr;
ep_params.flags = UCP_EP_PARAMS_FLAGS_CLIENT_SERVER;
ep_params.sockaddr.addr = (struct sockaddr *) &connect_addr;
ep_params.sockaddr.addrlen = sizeof(connect_addr);
status = ucp_ep_create(ucp_worker_, &ep_params, &client_ep_);
if (status != UCS_OK) {
throw std::runtime_error("failed to create an endpoint");
}
}
Sending Messages
We are using the tag message API to send a message. For sending, we use a tag or 0. This API will return a pointer to an object, which we will use later to check the progress.
void *UCXClient::SendMessage(void *msg, size_t msg_length) {
ucp_request_param_t param = {};
param.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE;
param.datatype = ucp_dt_make_contig(1);
return ucp_tag_send_nbx(client_ep_, msg, msg_length, 0, ¶m);
}
Receiving Messages
We need to post a receive request and a tag mask to receive a message. In this case, we provided a tag mask that matches any message. Like the send API, this returns a pointer to an object we need to use to check the progress.
void *UCXServer::ReceiveMessage(void *msg, size_t msg_length) {
ucp_request_param_t param = {};
param.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE;
param.datatype = ucp_dt_make_contig(1);
return ucp_tag_recv_nbx(ucp_worker_, msg, msg_length, 0, 0xFFFFFFFFFFFFFFFFULL,
¶m);
}
Progressing
After posting the message for send or receive, we must progress the UCP worker until the pending operation is complete. Here, we first progress the worker until no more work is left and then check the individual request for completion.
ucs_status_t UCXBase::WaitForCompletion(void *request) {
ucs_status_t status;
/* if operation was completed immediately */
if (request == nullptr) {
return UCS_OK;
}
if (UCS_PTR_IS_ERR(request)) {
std::cout << "Error " << std::endl;
return UCS_PTR_STATUS(request);
}
// while there is work progress
while (true) {
unsigned int progress_made;
do {
progress_made = ucp_worker_progress(ucp_worker_);
} while (progress_made != 0);
status = ucp_request_check_status(request);
if (status == UCS_OK) {
ucp_request_free(request);
break;
} else if (status != UCS_INPROGRESS) {
throw std::runtime_error("Failed to process request");
}
}
return status;
}
Server Code
The server code is pretty simple. We start the server and wait until a connection is established. Then, we post the received request and wait for it to be completed.
UCXServer server;
// create a server and wait for client to connect
server.StartServer("localhost", 12353);
while (!server.IsClientConnected()) {
server.ProgressWorker();
}
// allocate a buffer for receiving data
std::array<uint32_t, 10> recv_data{0};
void *request = server.ReceiveMessage(recv_data.data(), 40);
server.WaitForCompletion(request);
Client Code
In the client code, we create and connect the client to the server. Then, we post a send request and wait for it to complete.
UCXClient client;
try {
client.Connect("localhost", 12353);
std::array<uint32_t, 10> send_data{};
for (uint16_t i = 0; i < 10; i++) {
send_data[i] = i + 10;
}
void *request = client.SendMessage(send_data.data(), 40);
client.WaitForCompletion(request);
} catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
}
Running the program
To run the first, we can create a conda environment with UCX installed and then use CMake to build it. After that, we can run the client and server on separate terminals.
git clone git@github.com:supunkamburugamuve/ucx_examples.git && cd ucx_examples.git
conda create -n ucx_env ucx cmake
conda activate ucx_env
mkdir build && cd build
cmake ..
make
# run the program
./ucx_server
# in another terminal after activating the env
./ucx_client
Summary
This article explored a simple UCX program that sends a single message between two workers. It looked at how to initialize UCX, establish connections, and send and receive a single message using the tag-matching API.
In follow-up articles, we will look at more advanced concepts, such as using GPUs, active messages, one-sided messages, and atomic operations.