Vana Blog

Node.js Design Pattern - 스트림 코딩(2)

May 24, 2019

스트림을 사용한 비동기 제어 흐름

제어 흐름은 프로그램이 실행 중일 때 함수 호출, 명령문 및 명령문이 실행되거나 평가되는 순서이다. 송신 컴퓨터가 수신 컴퓨터보다 빠르기 때문에 데이터 통신에 흐름 제어가 필요하다. 데이터를 너무 빨리 보내면 오류가 발생하거나 데이터가 손실된다.

스트림 작업을 위한 Through2와 from2(stream2)

through2는 Transform스트림의 성성을 단순화 한다. through2를 사용하면 간단한 함수를 호출하여 새로운 Transform 스트림을 만들 수 있다.

const transform = through2([options], [_transform], [_flush])

비슷한 방법으로 from2를 사용하면 다음과 같은 코드를 사용하여 쉽고 간결하게 Readable 스트림을 만들 수 있다.

const readable = from2([options], _read)

순차 실행

기본적으로 스트림은 순차적으로 데이터를 처리한다. 비동기 작업을 순차적으로 처리하는데 스트림을 어떻게 사용할 수 있는지 예제.

const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');
function concatFiles(destination, files, callback) {
const destStream = fs.createWriteStream(destination);
fromArray.obj(files) // from2-array를 사용하여 파일 배열에서 Readable 스트림을 만든다.
.pipe(through.obj((file, enc, done) => {
// 순차적으로 파일을 처리하기 위해 through(Transform)스트림을 생성.
// 각 파일에 대해 Readable 스트림을 만들고, 이를 출력 파일을 나타내는 destStream으로 연결(pipe)
// pipe 옵션으로 {end:false}를 정의함으로써 소스 파일의 읽기를 완료한 후에도 destStream을 닫지 않도록 한다.
const src = fs.createReadStream(file);
src.pipe(destStream, {end: false});
src.on('end', done) // 모든 내용이 destStream으로 전달 되면 through에 done함수를 호출하여 현재 처리가 완료됨을 알림.
}))
.on('finish', () => {
// 모든 파일이 처리되면 finish 이벤트 시작. destStream을 종료하고 concatFiles()의 callback()함수 호출하여 완료.
destStream.end();
callback();
});
}
module.exports = concatFiles;
view raw concatFile.js hosted with ❤ by GitHub

위의 예제를 다음과 같이 실행할 수 있다. 모듈 실행

