多线程的通信方法总线

进程通信方式

1、管道(Pipe):管道可用于具有亲缘关系进程间的通信,允许一个进程和另一个与它有共同祖先的进程之间进行通信。
2、命名管道(named pipe):命名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关 系 进程间的通信。命名管道在文件系统中有对应的文件名。命名管道通过命令mkfifo或系统调用mkfifo来创建。
3、信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送 信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该 函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数)。
4、消息(Message)队列:消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字 节流以及缓冲区大小受限等缺
5、共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
6、内存映射(mapped memory):内存映射允许任何多个进程间通信,每一个使用该机制的进程通过把一个共享的文件映射到自己的进程地址空间来实现它。
7、信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。
8、套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。

 

管理通信示例

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>
#define BUFFER_SIZE 16
struct prodcons
{
    int buffer[BUFFER_SIZE];
    pthread_mutex_t lock;  //mutex ensuring exclusive access to buffer
    int readpos,writepos;  //position for reading and writing
    pthread_cond_t notempty;  //signal when buffer is not empty
    pthread_cond_t notfull;  //signal when buffer is not full
};
//initialize a buffer
void init(struct prodcons* b)
{
    pthread_mutex_init(&b->lock,NULL);
    pthread_cond_init(&b->notempty,NULL);
    pthread_cond_init(&b->notfull,NULL);
    b->readpos = 0;
    b->writepos = 0;
}
//store an integer in the buffer
void put(struct prodcons* b, int data)
{
    pthread_mutex_lock(&b->lock);
//wait until buffer is not full
    while((b->writepos+1)%BUFFER_SIZE == b->readpos)
    {
        printf("wait for not full\n");
        pthread_cond_wait(&b->notfull,&b->lock);
    }
    b->buffer[b->writepos] = data;
    b->writepos++;
    b->writepos %= BUFFER_SIZE;
    pthread_cond_signal(&b->notempty); //signal buffer is not empty
    pthread_mutex_unlock(&b->lock);
}
//read and remove an integer from the buffer
int get(struct prodcons* b)
{
    int data;
    pthread_mutex_lock(&b->lock);
//wait until buffer is not empty
    while(b->writepos == b->readpos)
    {
        printf("wait for not empty\n");
        pthread_cond_wait(&b->notempty,&b->lock);
    }
    data=b->buffer[b->readpos];
    b->readpos++;
    b->readpos %= BUFFER_SIZE;
    pthread_cond_signal(&b->notfull);  //signal buffer is not full
    pthread_mutex_unlock(&b->lock);
    return data;
}
#define OVER -1
struct prodcons buffer;
void * producer(void * data)
{
    int n;
    for(n=0; n<50; ++n)
    {
        printf("put-->%d\n",n);
        put(&buffer,n);
    }
    put(&buffer,OVER);
    printf("producer stopped\n");
    return NULL;
}
void * consumer(void * data)
{
    int n;
    while(1)
    {
        int d = get(&buffer);
        if(d == OVER) break;
        printf("get-->%d\n",d);
    }
    printf("consumer stopped\n");
    return NULL;
}
int main()
{
    pthread_t tha,thb;
    void * retval;
    init(&buffer);
    pthread_create(&tha,NULL,producer,0);
    pthread_create(&thb,NULL,consumer,0);
    pthread_join(tha,&retval);
    pthread_join(thb,&retval);
    return 0;
}

运行

 

有名管道通信示例

//createfifo.c
#include<stdio.h>
#include<fcntl.h>
#include<unistd.h>
#include<sys/types.h>
#include<error.h>
#include<sys/stat.h>
#include<stdlib.h>

int main()
{
    if(mkfifo("fifo.demo", 0666) == -1)
    {
        perror("mkfifo");
        exit(-1);
    }
    return 0;
}


//writefifo.c
//先执行writefifo,不执行readfifo,则阻塞在open函数(可以增加参数,实现非阻塞)
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<stdlib.h>
#include<stdio.h>
#include<error.h>
#define N 64
int main()
{
    int fd;
    char buf[N];
    if((fd=open("fifo.demo", O_WRONLY)) == -1)
    {
        perror("open fifo.demo");
        exit(-1);
    }
    while(1)
    {
        if(fgets(buf, N, stdin) != NULL)
        {
            write(fd, buf, strlen(buf));
            if(strncmp(buf,"exit", 4) == 0)//比较前四个字符,buf第五个字符是‘\n‘
            {
                close(fd);
                exit(1);
            }
        }
    }
}


//readfifo.c
//先执行readfifo,不执行writefifo,则阻塞在open函数(可以增加参数,实现非阻塞)
#include<fcntl.h>
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#define N 64
int main()
{
    int fd;
    char buf[N];
    int n;
    if((fd=open("fifo.demo", O_RDONLY))== -1)
    {
        perror("open fifo.demo");
        exit(-1);
    }
    while(1)
    {
        if((n = read(fd, buf, N)) >= 0)
        {
            if(n == 0)
            {
                exit(1);
            }
            write(1, buf, n);//读多少,写多少
        }
    }
}

