经典并发处理问题的多种解法

2024年6月4日 作者 ScotI_Blog

首先阐释一下并发进程判断的基本原理:

Bernstein并发进程的制约关系条件

同样的,还要考虑到并发进程之间的制约关系:竞争与协作

并发进程之间的竞争和协作关系

第一种是竞争关系,一个进程的执行可能影响到同其竞争资源的其他进程,如果两个进程要访问同一资源,那么,一个进程通过操作系统分配得到该资源,另一个将不得不等待
第二种是协作关系,某些进程为完成同一任务需要分工协作,由于合作的每一个进程都是独立地以不可预知的速度推进,这就需要相互协作的进程在某些协调点上协调各自的工作。当合作进程中的一个到达协调点后,在尚未得到其伙伴进程发来的消息或信号之前应阻塞自己,直到其他合作进程发来协调信号或消息后方被唤醒并继续执行

一、银行家算法

银行家算法是一种用来避免操作系统死锁出现的有效算法,所以在引入银行家算法的解释之前,有必要简单介绍下死锁的概念。

死锁:是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

死锁的发生必须具备以下四个必要条件

1)互斥条件:指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。
2)请求和保持条件:指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
3)不抢占条件:指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
4)循环等待条件:指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。

所以作为解决死锁问题的著名算法之一,银行家算法的着眼点就在于确保以上四个条件之一不出现,那就不可能出现死锁,所以这更像是一种预防措施

为实现银行家算法,系统必须设置若干数据结构,同时要解释银行家算法,必须先解释操作系统安全状态和不安全状态。

安全序列:是指一个进程序列{P1,…,Pn}是安全的,即对于每一个进程Pi(1≤i≤n),它以后尚需要的资源量不超过系统当前剩余资源量与之前所有进程Pj (j < i )当前占有资源量之和。

安全状态:如果存在一个由系统中所有进程构成的安全序列P1,…,Pn,则系统处于安全状态。安全状态一定是没有死锁发生。

不安全状态:不存在一个安全序列。不安全状态不一定导致死锁。

银行家算法中设计了很多的数据结构,具体如下

数据结构
1)可利用资源向量Available
是个含有m个元素的数组,其中的每一个元素代表一类可利用的资源数目。如果Available[j]=K,则表示系统中现有Rj类资源K个。
2)最大需求矩阵Max(或者claim)
这是一个n×m的矩阵,它定义了系统中n个进程中的每一个进程对m类资源的最大需求。如果Max[i,j]=K,则表示进程i需要Rj类资源的最大数目为K。
3)分配矩阵Allocation
这也是一个n×m的矩阵,它定义了系统中每一类资源当前已分配给每一进程的资源数。如果Allocation[i,j]=K,则表示进程i当前已分得Rj类资源的 数目为K。
4)需求矩阵Need
这也是一个n×m的矩阵,用以表示每一个进程尚需的各类资源数。如果Need[i,j]=K,则表示进程i还需要Rj类资源K个,方能完成其任务。

Need[i,j]=Max[i,j]-Allocation[i,j]

银行家算法具体阐释:

Request(i)是进程Pi的请求向量,如果Request(i)[j]=k,表示进程Pi需要K个R(j)类型的资源。当Pi发现资源请求后系统将进行下列步骤

(1)如果Request(i)[j] <= Need[i,j],边转向步骤2),否则认为出错,因为它所请求的资源数已超过它所宣布的最大值。

(2)如果Request(i)[j] <= Available[i,j],便转向步骤3),否则,表示尚无足够资源,Pi需等待。

(3)系统试探着把资源分配给进程Pi,并需要修改下面数据结构中的数值;

Available[j] = Available[j] - Request(i)[j];

Allocation[i,j] = Allocation[i,j] + Request(i)[j];

Need[i,j] = Need[i,j] - Request(i)[j];

两道例题

从图中数据我们可以利用银行家算法的四个数据结构,来描述当前的系统状态

MaxAllocationNeedAvailable
 ABCABCABC A     B     C
P1559212347 2     3      3
P2536402134 
P34011405006 
P4425204221 
P5424314110

