手把手教你玩转Node中的集群

javascriptjavascript 2023-08-29 16:26:37 1161
摘要: 本篇文章带大家详细了解一下Node.js中的集群,介绍一下cluster事件,希望对大家有所帮助!一、介绍Node在v0.8时直接引入了cluster模块,用以解决多核CPU的利用率问题,同时也提供了较完善的API,用以处理进程的健壮性问...

本篇文章带大家详细了解一下Node.js中的集群,介绍一下cluster 事件,希望对大家有所帮助!

一、介绍

Node 在 v0.8 时直接引入了 cluster 模块,用以解决多核 CPU 的利用率问题,同时也提供了较完善的 API,用以处理进程的健壮性问题。

cluster 模块调用 fork 方法来创建子进程,该方法与 child_process 中的 fork 是同一个方法。 cluster 模块采用的是经典的主从模型,cluster 会创建一个 master,然后根据你指定的数量复制出多个子进程,可以使用cluster.isMaster 属性判断当前进程是 master 还是 worker (工作进程)。由 master 进程来管理所有的子进程,主进程不负责具体的任务处理,主要工作是负责调度和管理。

cluster 模块使用内置的负载均衡来更好地处理线程之间的压力,该负载均衡使用了 Round-robin 算法(也被称之为循环算法)。当使用 Round-robin 调度策略时,master accepts() 所有传入的连接请求,然后将相应的TCP请求处理发送给选中的工作进程(该方式仍然通过 IPC 来进行通信)。 官方使用实例如下所示

const cluster = require('cluster');
const cpuNums = require('os').cpus().length;
const http = require('http');

if (cluster.isMaster) {
  for (let i = 0; i < cpuNums; i++){
    cluster.fork();
  }
  // 子进程退出监听
  cluster.on('exit', (worker,code,signal) => {
    console.log('worker process died,id',worker.process.pid)
  })
} else {
  // 给子进程标注进程名
  process.title = `cluster 子进程 ${process.pid}`;
  // Worker可以共享同一个 TCP 连接,这里是一个 http 服务器
  http.createServer((req, res)=> {
    res.end(`response from worker ${process.pid}`);
  }).listen(3000);
  console.log(`Worker ${process.pid} started`);
}

其实,cluster 模块由 child_process 和 net 模块的组合应用,cluster 启动时,会在内部启动 TCP 服务器,在 cluster.fork() 子进程时,将这个 TCP 服务器端 socket 的文件描述符发送给工作进程。如果工作进程是通过 cluster.fork() 复制出来的,那么它的环境变量里就存在 NODE_UNIQUE_ID,如果工作进程中存在 listen() 侦听网络端口的调用,它将拿到文件描述符,通过 SO_REUSEADDR 端口重用,从而实现多个子进程共享端口。

二、cluster 事件

  • fork:复制一个工作进程后触发该事件;

  • online:复制好一个工作进程后,工作进程主动发送一条 online 消息给主进程,主进程收到消息后,触发该事件;

  • listening:工作进程中调用 listen() (共享了服务器端 Socket)后,发送一条 listening 消息给主进程,主进程收到消息后,触发该事件;

  • disconnect:主进程和工作进程之间 IPC 通道断开后会触发该事件;

  • exit:有工作进程退出时会触发该事件;

  • setup:cluster.setupMaster() 执行完后触发该事件;

这些事件大多跟 child_process 模块的事件相关,在进程间消息传递的基础上完成的封装。

cluster.on('fork', ()=> {
  console.log('fork 事件... ');
})

cluster.on('online', ()=> {
  console.log('online 事件... ');
})

cluster.on('listening', ()=> {
  console.log('listening 事件... ');
})

cluster.on('disconnect', ()=> {
  console.log('disconnect 事件... ');
})

cluster.on('exit', ()=> {
  console.log('exit 事件... ');
})

cluster.on('setup', ()=> {
  console.log('setup 事件... ');
})

三、master 与 worker 通信

由以上可知,master 进程通过 cluster.fork() 来创建 worker 进程,其实,cluster.fork() 内部是通过 child_process.fork() 来创建子进程。也就是说:master 与 worker 进程是父、子进程的关系;其跟 child_process 创建的父子进程一样是通过 IPC 通道进行通信的。

IPC 的全称是 Inter-Process Communication,即进程间通信,进程间通信的目的是为了让不同的进程能够互相访问资源并进行协调工作。Node 中实现 IPC 通道的是管道(pipe)技术,具体实现由 libuv 提供,在 Windows 下由命名管道(named pipe)实现,*nix 系统则采用 Unix Domain Socket 实现。其变现在应用层上的进程间通信只有简单的 message 事件和 send 方法,使用十分简单。

1.png

父进程在实际创建子进程之前,会创建 IPC 通道并监听它,然后才真正创建出子进程,并通过环境变量(NODE_CHANNEL_FD)告诉子进程这个 IPC 通道的文件描述符。子进程在启动过程中,根据文件描述符去连接这个已存在的 IPC 通道,从而完成父子进程之间的连接。

2.png

建立连接之后的父子进程就可以进行自由通信了。由于 IPC 通道是用命名管道或 Domain Socket 创建的,它们与网络 socket 的行为比较类似,属于双向通信。不同的是它们在系统内核中就完成了进程间的通信,而不用经过实际的网络层,非常高效。在 Node 中,IPC 通道被抽象为 Stream 对象,在调用 send 时发送数据(类似于 write ),接收到的消息会通过 message 事件(类似于 data)触发给应用层。

