阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

N-API使用Libuv线程池解决Node.js不适合处理耗时操作问题

48次阅读
没有评论

共计 6556 个字符,预计需要花费 17 分钟才能阅读完成。

导读 Node.js 不适合处理耗时操作是一直存在的问题,为此 Node.js 提供了三种解决方案。子进程、子线程、Libuv 线程池, 前两种是开发效率比较高的,因为我们只需要写 js。但是也有些缺点. 执行 js 的成本, 虽然可以间接使用 Libuv 线程池,但是受限于 Node.js 提供的 API, 无法利用 c /c++ 层提供的解决方案 (内置或业界的)。

N-API 使用 Libuv 线程池解决 Node.js 不适合处理耗时操作问题

我们可以尝试第三种解决方案。直接通过 N -API 使用 Libuv 线程池。下面我们看看这么做。N-API 提供了几个 API。

napi_create_async_work // 创建一个 worr,但是还没有执行 
napi_delete_async_work // 释放上面创建的 work 的内存 
napi_queue_async_work // 往 Libuv 提交一个 work 
napi_cancel_async_work // 取消 Libuv 中的任务,如果已经在执行则无法取消 

接下来我们看看如何通过 N -API 使用 Libuv 线程池。首先看看 js 层。

const {submitWork} = require('./build/Release/test.node'); 
submitWork((sum) => {console.log(sum) 
})

js 提交一个任务,然后传入一个回调。接着看看 N -API 的代码。

napi_value Init(napi_env env, napi_value exports) { 
  napi_value func; 
  napi_create_function(env, 
                      NULL, 
                      NAPI_AUTO_LENGTH, 
                      submitWork, 
                      NULL, 
                      &func); 
  napi_set_named_property(env, exports, "submitWork", func); 
  return exports; 
} 
 
NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)

首先定义导出的函数,接着看核心逻辑。

定义一个结构体保存上下文
struct info 
{ 
  int sum; // 保存计算结果 
  napi_ref func; // 保存回调 
  napi_async_work worker; // 保存 work 对象 
};
提交任务到 Libuv
static napi_value submitWork(napi_env env, napi_callback_info info) { 
  napi_value resource_name; 
  napi_status status; 
   
  size_t argc = 1; 
  napi_value args[1]; 
  struct info data = {0, nullptr, nullptr}; 
  struct info * ptr = &data; 
  status = napi_get_cb_info(env, info, &argc, args, NULL, NULL); 
  if (status != napi_ok) {goto done;} 
  napi_create_reference(env, args[0], 1, &ptr->func); 
  status = napi_create_string_utf8(env,"test", NAPI_AUTO_LENGTH, &resource_name); 
  if (status != napi_ok) {goto done;} 
  // 创建一个 work,ptr 保存的上下文会在 work 函数和 done 函数里使用 
  status = napi_create_async_work(env, nullptr, resource_name, work, done, (void *) ptr, &ptr->worker); 
  if (status != napi_ok) {goto done;} 
  // 提及 work 到 Libuv 
  status = napi_queue_async_work(env, ptr->worker); 
 
  done:  
    napi_value ret; 
    napi_create_int32(env, status == napi_ok ? 0 : -1, &ret); 
    return  ret; 
}

执行上面的函数,任务就会被提交到 Libuv 线程池了。

Libuv 子线程执行任务
void work(napi_env env, void* data) {struct info *arg = (struct info *)data; 
  printf("doing...\n"); 
  int sum = 0; 
  for (int i = 0; i sum = sum;}

很简单,计算几个数。并且保存结果。

回调 js
void done(napi_env env, napi_status status, void* data) {struct info *arg = (struct info *)data; 
  if (status == napi_cancelled) {printf("cancel..."); 
  } else if (status == napi_ok) {printf("done...\n"); 
    napi_value callback; 
    napi_value global;   
    napi_value result; 
    napi_value sum; 
    // 拿到结果 
    napi_create_int32(env, arg->sum, &sum); 
    napi_get_reference_value(env, arg->func, &callback); 
    napi_get_global(env, &global); 
    // 回调 js 
    napi_call_function(env, global, callback, 1, &sum, &result); 
    // 清理 
    napi_delete_reference(env, arg->func); 
    napi_delete_async_work(env, arg->worker); 
  } 
}

并且执行后,我们看到输出了 45。接下来我们分析大致的过程。首先我呢看看 ThreadPoolWork,ThreadPoolWork 是对 Libuv work 的封装。

class ThreadPoolWork { 
 public: 
  explicit inline ThreadPoolWork(Environment* env) : env_(env) {CHECK_NOT_NULL(env); 
  } 
  inline virtual ~ThreadPoolWork() = default; 
 
  inline void ScheduleWork(); 
  inline int CancelWork(); 
 
  virtual void DoThreadPoolWork() = 0; 
  virtual void AfterThreadPoolWork(int status) = 0; 
 
  Environment* env() const { return env_;} 
 
 private: 
  Environment* env_; 
  uv_work_t work_req_; 
};

类的定义很简单,主要是封装了 uv_work_t。我们看看每个函数的意义。DoThreadPoolWork 和 AfterThreadPoolWork 是虚函数,由子类实现,我们一会看子类的时候再分析。我们看看 ScheduleWork

void ThreadPoolWork::ScheduleWork() {env_->IncreaseWaitingRequestCounter(); 
  int status = uv_queue_work(env_->event_loop(), 
      &work_req_, 
      // Libuv 子线程里执行的任务函数 
      [](uv_work_t* req) {ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req); 
        self->DoThreadPoolWork();}, 
      // 任务处理完后的回调 
      [](uv_work_t* req, int status) {ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req); 
        self->env_->DecreaseWaitingRequestCounter(); 
        self->AfterThreadPoolWork(status); 
      }); 
  CHECK_EQ(status, 0); 
}