因为系统资源R=(17,5,20)而系统分配给这几个线程的资源为Allocation=(15,2,17) 则可以求出Available=(2,3,3)

(1)在T0时刻,由于Availabel大于等于Need中 P5 所在行的向量,因此Availabel能满足 P5 的运行,在 P5 运行后,系统的状态变更为如下图所示:

 WorkNeedAllocationWork+Allocationfinsh
 ABCABCABCABC
P5233110314547true
P45472212047411true
P3741100640511416true
P21141613440215418true
P1154834721217520true

因此,在T0时刻,存在安全序列:P5,P4,P3,P2,P1(并不唯一)

(2)P2请求资源,P2发出请求向量Request(i)(0,3,4),系统根据银行家算法进行检查;

 ① P2 申请资源Reuqest(i)(0,3,4)<=Need中 P2 所在行向量Need(i)(1,3,4)

 ② P2 申请资源Reuqest(i)(0,3,4)>=可以利用资源向量Availabel(2,3,3),所以,该申请不给于分配

(3)P4请求资源,P4发出请求向量Request(i)(2,0,1),系统根据银行家算法进行检查;

 ①Reuqest(i)(2,0,1)<= Need(i)(2,2,1)

 ② Reuqest(i)(2,0,1 <= Availabel(2,3,3)

 ③对 P4 的申请(2,0,1)进行预分配后,系统的状态为:

 MaxAllocationNeedAvailable
 ABCABCABCA    B   C
P1559212347 0   3    2
P2536402134 
P34011405006 
P4425405020 
P5424314110

可利用资源向量Availabel=(0,3,2),大于Need中 P4 所在行的向量(0,2,0),因此可以满足 P4 的运行。P4 运行结束后,系统的状态变为:

 WorkNeedAllocationWork+Allocationfinsh
 ABCABCABCABC
P4032020405437true
P54371103147411true
P3741100640511416true
P21141613440215418true
P11541834721217520true

 同理依次推导,可计算出存在安全序列P4,P5,P3,P2,P1(并不唯一)

(4)P1请求资源,P1发出请求向量Request(i)(0,2,0),系统根据银行家算法进行检查;

 ①Request(i)(0,2,0)<= Need(i)(3,4,7)

 ② Request(i)(0,2,0)<= Availabel(2,3,3)

 ③对 P1 的申请(0,2,0)进行预分配后,系统的状态为:

 MaxAllocationNeedAvailable
 ABCABCABCA  B  C
P15592323270  1  2  
P2536402134 
P34011405006 
P4425204221 
P5424314110

由于Availabel不大于等于 P1 到 P5 任一进程在Need中的需求向量,因此系统进行预分配后

处于不安全状态,所以对于 P1 申请资源(0,2,0)不给予分配。

注意:因为(4)是建立在第(3)问的基础上的所以Available=(0,3,2)-(0,2,0)=(0,1,2)

二、生产者消费者问题

生产者消费者(producer—customer)问题是一个非常著名的进程同步问题。它描述的是:生产者进程在生产“产品”,消费者进程则消费这些“产品”,在生产者进程和消费者进程之间存在一个缓冲池(生产者将生产的产品放入该缓冲池,消费者则消费该缓冲池中的产品)。所有生产者进程、消费者进程都是相互独立的(即以异步的方式运行),但他们之间同时必须保持同步(即不允许消费者进程到空缓冲区取产品,也不允许生产者进程向满缓冲区存入产品)

我们使用队列(先进先出表)Queue来表示上述具有n个缓冲区的缓冲池。每投入(取出)一个产品,缓存池中就相应的插入(删除)一个节点。由于该缓冲池是被组织成队列的形式,因此,队空队满的判断条件分别如下:

q->front == q->rear;    //队空
q->front == (q->rear+1)%SIZE;    //队满

        此外,我们引入一个整型变量num,置其初值为0,当生产者(消费者)进程向缓冲池投入(取走)一个产品时,num对应的加一(减一)。

所以最后大概构建出如下的解决思路:

void producer()
{
    sem_wait();
    lockf();
    QueueFull();
    Enqueue();
    unlockf();
    sem_post();
}
void customer()
{
    sem_wait();
    lockf();
    QueueEmpty();
    Dequeue();
    unlockf();
    sem_post();
}

但是,仔细思考就会发现,仅仅只是这样是远远不够的。我们还必须保证队满不入、队空不取。因此,在上述结构的基础上,我们加入条件判断机制。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
 
#define TRUE  1
#define FALSE 0
#define SIZE 11
 
typedef int QueueData;	//定义一个整型变量QueueData,与为基本数据类型定义新的别名方法一样 
 
typedef struct _queue   //队列结构体
{
	int data[SIZE];
	int front;     // 指向队头的下标
	int rear;      // 指向队尾的下标
}Queue;
 
struct data             //信号量结构体
{
	sem_t count;
	Queue q;
};
 
struct data sem;
pthread_mutex_t mutex;	//互斥变量使用特定的数据类型 
int num = 0; 
 
int InitQueue (Queue *q)   // 队列初始化
{
	if (q == NULL)
	{
		return FALSE;
	}
	q->front = 0;
	q->rear  = 0;
	return TRUE;
}
 
int QueueEmpty (Queue *q)      //判断空队情况
{
	if (q == NULL)
	{
		return FALSE;
	}
	return q->front == q->rear;
}
 
int QueueFull (Queue *q)     //判断队满的情况
{
	if (q == NULL)
	{
		return FALSE;
	}
	return q->front == (q->rear+1)%SIZE;
} 
 
int DeQueue (Queue *q, int x)   //出队函数
{
	if (q == NULL)
	{
		return FALSE;
	}
	if (QueueEmpty(q))
	{
		return FALSE;
	}
	q->front = (q->front + 1) % SIZE;
	x = q->data[q->front];
	return TRUE;
}
 
int EnQueue (Queue *q, int x)   //进队函数
{
	if (q == NULL)
	{
		return FALSE;
	}	
	if (QueueFull(q))
	{
		return FALSE;
	}	
	q->rear = (q->rear+1) % SIZE;
	q->data[q->rear] = x;
	return TRUE;
}
 
void *Producer()
{
	int i=0;
	while(i<10)
	{
		i++;
		int time = rand() % 10 + 1;          //随机使程序睡眠0点几秒           
		usleep(time * 100000);
		                                      
		sem_wait(&sem.count);                 //信号量的P操作(使信号量的值减一) 
		pthread_mutex_lock(&mutex);           //互斥锁上锁
		if(!QueueFull(&sem.q))					//若队未满 
		{
			num++;                                
			EnQueue (&sem.q, num);              //消息进队
			printf("生产了一条消息,count=%d\n", num); 
		}                                      
		else	printf("Full\n");                                       
		pthread_mutex_unlock(&mutex);         //互斥锁解锁
		sem_post(&sem.count);                  //信号量的V操作(使信号量的值加一) 
	}
	printf("i(producer)=%d\n",i);
}
 
void *Customer()
{
	int i=0;
	while(i<10)
	{
		i++;
		int time = rand() % 10 + 1;   //随机使程序睡眠0点几秒
		usleep(time * 100000);
		
		sem_wait(&sem.count);           //信号量的P操作
		pthread_mutex_lock(&mutex);    //互斥锁上锁
		
		if(!QueueEmpty(&sem.q))			//若队未空 
		{
			num--;
			DeQueue (&sem.q, num);       //消息出队
			printf("消费了一条消息,count=%d\n",num);
		}
		else	printf("Empty\n");
		pthread_mutex_unlock(&mutex);  //互斥锁解锁
		sem_post(&sem.count);         //信号量的V操作
	}
	printf("i(customer)=%d\n",i);
}
 
int main()
{
	srand((unsigned int)time(NULL));
	
	//信号量地址,信号量在线程间共享,信号量的初始值 
	sem_init(&sem.count, 0, 10);    //信号量初始化(做多容纳10条消息,容纳了10条生产者将不会生产消息)
	
	pthread_mutex_init(&mutex, NULL);  //互斥锁初始化
	
	InitQueue(&(sem.q));   //队列初始化
	
	pthread_t producid;
	pthread_t consumid;
	
	pthread_create(&producid, NULL, Producer, NULL);   //创建生产者线程
	pthread_create(&consumid, NULL, Customer, NULL);   //创建消费者线程
	
	pthread_join(consumid, NULL);    //线程等待,如果没有这一步,主程序会直接结束,导致线程也直接退出。
	
	sem_destroy(&sem.count);         //信号量的销毁 
	
	pthread_mutex_destroy(&mutex);   //互斥锁的销毁
	
	return 0;
}

三、哲学家进餐问题

  该问题描述的是五个哲学家共用一张圆桌,分别坐在周围的五张椅子上,在圆桌上有五个碗和五只筷子,他们的生活方式是交替的进行思考和进餐。平时,一个哲学家进行思考,饥饿时便试图取用其左右最靠近他的筷子,只有在他拿到两只筷子时才能进餐。进餐完毕,放下筷子继续思考。

在哲学家问题中常常会遇到死锁的问题,所以这道题的核心点在于把筷子作为临界资源处理

经过分析可知,放在桌子上的5根筷子属于临界资源,在一段时间内只允许一位哲学家使用。为了实现对筷子的互斥使用,可以用一个信号量来表示一根筷子,由这5个信号量组成一个信号量数组。其描述如下:

semaphore chopstick[5]={1,1,1,1,1);

所有的信号量初始化为1,第 i 位哲学家的行为可描述如下:

while(1)
{
    think();
    hungry();
    wait(chopstick[i]);
    wait(chopstick[(i+1)%5]);
    eat();
    signal(chopstick[i]);
    signal(chopstick[(i+1)%5]);
}

这样的分开取筷子的方式就可能导致死锁

几种可能的方式:

(1) 至多允许四个哲学家同时取叉子 (C. A. R. Hoare方案)
(2) 奇数号先取左手边的叉子,偶数号先取右手边的叉子
(3) 每个哲学家取到手边的两把叉子才吃,否则一把叉子也不取

 semaphore fork[5];
 for (int i=0;i<5;i++)
     fork[i]= 1;
 semaphore room=4;   //该解法突破了进入临界区的进程容量为1的限制
//增加一个侍者,设想有两个房间1号房间是会议室,2号房间是餐厅
cobegin
   process philosopher_i( ){/*i=0,1,2,3, 4 */
      while(true) {
      think( );
      P(room);  //控制最多允许4位哲学家进入2号房间餐厅取叉子
      P(fork[i]);
      P(fork[(i+1)%5]); 
      eat( );
      V(fork[i]);
      V(fork[(i+1)%5]);
      V(room);
   }
 }
 coend

仅当哲学家左右两只筷子同时可用时,才允许他拿起筷子。即:要求每个哲学家先获得两个临界资源后方能进餐;这在本质上就是AND信号量机制。

while (1) {think();hungry();Swait(chopsticks[i],chopsticks[(i+1)%5]);eat();Signal(chopsticks[i],chopsticks[(i+1)%5]);   }

AND信号量的实质就是锁住全部的临界资源,当一个哲学家企图拿筷子的时候,就将所有的资源锁住,然后让他去拿他需要的筷子,等他取到他需要的筷子之后,就解锁,然后让其他哲学家取筷子。由于C语言库中并没有Swait的相关函数,因此,我们利用全局互斥变量来完成这一目的。

四、读者写者问题

一个数据问价或记录可以被多个进程共享,我们把只读该文件的进程称为“读者进程”,其他进程为“写者进程”。允许多个进程同时读一个共享对象,但不允许一个写者进程和其他写者进程或读者进程同时访问共享对象。即:保证一个写者进程必须与其他进程互斥的访问共享对象的同步问题;读者-写者问题常用来测试新同步原语

思路:读者和写者是互斥的,写者和写者是互斥的,读者和读者不互斥;两类进程,一种是写者,另一种是读者。写者很好实现,因为它和其他任何进程都是互斥的,因此对每一个写者进程都给一个互斥信号量的P、V操作即可;而读者进程的问题就较为复杂,它与写者进程是互斥的,而又与其他的读者进程是同步的,因此不能简单的利用P、V操作解决。

(一)读者优先算法

Print Friendly, PDF & Email