#author("2018-03-21T04:34:32+00:00","default:haikikyou","haikikyou")
#contents

* PIPE [#kffd177f]

- プロセス間通信の1つで、単方向通信手段を提供する。~
多くは、forkした親と子プロセスの間で通信するようなケースで利用されることが多い。
- 名前は持たない、名前を持つものをFIFO(名前つきパイプ)と呼ぶ。
- データは、カーネル空間を経由してやりとりされる。

#ref(./pipe-image.png,100%)
* 書式 [#q16f1cc8]

#geshi(c){{{
#include <unisdt.h>

int pipe(int fildes[2]);

#include <fcntl.h>
int pipe2(int fildes[2], int flags);
}}}

pipeはファイルディスクリプタ(fd)のペアを作成する。引数には、ペアのfdを格納するint型の配列を指定する。0番目がend-of-read、1番目がwrite-endとなる。pipe関数でパイプを作成した後、forkでプロセスを生成すると、親子で共通のパイプを参照した状態となる。その後、例えば親側で0番をclose、子側で1番をcloseすれば、親から子への単方向のデータフローを実現することができる。

pipe2は、flagsを渡してファイルディスクリプタに関する属性を設定することができる。flagsが0の場合、pipeと同様となる。flagsに指定できる属性は以下である。

-''O_CLOEXEC''
--close-on-execを指定
-''O_NONBLOCK''
--ノンブロッキング属性の指定

&label(rel){関連};
#geshi(c){{{
int filedes[2];

// pipe
pipe(filedes);
fcntl(fildes[0], F_SETFL, O_NONBLOCK);
fcntl(fildes[1], F_SETFL, O_NONBLOCK);
fcntl(fildes[0], F_SETFD, FD_CLOEXEC);
fcntl(fildes[1], F_SETFD, FD_CLOEXEC);

// pipe2
pipe2(filedes, O_CLOEXEC | O_NONBLOCK);
}}}

read-endがcloseしている状態でwriteするとエラーで&code(){-1};が返る(SIGPIPE)。write-endがcloseしている状態でreadすると、&code(){0(EOF)};となる。

なお、パイプバッファのサイズは制限があり、パイプがfullの状態でwriteするとblockまたは失敗する。システムの実装によって異なるため注意。2.6.35以降では、サイズは16ページとなっている。また、fcntlで変更可能となっている。

