[Electron] 13.멀티스레딩 구현 with worker(2) - 레시피 일렉트론 앱

개요

  1. Worker thread가 되어줄 script를 작성한다.
  2. Worker를 생성하고 통신해줄 Class를 작성하고, 각종 리스너를 설정한다.
  3. Worker에 맞춰서 IPC및 DB 쿼리문 호출 로직을 수정한다.

Worker를 통한 검색 로직

기존에는 Renderer 단에서 검색을 요청할 시, Main에서 바로 Sqlite DB의 쿼리문을 호출 해 결과를 보냈다. Worker를 추가하게 되면 Main에서 DB 대신 Worker로 검색 결과를 요청하게 된다.

  1. Worker는 RcpIPC 인스턴스가 Main에서 생성될 때 함께 생성되고 대기 중에 있다.
  2. Worker는 RcpSqliteDB 인스턴스를 생성한다.
  3. Renderer 단에서 검색 요청 시, Main은 요청을 바로 Worker thread로 전달한다.
  4. Worker는 생성한 DB 인스턴스를 통해 쿼리문을 호출하여 처리하고 결과를 Main으로 반환한다.
  5. Main은 리스너를 통해 Worker로 부터 받은 검색 결과를 Renderer로 송신한다.

Worker Script

Worker thread가 되어줄 javascript(worker.js)를 작성한다. 해당 js 파일은 설정에 따라 main.js 처럼 번들링되어 webpack에 패키징된다.

  • src/main/main.js → .webpack/main/index.js
  • src/main/worker/worker.js → .webpack/main/worker.js
//worker.js
const { parentPort, workerData } = require('worker_threads');

import RcpSqliteDB from '../rcp_sqlite_db';

const { dbPath } = workerData;
let db = null;

function open() {
  try {
    console.log('worker db open');
    db = new RcpSqliteDB(dbPath);
    db.open();

  } catch (error) {
    console.log('db open in worker: ' + error);
  }

}

function chooseQuery(msgId, data) {
  let dbResult;
  let replyId;

  switch (msgId) {
    case 'selectRcpByKeyword':
      let keyword = data.keyword;
      let column = data.column;
      let offset = data.offset;

      dbResult = db.selectRcpByKeyword(keyword, column, offset);
      replyId = 'resp_searchRcpList'
      break;

    case 'selectRcpImageFileById':
      //data == id
      dbResult = db.selectRcpImageFileById(data);
      replyId = 'resp_rcpImageSrc'
      break;

    case 'selectRcpZipFileById':
      let id = data.id;
      let type = data.type;
      dbResult = db.selectRcpZipFileById(id);

      if (type=='download') {
        replyId = 'resp_rcpZipFile'
      } else {
        replyId = 'resp_rcpViewByDecompress';
      }

      break;
  }

  let result = {
    replyId: replyId,
    dbResult: dbResult
  }

  return result;
}

parentPort.on('message', (_workerData) => {
  const { msgId, data } = _workerData;

  try {
    if (data.keyword == 'worker_error') {
      if (process.env.NODE_ENV != 'production') {
        parentPort.error(); // forced error. (test)
      }
    }

    if (msgId == 'close') {
      db.close();
      parentPort.close();

      return;
    }

    let workerResult = chooseQuery(msgId, data);

    parentPort.postMessage(workerResult);

  } catch (e) {
    console.log('[worker] Catch Error in Worker: ');
    console.log(e);
    parentPort.close();
  }
});

open();

process.on('exit', (code) => {
  console.log(`[worker] Exit event in worker.`);
  db.close();
});

Worker thread(worker.js)와 Main process(main.js) 간의 통신은 IPC 통신, Iframe 통신과 유사하게 이뤄진다.

 

open()

  • Main에서 Worker를 생성할 때, workerData로 넘겨받은 dbPath를 통해 인스턴스를 생성하고, DB와 접속한다.

parentPort.on('message', (_workerData)⇒{ … } )

  • Main(ParentPort)에서 Worker로 보내는 요청을 수신하여 처리하는 리스너.
  • _workerData를 통해 어떤 요청인지 확인하고, 검색 요청일시 chooseQuery() 함수를 호출하여 검색 결과를 얻는다.
  • 얻은 검색결과를 다시 Main으로 송신한다.

