多线程编程实践:覆盖 80% 的应用场景#
1. 线程创建与销毁(2 个)#
pthread_create()#
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg);
功能:创建新线程
特点:
新线程立即开始执行
start_routinearg传递给线程函数的参数线程属性
attr通常设为 NULL 使用默认值
示例:
void* thread_func(void* arg) {
int* num = (int*)arg;
printf("Thread received: %d\n", *num);
return NULL;
}
int main() {
pthread_t tid;
int value = 42;
pthread_create(&tid, NULL, thread_func, &value);
// ...
}
pthread_join()#
int pthread_join(pthread_t thread, void **retval);
功能:等待线程结束并回收资源
特点:
阻塞调用线程直到目标线程结束
可以获取线程的返回值
必须调用,否则可能产生僵尸线程
注意事项:
每个线程只能被 join 一次
不 join 会导致内存泄漏
分离线程不需要 join
示例:
void* thread_func(void* arg) {
int* result = malloc(sizeof(int));
*result = 100;
return result;
}
int main() {
pthread_t tid;
pthread_create(&tid, NULL, thread_func, NULL);
void* retval;
pthread_join(tid, &retval);
printf("Thread returned: %d\n", *(int*)retval);
free(retval);
}
2. 互斥锁操作(4 个)#
pthread_mutex_init()#
int pthread_mutex_init(pthread_mutex_t *mutex,
const pthread_mutexattr_t *attr);
功能:初始化互斥锁
特点:
静态初始化可用
PTHREAD_MUTEX_INITIALIZER属性
attr通常设为 NULL
pthread_mutex_destroy()#
int pthread_mutex_destroy(pthread_mutex_t *mutex);
功能:销毁互斥锁
注意事项:
确保没有线程持有锁
不能销毁已初始化的锁再次使用
pthread_mutex_lock() / pthread_mutex_unlock()#
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
功能:加锁和解锁
示例:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int shared_data = 0;
void* increment(void* arg) {
for (int i = 0; i < 100000; i++) {
pthread_mutex_lock(&mutex);
shared_data++;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
注意事项:
锁顺序:避免死锁,按固定顺序加锁
锁粒度:锁的粒度要合适,太细增加开销,太粗降低并发
RAII 模式:C++ 中使用
std::lock_guard
3. 条件变量操作(5 个)#
pthread_cond_init() / pthread_cond_destroy()#
int pthread_cond_init(pthread_cond_t *cond,
const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
功能:初始化/销毁条件变量
特点:
静态初始化可用
PTHREAD_COND_INITIALIZER
pthread_cond_wait()#
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
功能:等待条件变量
特点:
原子地释放锁并进入等待
被唤醒时重新获得锁
pthread_cond_signal() / pthread_cond_broadcast()#
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
功能:唤醒等待的线程
区别:
signal唤醒至少一个线程broadcast唤醒所有等待线程
经典生产者-消费者示例:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
queue_t queue;
// 生产者
void producer() {
pthread_mutex_lock(&mutex);
// 生产数据
queue_push(&queue, data);
pthread_cond_signal(&cond); // 通知消费者
pthread_mutex_unlock(&mutex);
}
// 消费者
void consumer() {
pthread_mutex_lock(&mutex);
while (queue_empty(&queue)) { // 必须用 while,防止虚假唤醒
pthread_cond_wait(&cond, &mutex);
}
data = queue_pop(&queue);
pthread_mutex_unlock(&mutex);
}
关键注意事项总结#
1. 虚假唤醒(Spurious Wakeup)#
条件变量等待必须使用 while 循环检查条件:
// 错误:if 判断
if (condition_is_false) {
pthread_cond_wait(&cond, &mutex);
}
// 正确:while 循环
while (condition_is_false) {
pthread_cond_wait(&cond, &mutex);
}
2. 资源管理#
动态初始化的 mutex/cond 必须销毁
使用 RAII(Resource Acquisition Is Initialization)模式
3. 错误处理#
所有 pthread 函数都有返回值,生产代码应该检查:
int ret = pthread_create(&tid, NULL, func, NULL);
if (ret != 0) {
// 错误处理
}
4. 线程安全设计原则#
共享数据最小化:减少需要锁保护的数据
锁的持有时间尽量短
避免在持有锁时调用未知函数(可能造成死锁)
使用更高级的同步原语(如信号量、读写锁)时需谨慎
实际工程建议#
在实际 C++ 项目中,更推荐使用 C++11 的 <thread>、<mutex>、<condition_variable>,它们提供了更好的类型安全和 RAII 支持。POSIX thread 函数是底层基础,理解它们有助于深入理解多线程编程的本质。
这 11 个函数确实是多线程编程的基石,掌握它们就能解决大部分线程同步问题。muduo 网络库正是基于这些基本原语构建了高效的多线程架构。
核心 POSIX 线程函数总结#
类别 |
函数 |
功能描述 |
关键注意事项 |
|---|---|---|---|
线程创建与销毁 |
|
创建新线程并开始执行 |
传递参数给线程函数;线程属性通常为 NULL |
|
等待线程结束并回收资源 |
避免僵尸线程;获取返回值;每个线程只能 join 一次 |
|
互斥锁操作 |
|
初始化互斥锁 |
可使用 |
|
销毁互斥锁 |
确保无线程持有锁;不可重复销毁 |
|
|
对互斥锁加锁 |
注意锁顺序以避免死锁 |
|
|
对互斥锁解锁 |
确保与加锁配对使用 |
|
条件变量操作 |
|
初始化条件变量 |
可使用 |
|
销毁条件变量 |
确保无线程等待 |
|
|
等待条件变量 |
必须与互斥锁配合使用;总是用 while 循环检查条件 |
|
|
唤醒至少一个等待线程 |
通常在持有互斥锁时调用 |
|
|
唤醒所有等待线程 |
唤醒多个等待者时使用 |
示例#
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
// 定义线程参数结构体
typedef struct {
int fd; // 文件描述符
char* buffer; // 缓冲区指针
int size; // 缓冲区大小
volatile int running; // 运行状态标志(volatile 确保可见性)
pthread_mutex_t mutex; // 互斥锁保护共享数据
pthread_cond_t cond; // 条件变量用于线程同步
} thread_args_t;
// 线程函数:使用 while 循环检查条件,避免虚假唤醒
void* uart_thread_func(void* arg) {
thread_args_t* args = (thread_args_t*)arg;
printf("UART thread started (fd: %d)\n", args->fd);
while (1) {
pthread_mutex_lock(&args->mutex);
// 检查停止条件(使用 while 循环防止虚假唤醒)
while (args->running == 0) {
printf("Thread exiting...\n");
pthread_mutex_unlock(&args->mutex);
pthread_exit(NULL);
}
pthread_mutex_unlock(&args->mutex);
// 读取数据
int ret = read(args->fd, args->buffer, args->size - 1);
if (ret > 0) {
// 成功读取数据
args->buffer[ret] = '\0'; // 确保字符串结束
pthread_mutex_lock(&args->mutex);
printf("UART received (%d bytes): %s\n", ret, args->buffer);
pthread_mutex_unlock(&args->mutex);
// 通知主线程数据已就绪(如果有需要)
pthread_cond_signal(&args->cond);
} else if (ret == 0) {
// EOF,设备可能断开
printf("UART device closed or EOF reached\n");
usleep(100000); // 100ms 延时
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 非阻塞读取,没有数据可用
usleep(50000); // 50ms 延时,避免忙等待
} else {
// 读取错误
perror("UART read error");
usleep(100000); // 100ms 延时后重试
}
}
}
return NULL;
}
// 初始化 UART 接收线程(使用 RAII 思想进行资源管理)
int uart_recv_thread_init(int fd, char* buffer, int size, thread_args_t** args_ptr) {
if (fd < 0 || buffer == NULL || size <= 0) {
fprintf(stderr, "Invalid parameters\n");
return -1;
}
// 分配参数内存
thread_args_t* args = (thread_args_t*)calloc(1, sizeof(thread_args_t));
if (!args) {
perror("malloc failed");
return -1;
}
// 初始化参数
args->fd = fd;
args->buffer = buffer;
args->size = size;
args->running = 1;
// 初始化互斥锁和条件变量
int ret = pthread_mutex_init(&args->mutex, NULL);
if (ret != 0) {
fprintf(stderr, "pthread_mutex_init failed: %s\n", strerror(ret));
free(args);
return -1;
}
ret = pthread_cond_init(&args->cond, NULL);
if (ret != 0) {
fprintf(stderr, "pthread_cond_init failed: %s\n", strerror(ret));
pthread_mutex_destroy(&args->mutex);
free(args);
return -1;
}
// 创建线程
pthread_t thread;
ret = pthread_create(&thread, NULL, uart_thread_func, (void*)args);
if (ret != 0) {
fprintf(stderr, "pthread_create failed: %s\n", strerror(ret));
pthread_cond_destroy(&args->cond);
pthread_mutex_destroy(&args->mutex);
free(args);
return -1;
}
// 设置线程为分离状态(自动回收资源)
ret = pthread_detach(thread);
if (ret != 0) {
fprintf(stderr, "pthread_detach failed: %s\n", strerror(ret));
// 继续执行,因为线程已创建成功
}
// 返回参数指针供外部使用
if (args_ptr) {
*args_ptr = args;
}
printf("UART receive thread created successfully\n");
return 0;
}
// 线程清理函数(安全的资源释放)
void uart_recv_thread_cleanup(thread_args_t* args) {
if (!args) {
return;
}
printf("Cleaning up UART thread...\n");
// 首先停止线程运行
pthread_mutex_lock(&args->mutex);
args->running = 0;
pthread_mutex_unlock(&args->mutex);
// 发送条件信号,唤醒可能正在等待的线程
pthread_cond_signal(&args->cond);
// 等待一小段时间让线程退出(实际应用中可能需要更精确的同步)
usleep(100000); // 100ms
// 销毁同步原语
pthread_cond_destroy(&args->cond);
pthread_mutex_destroy(&args->mutex);
// 释放内存
free(args);
printf("UART thread cleanup completed\n");
}
// 等待数据就绪的辅助函数
int wait_for_data(thread_args_t* args, int timeout_ms) {
if (!args) {
return -1;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout_ms / 1000;
ts.tv_nsec += (timeout_ms % 1000) * 1000000;
pthread_mutex_lock(&args->mutex);
int ret = pthread_cond_timedwait(&args->cond, &args->mutex, &ts);
pthread_mutex_unlock(&args->mutex);
return ret;
}
// 主函数示例
int main() {
// 打开串口设备(示例)
int fd = open("/dev/ttyUSB0", O_RDWR | O_NOCTTY | O_NONBLOCK);
if (fd < 0) {
// 尝试备用设备
fd = open("/dev/ttyS0", O_RDWR | O_NOCTTY | O_NONBLOCK);
if (fd < 0) {
perror("Failed to open UART device");
return -1;
}
}
// 配置串口参数(实际应用中需要根据硬件配置)
// struct termios options;
// tcgetattr(fd, &options);
// cfsetispeed(&options, B9600);
// cfsetospeed(&options, B9600);
// options.c_cflag &= ~PARENB; // 无奇偶校验
// options.c_cflag &= ~CSTOPB; // 1位停止位
// options.c_cflag &= ~CSIZE;
// options.c_cflag |= CS8; // 8位数据位
// tcsetattr(fd, TCSANOW, &options);
char buffer[1024] = {0};
thread_args_t* thread_args = NULL;
// 初始化接收线程
if (uart_recv_thread_init(fd, buffer, sizeof(buffer), &thread_args) != 0) {
close(fd);
return -1;
}
printf("Main thread running. Press Enter to exit...\n");
// 主线程工作循环
for (int i = 0; i < 10; i++) {
printf("Main thread working... (%d/10)\n", i + 1);
sleep(1);
// 可以在这里等待数据或进行其他处理
// if (wait_for_data(thread_args, 500) == 0) {
// printf("Data received in main thread\n");
// }
}
printf("Press Enter to clean up and exit...\n");
getchar();
// 清理资源
uart_recv_thread_cleanup(thread_args);
close(fd);
printf("Program exited successfully\n");
return 0;
}