Threads and Networking in C

As I stated in my Hello World post, I started this blog in order to increase my communications skills and be able to effectively show what I know, and what better way to do so than to make a study guide for my CS230 class?

What is a Thread?

Not as colorful as they are in the real world

Before we talk about threads we have to talk about something we should be familiar with: processes in C. Processes are ways to run the same program multiple times and make them to interesting things. They’re incredibly intensive to run because they end up cloning all the information to a child process, and can only use files or pipes to share data. What if there was a way to create a less intensive process and still be able to carry out multiple things?

The answer? Threads.

ProcesessesThreads
  • Clones the entire parent’s memoryspace
  • Threads have a direct parent and child
  • Shares the libraries, run-time heap, read-write data, and read-only code/data. Each thread is allocated it’s own stack
  • Threads are more like peers towards each other, no hierarchical structure.

Let’s look at the functions within pthread.h that allow it to work:

void *thread(void *vargp); //declare the function for the preprocessor

int main() {
  pthread_t tid; //declare a thread

  pthread_create(&tid, NULL, thread, NULL);
  pthread_join(tid, NULL);//waits for the thread to finish
  exit(0);
}
void *thread(void *vargp) {
  printf("Hello, world!\n"); 
  pthread_exit(pthread_self());
}

Unlike processes, you don’t fork a thread from a parent, rather a function is usually run separately from the main thread. In this case, we declare a thread called tid and then use pthread_create() to actually initialize the thread. The first argument in create will be the address of the thread we initialized, the second and fourth arguments are typically NULL, while the second argument will be function that will branch off and do a specific function. The function pthread_join() will wait for the thread to complete before it finishes the main thread. Finally, pthread_exit() will identify itself and finish off the thread, allowing for the main function to finish.

You may notice that some of the pthread functions seem to work in the same way as process functions, and that’s because they do! This makes it easy to implement processes since they are basically smaller versions of threads.

Thread Termination in Detail

Turns out that there are more options to end threads than we may have thought. Let’s think about these options:

  • Return result
    • If you return a result at the end of the function of a thread, it’ll automatically kill that thread.
  • pthread_exit() vs pthread_cancel()
    • Both functions work in the same way in that they will end the process, but the cancel function returns an error value if it joins another thread.
  • exit()
    • This function should be familiar to all students of C. It’ll cancel ALL threads and end the entire program.

Thread Synchronization

Threads are a great way to easily create multiple instances of functions, they’re fast, light-weight, and pretty easy to use. Although, one of the problems we may face with threads are race conditions, or basically, when threads try to access and change variables in an order we weren’t expecting.

Can we manage to synchronize our threads as well as these skaters have synced their routine?

Let’s define a little bit of vocab here. A global variable is a variable declared outside of a function, in memory there is exactly one instance of any global variable. A local variable is a variable declared within a function without the static attribute, and each thread stack contains one instance of each local variable. Take a look at the following example:

char **ptr; 
void *thread(void *vargp)
{
    int myid = (int)vargp;
    static int cnt = 0;
    
    printf("[%d]: %s (svar=%d)\n", 
         myid, ptr[myid], ++cnt);
}

int main()
{
    int i;
    pthread_t tid;
    char *msgs[2] = {
        "Hello from foo", "Hello from bar"
    };
    ptr = msgs;

    for (i = 0; i < 2; i++)
        pthread_create(&tid, NULL, thread, (void *)i);
    pthread_exit(NULL);
}

Here, char **ptr is a global variable since it is declared outside of both the main and thread functions. Within main, i and *msgs are both local variables since they are declared within the main function, so there is exactly one copy of those variables. Now, things get a little different looking at variables within the thread function. Since there are two threads from the main function, there are two instances of int myid— each belonging to one of the threads. Finally, the static int cnt variable would have one instance within memory, but is considered a local static variable.

Progress Graphs

What is this thing?

