少女祈祷中...

进程间通信

进程之间需要某种协同, 这种协同是需要通信的

进程间通信的前提是 先让不同的进程看到 同一份资源(操作系统提供一段内存)

由于进程间具有独立性, 进程间通信需要第三方(OS) 介入

常见的进程间通信方式

管道通信:

匿名管道(Anonymous Pipe)

  • 特点:匿名管道是临时的,只能用于具有亲缘关系的进程之间(如父子进程或兄弟进程),并且通常在创建时由父进程建立,然后通过fork()系统调用传递给子进程。
  • 使用场景:常用于shell命令中的管道操作,例如 ls | grep txt,其中ls命令的输出通过管道传递给grep命令作为输入。
  • 生命周期:当所有引用该管道的文件描述符都被关闭后,管道会被自动销毁。

管道的设计为了复用代码, 只能是单向的, 但双向通信可以用两个管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <iostream>
#include <cerrno>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <cstring>
#include <string>

std::string getOtherMessage() {
static int cnt = 0;
std::string messageid = std::to_string(cnt);
cnt++;
pid_t self_id = getpid();
std::string stringpid = std::to_string(self_id);

std::string message = "messageid: ";
message = message + messageid + "my pid is " + stringpid;

return message;
}

void SubProcessWrite(int wfd) {
std::string message = "father, i am your son process";
while (true) {
std::string info = message + getOtherMessage();

write(wfd, info.c_str(), info.size());
sleep(1);
}
}

void FatherProcessRead(int rfd) {
char inbuffer[1024];
while (true) {
sleep(2);
size_t n = read(rfd, inbuffer, sizeof(inbuffer) - 1);
if (n > 0) {
inbuffer[n] = 0;
std::cout << "father get message: " << inbuffer << std::endl;
std::cout << n << std::endl;
} else if (n == 0) {
std::cout << "client quit\n";
break;
} else if (n < 0) {
std::cout << "child quit...\n";
}

}
}

int main() {
// 创建管道
int pipefd[2];
int n = pipe(pipefd); // 输出型参数,rfd, wfd
if (n != 0) {
std::cerr << "errno" << errno << ":errstring:" << strerror(errno) << std::endl;
}

std::cout << "pipefd: " << pipefd[0] << ' ' << pipefd[1] << std::endl;

pid_t id = fork();
if (id == 0) {
// 子进程 写
close(pipefd[0]);
SubProcessWrite(pipefd[1]);
close(pipefd[1]);
exit(0);
}
// 父进程
close(pipefd[1]);
FatherProcessRead(pipefd[0]);

close(pipefd[0]);

pid_t rid = waitpid(id, nullptr, 0);
if (rid > 0) {
std::cout << "sucess" << std::endl;
}

return 0;
}

1. 单向通信

管道是半双工的,数据只能向一个方向流动。一个管道一端用于写入,另一端用于读取。如果要实现双向通信,必须创建两个管道。

2. 基于字节流

管道传输的是无格式的字节流(a stream of bytes)。它不像消息队列那样有消息边界。读写进程必须事先约定好数据的格式(例如,用换行符\n分隔每条记录),否则无法正确解析。

3. 依赖于父子进程或兄弟进程关系

管道通常由一个进程创建(调用 pipe() 系统调用),然后该进程调用 fork() 创建子进程。由于子进程会继承父进程的文件描述符,因此父子进程(或兄弟进程)可以通过这个管道进行通信。

  • 匿名管道:主要用于有亲缘关系的进程间通信。
  • 命名管道(FIFO):通过一个特殊的文件(FIFO文件)存在于文件系统中,允许无亲缘关系的进程通过打开这个文件进行通信,突破了亲缘关系的限制。

4. 同步与互斥(进程同步)

操作系统会保证对管道的读写是互斥的。当一個进程正在读或写管道时,其他进程必须等待。同时,管道也提供了一种简单的同步机制:

  • 当读一个空管道时:读操作会被阻塞,直到有数据被写入。
  • 当写一个满管道时:写操作会被阻塞,直到有空间可写(有数据被读出)。
  • 当管道的读端被关闭后:如果继续写入,写入进程会收到 SIGPIPE 信号(通常导致进程终止)。

这种特性使得生产者进程和消费者进程能够自动协调它们的执行速度。