chooseQuery(msgId, data)

  • msgId를 기준으로 쿼리문을 선택해 호출하고 결과를 반환한다.

process.on('exit', (code) => { … } )

  • Worker 종료시에 실행되는 리스너
  • DB도 함께 종료한다.

RcpWorker

Worker thread가 되어줄 Script의 경로를 지정하여 Worker 인스턴스를 생성하는 클래스. Worker 생성 외에도 Worker에서 메시지를 수신하여 처리하는 리스너와 Worker가 종료되었을 때의 예외처리를 수행하는 리스너를 다룬다.

//rcp_worker.js
const { nativeImage, app, dialog } = require('electron');
const path = require('path');
const fs = require('fs');
const { Worker } = require('worker_threads');

import RcpDecompress from './rcp_decompress';
import RcpSetting from '../rcp_setting';

export default class RcpWorker {
  mainWindow;
  modalWindow;

  worker;
  workerOnListener;
  workerExitListener;

  constructor(_mainWindow, _modalWindow) {
    this.mainWindow = _mainWindow;
    this.modalWindow = _modalWindow;

    this.setWorker();
    this.setWorkerOnListener();
    this.setWorkerExitListener();
  }

  setWorker() {
    //webpack path
    let workerPath = path.resolve(__dirname, './worker.js');
    let _dbPath = RcpSetting.getDatabasePath();

    console.log('Create new worker');
    const dbPathObj = { dbPath: _dbPath };
    this.worker = new Worker(workerPath, { workerData: dbPathObj });
  }

  setWorkerOnListener() {
    this.workerOnListener = this.worker.on('message', (workerResult) => {

      if (workerResult.replyId == 'resp_rcpImageSrc') {
        let imageResult = workerResult.dbResult[0];
        imageResult.image_file = nativeImage.createFromBuffer(imageResult.image_file).toDataURL();

        this.mainWindow.webContents.postMessage(workerResult.replyId, imageResult);

        return;

      } else if (workerResult.replyId == 'resp_rcpZipFile') {
        let zipFileResult = workerResult.dbResult[0];
        this.showSaveAsRcpZipFile(zipFileResult);

        return;

      } else if (workerResult.replyId == 'resp_rcpViewByDecompress') {
        let zipFileResult = workerResult.dbResult[0];
        let rcpDecompress = new RcpDecompress();
        rcpDecompress.decompressRecipeZipFile(zipFileResult, this.viewRcpDetail, this.modalWindow);
      }

      this.mainWindow.webContents.postMessage(workerResult.replyId, workerResult.dbResult);
    });
  }

  setWorkerExitListener() {
    this.exitListener = this.worker.on('exit', (code) => {
      console.log(`IPC: Worker exited with code ${code}`);
      this.worker = null;
      this.onListener = null;
      this.exitListener = null;

      // Code value to determine whether to go to the main page. 
      // 1 : call of terminate()
      // 0 : anything else
      if (code == 1) {
        console.log('terminate!!! create new worker and listener');
        return;
      }

      let timer = setInterval(() => {
        dialog.showMessageBox(this.mainWindow, { type: 'error', title: 'Error', message: 'An unknown error occurred in program. \\nReturn to main window. ' })
          .then(() => {
            this.mainWindow.loadURL(MAIN_WINDOW_WEBPACK_ENTRY);
          });
        clearInterval(timer);
      }, 500);

    });

  }

  postMessageToWorker(msgId, data) {
    if (this.worker == null) {
      console.log('This worker and listenr are null. create new worker and listener');
      this.setWorker();
      this.setWorkerOnListener();
      this.setWorkerExitListener();
    }

    const workerData = {
      msgId: msgId,
      data: data
    }

    this.worker.postMessage(workerData);
  }

...

}

 

constructor(_mainWindow, _modalWindow)

  • 요청한 주체에 따라 Window를 선택해 응답을 송신해야 하므로, 생성자에서 window를 인자로 받는다.
  • 인스턴스 생성과 동시에 Worker를 생성하고, 리스너들을 등록한다.