master 和 worker 进程在 server 实例的创建过程中,是通过 IPC 通道进行通信的,那会不会对我们的开发造成干扰呢?比如,收到一堆其实并不需要关心的消息?答案肯定是不会?那么是怎么做到的呢?

Node 引入进程间发送句柄的功能,send 方法除了能通过 IPC 发送数据外,还能发送句柄,第二个参数为句柄,如下所示

child.send(meeage, [sendHandle])

句柄是一种可以用来标识资源的引用,它的内部包含了指向对象的文件描述符。例如句柄可以用来标识一个服务器端 socket 对象、一个客户端 socket 对象、一个 UDP 套接字、一个管道等。 那么句柄发送跟我们直接将服务器对象发送给子进程有没有什么差别?它是否真的将服务器对象发送给子进程?

其实 send() 方法在将消息发送到 IPC 管道前,将消息组装成两个对象,一个参数是 handle,另一个是 message,message 参数如下所示

{
  cmd: 'NODE_HANDLE',
  type: 'net.Server',
  msg: message
}

发送到 IPC 管道中的实际上是要发送的句柄文件描述符,其为一个整数值。这个 message 对象在写入到 IPC 管道时会通过 JSON.stringify 进行序列化,转化为字符串。子进程通过连接 IPC 通道读取父进程发送来的消息,将字符串通过 JSON.parse 解析还原为对象后,才触发 message 事件将消息体传递给应用层使用。在这个过程中,消息对象还要被进行过滤处理,message.cmd 的值如果以 NODE_ 为前缀,它将响应一个内部事件 internalMessage ,如果 message.cmd 值为 NODE_HANDLE,它将取出 message.type 值和得到的文件描述符一起还原出一个对应的对象。这个过程的示意图如下所示

3.png

在 cluster 中,以 worker 进程通知 master 进程创建 server 实例为例子。worker 伪代码如下:

// woker进程
const message = {
  cmd: 'NODE_CLUSTER',
  type: 'net.Server',
  msg: message
};
process.send(message);

master 伪代码如下:

worker.process.on('internalMessage', fn);

四、如何实现端口共享

在前面的例子中,多个 woker 中创建的 server 监听了同个端口 3000,通常来说,多个进程监听同个端口,系统会报 EADDRINUSE 异常。为什么 cluster 没问题呢?

因为独立启动的进程中,TCP 服务器端 socket 套接字的文件描述符并不相同,导致监听到相同的端口时会抛出异常。但对于 send() 发送的句柄还原出来的服务而言,它们的文件描述符是相同的,所以监听相同端口不会引起异常。

这里需要注意的是,多个应用监听相同端口时,文件描述符同一时间只能被某个进程所用,换言之就是网络请求向服务器端发送时,只有一个幸运的进程能够抢到连接,也就是说只有它能为这个请求进行服务,这些进程服务是抢占式的。

五、如何将请求分发到多个worker

  • 每当 worker 进程创建 server 实例来监听请求,都会通过 IPC 通道,在 master 上进行注册。当客户端请求到达,master 会负责将请求转发给对应的 worker;
  • 具体转发给哪个 worker?这是由转发策略决定的,可以通过环境变量 NODE_CLUSTER_SCHED_POLICY 设置,也可以在 cluster.setupMaster(options) 时传入,默认的转发策略是轮询(SCHED_RR);
  • 当有客户请求到达,master 会轮询一遍 worker 列表,找到第一个空闲的 worker,然后将该请求转发给该worker;

六、pm2 工作原理

pm2 是 node 进程管理工具,可以利用它来简化很多 node 应用管理的繁琐任务,如性能监控、自动重启、负载均衡等。

pm2 自身是基于 cluster 模块进行封装的, 本节我们主要 pm2 的 Satan 进程、God Daemon 守护进程 以及两者之间的进程间远程调用 RPC。

4.png

其中 Satan.js 提供程序的退出、杀死等方法,God.js 负责维持进程的正常运行,God 进程启动后一直运行,相当于 cluster 中的 Master进程,维持 worker 进程的正常运行。

RPC(Remote Procedure Call Protocol)是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A 服务器上,想要调用 B 服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。同一机器不同进程间的方法调用也属于 rpc 的作用范畴。 执行流程如下所示

5.png

每次命令行的输入都会执行一次 satan 程序,如果 God 进程不在运行,首先需要启动 God 进程。然后根据指令,Satan 通过 rpc 调用 God 中对应的方法执行相应的逻辑。

pm2 start app.js -i 4 为例,God 在初次执行时会配置 cluster,同时监听 cluster 中的事件:

// 配置cluster
cluster.setupMaster({
  exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js')
});

// 监听cluster事件
(function initEngine() {
  cluster.on('online', function(clu) {
    // worker进程在执行
    God.clusters_db[clu.pm_id].status = 'online';
  });
  
  // 命令行中 kill pid 会触发exit事件,process.kill不会触发exit
  cluster.on('exit', function(clu, code, signal) {
    // 重启进程 如果重启次数过于频繁直接标注为stopped
    God.clusters_db[clu.pm_id].status = 'starting';
    // 逻辑
    // ...
  });
})();

在 God 启动后, 会建立 Satan 和 God 的rpc链接,然后调用 prepare 方法,prepare 方法会调用 cluster.fork 来完成集群的启动

God.prepare = function(opts, cb) {
  // ...
  return execute(opts, cb);
};

function execute(env, cb) {
  // ...
  var clu = cluster.fork(env);
  // ...
  God.clusters_db[id] = clu;
  
  clu.once('online', function() {
    God.clusters_db[id].status = 'online';
    if (cb) return cb(null, clu);
    return true;
  });
  return clu;
}

七、总结

更多node相关知识,请访问:nodejs 教程!!