Ignore the picture for a minute. Synchronization is important in computing because if we have a global variable that is being adjusted but is read in another thread, the program may read a variable incorrectly. Progress graphs allow us to plot out when critical sections happen (areas of the code where reading and writing variables occur) and what is the best way to make sure two critical sections don’t occur at once (within an unsafe region).

  • H stands for the header region of a program in assembly
  • L is the load function of the variable being accessed
  • U is the update function of the variable
  • S is the store function
  • T is the tail of the assembly program

L, U, and S are considered critical regions of a program– that is where a thread is trying to access variables and actively update them. A program like the one below might look like it works fine, but will actually print out the incorrect result sometimes. Why is that? Where did it go wrong?

volatile int cnt = 0; /* global */
//use volatile to tell the compiler that multiple 
//threads will access this variable

/* Thread routine */
void *thread (void *vargp) {
  int i;
  int niters = *((int *)(vargp));

  for (i = 0; i < niters; i++) {
    cnt++;                   
  }
  return NULL;
}
int main (int argc, char **argv) {
  int niters = atoi(argv[1]);
  pthread_t tid1, tid2;

  pthread_create(&tid1, NULL,          
                 thread, &niters);
  pthread_create(&tid2, NULL, 
                 thread, &niters);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);

  /* Check result */
  if (cnt != (2 * niters)) {
    printf("BOOM! cnt=%d\n”, cnt);
  } else {
    printf("OK cnt=%d\n", cnt);
  }
  exit(0);
}

If this program works correctly, it should theoretically print out twice the original number, and in some cases it does. Let’s say you enter pass in the number 10,000, most likely you’ll get a cnt of 13051. Why is this number so off? In C, you don’t actually have control over how the processor will schedule certain threads. In the best case scenario, thread 1 will complete entirely, and then move on to thread 2 to give the correct result. But this isn’t guaranteed. Sometimes the processor will schedule to load the variable of thread 1, and then load the variable for thread 2. This causes thread 1 and 2 to load in the same number, update them, and load them back into the original program, basically showing no progress. How can we fix this code?

Semaphores and Mutexes

A semaphore is an integer-like function that allows for synchronization of threads. They work by decremented the semaphore, which basically “grabs the resource” available and tells every other thread that it is being used, and by increment the semaphore, which would give back the resource to allow another thread to use it. If you surround critical sections of code that read and update variables (the cnt++ command in the thread function) it will slow down the processes by allowing only one thread to occur at a time. So let’s try to fix the code above.


// Thread routine prototype.
void *thread(void *vargp);

// Global shared variable.
volatile int cnt = 0;

sem_t mutex;//declare semaphore

int main(int argc, char **argv) {
  // The number of iterations.
  int niters;
  sem_init(&mutex, 0, 1);//initialize binary semaphore

  // We are creating two threads, so we create two thread ids.
  pthread_t tid1, tid2;

  // Check input arguments.
  if (argc != 2) {
    printf("usage: %s <niters>\n", argv[0]);
    exit(0);
  }

  // Convert the string form of the number of iterations argument to an int.
  niters = atoi(argv[1]);

  // Create threads and wait for them to finish.
  pthread_create(&tid1, NULL, thread, &niters);
  pthread_create(&tid2, NULL, thread, &niters);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);

  // Check the result.
  if (cnt != (2 * niters)) {
    printf("BOOM! cnt=%d\n", cnt);
    printf("It should have been %d\n", (2 * niters));
    printf("A difference of %d. Yikes!\n", (2 * niters) - cnt);
  } else {
    printf("OK cnt=%d\n", cnt);
  }
  exit(0);
}

// The thread routine.
// A thread will iterate from 0 to niters - 1 incrementing a globally
// shared count variable. Eventually this causes a problem :-(.
void *thread(void *vargp) {
  // Cast and dereference the void *.
  int niters = *((int *)(vargp));
  int i;

  for (i = 0; i < niters; i++) {
	sem_wait(&mutex);//tell the processor that we are processing one thread currently
    cnt++;
	sem_post(&mutex);//tell the processor we are finished
  }

  return NULL;
}