ScheduleWork 是负责给 Libuv 提交任务的函数。接着看看 CancelWork。

int ThreadPoolWork::CancelWork() {return uv_cancel(reinterpret_cast(&work_req_)); 
} 

直接调用 Libuv 的函数取消任务。看完父类,我们看看子类的定义,子类在 N -API 里实现。

class Work : public node::AsyncResource, public node::ThreadPoolWork { 
 private: 
  explicit Work(node_napi_env env, 
                v8::Local<:object> async_resource, 
                v8::Local<:string> async_resource_name, 
                napi_async_execute_callback execute, 
                napi_async_complete_callback complete = nullptr, 
                void* data = nullptr) 
    : AsyncResource(env->isolate, 
                    async_resource, 
                    *v8::String::Utf8Value(env->isolate, async_resource_name)), 
      ThreadPoolWork(env->node_env()), 
      _env(env), 
      _data(data), 
      _execute(execute), 
      _complete(complete) { } 
 
  ~Work() override = default; 
 
 public: 
  static Work* New(node_napi_env env, 
                   v8::Local<:object> async_resource, 
                   v8::Local<:string> async_resource_name, 
                   napi_async_execute_callback execute, 
                   napi_async_complete_callback complete, 
                   void* data) { 
    return new Work(env, async_resource, async_resource_name, 
                    execute, complete, data); 
  } 
  // 释放该类对象的内存 
  static void Delete(Work* work) {delete work;} 
  // 执行用户设置的函数 
  void DoThreadPoolWork() override {_execute(_env, _data); 
  } 
 
  void AfterThreadPoolWork(int status) override { 
   // 执行用户设置的回调 
    _complete(env, ConvertUVErrorCode(status), _data); 
  } 
 
 private: 
  node_napi_env _env; 
  // 用户设置的数据,用于保存执行结果等 
  void* _data; 
  // 执行任务的函数 
  napi_async_execute_callback _execute; 
  // 任务处理完的回调 
  napi_async_complete_callback _complete; 
}; 

在 Work 类我们看到了虚函数 DoThreadPoolWork 和 AfterThreadPoolWork 的实现,没有太多逻辑。最后我们看看 N -API 提供的 API 的实现。

napi_status napi_create_async_work(napi_env env, 
                                   napi_value async_resource, 
                                   napi_value async_resource_name, 
                                   napi_async_execute_callback execute, 
                                   napi_async_complete_callback complete, 
                                   void* data, 
                                   napi_async_work* result) {v8::Local<:context> context = env->context(); 
 
  v8::Local<:object> resource; 
  if (async_resource != nullptr) {CHECK_TO_OBJECT(env, context, resource, async_resource); 
  } else {resource = v8::Object::New(env->isolate); 
  } 
 
  v8::Local<:string> resource_name; 
  CHECK_TO_STRING(env, context, resource_name, async_resource_name); 
 
  uvimpl::Work* work = uvimpl::Work::New(reinterpret_cast(env), 
                                         resource, 
                                         resource_name, 
                                         execute, 
                                         complete, 
                                         data); 
 
  *result = reinterpret_cast(work); 
 
  return napi_clear_last_error(env); 
} 

napi_create_async_work 本质上是对 Work 的简单封装,创建一个 Work 并返回给用户。

napi_delete_async_work
napi_status napi_delete_async_work(napi_env env, napi_async_work work) {CHECK_ENV(env); 
  CHECK_ARG(env, work); 
 
  uvimpl::Work::Delete(reinterpret_cast<:work>(work)); 
 
  return napi_clear_last_error(env); 
} 

napi_delete_async_work 用于任务执行完后释放 Work 对应的内存。

napi_queue_async_work
napi_status napi_queue_async_work(napi_env env, napi_async_work work) {CHECK_ENV(env); 
  CHECK_ARG(env, work); 
 
  napi_status status; 
  uv_loop_t* event_loop = nullptr; 
  status = napi_get_uv_event_loop(env, &event_loop); 
  if (status != napi_ok) 
    return napi_set_last_error(env, status); 
 
  uvimpl::Work* w = reinterpret_cast<:work>(work); 
 
  w->ScheduleWork(); 
 
  return napi_clear_last_error(env); 
} 

napi_queue_async_work 是对 ScheduleWork 的封装,作用是给 Libuv 线程池提交任务。

napi_cancel_async_work
napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {CHECK_ENV(env); 
  CHECK_ARG(env, work); 
 
  uvimpl::Work* w = reinterpret_cast<:work>(work); 
 
  CALL_UV(env, w->CancelWork()); 
 
  return napi_clear_last_error(env); 
} 

napi_cancel_async_work 是对 CancelWork 的封装,即取消 Libuv 线程池的任务。我们看到一层层套,没有太多逻辑,主要是要符合 N -API 的规范。

总结:通过 N -API 提供的 API,使得我们不再受限于 Nod.js 本身提供的一些异步接口 (使用 Libuv 线程池的接口),而是直接使用 Libuv 线程池,这样我们不仅可以自己写 c /c++,还可以复用业界的一些解决方案解决 Node.js 里的一些耗时任务。

仓库:https://github.com/theanarkh/learn-to-write-nodejs-addons

阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

代金券:在阿里云专用满减优惠券

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-07-25发表,共计6556字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中