网站首页 > 技术文章 正文
作者: theanarkh 来源:编程杂技
昨天分析http模块相关的代码时,遇到了一个晦涩的逻辑,看了想,想了看还是没看懂。百度、谷歌了很多帖子也没看到合适的答案。突然看到一个题目有点相似的搜索结果,点进去是Stack Overflow上的帖子,但是已经404,最后还是通过快照功能成功看到内容。这个帖子[1]和我的疑惑不相关,但是突然给了我一些灵感。沿着这个灵感去看了代码,最后下载nodejs源码,加了一些log,编译了一夜(太久了,等不及编译完成,得睡觉了)。上午起来验证,终于解开了疑惑。这个问题源于下面这段代码。
function connectionListenerInternal(server, socket) {
socket.server = server;
// 分配一个http解析器
const parser = parsers.alloc();
// 解析请求报文
parser.initialize(
HTTPParser.REQUEST,
new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket),
server.maxHeaderSize || 0,
server.insecureHTTPParser === undefined ?
isLenient() : server.insecureHTTPParser,
);
parser.socket = socket;
// 开始解析头部的开始时间
parser.parsingHeadersStart = nowDate();
socket.parser = parser;
const state = {
onData: null,
onEnd: null,
onClose: null,
onDrain: null,
// 同一tcp连接上,请求和响应的的队列
outgoing: [],
incoming: [],
outgoingData: 0,
keepAliveTimeoutSet: false
};
state.onData = socketOnData.bind(undefined, server, socket, parser, state);
socket.on('data', state.onData);
if (socket._handle && socket._handle.isStreamBase &&
!socket._handle._consumed) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(socket._handle);
}
parser[kOnExecute] =
onParserExecute.bind(undefined, server, socket, parser, state);
socket._paused = false;
}
这段代码看起来很多,这是启动http服务器后,有新的tcp连接建立时执行的回调。问题在于tcp上有数据到来时,是怎么处理的,上面代码中nodejs监听了socket的data事件,同时注册了钩子kOnExecute。data事件我们都知道是流上有数据到来时触发的事件。我们看一下socketOnData做了什么事情。
function socketOnData(server, socket, parser, state, d) {
// 交给http解析器处理,返回已经解析的字节数
const ret = parser.execute(d);
onParserExecuteCommon(server, socket, parser, state, ret, d);
}
这看起来没有问题,socket上有数据,然后交给http解析器处理。几乎所有http模块源码解析的文章也是这样分析的,我第一反应也觉得这个没问题,那kOnExecute是做什么的呢?kOnExecute钩子函数的值是onParserExecute,这个看起来也是解析tcp上的数据的,看起来和onSocketData是一样的作用,难道tcp上的数据有两个消费者?我们看一下kOnExecute什么时候被回调的。
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
Local<Value> ret = Execute(buf.base, nread);
Local<Value> cb =
object()->Get(env()->context(), kOnExecute).ToLocalChecked();
MakeCallback(cb.As<Function>(), 1, &ret);
}
在node_http_parser.cc中的OnStreamRead中被回调,那么OnStreamRead又是什么时候被回调的呢?OnStreamRead是nodejs中c++层流操作的通用函数,当流有数据的时候就会执行该回调。而且OnStreamRead中也会把数据交给http解析器解析。这看起来真的有两个消费者?这就很奇怪,为什么一份数据会交给http解析器处理两次?这时候我的想法就是这两个地方肯定是互斥的。但是我一直没有找到是哪里做了处理。最后在connectionListenerInternal的一段代码中找到了答案。
if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(socket._handle);
}
因为tcp流是继承StreamBase类的,所以if成立(后面会具体分析)。我们看一下consume的实现。
static void Consume(const FunctionCallbackInfo<Value>& args) {
Parser* parser;
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
CHECK(args[0]->IsObject());
StreamBase* stream = StreamBase::FromObjject(args[0].As<Object>());
CHECK_NOT_NULL(stream);
stream->PushStreamListener(parser);
}
http解析器把自己注册为tcp stream的一个listener。这里涉及到了c++层对流的设计。我们从头开始。看一下PushStreamListener做了什么事情。c++层中,流的操作由类StreamResource进行了封装。
class StreamResource {
public:
virtual ~StreamResource();
virtual int ReadStart() = 0;
virtual int ReadStop() = 0;
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
void PushStreamListener(StreamListener* listener);
void RemoveStreamListener(StreamListener* listener);
protected:
uv_buf_t EmitAlloc(size_t suggested_size);
void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
StreamListener* listener_ = nullptr;
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;
friend class StreamListener;
};
我们看到StreamResource是一个基类,定义了操作流的公共方法。其中有一个成员是StreamListener类的实例。我们看看StreamListener的实现。
class StreamListener {
public:
virtual ~StreamListener();
virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
virtual void OnStreamRead(ssize_t nread,
const uv_buf_t& buf) = 0;
virtual void OnStreamDestroy() {}
inline StreamResource* stream() { return stream_; }
protected:
void PassReadErrorToPreviousListener(ssize_t nread);
StreamResource* stream_ = nullptr;
StreamListener* previous_listener_ = nullptr;
friend class StreamResource;
};
StreamListener是一个负责消费流数据的类。StreamListener 和StreamResource类的关系如下。
null我们看到一个流可以注册多个listener,多个listener形成一个链表。接着我们看一下创建一个c++层的tcp对象是怎样的。下面是TCPWrap的继承关系。
class TCPWrap : public ConnectionWrap<TCPWrap, uv_tcp_t>{}
class ConnectionWrap : public LibuvStreamWrap{}
class LibuvStreamWrap : public HandleWrap, public StreamBase{}
class StreamBase : public StreamResource {}
我们看到tcp流是继承于StreamResource的。新建一个tcp的c++的对象时(tcp_wrap.cc),会不断往上调用父类的构造函数,其中在StreamBase中有一个关键的操作。
inline StreamBase::StreamBase(Environment* env) : env_(env) {
PushStreamListener(&default_listener_);
}
EmitToJSStreamListener default_listener_;
StreamBase会默认给流注册一个listener。我们看下EmitToJSStreamListener 具体的定义。
class ReportWritesToJSStreamListener : public StreamListener {
public:
void OnStreamAfterWrite(WriteWrap* w, int status) override;
void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
private:
void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
};
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
public:
uv_buf_t OnStreamAlloc(size_t suggested_size) override;
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
};
EmitToJSStreamListener继承StreamListener ,定义了分配内存和读取接收数据的函数。接着我们看一下PushStreamListener做了什么事情。
inline void StreamResource::PushStreamListener(StreamListener* listener) {
// 头插法
listener->previous_listener_ = listener_;
listener->stream_ = this;
listener_ = listener;
}
PushStreamListener就是构造出上图的结构。对应到创建一个c++层的tcp对象中,如下图。
然后我们看一下对于流来说,读取数据的整个链路。首先是js层调用readStart
function tryReadStart(socket) {
socket._handle.reading = true;
const err = socket._handle.readStart();
if (err)
socket.destroy(errnoException(err, 'read'));
}
// 注册等待读事件
Socket.prototype._read = function(n) {
tryReadStart(this);
};
我们看看readStart
int LibuvStreamWrap::ReadStart() {
return uv_read_start(stream(), [](uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
}, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
});
}
ReadStart调用libuv的uv_read_start注册等待可读事件,并且注册了两个回调函数OnUvAlloc和OnUvRead。
void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
EmitRead(nread, *buf);
}
inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
// bytes_read_表示已读的字节数
if (nread > 0)
bytes_read_ += static_cast<uint64_t>(nread);
listener_->OnStreamRead(nread, buf);
}
通过层层调用最后会调用listener_的OnStreamRead。我们看看tcp的OnStreamRead
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
StreamBase* stream = static_cast<StreamBase*>(stream_);
Environment* env = stream->stream_env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
AllocatedBuffer buf(env, buf_);
stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
}
继续回调CallJSOnreadMethod
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
Local<ArrayBuffer> ab,
size_t offset,
StreamBaseJSChecks checks) {
Environment* env = env_;
// ...
AsyncWrap* wrap = GetAsyncWrap();
CHECK_NOT_NULL(wrap);
Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
CHECK(onread->IsFunction());
return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
}
CallJSOnreadMethod会回调js层的onread回调函数。onread会把数据push到流中,然后触发data事件。这是tcp里默认的数据读取过程。而文章开头讲到的parser.consume打破了这个默认行为。stream->PushStreamListener(parser);修改了tcp流的listener链,http parser把自己作为数据的接收者。所以这时候tcp流上的数据是直接由node_http_parser.cc的OnStreamRead消费的。而不是触发socket的data事件,最后通过在nodejs源码中加log,重新编译验证的确如文中所述。最后提一个这个过程中还有一个关键的地方是调用consume函数的前提是socket._handle.isStreamBase为true。isStreamBase是在StreamBase::AddMethods中定义为true的,而tcp对象创建的过程中,调用了这个方法,所以tcp的isStreamBase是true,才会执行consume,才会执行kOnExecute回调。
- 上一篇: 摸鱼时间前端工程师经常去逛的技术网站有那些?
- 下一篇: Nodejs V18版本发布,带来多项新功能
猜你喜欢
- 2024-10-02 nodejs中使用sqlite3数据库(nodejs连接mysql数据库)
- 2024-10-02 NodeJS & Dapr Javascript SDK 官方使用指南
- 2024-10-02 专门为前端工程师设计 Nodejs+React 实战开发区块链“慕课”DApp
- 2024-10-02 若依nodejs全栈(四:用户列表增删改查接口的实现)
- 2024-10-02 系统性学习(3) Node.js——手写 Events
- 2024-10-02 完美支持Vue3,一个自带管理模板的Vue3开源组件库——vuestic
- 2024-10-02 GitHub精选 | 后台权限管理系统(基于Node.js)
- 2024-10-02 若依nodejs全栈(三:用户信息和路由接口的实现)
- 2024-10-02 技术开发者应该如何构建小团队的微服务方案?
- 2024-10-02 全局变量、事件绑定、缓存爆炸?Node.js内存泄漏问题分析
- 最近发表
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)