Node.js 的 TCP 链接管理
在 Node.js 的微服务中,一般不同的服务模块我们会采用 TCP 进行通信,本文来简单谈一谈如何设计 TCP 服务的基础管理。
在具体设计上,本文参考了微服务框架 Seneca 所采用的通信方案 Seneca-transport,已经被实践所证明其可行性。
一提到 TCP 通信,我们肯定离不开 net
模块,事实上,借助 net
模块,我们也可以比较快速地完成一般的 TCP 通信的任务。
为了避免对基础的遗忘,我们还是先附上一个基本的 TCP 链接代码:
//server.js:
const net = require('net');
const server = net.createServer((socket) => {
socket.write('goodbye\n');
socket.on('data', (data) => {
console.log('data:', data.toString());
socket.write('goodbye\n');
})
}).on('error', (err) => {
throw err;
});
// grab an arbitrary unused port.
server.listen(8024, () => {
console.log('opened server on', server.address());
});
//client.js:
const net = require('net');
const client = net.createConnection({ port: 8024 }, () => {
//'connect' listener
console.log('connected to server!');
client.write('world!\r\n');
setInterval(() => {
client.write('world!\r\n');
}, 1000)
});
client.on('data', (data) => {
console.log(data.toString());
// client.end();
});
client.on('end', () => {
console.log('disconnected from server');
});
其实,上述已经是一个几乎最简单的客户端和服务端通信 Demo,但是并不能在实际项目中使用,首先我们需要审视,其离生产环境还差哪些内容:
- 以上要求 Server 端要在 Client 端之前启动,并且一旦因为一些错误导致 Server 端重启了并且这个时候 Client 端正好和 Server 端进行通信,那么肯定会 crash,所以,我们需要一个更为平滑兼容的方案。
- 以上 TCP 链接的 Server 部分,并没有对 connection 进行管理的能力,并且在在以上的例子中,双方都没有主动释放链接,也就是说,建立的是一个 TCP 长连接。
- 以上链接的处理数据能力有限,只能处理纯文本的内容,并且还有一定的风险性(你也许会说可以用 JSON 的序列化反序列化的方法来处理 JSON 数据,但是你别忘了
socket.on('data'...
很可能接收到的不是一个完整的 JSON,如果 JSON 较长,其可能只接收到一般的内容,这个时候如果直接JSON.parse())
很可能就会报错)。
以上三个问题,便是我们要解决的主要问题,如果你看过之后立刻知道该如何解决了,那么这篇文章可能你不需要看了,否则,我们可以一起继续探索解决方案。
使用 reconnect-core
reconnect-core 是一个和协议无关的链接重试算法,其工作方式也比较简单,当你需要在 Client 端建立链接的时候,其流程是这样的:
- 调用事先传入的链接建立函数,如果这个时候返回成功了,即成功建立链接。
- 如果第一次建立链接失败了,那么再隔一段时间建立第二次,如果第二次还是失败,那么再隔一段更长的时间建立第三次,如果还是失败,那么再隔更长的一段时间……直到到达最大的尝试次数。
实际上关于尝试的时间间隔,也会有不同的策略,比较常用的是 Fibonacci 策略和 exponential 策略。
当然,关于策略的具体实现,reconnect-core 采用了一个 backoff 的库来管理,其可以支持 Fibonacci 策略和 exponential 策略以及更多的自定义策略。
对于上面提到的 DEMO 代码。我们给出 Client 端使用 reconnect-core 的一个实现:
//client.js:
const Reconnect = require('reconnect-core');
const net = require('net');
const Ndjson = require('ndjson');
const Connect = Reconnect(function() {
var args = [].slice.call(arguments);
return net.connect.apply(null, args)
});
let connection = Connect(function(socket) {
socket.write('world!\r\n');
socket.on('data', (msg) => {
console.log('data', msg.toString());
});
socket.on('close', (msg) => {
console.log('close', msg).toString();
connection.disconnect();
});
socket.on('end', () => {
console.log('end');
});
});
connection.connect({
port: 8024
});
connection.on('reconnect', function () {
console.log('on reconnect...')
});
connection.on('error', function (err) {
console.log('error:', err);
});
connection.on('disconnect', function (err) {
console.log('disconnect:', err);
});
采用 Reconnect 实际上相比之前是多了一层内容,我们在这里需要区分 connection 实例和 socket 句柄,并且附加正确的时间监听。
现在,我们就不用担心到底是先启动服务端还是先启动客户端了,另外,就算我们的服务端在启动之后由于某些错误关闭了一会,只要没超过最大时间(而这个也是可配置的),仍然不用担心客户端与其建立连接。
给 Server 端增加管理能力
给 Server 端增加管理能力是一个比较必要的并且可以做成不同程度的,一般来说,最重要的功能则是及时清理链接,常用的做法是收到某条指令之后进行清理,或者到达一定时间之后定时清理。
这里我们可以增加一个功能,达到一定时间之后,自动清理所有链接:
//server.js
const net = require('net');
var connections = [];
const server = net.createServer((socket) => {
connections.push(socket);
socket.write('goodbye\n');
socket.on('data', (data) => {
console.log('data:', data.toString());
socket.write('goodbye\n');
})
}).on('error', (err) => {
throw err;
});
setTimeout(() => {
console.log('clear connections');
connections.forEach((connection) => {
connection.end('end')
// connection.destory()
})
}, 10000);
// grab an arbitrary unused port.
server.listen(8024, () => {
console.log('opened server on', server.address());
});
我们可以通过connection.end('end')
和 connection.destory()
来清理,一般来说,前者是正常情况下的关闭指令,需要 Client 端进行确认,而后者则是强制关闭,一般在出错的时候会这样调用。
使用 ndjson 来格式化数据
ndjson 是一个比较方便的 JSON 序列化/反序列化库,相比于我们直接用 JSON,其好处主要体现在:
- 可以同时解析多个 JSON 对象,如果是一个文件流,即其可以包含多个
{}
,但是要求则是每一个占据一行,其按行分割并且解析。 - 内部使用了 split2,好处就是其返回时可以保证该行的所有内容已经接受完毕,从而防止 ndjson 在序列化的时候出错。
关于 ndjson 的基本使用,可以根据上述链接查找文档,这里一般情况下,我们的使用方式如下(以下是一个 demo):
//server.js:
const net = require('net');
var connections = [];
const server = net.createServer((socket) => {
connections.push(socket);
socket.on('data', (data) => {
console.log('data:', data.toString());
socket.write('{"good": 1234}\r\n');
socket.write('{"good": 4567}\n\n');
})
}).on('error', (err) => {
throw err;
});
// grab an arbitrary unused port.
server.listen(8024, () => {
console.log('opened server on', server.address());
});
//client.js:
const Reconnect = require('reconnect-core');
const net = require('net');
const Ndjson = require('ndjson');
var Stream = require('stream');
const Connect = Reconnect(function() {
var args = [].slice.call(arguments);
return net.connect.apply(null, args)
});
let connection = Connect(function(socket) {
socket.write('world!\r\n');
var parser = Ndjson.parse();
var stringifier = Ndjson.stringify();
function yourhandler(){
var messager = new Stream.Duplex({ objectMode: true });
messager._read = function () {
// console.log('data:', data);
};
messager._write = function (data, enc, callback) {
console.log(typeof data, data);
// your handler
return callback()
};
return messager
}
socket // 链接句柄
.pipe(parser)
.pipe(yourhandler())
.pipe(stringifier)
.pipe(socket);
socket.on('close', (msg) => {
console.log('close', msg).toString();
connection.disconnect();
});
socket.on('end', (msg) => {
console.log('end', msg);
});
});
connection.connect({
port: 8024
});
connection.on('reconnect', function () {
console.log('on reconnect...')
});
connection.on('error', function (err) {
console.log('error:', err);
});
connection.on('disconnect', function (err) {
console.log('disconnect:', err);
});
其中,用户具体的逻辑代码,可以是 yourhandler
函数 _write
里面的一部分,其接收的是一个一个处理好的对象。