const concatFiles = require('./concatFiles);
concatFiles(process.argv[2], process.argv.slice(3), () => {
  console.log('Files concatenated successfully');
})

스트림 또는 스트림 조합을 사용하여 일련의 비동기 작업을 순차적으로 쉽게 반복할 수 있다.

비순차 병렬 실행

순차로 처리하는 것은 때때로 Node.js 동시성을 최대한 활용하지 못하기 때문에 병목 현상이 있을 수 있다. 모든 데이터 덩어리들에 대해 느린 비동기 작업을 실행해야 하는 경우, 실행을 병렬화하고 전체 프로세스의 속도를 높이는 것이 유리할 수 있다. 이 패턴은 각각의 데이터 덩어리들이 서로 관계가 없는 경우에만 적용될 수 있다. 병렬 스트림은 데이터가 처리되는 순서가 중요한 경우에는 사용할 수 없다.

const stream = require('stream');
class ParallelStream extends stream.Transform {
constructor(userTransform) {
super({objectMode: true});
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this),
this._onComplete.bind(this));
done(); // userTransform이 완료되는걸 기다리지 않고 바로 호출함으로써 다른 항목의 처리를 시작할 수 있다.
}
_flush(done) { // 스트림이 끝나기 직전에 호출된다.
if(this.running> 0) { // 실행중인 작업이 있을 경우 done콜백을 호출하지 않도록 하여 finish 이벤트의 발생을 보류.
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) { // 비동기 작업이 완료될 때마다 호출.
this.running--;
if(err) {
return this.emit('error', err);
}
if(this.running === 0) { // 싫행 중인 작업이 없으면 스트림을 종료시키고 _flush에서 보류된 이벤트 발생.
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = ParallelStream;
view raw paralelStream.js hosted with ❤ by GitHub
주의해야 할 점은 항목들을 받은 순서대로 보존하지 않는다.

URL 상태 모니터링 어플리케이션의 구현.

const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');
fs.createReadStream(process.argv[2]) //입력으로 주어진 파일로부터 Readable 스트림 생성
.pipe(split()) //각각의 라인을 서로 다른 데이터 덩어리로 출력하는 Transform스트림인 split을 통해 입력 파일의 내용을 연결(pipe)한다.
.pipe(new ParallelStream((url, enc, push, done) => {
//ParallelStream을 사용하여 요청 헤더를 보내고 응답을 기다려 URL을 검사한다.
// 콜백이 호출될 때 작업 결과를 스트림으로 밀어낸다.
if(!url) return done();
request.head(url, (err, response) => {
push(url + ' is ' + (err? 'down' : 'up') + '\n');
done();
});
}))
.pipe(fs.createWriteStream('results.txt')) //모든 결과가 results.txt파일에 파이프 된다.
.on('finish', () => console.log('All urls were checked'));
view raw checkUrls.js hosted with ❤ by GitHub
Transform 스트림 split 결과가 작성되는 순서가 URL이 파일에 기록된 순서와 다를 확률이 높다.

제한된 비순차 병렬 실행

위의 예제를 수천 또는 수백만 개의 URL이 포함된 파일에 대해 사용하게 되면 한 번에 감당할 수 없는 연결을 한꺼번에 생성하고 데이터를 동시에 보냄으로써 어플리케이션에 문제를 줄 수 있다. 부하와 리소스 사용을 제어하는 방법은 병렬 작업의 동시 실행을 제한하는 것이다. paralelStream를 다음과 같이 동시 실행 제한을 주어 변경함.

const stream = require('stream');
class LimitedParallelStream extends stream.Transform {
constructor(concurrency, userTransform) { // 동시 실행 제한을 입력받음.
super({objectMode: true});
this.concurrency = concurrency;
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null; // _flush 메소드
this.continueCallback = null; // _transform 메소드
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this._onComplete.bind(this));
if(this.running < this.concurrency) { // done을 호출하기 전에 예비 슬롯이 남아있는지 확인.
done();
} else { // 최대 동시 실행 스트림 수에 도달한 경우
this.continueCallback = done;
}
}
_flush(done) {
if(this.running> 0) {
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
const tmpCallback = this.continueCallback;
this.continueCallback = null;
tmpCallback && tmpCallback(); // 작업이 완료될 때마다 스트림의 차단을 해제할 continueCallback호출.
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = LimitedParallelStream;

순차 병렬 실행

위의 병렬 예제들은 순서를 지키지 않았지만 순서를 지키면서도 transform함수를 병렬로 실행하는 방법이 있다. 이것은 각 작업에 의해 발생한 데이터들을 정렬하는 것이다. 이런 패키지가 through2-paralllel이다. 이를 재사용 한 예제는 다음과 같다.

const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');
fs.createReadStream(process.argv[2])
.pipe(split())
.pipe(throughParallel.obj({concurrency:2}, (url, enc, done) => { // 동시 실행 제한을 지정할 수 있음.
if(!url) return done();
request.head(url, (err, response) => {
push(url + ' is ' + (err? 'down' : 'up') + '\n');
done();
});
}))
.pipe(fs.createWriteStream('results.txt'))
.on('finish', () => console.log('All urls were checked'));
view raw through2-parallel.js hosted with ❤ by GitHub
입력 파일에 기록된 것과 같은 순서로 결과가 나오는 것을 확인할 수 있다.


Vana Yun

Written by Vana Yun