The updated code uses sem_t to declare the function, and sem_init() to initialize it. The 0 within the init function indicates that we want this function to be shared between threads, not processes, and the 1 is the value of threads we can process at a time. In this case since it’s 1, only one thread can be processed before another one is. In the thread function, we surround cnt++ with two semaphore operations, because this is the critical section of our code that could break. sem_wait(&mutex) will decrement that 1 value we set earlier and will indicate that it can’t handle any more threads. It will then increment cnt, and then the sem_post(&mutex) will increment that value in the semaphore the let the processor know that it can handle another thread. If you run this code, you’ll find that the race condition will never occur, and the result will always be twice the original number.

Inter Process Communication

Pipes

Up to this point you may have heard about pipes. They’re a form of IPC that allows for the output of one process to be used as the input to another process. Pipes are typically unidirectional, and have a limited capacity– meaning they have a buffer maintained in kernel memory. If this capacity is full, the write end of the pipe will block until the reader removes some data from the pipe. Take a look at the following code:


int child_task(int read_pipe) {
  char to_msg[25];
  int nr;

  if ((nr = read(read_pipe, to_msg, 25)) == -1) {
    perror("child failed to read from pipe");
    return 1;
  }

  printf("child received message from parent.\n");
  printf("bytes: %d\n", nr);
  printf("message received: %s\n", to_msg);

  return 0;
}

int parent_task(int write_pipe) {
  int nw;
  char from_msg[] = "hello from parent.";

  if ((nw = write(write_pipe, from_msg, strlen(from_msg) + 1)) == -1) {
    perror("parent failed to write to pipe");
    exit(1);
  }

  printf("parent sent message to child.\n");
  printf("message sent: %s.\n", from_msg);
  printf("bytes written: %d\n", nw);

  return 0;
}

int main() {
  int parent_to_child_pipe[2];

  // Create the pipe
  if (pipe(parent_to_child_pipe) == -1) {
    perror("could not create pipe");
    exit(1);
  }

  pid_t pid;
  if ((pid = fork()) == 0) {
    // Child process...
    // First we close the write end of the pipe.
    close(parent_to_child_pipe[1]);
    exit(child_task(parent_to_child_pipe[0]));
  } else {
    // Parent process...
    // First, we close the read end of the pipe.
    close(parent_to_child_pipe[0]);
    exit(parent_task(parent_to_child_pipe[1]));

    // Wait on child process to complete
    wait(NULL);
    printf("parent process completing.\n");
  }
}

We start off by creating an int array that will contain the file descriptor write(0) and read(1) for the pipes. Using the pipe() function we attempt to create a pipe. We then fork our process to create a child and parent process that will be able to talk to each other. In the child process we make sure to close the write end of the pipe, and exit the process while calling the child_task() function. The child will then begin to read from the pipe. Meanwhile, the parent process closes the read end of the pipe, and calls on its parent_task() to write into that buffer. Now we should ask, is there a way to have the child respond to the parent? Yes, but it’ll require a little bit of work.

int child_task(int read_pipe, int write_pipe) {
  char to_msg[25];
  char from_msg[] = "hello from child";
  int nr, nw;

  if ((nr = read(read_pipe, to_msg, 25)) == -1) {
    perror("child failed to read from pipe");
    return 1;
  }

  printf("child received message from parent.\n");
  printf("bytes: %d\n", nr);
  printf("message received: %s\n", to_msg);


  if ((nw = write(write_pipe, from_msg, strlen(from_msg) + 1)) == -1) {
	  perror("child failed to write to pipe");
	  exit(1);
  }
  printf("child sent message to parent.\n");
  printf("message sent: %s.\n", from_msg);
  printf("bytes written: %d\n", nw);

  return 0;
}

int parent_task(int read_pipe, int write_pipe) {
  int nw, nr;
  char from_msg[] = "hello from parent.";
  char to_msg[25];

  if ((nw = write(write_pipe, from_msg, strlen(from_msg) + 1)) == -1) {
    perror("parent failed to write to pipe");
    exit(1);
  }

  printf("parent sent message to child.\n");
  printf("message sent: %s.\n", from_msg);
  printf("bytes written: %d\n", nw);

  if ((nr = read(read_pipe, to_msg, 25)) == -1) {
	  perror("parent failed to read from pipe");
  }

  printf("parent received message from child\n");
  printf("bytes: %d\n", nr);
  printf("message received: %s\n", to_msg);

  return 0;
}

