WorkerPool.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. 'use strict';
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. var _child_process = require('child_process');
  6. var _child_process2 = _interopRequireDefault(_child_process);
  7. var _queue = require('neo-async/queue');
  8. var _queue2 = _interopRequireDefault(_queue);
  9. var _mapSeries = require('neo-async/mapSeries');
  10. var _mapSeries2 = _interopRequireDefault(_mapSeries);
  11. var _readBuffer = require('./readBuffer');
  12. var _readBuffer2 = _interopRequireDefault(_readBuffer);
  13. var _WorkerError = require('./WorkerError');
  14. var _WorkerError2 = _interopRequireDefault(_WorkerError);
  15. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  16. const workerPath = require.resolve('./worker'); /* eslint-disable no-console */
  17. let workerId = 0;
  18. class PoolWorker {
  19. constructor(options, onJobDone) {
  20. this.disposed = false;
  21. this.nextJobId = 0;
  22. this.jobs = Object.create(null);
  23. this.activeJobs = 0;
  24. this.onJobDone = onJobDone;
  25. this.id = workerId;
  26. workerId += 1;
  27. this.worker = _child_process2.default.spawn(process.execPath, [].concat(options.nodeArgs || []).concat(workerPath, options.parallelJobs), {
  28. detached: true,
  29. stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe']
  30. });
  31. this.worker.unref();
  32. // This prevents a problem where the worker stdio can be undefined
  33. // when the kernel hits the limit of open files.
  34. // More info can be found on: https://github.com/webpack-contrib/thread-loader/issues/2
  35. if (!this.worker.stdio) {
  36. throw new Error(`Failed to create the worker pool with workerId: ${workerId} and ${''}configuration: ${JSON.stringify(options)}. Please verify if you hit the OS open files limit.`);
  37. }
  38. const [,,, readPipe, writePipe] = this.worker.stdio;
  39. this.readPipe = readPipe;
  40. this.writePipe = writePipe;
  41. this.listenStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
  42. this.readNextMessage();
  43. }
  44. listenStdOutAndErrFromWorker(workerStdout, workerStderr) {
  45. if (workerStdout) {
  46. workerStdout.on('data', this.writeToStdout);
  47. }
  48. if (workerStderr) {
  49. workerStderr.on('data', this.writeToStderr);
  50. }
  51. }
  52. ignoreStdOutAndErrFromWorker(workerStdout, workerStderr) {
  53. if (workerStdout) {
  54. workerStdout.removeListener('data', this.writeToStdout);
  55. }
  56. if (workerStderr) {
  57. workerStderr.removeListener('data', this.writeToStderr);
  58. }
  59. }
  60. writeToStdout(data) {
  61. if (!this.disposed) {
  62. process.stdout.write(data);
  63. }
  64. }
  65. writeToStderr(data) {
  66. if (!this.disposed) {
  67. process.stderr.write(data);
  68. }
  69. }
  70. run(data, callback) {
  71. const jobId = this.nextJobId;
  72. this.nextJobId += 1;
  73. this.jobs[jobId] = { data, callback };
  74. this.activeJobs += 1;
  75. this.writeJson({
  76. type: 'job',
  77. id: jobId,
  78. data
  79. });
  80. }
  81. warmup(requires) {
  82. this.writeJson({
  83. type: 'warmup',
  84. requires
  85. });
  86. }
  87. writeJson(data) {
  88. const lengthBuffer = Buffer.alloc(4);
  89. const messageBuffer = Buffer.from(JSON.stringify(data), 'utf-8');
  90. lengthBuffer.writeInt32BE(messageBuffer.length, 0);
  91. this.writePipe.write(lengthBuffer);
  92. this.writePipe.write(messageBuffer);
  93. }
  94. writeEnd() {
  95. const lengthBuffer = Buffer.alloc(4);
  96. lengthBuffer.writeInt32BE(0, 0);
  97. this.writePipe.write(lengthBuffer);
  98. }
  99. readNextMessage() {
  100. this.state = 'read length';
  101. this.readBuffer(4, (lengthReadError, lengthBuffer) => {
  102. if (lengthReadError) {
  103. console.error(`Failed to communicate with worker (read length) ${lengthReadError}`);
  104. return;
  105. }
  106. this.state = 'length read';
  107. const length = lengthBuffer.readInt32BE(0);
  108. this.state = 'read message';
  109. this.readBuffer(length, (messageError, messageBuffer) => {
  110. if (messageError) {
  111. console.error(`Failed to communicate with worker (read message) ${messageError}`);
  112. return;
  113. }
  114. this.state = 'message read';
  115. const messageString = messageBuffer.toString('utf-8');
  116. const message = JSON.parse(messageString);
  117. this.state = 'process message';
  118. this.onWorkerMessage(message, err => {
  119. if (err) {
  120. console.error(`Failed to communicate with worker (process message) ${err}`);
  121. return;
  122. }
  123. this.state = 'soon next';
  124. setImmediate(() => this.readNextMessage());
  125. });
  126. });
  127. });
  128. }
  129. onWorkerMessage(message, finalCallback) {
  130. const { type, id } = message;
  131. switch (type) {
  132. case 'job':
  133. {
  134. const { data, error, result } = message;
  135. (0, _mapSeries2.default)(data, (length, callback) => this.readBuffer(length, callback), (eachErr, buffers) => {
  136. const { callback: jobCallback } = this.jobs[id];
  137. const callback = (err, arg) => {
  138. if (jobCallback) {
  139. delete this.jobs[id];
  140. this.activeJobs -= 1;
  141. this.onJobDone();
  142. if (err) {
  143. jobCallback(err instanceof Error ? err : new Error(err), arg);
  144. } else {
  145. jobCallback(null, arg);
  146. }
  147. }
  148. finalCallback();
  149. };
  150. if (eachErr) {
  151. callback(eachErr);
  152. return;
  153. }
  154. let bufferPosition = 0;
  155. if (result.result) {
  156. result.result = result.result.map(r => {
  157. if (r.buffer) {
  158. const buffer = buffers[bufferPosition];
  159. bufferPosition += 1;
  160. if (r.string) {
  161. return buffer.toString('utf-8');
  162. }
  163. return buffer;
  164. }
  165. return r.data;
  166. });
  167. }
  168. if (error) {
  169. callback(this.fromErrorObj(error), result);
  170. return;
  171. }
  172. callback(null, result);
  173. });
  174. break;
  175. }
  176. case 'resolve':
  177. {
  178. const { context, request, questionId } = message;
  179. const { data } = this.jobs[id];
  180. data.resolve(context, request, (error, result) => {
  181. this.writeJson({
  182. type: 'result',
  183. id: questionId,
  184. error: error ? {
  185. message: error.message,
  186. details: error.details,
  187. missing: error.missing
  188. } : null,
  189. result
  190. });
  191. });
  192. finalCallback();
  193. break;
  194. }
  195. case 'emitWarning':
  196. {
  197. const { data } = message;
  198. const { data: jobData } = this.jobs[id];
  199. jobData.emitWarning(this.fromErrorObj(data));
  200. finalCallback();
  201. break;
  202. }
  203. case 'emitError':
  204. {
  205. const { data } = message;
  206. const { data: jobData } = this.jobs[id];
  207. jobData.emitError(this.fromErrorObj(data));
  208. finalCallback();
  209. break;
  210. }
  211. default:
  212. {
  213. console.error(`Unexpected worker message ${type} in WorkerPool.`);
  214. finalCallback();
  215. break;
  216. }
  217. }
  218. }
  219. fromErrorObj(arg) {
  220. let obj;
  221. if (typeof arg === 'string') {
  222. obj = { message: arg };
  223. } else {
  224. obj = arg;
  225. }
  226. return new _WorkerError2.default(obj, this.id);
  227. }
  228. readBuffer(length, callback) {
  229. (0, _readBuffer2.default)(this.readPipe, length, callback);
  230. }
  231. dispose() {
  232. if (!this.disposed) {
  233. this.disposed = true;
  234. this.ignoreStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
  235. this.writeEnd();
  236. }
  237. }
  238. }
  239. class WorkerPool {
  240. constructor(options) {
  241. this.options = options || {};
  242. this.numberOfWorkers = options.numberOfWorkers;
  243. this.poolTimeout = options.poolTimeout;
  244. this.workerNodeArgs = options.workerNodeArgs;
  245. this.workerParallelJobs = options.workerParallelJobs;
  246. this.workers = new Set();
  247. this.activeJobs = 0;
  248. this.timeout = null;
  249. this.poolQueue = (0, _queue2.default)(this.distributeJob.bind(this), options.poolParallelJobs);
  250. this.terminated = false;
  251. this.setupLifeCycle();
  252. }
  253. isAbleToRun() {
  254. return !this.terminated;
  255. }
  256. terminate() {
  257. if (this.terminated) {
  258. return;
  259. }
  260. this.terminated = true;
  261. this.poolQueue.kill();
  262. this.disposeWorkers(true);
  263. }
  264. setupLifeCycle() {
  265. process.on('exit', () => {
  266. this.terminate();
  267. });
  268. }
  269. run(data, callback) {
  270. if (this.timeout) {
  271. clearTimeout(this.timeout);
  272. this.timeout = null;
  273. }
  274. this.activeJobs += 1;
  275. this.poolQueue.push(data, callback);
  276. }
  277. distributeJob(data, callback) {
  278. // use worker with the fewest jobs
  279. let bestWorker;
  280. for (const worker of this.workers) {
  281. if (!bestWorker || worker.activeJobs < bestWorker.activeJobs) {
  282. bestWorker = worker;
  283. }
  284. }
  285. if (bestWorker && (bestWorker.activeJobs === 0 || this.workers.size >= this.numberOfWorkers)) {
  286. bestWorker.run(data, callback);
  287. return;
  288. }
  289. const newWorker = this.createWorker();
  290. newWorker.run(data, callback);
  291. }
  292. createWorker() {
  293. // spin up a new worker
  294. const newWorker = new PoolWorker({
  295. nodeArgs: this.workerNodeArgs,
  296. parallelJobs: this.workerParallelJobs
  297. }, () => this.onJobDone());
  298. this.workers.add(newWorker);
  299. return newWorker;
  300. }
  301. warmup(requires) {
  302. while (this.workers.size < this.numberOfWorkers) {
  303. this.createWorker().warmup(requires);
  304. }
  305. }
  306. onJobDone() {
  307. this.activeJobs -= 1;
  308. if (this.activeJobs === 0 && isFinite(this.poolTimeout)) {
  309. this.timeout = setTimeout(() => this.disposeWorkers(), this.poolTimeout);
  310. }
  311. }
  312. disposeWorkers(fromTerminate) {
  313. if (!this.options.poolRespawn && !fromTerminate) {
  314. this.terminate();
  315. return;
  316. }
  317. if (this.activeJobs === 0 || fromTerminate) {
  318. for (const worker of this.workers) {
  319. worker.dispose();
  320. }
  321. this.workers.clear();
  322. }
  323. }
  324. }
  325. exports.default = WorkerPool;