5. 有限的容量

管道在内核中其实是一个固定大小的缓冲区。在Linux中,这个缓冲区的大小通常是 64KB(自 Linux 2.6.11 起,可以通过 fcntl 调整)。一旦管道被写满,写操作就会进入阻塞状态。

6. 生命周期随进程

匿名管道的生命周期与使用它的进程相同。当所有指向它的文件描述符都被关闭时(即所有相关进程都终止后),管道占用的资源会被操作系统自动回收。

命名管道(FIFO) 的生命周期则与文件系统绑定,除非被显式删除,否则会一直存在。

管道的应用场景,进程池

ProcessPool.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"

// void work(int rfd) {
// while (1) {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int)) {
// ExcuteTask(command);
// } else if (n == 0) {
// // 写端关闭, 结束任务
// break;
// }
// }
// }

// void work() {
// while (1) {
// int command = 0;
// int n = read(0, &command, sizeof(command));
// if (n == sizeof(int)) {
// ExcuteTask(command);
// } else if (n == 0) {
// // 写端关闭, 结束任务
// break;
// }
// }
// }

class Channel {
public:
Channel(int wfd, pid_t id, const std::string &name):_wfd(wfd), _subprocessid(id), _name(name) {}

int GetWfd() { return _wfd; }
pid_t GetProcessId() { return _subprocessid; }
std::string GetName() { return _name; }

void Closechannel() {
close(_wfd);
}
void wait() {
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if (rid > 0) {
std::cout << "sucess wait" << rid << std::endl;
}
}

~Channel() {}

private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};

void CreateChannelAndSub(std::vector<Channel> *channels, int num, task_t task) {
for (int i = 0; i < num; i++) {
// 创建管道
int pipefd[2];
int n = pipe(pipefd);
if (n < 0) {
exit(1);
}
// 创建子进程
pid_t id = fork();
if (id == 0) {
close(pipefd[1]);
// 直接重定向,
dup2(pipefd[0], 0);

task();
// work(rfd);
close(pipefd[0]);
exit(0);
}
close(pipefd[0]);
std::string Channel_name = "Channel-" + std::to_string(i);
channels->push_back(Channel(pipefd[1], id, Channel_name));
}
}

int NextChannel(int channelnum) {
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}

void SendTaskCommand(Channel &channel, int taskcommand) {
write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}

void ctrlprocess(std::vector<Channel>& channels)
{
int cnt = 5;
while (cnt--) {
int taskcommand = SelectTask();
int curr_channel_index = NextChannel(channels.size());
// std::cout << taskcommand << ' ' << curr_channel_index << std::endl;

// 发送任务
SendTaskCommand(channels[curr_channel_index], taskcommand);
sleep(1);
}
}

void Cleanchannel(std::vector<Channel>& channels)
{
// 注意这里两个循环要分开写,否则会阻塞
for(auto& channel : channels) {
channel.Closechannel();
}
for(auto& channel : channels) {
channel.wait();
}
}

int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " processunm" << std::endl;
return 0;
}
int num = std::stoi(argv[1]);
std::vector<Channel> channels;
InitTask();
// 创建信道和子进程
CreateChannelAndSub(&channels, num, work);


// for (auto channel : channels) {
// std::cout << channel.GetName() << ' ' << channel.GetWfd() << ' ' << channel.GetProcessId() << '\n';
// }
// sleep(100);

//控制
ctrlprocess(channels);

//回收管道和子进程, 关闭写端
Cleanchannel(channels);

return 0;
}

Task.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3

typedef void (*task_t) ();

task_t tasks[TaskNum];

void Print() {
std::cout << "I am Print task" << std::endl;
}

void DownLoad() {
std::cout << "I am DownLoad task" << std::endl;
}

void Flush() {
std::cout << "I am Flush task" << std::endl;
}


void InitTask() {
srand(time(0) ^ getpid() ^ 17777);

tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}

void ExcuteTask(int number) {
if (number < 0 || number >= 3) {
return;
}
// std::cout << number << std::endl;
tasks[number]();
}
int SelectTask() {
return rand() % TaskNum;
}

void work() {
while (1) {
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int)) {
ExcuteTask(command);
} else if (n == 0) {
// 写端关闭, 结束任务
break;
}
}
}