运行

 

信号量通信示例

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sem.h>

union semun
{
    int val;
    struct semid_ds *buf;
    unsigned short *arry;
};

static int sem_id = 0;
static int set_semvalue();
static void del_semvalue();
static int semaphore_p();
static int semaphore_v();

int main(int argc, char *argv[])
{
    char message = X;
    int i = 0;

    //创建信号量
    sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT);

    if(argc > 1)
    {
        //程序第一次被调用,初始化信号量
        if(!set_semvalue())
        {
            fprintf(stderr, "Failed to initialize semaphore\n");
            exit(EXIT_FAILURE);
        }
        //设置要输出到屏幕中的信息,即其参数的第一个字符
        message = argv[1][0];
        sleep(2);
    }
    for(i = 0; i < 10; ++i)
    {
        //进入临界区
        if(!semaphore_p())
            exit(EXIT_FAILURE);
        //向屏幕中输出数据
        printf("%c", message);
        //清理缓冲区,然后休眠随机时间
        fflush(stdout);
        sleep(rand() % 3);
        //离开临界区前再一次向屏幕输出数据
        printf("%c", message);
        fflush(stdout);
        //离开临界区,休眠随机时间后继续循环
        if(!semaphore_v())
            exit(EXIT_FAILURE);
        sleep(rand() % 2);
    }

    sleep(10);
    printf("\n%d - finished\n", getpid());

    if(argc > 1)
    {
        //如果程序是第一次被调用,则在退出前删除信号量
        sleep(3);
        del_semvalue();
    }
    exit(EXIT_SUCCESS);
}

static int set_semvalue()
{
    //用于初始化信号量,在使用信号量前必须这样做
    union semun sem_union;

    sem_union.val = 1;
    if(semctl(sem_id, 0, SETVAL, sem_union) == -1)
        return 0;
    return 1;
}

static void del_semvalue()
{
    //删除信号量
    union semun sem_union;

    if(semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
        fprintf(stderr, "Failed to delete semaphore\n");
}

static int semaphore_p()
{
    //对信号量做减1操作,即等待P(sv)
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = -1;//P()
    sem_b.sem_flg = SEM_UNDO;
    if(semop(sem_id, &sem_b, 1) == -1)
    {
        fprintf(stderr, "semaphore_p failed\n");
        return 0;
    }
    return 1;
}

static int semaphore_v()
{
    //这是一个释放操作,它使信号量变为可用,即发送信号V(sv)
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = 1;//V()
    sem_b.sem_flg = SEM_UNDO;
    if(semop(sem_id, &sem_b, 1) == -1)
    {
        fprintf(stderr, "semaphore_v failed\n");
        return 0;
    }
    return 1;
}

运行

 

消息队列通信示例

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>

#define BUFSZ 512
#define TYPE 100

struct msgbuf
{
    long mtype;
    char mtext[BUFSZ];
};

int main()
{
    int qid, len;
    key_t key;
    struct msgbuf msg;
    /*根据不同的路径和关键字产生key*/
    if((key = ftok(".", a)) == -1)
    {
        perror("ftok");
        exit(-1);
    }
    /*创建消息队列*/
    if((qid = msgget(key, IPC_CREAT|0666)) == -1)
    {
        perror("mesgget");
        exit(-1);
    }
    printf("opened queue %d\n", qid);
    printf("please input the message to queue:");
    if((fgets((&msg)->mtext, BUFSZ, stdin)) == NULL)
    {
        puts("no message\n");
        exit(-1);
    }
    msg.mtype = TYPE;
    len = strlen(msg.mtext) + 1;
    /*添加消息到消息队列*/
    if(msgsnd(qid, &msg, len, 0) < 0)
    {
        perror("msgsnd");
        exit(-1);
    }
    /*从消息队列读取消息*/
    if(msgrcv(qid, &msg, BUFSZ, 0, 0) < 0)
    {
        perror("msgrcv");
        exit(-1);
    }
    printf("message is :%s\n",msg.mtext);
    /*从系统中删除消息队列*/
    if(msgctl(qid, IPC_RMID, NULL) < 0)
    {
        perror("msgctl");
        exit(-1);
    }
    return 0;
}

运行

 

共享内存示例

//writer.c  
#include <stdio.h>  
#include <unistd.h>  
#include <signal.h>  
#include <stdlib.h>  
#include <errno.h>  
#include <sys/ipc.h>  
#include <sys/types.h>  
#include <sys/shm.h>  
#include <string.h>  
  
#define N 1024  
  
typedef struct  
{  
    pid_t pid;  //最先打开的进程会把自己的pid记录在此,对其进行初始化  
    char buf[N];  
}SHM;  
  
void handler(int signo)   //用于发给唤醒pause  
{  
    
}  
  
int main()  
{  
    key_t key;  
    int shmid;  
    SHM *shmaddr;  
    pid_t peerpid;  
  
    signal(SIGUSR1, handler);  
  
    if ((key = ftok(".", a)) == -1)  
    {  
        perror("ftok");  
        exit(-1);  
    }  
   
//创建共享内存  
    if ((shmid = shmget(key, sizeof(SHM), 0666 | IPC_CREAT | IPC_EXCL)) == -1)//IPC_EXCL同open的EXCL,用于判断是哪个进程最先打开  
    {  
        if (errno == EEXIST)  
        {  
            shmid = shmget(key, sizeof(SHM), 0666);  
      
            if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)  
            {  
                perror("shmat");  
                exit(-1);  
            }  
            peerpid = shmaddr->pid;  
            shmaddr->pid = getpid();  
            kill(peerpid, SIGUSR1);  
        }  
        else  
        {  
            perror("shmget");  
            exit(-1);  
        }  
    }  
    else //first  
    {  
        if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)//添加映射  
        {  
            perror("shmat");  
            exit(-1);  
        }  
  
        shmaddr->pid = getpid();  
        pause();  
        peerpid = shmaddr->pid;  
    }  
  
    while (1)  
    {  
        fgets(shmaddr->buf, N, stdin);  
        kill(peerpid, SIGUSR1);  
        if (strncmp(shmaddr->buf, "quit", 4) == 0)  
            break;  
        pause();  
    }
    
  
    if (shmdt(shmaddr) == -1) //解除映射  
    {  
        perror("shmdt");  
        exit(-1);  
    }  
  
    if (shmctl(shmid, IPC_RMID, NULL) == -1)//释放共享内存  
    {  
        perror("RM");  
        exit(-1);  
    }
  
    exit(0);  
}


