Skip to content

Commit

Permalink
feat: implement streams for client
Browse files Browse the repository at this point in the history
Closes: #42
  • Loading branch information
o-rumiantsev committed Dec 31, 2018
1 parent 05c9945 commit 430684e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
Empty file removed justtest.js
Empty file.
35 changes: 21 additions & 14 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const fs = require('./fs-interface');
const ERR_TARGET_DIR_IS_REQUIRED = 'Target directory is required';

const onMessage = Symbol('onMessage');
const onStream = Symbol('onStream');
const onSync = Symbol('onSync');

// Client class used to synchronize local directory with remote one,
Expand Down Expand Up @@ -37,6 +38,7 @@ class Client extends EventEmitter {
this.connection.on('error', err => this.emit('error', err));
this.connection.on('close', () => this.emit('close'));
this.connection.on('message', this[onMessage].bind(this));
this.connection.on('stream', this[onStream].bind(this));
this.connection.transport.on('connect', () => this.emit('connect'));
this.connection.transport.connect({ port, host });
}
Expand Down Expand Up @@ -75,31 +77,35 @@ class Client extends EventEmitter {
// event - <string>, type of event
// e. g. 'sync', 'inspect' - Server events
// 'create', 'delete', 'change' - Watcher events
// type - <string>, type of entity: 'dir' or 'file'
// path - <string>, path to entity
// data - <Buffer> | <string[]> | <Object[]>,
// changed data | directory structure | fileSubsystem
[onMessage](message) {
const { event, type, path } = message;
let { data } = message;
const { event, type, path, data } = message;
const callback = err => {
if (err) this.emit('error', err);
};
const fullPath = path ?
fs.replacePath(path, this.targetDir, this.sourceDir) : this.targetDir;

if (data && data.type === 'Buffer') data = fs.postprocess(data);

if (event === 'error') callback(new Error(data));
else if (event === 'sync') this[onSync](data);
else if (event === 'inspect') this.emit('inspect', data);
else if (event === 'remove') fs.remove[type](fullPath, callback);
else if (event === 'update') fs.update(fullPath, data, callback);
else if (event === 'create') {
else if (event === 'create') fs.createDir(fullPath, callback);
else if (event === 'remove')
type === 'file' ?
fs.create.file(fullPath, data, callback) :
fs.create.dir(fullPath, callback);
}
fs.removeFile(fullPath, callback) : fs.removeDir(fullPath, callback);
}

// Handles server streams
// stream - <Readable>
// info - <Object>, additional info
// path - <string>, file path
[onStream](stream, info) {
if (info.preloading) return;
const { path } = info;
const fullPath = path ?
fs.replacePath(path, this.targetDir, this.sourceDir) : this.targetDir;
const writable = fs.writable(fullPath);
stream.pipe(writable);
}

// Builds remote synced directory`s file subsystem
Expand All @@ -109,9 +115,10 @@ class Client extends EventEmitter {
// children - <Object[]>, subdirectories, same structure as fileSubsystem
[onSync](fileSubsystem) {
this.sourceDir = fileSubsystem.path;
fs.postprocessTree(fileSubsystem);
const streams = this.connection.streams;
fs.buildFileSubsystem(
fileSubsystem,
streams,
this.targetDir,
this.sourceDir,
err => {
Expand Down
4 changes: 2 additions & 2 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Server extends EventEmitter {
if (event === 'update') {
info.type = 'file';
info.path = dirent;
} else if (dirent) {
} else {
info.type = dirent.isFile() ? 'file' : 'dir';
info.path = dirent.name;
}
Expand Down Expand Up @@ -171,7 +171,7 @@ class Server extends EventEmitter {
const streamFiles = node => {
node.files = node.files.map(p => {
const stream = readable(p);
return [p, connection.stream(stream)];
return [p, connection.stream(stream, { preloading: true })];
});
node.children.forEach(streamFiles);
};
Expand Down

0 comments on commit 430684e

Please sign in to comment.