setWorker()

  • workrer.js의 경로와 DB 경로를 인자로 worker 인스턴스를 생성한다.
  • worker.js의 경로는 현재 sourece 기준이 아닌 번들링된 worker.js를 기준으로 지정한다. (설정 파일 참고)

setWorkerOnListener()

  • Worker에서 보내는 메시지를 수신하여 처리하는 리스너를 설정한다.
  • 응답 ID (replyId)에 따라 알맞은 로직을 수행한다. (이미지, 다운로드, 압축 해제, 단순 검색)

setWorkerExitListener()

  • Worker의 종료를 감지하여 예외처리를 해주는 리스너를 설정한다.

postMessageToWorker(msgId, data)

  • Worker로 메시지를 송신한다
  • Worker가 종료되어 없을 시에는 새롭게 생성하고 리스너를 설정하도록 한다.

RcpIPC

이미 생성한 RcpIPC 클래스를 수정한다.

//rcp_ipc.js
const { ipcMain} = require('electron');
import RcpWorker from './rcp_worker';

export default class RcpIPC {
  mainWindow;
  modalWindow;
  rcpWorker;

  constructor(_mainWindow, _modalWindow) {
    this.mainWindow = _mainWindow;
    this.modalWindow = _modalWindow;
    this.rcpWorker = new RcpWorker(_mainWindow, _modalWindow);
  }

...

  addSearchRcpList() {
    ipcMain.on('req_searchRcpList', (event, keyword, column, pageNum) => {
      try {
        keyword = keyword.trim();
        let offset = (pageNum > 1) ? (pageNum - 1) * 10 : 0;

        let data = {
          keyword: keyword,
          column: column,
          offset: offset
        };

        this.rcpWorker.postMessageToWorker('selectRcpByKeyword', data);

      } catch (error) {
        console.log(error);
        event.reply('resp_searchRcpList', 'error');
      }

    });

  }

}

생성자에서 RcpWorker 인스턴스를 생성하도록 한다. 또한 Renderer 단에서 요청을 수신하고, 바로 DB의 쿼리문을 호출하여 결과를 응답하던 부분을 수정한다. DB의 쿼리문을 직접 호출하지 않고, Worker로 데이터를 송신하기만 한다.

 

constructor(_mainWindow, _modalWindow)

  • _db 를 인자로 받아 설정하던 로직을 삭제한다.
  • worker 인스턴스를 생성한다.

그 외 함수들

  • DB의 쿼리문을 호출하고 결과를 받아 응답하던 부분을 수정한다.
  • DB 쿼리문 대신 RcpWorker 인스턴스의 postMessageToWorker()를 호출하여 요청을 Worker로 전달한다.

RcpSqliteDB

RcpSetting 클래스에 저장되어있는 DB 경로를 가져오도록 하여 접속하는 로직을 수정한다.

//rcp_sqlite_db.js
import DatabaseConstructor, { Database } from 'better-sqlite3';
const log = require('electron-log');
const fs = require('fs');

export default class RcpSqliteDB {
  db = null;
  dbPath;
  is_open = false;

  constructor(_dbPath) {
    this.dbPath = _dbPath;
  }

  open() {
    let existDb = fs.existsSync(this.dbPath);

    if (existDb) {
      this.db = new DatabaseConstructor(this.dbPath);

      this.db.prepare("SELECT * FROM tb_recipe LIMIT 1").all();

      this.is_open = true;
    }

    console.log('db is open: ' + this.dbPath);

    return this.is_open;
  }

...

}

RcpSetting 클래스에 저장되어있던 DB 경로 대신 생성자의 인자로 경로를 받아 DB와 접속한다.

Worker 사용

이제 앱에서 검색을 실행하면 관련 로직은 모두 Worker thread에서 처리한다. 겉으로 볼 때 변한 것은 없지만, 더 큰 대용량의 데이터 및 CPU 작업을 실행할 때의 속도는 확실하게 차이나며, 처리할 동안 앱에서 다른 작업도 수행할 수 있다. (창 이동, 최소화, 최대화 등)