int main() {
  int parent_to_child_pipe[2];

  // Create the pipe
  if (pipe(parent_to_child_pipe) == -1) {
    perror("could not create pipe");
    exit(1);
  }

  pid_t pid;
  if ((pid = fork()) == 0) {
    // Child process...
    exit(child_task(parent_to_child_pipe[0], parent_to_child_pipe[1]));
  } else {
    // Parent process...
    exit(parent_task(parent_to_child_pipe[0], parent_to_child_pipe[1]));

    // Wait on child process to complete
    wait(NULL);
    printf("parent process completing.\n");
  }
}

As you go through the code there should only be some slight, but familiar changes. The first change is that in the main function the close functions were removed because both the parent and child are writing and reading to the same pipe. In child_task an extra block of code was added to read from the pipe, and vice versa for parent_task.

I think at this point it’s important for me to relay the reason why we do generally close file descriptors when they aren’t being used. If the reading process doesn’t close the write end of the pipe, then after the other process closes its write descriptor, the reader won’t see the end-of-file mark, even after it has read all the data from the pipe. This occurs because a read() would block waiting for data, because the kernel knows that there is still at least one write descriptor open for the pipe. If you try to write to a pipe where there is no read process, a SIGPIPE signal will be sent killing the process unless we provide a handler

popen() and pclose()

popen() is a function that takes in a shell command and a read or write variable. It will end up opening another shell and executing that command. For example, popen("ls", "r") would open a shell process and call ls, and uses the pipe created to read or write into it, so in this case read.

When the mode is r, the command is fed into the read section of the pipe, whereas when it is w, it is fed into the write section

Networking

The wonderful world of networking is more than just LinkedIn

Physical Media

I kind of have to rush here, but I’m going to assume a rudimentary understanding of networking that should be good enough. Let’s talk about the lowest levels of physical media required to make physical connections. First is Ethernet segments which are collections of hosts connected by wires to a hub. Their function is to send bits to any other hosts in chunks called frames. The hub will copy each bit from each port and transmit it to every other port. Every host can see every bit. These aren’t that common anymore because bridges(switches/router) became cheap enough to replace hubs, and the advantage of a bridge is that they don’t broadcast traffic to all hosts.

This shows an example of Ethernet segments

Above this level are bridged Ethernet segments. They typically span rooms, buildings, or a campus.The bridges learn which hosts are reachable from which ports and then selectively copy frames from port to port. The next level would be Internets which are multiple incompatible or distant LANs that can be physically connected by routers.

An example of a bridged ethernet segment
An example of an Internet, kept together by routers.

Protocols

Now that we have access to this infrastructure connecting computers together, let’s try to communicate using some commonly known protocols. Internet Protocol (IP) provides a naming scheme and delivery mechanism across the internet. An IP address are 32 bit numbers that represent the numbers in a dotted decimal notation. Unreliable Datagram Protocol (UDP) is an example of a connectionless protocol. Transmission Control Protocol (TCP) is a connection-oriented protocol that requires a handshake between the initiator and destination. TCP is more commonly used even though it’s slower because it guarantee the destination will receive the message. Many other protocols such as FTP are built on top of TCP to ensure a reliable connection.

Sockets

In C, the sockets.h library allows the user to interface with online servers and clients. It operates similarly to a pipe, but the fundamental difference is that a socket can interconnect two processes that execute on arbitrary machines, whereas a pipe is unidirectional and work on one kernel. Let’s explore how a client interacts with a server using the sockets API.

#define MAXLINE 8192 /* Max text line length */