//reader.c  
#include <stdio.h>  
#include <unistd.h>  
#include <signal.h>  
#include <stdlib.h>  
#include <errno.h>  
#include <sys/ipc.h>  
#include <sys/types.h>  
#include <sys/shm.h>  
#include <string.h>  
  
#define N 1024  
  
typedef struct  
{  
    pid_t pid;  
    char buf[N];  
}SHM;  
  
void handler(int signo)  
{  
}  
  
int main()  
{  
    key_t key;  
    int shmid;  
    SHM *shmaddr;  
    pid_t peerpid;  
  
    signal(SIGUSR1, handler);  
  
    if ((key = ftok(".", a)) == -1)  
    {  
        perror("ftok");  
        exit(-1);  
    }  
  
    if ((shmid = shmget(key, sizeof(SHM), 0666 | IPC_CREAT | IPC_EXCL)) == -1)  
    {  
        if (errno == EEXIST)  
        {  
            shmid = shmget(key, sizeof(SHM), 0666);  
      
            if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)  
            {  
                perror("shmat");  
                exit(-1);  
            }  
            peerpid = shmaddr->pid;  
            shmaddr->pid = getpid();  
            kill(peerpid, SIGUSR1);  
        }  
        else  
        {  
            perror("shmget");  
            exit(-1);  
        }  
    }  
    else //first  
    {  
        if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)  
        {  
            perror("shmat");  
            exit(-1);  
        }  
  
        shmaddr->pid = getpid();  
        pause();  
        peerpid = shmaddr->pid;  
    }  
  
    while (1)  
    {  
        pause();  
        printf("%s", shmaddr->buf);  
        if (strncmp(shmaddr->buf, "quit", 4) == 0)  
            break;  
        kill(peerpid, SIGUSR1);  
    }  
  
    if (shmdt(shmaddr) == -1)  
    {  
        perror("shmdt");  
        exit(-1);  
    }  
  
    exit(0);  
}

运行

 

信号通信示例

#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/wait.h>

pid_t pid;
void conductor_handler(int signo);
void driver_handler(int signo);

int main()
{
    if((pid = fork()) < 0)
    {
        perror("fork error.\n");
    }
    else if(pid == 0)
    {
        signal(SIGTSTP,SIG_IGN);
        signal(SIGINT,conductor_handler);
        signal(SIGQUIT,conductor_handler);
        signal(SIGUSR1,conductor_handler);
        while(1)
        {
            pause();
        }
    }
    else
    {
        signal(SIGINT,SIG_IGN);
        signal(SIGQUIT,SIG_IGN);
        signal(SIGTSTP,driver_handler);
        signal(SIGUSR1,driver_handler);
        signal(SIGUSR2,driver_handler);
        while(1)
        {
            pause();
        }
    }
    return 0;
}

void conductor_handler(int signo)
{
    switch(signo)
    {
        case SIGINT :
            kill(getppid(),SIGUSR1);
            break;
        case SIGQUIT:
            kill(getppid(),SIGUSR2);
            break;
        case SIGUSR1:
            printf("Final station ,all get off.\n");
            exit(0);
    }
}

void driver_handler(int signo)
{
    switch(signo)
    {
        case SIGTSTP :
            kill(pid,SIGUSR1);
            wait(NULL);
            exit(0);
        case SIGUSR1 :
            printf("bus will run...\n");
            break;
        case SIGUSR2 :
            printf("bus will stop...\n");
            break;
    }
}

运行

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。