命名管道

命名管道(Named Pipe),也称为 FIFO(First In, First Out),是 Unix/Linux 系统中一种重要的**进程间通信(IPC, Inter-Process Communication)**机制。它允许**不相关的进程**通过一个特殊的文件进行数据交换。

makefile

1
2
3
4
5
6
7
8
9
10
.PHONY:all
aLL:client server

client:client.cc
g++ -o $@ $^ -std=c++11
server:server.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f client server

client.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include "namedPipe.hpp"

int main() {
NamePiped fifo(comm_path, User);
if (fifo.OpenForWrite()) {
while (1) {
std::cout << "please Enter> ";
std::string message;
std::getline(std::cin, message);
fifo.WriteNamedPipe(message);
}
}
return 0;
}

server.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "namedPipe.hpp"

int main() {
NamePiped fifo(comm_path, Creater);
if (fifo.OpenforRead()) {
while (1) {
std::string message;
int n = fifo.ReadNamedPipe(&message);
if (n > 0) {
std::cout << "client say>" << message << std::endl;
} else if (n == 0) {
std::cout << "client quit, server too!" << std::endl;
break;
} else {
std::cout << "fifo.ReadNamePipe Error" << std::endl;
break;
}

}
}
// sleep(5);
return 0;
}

NamedPipe.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#pragma once

#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <cstring>
#include <cstdio>
#include <cerrno>
#include <unistd.h>
#include <fcntl.h>

const std::string comm_path = "./myfifo";
#define Creater 1
#define User 2
#define DefaultFd -1
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096

class NamePiped {
private:
bool OpenNamedPipe(int mode) {
_fd = open(_fifo_path.c_str(), mode);
if (_fd < 0) {
return false;
}
return true;
}

public:
NamePiped(const std::string& path, int who)
:_fifo_path(path), _id(who), _fd(DefaultFd)
{
if (_id == Creater) {
int res = mkfifo(path.c_str(), 0666);
if (res != 0) {
perror("mkfifo");
}
std::cout << "Creater create named pipe" << std::endl;
}
}

bool OpenforRead() {
return OpenNamedPipe(Read);
}

bool OpenForWrite() {
return OpenNamedPipe(Write);
}

// 输出型
int ReadNamedPipe(std::string *out) {
char buffer[BaseSize];
int n = read(_fd, buffer, sizeof(buffer));
if (n > 0) {
buffer[n] = 0;
*out = buffer;
}
return n;
}

void WriteNamedPipe(const std::string& in) {
write(_fd, in.c_str(), in.size());
}

~NamePiped() {
if (_id == Creater) {
int res = unlink(_fifo_path.c_str());
if (res != 0) {
perror("unlink");
}
std::cout << "Creater free named pipe" << std::endl;
}
if (_fd != DefaultFd) {
// std::cout << "close" << std::endl;
close(_fd);
}
}
private:
const std::string _fifo_path;
int _id;
int _fd;
};

信号

信号是一种非常重要的进程间通信机制,在操作系统中起着重要的作用。它能够用于进程控制、错误处理、事件通知等。在编程中,信号能够通过 signal()sigaction() 来捕获和处理。理解信号的机制能够帮助我们更好地控制进程行为和处理异常。

信号产生的异步性是指信号的产生和处理与进程的正常执行流程是独立的,且不需要进程主动触发或干预。信号是由操作系统或外部事件触发的,并且它们的处理可以在任何时刻打断进程的正常执行流程,导致进程对信号的响应。

观察代码,执行kill - 2 [pid] 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
#include <signal.h>
#include <unistd.h>

void hander(int sig) {
std::cout << "get a sig" << std::endl;
}

int main() {
signal(2, hander);
while (1) {
std::cout << "hello sig " << getpid() << std::endl;
sleep(1);
}
return 0;
}

信号产生:kill + 信号,ctrl + c等

进程 —— task_struct —- struct —– 成员变量 —– 保存用位图来保存收到的信号的

发送信号:修改指定进程pcb中信号的指定位图, 0 =>1

core, tern 进程异常时, 如果是core, 操作系统会把进程相关数据存起来,core文件,核心转储

信号的保存

信号从产生到递达的状态, 未决状态。

进程可以选择阻塞某个信号