int open_clientfd(char *hostname, int port) {
  // The client's socket file descriptor.
  int clientfd;

  // The hostent struct is used to get the IP address of the server
  // using DNS.
  struct hostent *hp;

  // serveraddr is used to record the server information (IP address
  // and port number).
  struct sockaddr_in serveraddr;

  printf("Echo Client is creating a socket.\n");

  // First, we create the socket file descriptor with the given
  // protocol and protocol family.
  if ((clientfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) return -1;

  // Query DNS for the host (server) information.
  if ((hp = gethostbyname(hostname)) == NULL) return -2;

  // The socket API requires that you zero out the bytes!
  bzero((char *)&serveraddr, sizeof(serveraddr));

  // Record the protocol family we are using to connect.
  serveraddr.sin_family = AF_INET;

  // Copy the IP address provided by DNS to our server address
  // structure.
  bcopy((char *)hp->h_addr_list[0], (char *)&serveraddr.sin_addr.s_addr,
        hp->h_length);

  // Convert the port from host byte order to network byte order and
  // store this in the server address structure.
  serveraddr.sin_port = htons(port);

  printf("Echo Client is trying to connect to %s (%s:%d).\n", hostname,
         inet_ntoa(serveraddr.sin_addr), port);

  // Establish a connection with the server.
  if (connect(clientfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0)
    return -1;

  printf("Echo Client connected.\n");

  // Return the connected file descriptor.
  return clientfd;
}//end of open_clientfd

int main(int argc, char **argv) {
  // The client socket file descriptor.
  int clientfd;

  // The port number.
  int port;

  // Variable to store the host/server domain name.
  char *host;

  // A buffer to receive data from the server.
  char buf[MAXLINE];

  // First, we check the program arguments:
  if (argc != 3) {
    fprintf(stderr, "usage: %s <host> <port>\n", argv[0]);
    exit(0);
  }

  // First argument is host, second is port:
  host = argv[1];
  port = atoi(argv[2]);

  // Open the client socket file descriptor given the host and port:
  clientfd = open_clientfd(host, port);

  // Print "type: " and fflush to stdout:
  printf("type: ");
  fflush(stdout);

  // Continue to read stdin into `buf` until C-d:
  while (fgets(buf, MAXLINE, stdin) != NULL) {
    // Remove the newline from fgets:
    buf[strcspn(buf, "\n")] = '\0';

    ssize_t ns = send(clientfd, buf, strlen(buf), 0);
    ssize_t nr = recv(clientfd, buf, MAXLINE, 0);

    // Add a newline back on:
    buf[strlen(buf) + 1] = '\0';
    buf[strlen(buf)] = '\n';

    // Display and read again:
    printf("echo (%u bytes): ", nr);
    fputs(buf, stdout);
    printf("type: ");
    fflush(stdout);
  }

  // Close the file descriptor:
  close(clientfd);
  exit(0);
}

To boil this down to the essentials, open_clientfd() is the function that tries to establish a connection to the server. First, we call socket() with the argument AF_INET which indicates we can interact with IPv4 addresses, and the SOCK_STREAM argument lets the socket know we want to use TCP (if we wanted to use UDP we would use SOCK_DGRAM). Later in the function we try to establish a connection with connect() by passing in the socket file descriptor; the serveraddr, which records the information about the network we’re connecting to, and its size. In the main function we send information to the client, and then receive it. Pretty simple, right? Now let’s look at the server this example communicates with.

#define MAXLINE 8192 /* Max text line length */
#define LISTENQ 1024 /* Second argument to listen() */

/* Destructively modify string to be upper case */
void upper_case(char *s) {
  while (*s) {
    *s = toupper(*s);
    s++;
  }
}

void echo(int connfd) {
  // Local variable declarations:
  size_t n;
  char buf[MAXLINE];

  // Keep reading lines until client closes connection:
  while ((n = recv(connfd, buf, MAXLINE, 0)) != 0) {
    printf("Echo Server received %u (%s) incoming bytes.\n", n, buf);
    printf("Echo Server is making those bytes upper case.\n");
    upper_case(buf);
    printf("Echo Server is sending %u bytes back to client (%s).\n", n, buf);
    send(connfd, buf, n, 0);
  }
}

int open_listenfd(int port) {
  int listenfd;    // the listening file descriptor
  int optval = 1;  // options for setsockopt

  struct sockaddr_in serveraddr;

  // Create a socket descriptor.
  if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) return -1;

  // Eliminates "Address already in use" error from bind.
  if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval,
                 sizeof(int)) < 0)
    return -1;

  /* listenfd will be an endpoint for all requests to port
     on any IP address for this host */

  // The socket API says that you need to zero out the bytes first!
  bzero((char *)&serveraddr, sizeof(serveraddr));

  // Set the AF_INET protocol family.
  serveraddr.sin_family = AF_INET;

  // Allow incoming connections from any IP address.
  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

  // Indicate the port number to "listen" on.
  serveraddr.sin_port = htons((unsigned short)port);

  // Bind the server information (options and port number) to the
  // listening socket file descriptor.
  if (bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0)
    return -1;

  // Make it a listening socket ready to accept connection requests.
  if (listen(listenfd, LISTENQ) < 0) return -1;

  // Return the prepared listen socket file descriptor.
  return listenfd;
}