&label(warn){参考}; [[PIPE(7)>http://man7.org/linux/man-pages/man7/pipe.7.html]] - &size(11){&color(gray){on http://man7.org/linux/man-pages/man7/pipe.7.html};};
* 返り値 [#xcc74fa9]

成功時は&code(){0};を返す。失敗時は&code(){-1};を返し、&code(){errno};に以下の値がセットされる。

* エラー [#tf91a7bc]

- ''EFAULT''
-- &code(){fildes};が無効なアドレスを参照している
- ''EMFILE''
-- オープンされているファイル数が上限に達している
- ''ENFILE''
-- システムでオープンされているファイル数が上限に達している
- ''ENOMEM''
-- カーネルにパイプを作成する十分なメモリがない

pipe2()

- ''EINVAL''
-- flagsが不正な値である


* 例 [#h3847514]

** 親プロセスと子プロセスでpipeを経由して通信 [#o354f79f]

#geshi(c,number){{{
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>

/*
 * Pipe
 */
static int pipe_fds[2] = {-1, -1}; /* read-end, write-end */


int main(int argc, char *argv[])
{
	pid_t pid;
	int status;
	ssize_t ret;
	char buf[16];
	char *message = "hello";

	if (pipe(pipe_fds) < 0)
	{
		perror("failed to create pipe");
		exit(1);
	}

	pid = fork();

	if (pid < 0)
	{
		perror("failed to do fork");
		exit(1);
	}
	else if (pid == 0)
	{
		close(pipe_fds[1]);
		sleep(1);
		ret = read(pipe_fds[0], buf, sizeof(buf));
		if (ret < 0)
		{
			perror("faild to read");
			exit(1);
		}
		buf[sizeof(buf) - 1] = '\0';
		printf("child read: %s\n", buf);
		exit(0);
	}

	close(pipe_fds[0]);

	printf("parent write: %s\n", message);
	ret = write(pipe_fds[1], message, strlen(message));
	if (ret < 0)
	{
		perror("faild to write");
	}

	waitpid(pid, &status, 0);
	exit(0);
}


/*
出力例:
parent write: hello
child read: hello
*/
}}}

**&label(study){実験}; pipeごとのバッファ、パイプサイズの確認 [#b233adf7]

#geshi(c,number){{{
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>

#define CHILD_NUM		5

#define PIPE_BUFFER		65537

/*
 * Pipe
 */
int pipe_fds[CHILD_NUM][2];
int mypid;

pid_t child_procs[CHILD_NUM];

static pid_t spawn(int id);
static void do_it(void);

static pid_t spawn(int id)
{
	pid_t pid;

	pid = fork();

	if (pid < 0) {
		perror("failed to fork");
		exit(1);
	} else if (pid == 0) {
		close(pipe_fds[id][1]);
		sleep(1);
		do_it();
	}

	return pid;
}

static void do_it(void)
{
	while (1)
	{
		if (getppid() != mypid)
			break;
 		sleep(5);
	}
	exit(0);
}


int main(int argc, char *argv[])
{
	int i, j;
	ssize_t ret;

	mypid = getpid();

	for (i = 0; i < CHILD_NUM; ++i)
	{
		if (pipe(pipe_fds[i]) < 0)
		{
			perror("faild to create a pipe");
			exit(1);
		}

		/*
		 * Set fd to non-blocking mode.
		 */
		if (fcntl(pipe_fds[i][0], F_SETFL, O_NONBLOCK) == -1)
		{
			perror("faild to fcntl");
			exit(1);
		}

		if (fcntl(pipe_fds[i][1], F_SETFL, O_NONBLOCK) == -1)
		{
			perror("faild to fcntl");
			exit(1);
		}

		child_procs[i] = spawn(i);
	}

	for (i = 0; i < CHILD_NUM; ++i)
	{
		for (j = 1; j <= PIPE_BUFFER; j++)
		{
			ret = write(pipe_fds[i][1], "a", 1);

			if (j >= PIPE_BUFFER - 1)
			{
				printf("Proc no.%d: writing %d byte ... ", i, j);

				if (ret < 0)
				{
					fprintf(stderr, "error : %s\n", strerror(errno));
					break;
				}
				else
					printf("ok\n");
			}
		}
	}

	exit(0);
}


/*
出力例: maxOS High Seirra 10.13.3
Proc no.0: writing 65536 byte ... ok
Proc no.0: writing 65537 byte ... error : Resource temporarily unavailable
Proc no.1: writing 65536 byte ... ok
Proc no.1: writing 65537 byte ... error : Resource temporarily unavailable
Proc no.2: writing 65536 byte ... ok
Proc no.2: writing 65537 byte ... error : Resource temporarily unavailable
Proc no.3: writing 65536 byte ... ok
Proc no.3: writing 65537 byte ... error : Resource temporarily unavailable
Proc no.4: writing 65536 byte ... ok
Proc no.4: writing 65537 byte ... error : Resource temporarily unavailable
*/
}}}

** PostgreSQLのpostmasterダウン検知 [#e74cee6e]

PostgreSQLでは、postmasterプロセスのダウン検知をpipeを使って実装している。子プロセスは、親プロセスのディスクリプタをチェックし、親プロセスの生死を確認している。以下はイメージソースである(大体イメージソースのようなことをしている)。

&label(warn){参考};[[PostmasterIsAlive()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/storage/ipc/pmsignal.c;h=85db6b21f83ff6ed1caebbb74c77f87f649f66cf;hb=ad4fb805ad08c86dd6389e6755081dfd7c864416#l268]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/};};

#geshi(c, number){{{
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>


#define CHILD_NUM 	3

/*
 * Pipe
 */
int	postmaster_alive_fds[2] = {-1, -1};

pid_t child_procs[CHILD_NUM];

static pid_t spawn(void);
static void do_it(void);

static pid_t spawn(void)
{
	pid_t pid;

	pid = fork();

	if (pid < 0) {
		perror("failed to fork");
		exit(1);
	} else if (pid == 0) {
		close(postmaster_alive_fds[1]);
		do_it();
	}

	return pid;
}

static void do_it(void)
{
	ssize_t ret;
	char c;

	while (1)
	{
		ret = read(postmaster_alive_fds[0], &c, 1);
		if (ret < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
				goto RETRY; /* postmaster is alive */
			else
			{
				perror("failed to read");
				break;
			}
		}
		else if (ret > 0)
		{
			printf("read invalid data %c\n", c);
			break;
		}
		else
			break; /* postmaster may be dead */
RETRY:
		sleep(5);
	}

	exit(0);
}


int main(int argc, char *argv[])
{
	int i;

	if (pipe(postmaster_alive_fds) < 0)
	{
		perror("faild to create a pipe");
		exit(1);
	}

	/*
	 * Set fd to non-blocking mode.
	 */
	if (fcntl(postmaster_alive_fds[0], F_SETFL, O_NONBLOCK) == -1)
	{
		perror("faild to fcntl");
		exit(1);
	}

	for (i = 0; i < CHILD_NUM; ++i)
	{
		child_procs[i] = spawn();
	}

	close(postmaster_alive_fds[0]);

	sleep(3);
	printf("postmaster process down\n");

	exit(0);
}
}}}

** 共通の親を持つ子プロセス間同士で通信 [#z2dac64c]

#geshi(c,number){{{
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>


#define READ_END 	0
#define WRITE_END 	1

/*
 * Pipe
 */
int	pipe_fds[2] = {-1, -1};


int main(int argc, char *argv[])
{
	int status;
	pid_t proc1;
	pid_t proc2;
	pid_t pid;

	if (pipe(pipe_fds) < 0)
	{
		perror("faild to create a pipe");
		exit(1);
	}

	/*
	 * Proc1
	 */
	proc1 = fork();

	if (proc1 < 0)
	{
		perror("failed to do fork");
		exit(1);
	}
	else if (proc1 == 0)
	{
		char buf[32] = {'\0'};
		size_t ret;

		close(pipe_fds[WRITE_END]);
		ret = read(pipe_fds[READ_END], buf, sizeof(buf));
		if (ret == 0)
		{
			perror("proc1: failed to read from pipe");
			exit(1);
		}

		printf("proc1 read: %s\n", buf);
		exit(0);
	}

	/*
	 * Proc2
	 */
	proc2 = fork();

	if (proc2 < 0)
	{
		perror("failed to do fork");
		exit(1);
	}
	else if (proc2 == 0)
	{
		close(pipe_fds[READ_END]);
		char *message = "hello";
		write(pipe_fds[WRITE_END], message, strlen(message) + 1);
		exit(0);
	}

	close(pipe_fds[READ_END]);
	close(pipe_fds[WRITE_END]);
	
	do {
		pid = waitpid(-1, &status, 0);
	} while (pid > 0 || errno == EINTR);

	puts("done");
	exit(0);
}
}}}
* 参考リンク [#a6344c70]

- [[BSD - PIPE(2)>https://www.freebsd.org/cgi/man.cgi?query=pipe&apropos=0&sektion=2&manpath=FreeBSD+12-current&arch=default&format=html]] - &size(11){&color(gray){on https://www.freebsd.org/cgi/man.cgi};};
- [[Linux - PIPE(7)>http://man7.org/linux/man-pages/man7/pipe.7.html]] - &size(11){&color(gray){on http://man7.org/linux/man-pages/man7/pipe.7.html};};
- [[Linux - Man page of PIPE(2)>https://linuxjm.osdn.jp/html/LDP_man-pages/man2/pipe.2.html]] - &size(11){&color(gray){on https://linuxjm.osdn.jp/html/LDP_man-pages/man2/pipe.2.html};};
- [[Linux - Man page of PIPE(7)>https://linuxjm.osdn.jp/html/LDP_man-pages/man7/pipe.7.html]] - &size(11){&color(gray){on https://linuxjm.osdn.jp/html/LDP_man-pages/man7/pipe.7.html};};
- [[http://kernhack.hatenablog.com/entry/2014/05/31/190317]]
- [[linux/v4.15.9/source/fs/pipe.c>https://elixir.bootlin.com/linux/latest/source/fs/pipe.c]] - &size(11){&color(gray){on https://elixir.bootlin.com/linux/v4.15.9/source/fs/pipe.c};};
- [[UNIXネットワークプログラミング〈Vol.2〉IPC:プロセス間通信>http://www.amazon.co.jp/exec/obidos/ASIN/4894712571/haikikyou-22/ref=nosim/]] - &size(11){&color(gray){on http://www.amazon.co.jp/};};

トップ   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
目次
ダブルクリックで閉じるTOP | 閉じる
GO TO TOP