目录
stream_context_create
创建并返回一个资源流上下文,该资源流中包含了 options 中提前设定的所有参数的值。
文档 stream_context_create
- Return Values
A stream context resource.
worker 初始化非空时,会先建立一个资源流上下文
$this->_context = \stream_context_create($context_option);
stream_context_set_option
给指定的上下文设置参数。参数 value 是设置 wrapper 的 option 参数的值。
文档 stream_context_set_option
- Return Values
Returns true on success or false on failure.
reusePort
\stream_context_set_option($context, 'socket', 'so_reuseport', 1);
stream_socket_server
创建 Internet 或 Unix 域服务器套接字
文档 stream_socket_server
- Return Value
Returns the created stream, or false on error.
\stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
文档中第四个参数是没有说明的,这里是可以传入由stream_context_create 创建的stream context resource
stream_socket_enable_crypto
在已连接的套接字上打开/关闭加密
文档 stream_socket_enable_crypto
- Return Values
Returns true on success, false if negotiation has failed or 0 if there isn’t enough data and you should try again (only for non-blocking sockets).
初始化是 先设定为 false
if ($this->transport === 'ssl') {
\stream_socket_enable_crypto($this->_mainSocket, false);
}
ssl握手时
if($async){
$type = \STREAM_CRYPTO_METHOD_SSLv2_CLIENT | \STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
}else{
$type = \STREAM_CRYPTO_METHOD_SSLv2_SERVER | \STREAM_CRYPTO_METHOD_SSLv23_SERVER;
}
$ret = \stream_socket_enable_crypto($socket, true, $type);
stream_set_blocking
为资源流设置阻塞或者阻塞模式
文档 stream_set_blocking
\stream_set_blocking($stream, false);
在 Windows 系统上,这对本地文件没有影响。Windows 不支持本地文件的非阻塞 IO。
stream_socket_accept
接受由stream_socket_server()创建的套接字上的连接
文档 stream_socket_server
- Return Values
Returns a stream to the accepted socket connection or false on failure.
Warning
This function should not be used with UDP server sockets. Instead, use stream_socket_recvfrom() and stream_socket_sendto().
acceptConnection
$new_socket = \stream_socket_accept($socket, 0, $remote_address);
stream_set_read_buffer
在给定的流上设置读取文件缓冲
文档 stream_set_read_buffer
- size
要缓冲的字节数。如果size 为 0,则读取操作是无缓冲的。这确保在允许其他进程从该输入流中读取之前,所有使用fread() 的读取都已完成。
if (\function_exists('stream_set_read_buffer')) {
\stream_set_read_buffer($this->_socket, 0);
}
fwrite
把 string 的内容写入 文件指针 handle 处。
文档 fwrite
- 返回值
返回写入的字符数,出现错误时则返回 false
TCP连接中往socket连接写入数据
if ($this->transport === 'ssl') {
$len = @\fwrite($this->_socket, $this->_sendBuffer, 8192);
} else {
$len = @\fwrite($this->_socket, $this->_sendBuffer);
}
fread
从文件指针 handle 读取最多 length 个字节
文档 fread
- 返回值
返回所读取的字符串, 或者在失败时返回 false。
TCP连接中从socket连接读取数据
$buffer = @\fread($socket, self::READ_BUFFER_SIZE);
fclose
关闭一个已打开的文件指针
stream_socket_recvfrom
从套接字接收数据
文档 stream_socket_recvfrom
。
接受UDP 数据
\stream_socket_recvfrom($socket, static::MAX_UDP_PACKAGE_SIZE, 0, $remote_address)
Returns the read data, as a string
stream_socket_sendto
向套接字发送消息,无论它是否已连接
文档 stream_socket_sendto
发送UDP 数据
\stream_socket_sendto($this->_socket, $send_buffer, 0, $this->_remoteAddress)
- Return Values
Returns a result code, as an integer.
socket_import_stream
将一个将套接字封装到套接字扩展资源中的流导入。
文档 socket_import_stream
$socket = \socket_import_stream($context);
\socket_set_option($socket, \SOL_SOCKET, \SO_KEEPALIVE, 1);
\socket_set_option($socket, \SOL_TCP, \TCP_NODELAY, 1);
stream_select
Runs the equivalent of the select() system call on the given arrays of streams with a timeout specified by tv_sec and tv_usec
The stream_select() function accepts arrays of streams and waits for them to change status. Its operation is equivalent to that of the socket_select() function except in that it acts on streams
文档 stream_select
while (1) {
if(\DIRECTORY_SEPARATOR === '/') {
\pcntl_signal_dispatch();
}
$read = $this->_readFds;
$write = $this->_writeFds;
$except = $this->_exceptFds;
if ($read || $write || $except) {
try {
$ret = @stream_select($read, $write, $except, 0, $this->_selectTimeout);
} catch (\Exception $e) {} catch (\Error $e) {}
} else {
usleep($this->_selectTimeout);
$ret = false;
}
if (!$this->_scheduler->isEmpty()) {
$this->tick();
}
if (!$ret) {
continue;
}
if ($read) {
foreach ($read as $fd) {
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][self::EV_READ])) {
\call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0],
array($this->_allEvents[$fd_key][self::EV_READ][1]));
}
}
}
if ($write) {
foreach ($write as $fd) {
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][self::EV_WRITE])) {
\call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0],
array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
}
}
}
if($except) {
foreach($except as $fd) {
$fd_key = (int) $fd;
if(isset($this->_allEvents[$fd_key][self::EV_EXCEPT])) {
\call_user_func_array($this->_allEvents[$fd_key][self::EV_EXCEPT][0],
array($this->_allEvents[$fd_key][self::EV_EXCEPT][1]));
}
}
}
}
当已初始化的流数组 $read 可供读取 或 $write 可写入不会阻塞 时,响应可用流对应的事件
即:
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));