int main(int argc, char **argv) {
  int listenfd;  // listening file descriptor
  int connfd;    // the connection file descriptor
  int port;      // the port number

  // clientaddr records the address information of the client program
  // that connects to this server.
  struct sockaddr_in clientaddr;

  // clientlen is used to record the length of the client adress. This
  // is necessary for calling the accept() function.
  socklen_t clientlen;

  // hp is used to record the client host information through DNS. We
  // will use this to lookup the host information using
  // gethostbyaddr().
  struct hostent *hp;

  // haddrp is used to remember the domain name of the host.
  char *haddrp;

  // client_port is the ephemeral port used by the client.
  unsigned short client_port;

  // First, check the command line arguments.
  if (argc != 2) {
    fprintf(stderr, "usage: %s <port>\n", argv[0]);
    exit(0);
  }

  // Next, convert the port number as a string to an integer.
  port = atoi(argv[1]);

  // Now, create a socket file descriptor to listen for incoming
  // connections.
  listenfd = open_listenfd(port);

  printf("Echo Server is listening on port %d.\n", port);

  // As is the case for all servers - run forever! It is only these
  // programs that you want to go into an infinte loop!
  while (1) {
    // Record the size of the clienaddr (struct sockaddr_in) structure.
    clientlen = sizeof(clientaddr);

    printf("Echo Server is accepting incoming connections on port %d.\n", port);

    // Block on accepting incoming connections. When we have an
    // incoming connection accept() will fill in the given sockaddr.
    //
    // So, why do we cast a sockaddr_in to a sockaddr:
    //
    // struct sockaddr {
    //   unsigned short    sa_family;    // address family, AF_xxx
    //   char              sa_data[14];  // 14 bytes of protocol address
    // };
    connfd = accept(listenfd, (struct sockaddr *)(&clientaddr), &clientlen);

    // determine the domain name and IP address of the client
    hp = gethostbyaddr((const char *)(&clientaddr.sin_addr.s_addr),
                       sizeof(clientaddr.sin_addr.s_addr), AF_INET);

    // Need to convert the network byte order IP address to dotted IP string.
    haddrp = inet_ntoa(clientaddr.sin_addr);

    // Convert the port number from network byte order to host byte order.
    client_port = ntohs(clientaddr.sin_port);

    // Print an information message.
    printf(
        "Echo Server received a connection to %s (%s).\n"
        "Echo Server is using port %u and client has an ephemeral port of "
        "%u.\n",
        hp->h_name, haddrp, port, client_port);

    // Service the connection.
    echo(connfd);

    printf("Echo Server is closing the connection on %s (%s).\n", hp->h_name,
           haddrp);

    // Close the connection.
    close(connfd);
  }

  exit(0);
}

In the function open_listenfd() we start off by creating the same socket we did in the client. Near the end we bind the server information to the listening socket file descriptor, and listen for any connection requests. Once we have a connection, we need to accept it, and once we’re done, close the file descriptor.

I have to go to CS230 for this exam, so this is pretty poorly put together, but I may update it with anything else if I feel the need to.

Leave a Reply

Your email address will not be published. Required fields are marked *