Реактивное программирование: теория и практика
Введение
Вообще, реактивное программирование — не самая сложная штука. Поэтому я постарался сконцентрироваться на реальном его применении в Node.js, и поэтому здесь будет больше кода, чем текста :)
Что такое реактивное программирование?
Если по-простому, то реактивное программирование — это парадигма программирования, выстроенная вокруг событий и реагирования на них. Было бы гораздо проще показать примеры реактивного программирования с использованием EventEmitter (о нем у меня тоже будет статья, не сомневайтесь), но мне было интереснее разобраться с такой штукой, как Node.js Streams. К тому же, получившийся пример вполне себе применим на практике.
Вообще, Streams обычно используются для чтения и записи буферизированных данных (бинарных файлов и всего такого). Но, подумал я, что, если с помощью него работать с API? :)
В примере мы читаем данные из настоящего API, получая данные порционно (имитируя пагинацию), и сохраняем их в массив. Понятное дело, что никто не мешает нам их обрабатывать и сохранять в базу, да и вообще делать с ними что угодно.
Кому не терпится посмотреть на код, то — вот. Остальных приглашаю разобраться вместе со мной :)
API
Для того, чтобы продемонстрировать чтение из API стримами, я набросал простенькое API на Fastify. Вот оно:
import Fastify, { FastifyInstance, RouteShorthandOptions, RouteHandlerMethod } from 'fastify'
const srv: FastifyInstance = Fastify({})
interface Objects {
[key: string]: number,
}
interface APIResponse {
objects: Objects,
start?: number,
end?: number,
}
interface RequestQuery {
offset?: number,
limit?: number,
}
const opts: RouteShorthandOptions = {
schema: {
querystring: {
type: 'object',
properties: {
offset: { type: 'number'},
limit: {type: 'number'}
},
}
},
}
const handler: RouteHandlerMethod = async (req, _res) => {
const response: APIResponse = {
objects: {}
};
const query = req.query as RequestQuery;
const offset = Number(query?.offset) || 0;
const limit = Number(query?.limit) || 50;
const finalNum = 10000;
const end = (Number(offset) + Number(limit)) < finalNum
? (Number(offset) + Number(limit))
: finalNum;
for (let i = offset; i <= end; i++) {
response.objects[i] = i;
}
response.start = offset;
response.end = end;
return response;
};
srv.get('/', opts, handler);
const start = async () => {
try {
await srv.listen({ port: 3000 });
} catch (err) {
srv.log.error(err);
process.exit(1);
}
}
start();
Как мы видим, оно включает в себя всего один эндпойнт, который может отдать объекты вида [key: string]: number
и поддерживает пагинацию (то есть, мы можем указать, с какого и по какое число нам сгенерировать объекты). Есть и лимит: числа больше 10000
он не отдаст (сделано это для того, чтобы чтение из API всегда было конечно). Все, больше ничего интересного: все самое интересное — в клиенте.
Клиент
Клиент основан на Node.js-стримах и содержит два класса, наследующихся от Readable и Writable соответственно. Тот, что Readable
, порционно читает данные из API, соблюдая пагинацию — а тот, что Writable
, эти данные так же порционнно собирает в массив.
Это немного напоминает очереди сообщений, о которых тоже будет статья. Давайте смотреть код:
import { Writable, Readable } from 'node:stream';
const dbArr: object[] = [];
interface Objects {
[key: string]: number;
}
interface APIResponse {
start: number,
end: number,
objects: Objects,
}
class ApiReadable extends Readable {
private uri: string;
private offset: number;
private limit: number
constructor() {
super({ objectMode: true });
this.uri = 'http://localhost:3000/';
this.offset = 0;
this.limit = 100;
}
async getAPI(): Promise<APIResponse|null> {
const response = await fetch(`${this.uri}?limit=${this.limit}&offset=${this.offset}`);
const result = await response.json();
// кончились данные в API? Вернем null;
if (result?.end && (result.end - result.start) < this.limit ) {
return null;
}
// вернем ответ API в виде JSON
return result;
}
// переопределим встроенный в Readable метод _read
override async _read() {
// заберем нужное количество данных из API
const result = await this.getAPI();
// если данных нет, отдадим null во Writable — это сгенерит событие end
if (!result) {
this.push(null);
return;
} else {
// а если данные есть, пихнем их во Writable
this.push(result.objects);
}
// сместим пагинацию вперед
this.offset = result.end > this.offset ? result.end + 1 : result.end;
}
}
class ObjWritable extends Writable {
private dbArr: object[];
// конструктор принимает архив, в который будем пихать данные, полученные от Readable
constructor(dbArr: object[]) {
super({ highWaterMark: 5, objectMode: true });
this.dbArr = dbArr;
}
writeToObj(chunk: object) {
this.dbArr.push(chunk);
}
// переопределим метод Writable.prototype._write - теперь он пишет в наш массив полученные данные и исполняет свой дефолтный коллбек
override _write(chunk: object, _encoding: string, callback: Function) {
this.writeToObj(chunk);
callback();
}
// то же самое произойдет и с дефолтным методом _writev, который пишет несколько кусков (chunks) данных за раз
override _writev(chunks: object[], callback: Function) {
this.dbArr.push(...chunks);
callback();
}
}
// создадим экземпляры наших классов
const objWritable = new ObjWritable(dbArr);
const apiReadable = new ApiReadable();
apiReadable.pipe(objWritable); // и самое интересное: подпишем Writable на Readable
apiReadable.on('end', () => console.log(dbArr));
С комментами понятнее? Да, там внутри EventEmitter :) но как красиво! Забираем данные из API порционнно, порционно их куда угодно пишем. Обратите внимание на параметр highWaterMark
у Writable
— это количество одновременно хранимых данных. Так можно и rate limiting безо всяких брокеров сообщений соорудить :)
В общем, мне кажется, это отличный и небанальный пример реактивного программирования на Node.js.
Выводы
Вообще, JavaScript и на фронте, и на бэке прекрасно подходит для реализации event-driven архитектуры. И реактивное программирование может быть не какой-то далекой от реальной разработки теорией, а вполне себе применимой штукой.
Stay tuned! Хоть блог и для новичков по большей части, но иногда я буду писать и что-то повеселее :)
Интересный пост?
Вот еще похожие:
- Событийно-ориентированная архитектура и Node.js Events
- Как и зачем писать тесты?
- Функциональное программирование. Что это и зачем?
- Профилирование Node.js-приложений
- Docker: что, зачем и почему