C语言的高并发案例
由于C语言的实现是最复杂的,先来看结构体设计和他的注释:
typedef struct {
char label[16];//消息内容
sem_t emptySem;//此信号量代表队列的可写状态
sem_t fullSem;//此信号量代表队列的可读状态
pthread_mutex_t queueMutex;//此互斥体为保证消息不会被误修改,保证线程程安全
int fullSlot;//队尾位置
int emptySlot;//队头位置
int queueSize;#队列长度
int numOfThreads;//同时操作的线程数量
pthread_t * qthread;//线程指针
SSchedMsg * queue;//队列指针
} SSchedQueue;
再来看Shceduler初始化函数,这里需要特别说明的是,两个信号量的创建,其中emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度,fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读。具体代码及我的注释如下:
void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {
pthread_attr_t attr;
SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
memset(pSched, 0, sizeof(SSchedQueue));
pSched->queueSize = queueSize;
pSched->numOfThreads = numOfThreads;
strcpy(pSched->label, label);
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
//emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度。
if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
//fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读
if (sem_init(&pSched->fullSem, 0, 0) != 0) {
pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
goto _error;
}
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
pSched->fullSlot = 0;//实始化时队列为空,故队头和队尾的位置都是0
pSched->emptySlot = 0;//实始化时队列为空,故队头和队尾的位置都是0
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
goto _error;
}
pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
return (void *)pSched;
_error:
taosCleanUpScheduler(pSched);
return NULL;
}
再来看读消息的taosProcessSchedQueue函数这其实是消费者一方的实现,这个函数的主要逻辑是
1.使用无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续向下处理
2.在操作msg前,加入互斥体防止msg被误用。
3.读操作完毕后修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。同时退出互斥体。
4.对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6
具体代码及注释如下:
void *taosProcessSchedQueue(void *param) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
//注意这里是个无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续处理
while (1) {
if (sem_wait(&pSched->fullSem) != 0) {
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
continue;
}
}
//加入互斥体防止msg被误用。
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
msg = pSched->queue[pSched->fullSlot];
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
//读取完毕修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
//读取完毕修改退出互斥体
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno));
//读取完毕对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6
if (sem_post(&pSched->emptySem) != 0)
pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno));
if (msg.fp)
(*(msg.fp))(&msg);
else if (msg.tfp)
(*(msg.tfp))(msg.ahandle, msg.thandle);
}
}
最后写消息的taosScheduleTask函数也就是生产的实现,其基本逻辑是
1.写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。
2.加入互斥体防止msg被误操作,写入完成后退出互斥体
3.写队列完成后对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就继续向下。
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
if (pSched == NULL) {
pError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
}
//在写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。
if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
//加入互斥体防止msg被误操作
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
//在写队列前先对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取函数可以进行处理。
if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
return 0;
}
Java的高并发实现
从并发模型来看,Go和Rust都有channel这个概念,也都是通过Channel来实现线(协)程间的同步,由于channel带有读写状态且保证数据顺序,而且channel的封装程度和效率明显可以做的更高,因此Go和Rust官方都会建议使用channel(通信)来共享内存,而不是使用共享内存来通信。
为了让帮助大家找到区别,我们先以Java为例来,看一下没有channel的高级语言Java,生产者消费者该如何实现,代码及注释如下:
public class Storage {
// 仓库最大存储量
private final int MAX_SIZE = 10;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的信号量
private final Condition full = lock.newCondition();
// 仓库空的信号量
private final Condition empty = lock.newCondition();
public void produce()
{
// 获得锁
lock.lock();
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
empty.signalAll();
lock.unlock();
}
public void consume()
{
// 获得锁
lock.lock();
while (list.size() == 0) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
full.signalAll();
lock.unlock();
}
}
在Java、C#这种面向对象,但是没有channel语言中,生产者、消费者模式至少要借助一个lock和两个信号量共同完成。其中锁的作用是保证同是时间,仓库中只有一个用户进行数据的修改,而还需要表示仓库满的信号量,一旦达到仓库满的情况则将此信号量置为阻塞状态,从而阻止其它生产者再向仓库运商品了,反之仓库空的信号量也是一样,一旦仓库空了,也要阻其它消费者再前来消费了。
Go的高并发实现
我们刚刚也介绍过了Go语言中官方推荐使用channel来实现协程间通信,所以不需要再添加lock和信号量就能实现模式了,以下代码中我们通过子goroutine完成了生产者的功能,在在另一个子goroutine中实现了消费者的功能,注意要阻塞主goroutine以确保子goroutine能够执行,从而轻而易举的就这完成了生产者消费者模式。下面我们就通过具体实践中来看一下生产者消费者模型的实现。
package main
import (
"fmt"
"time"
)
func Product(ch chan<- int) { //生产者
for i := 0; i < 3; i++ {
fmt.Println("Product produceed", i)
ch <- i //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.
}
}
func Consumer(ch <-chan int) {
for i := 0; i < 3; i++ {
j := <-ch //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.
fmt.Println("Consmuer consumed ", j)
}
}
func main() {
ch := make(chan int)
go Product(ch)//注意生产者与消费者放在不同goroutine中
go Consumer(ch)//注意生产者与消费者放在不同goroutine中
time.Sleep(time.Second * 1)//防止主goroutine退出
/*运行结果并不确定,可能为
Product produceed 0
Product produceed 1
Consmuer consumed 0
Consmuer consumed 1
Product produceed 2
Consmuer consumed 2
*/
}
可以看到和Java比起来使用GO来实现并发式的生产者消费者模式的确是更为清爽了。
Rust的高并发实现
不得不说Rust的难度实在太高了,虽然笔者之前在汇编、C、Java等方面的经验可以帮助我快速掌握Go语言。但是假期看了两天Rust真想大呼告辞,这尼玛也太劝退了。在Rust官方提供的功能中,其实并不包括多生产者、多消费者的channel,std:sync空间下只有一个多生产者单消费者(mpsc)的channel。其样例实现如下:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
let tx2 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1"),
String::from("3"),
String::from("5"),
String::from("7"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("11"),
String::from("13"),
String::from("15"),
String::from("17"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("21"),
String::from("23"),
String::from("25"),
String::from("27"),
];
for val in vals {
tx2.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for rec in rx {
println!("Got: {}", rec);
}
}
可以看到在Rust下实现生产者消费者是不难的,但是生产者可以clone多个,不过消费者却只能有一个,究其原因是因为Rust下没有GC也就是垃圾回收功能,而想保证安全Rust就必须要对于变更使用权限进行严格管理。在Rust下使用move关键字进行变更的所有权转移,但是按照Rust对于变更生产周期的管理规定,线程间权限转移的所有权接收者在同一时间只能有一个,这也是Rust官方只提供MPSC的原因,
use std::thread;
fn main() {
let s = "hello";
let handle = thread::spawn(move || {
println!("{}", s);
});
handle.join().unwrap();
}
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责,本站只提供参考并不构成任何投资及应用建议。本站是一个个人学习交流的平台,网站上部分文章为转载,并不用于任何商业目的,我们已经尽可能的对作者和来源进行了通告,但是能力有限或疏忽,造成漏登,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。