188 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			188 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
'use strict';
 | 
						|
 | 
						|
var Queue = require('double-ended-queue');
 | 
						|
var utils = require('./utils');
 | 
						|
var Command = require('./command');
 | 
						|
 | 
						|
function Multi (client, args) {
 | 
						|
    this._client = client;
 | 
						|
    this.queue = new Queue();
 | 
						|
    var command, tmp_args;
 | 
						|
    if (args) { // Either undefined or an array. Fail hard if it's not an array
 | 
						|
        for (var i = 0; i < args.length; i++) {
 | 
						|
            command = args[i][0];
 | 
						|
            tmp_args = args[i].slice(1);
 | 
						|
            if (Array.isArray(command)) {
 | 
						|
                this[command[0]].apply(this, command.slice(1).concat(tmp_args));
 | 
						|
            } else {
 | 
						|
                this[command].apply(this, tmp_args);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
function pipeline_transaction_command (self, command_obj, index) {
 | 
						|
    // Queueing is done first, then the commands are executed
 | 
						|
    var tmp = command_obj.callback;
 | 
						|
    command_obj.callback = function (err, reply) {
 | 
						|
        // Ignore the multi command. This is applied by node_redis and the user does not benefit by it
 | 
						|
        if (err && index !== -1) {
 | 
						|
            if (tmp) {
 | 
						|
                tmp(err);
 | 
						|
            }
 | 
						|
            err.position = index;
 | 
						|
            self.errors.push(err);
 | 
						|
        }
 | 
						|
        // Keep track of who wants buffer responses:
 | 
						|
        // By the time the callback is called the command_obj got the buffer_args attribute attached
 | 
						|
        self.wants_buffers[index] = command_obj.buffer_args;
 | 
						|
        command_obj.callback = tmp;
 | 
						|
    };
 | 
						|
    self._client.internal_send_command(command_obj);
 | 
						|
}
 | 
						|
 | 
						|
Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) {
 | 
						|
    if (this.queue.length < 2) {
 | 
						|
        return this.exec_batch(callback);
 | 
						|
    }
 | 
						|
    return this.exec(callback);
 | 
						|
};
 | 
						|
 | 
						|
function multi_callback (self, err, replies) {
 | 
						|
    var i = 0, command_obj;
 | 
						|
 | 
						|
    if (err) {
 | 
						|
        err.errors = self.errors;
 | 
						|
        if (self.callback) {
 | 
						|
            self.callback(err);
 | 
						|
            // Exclude connection errors so that those errors won't be emitted twice
 | 
						|
        } else if (err.code !== 'CONNECTION_BROKEN') {
 | 
						|
            self._client.emit('error', err);
 | 
						|
        }
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    if (replies) {
 | 
						|
        while (command_obj = self.queue.shift()) {
 | 
						|
            if (replies[i] instanceof Error) {
 | 
						|
                var match = replies[i].message.match(utils.err_code);
 | 
						|
                // LUA script could return user errors that don't behave like all other errors!
 | 
						|
                if (match) {
 | 
						|
                    replies[i].code = match[1];
 | 
						|
                }
 | 
						|
                replies[i].command = command_obj.command.toUpperCase();
 | 
						|
                if (typeof command_obj.callback === 'function') {
 | 
						|
                    command_obj.callback(replies[i]);
 | 
						|
                }
 | 
						|
            } else {
 | 
						|
                // If we asked for strings, even in detect_buffers mode, then return strings:
 | 
						|
                replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]);
 | 
						|
                if (typeof command_obj.callback === 'function') {
 | 
						|
                    command_obj.callback(null, replies[i]);
 | 
						|
                }
 | 
						|
            }
 | 
						|
            i++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if (self.callback) {
 | 
						|
        self.callback(null, replies);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
Multi.prototype.exec_transaction = function exec_transaction (callback) {
 | 
						|
    if (this.monitoring || this._client.monitoring) {
 | 
						|
        var err = new RangeError(
 | 
						|
            'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
 | 
						|
        );
 | 
						|
        err.command = 'EXEC';
 | 
						|
        err.code = 'EXECABORT';
 | 
						|
        return utils.reply_in_order(this._client, callback, err);
 | 
						|
    }
 | 
						|
    var self = this;
 | 
						|
    var len = self.queue.length;
 | 
						|
    self.errors = [];
 | 
						|
    self.callback = callback;
 | 
						|
    self._client.cork();
 | 
						|
    self.wants_buffers = new Array(len);
 | 
						|
    pipeline_transaction_command(self, new Command('multi', []), -1);
 | 
						|
    // Drain queue, callback will catch 'QUEUED' or error
 | 
						|
    for (var index = 0; index < len; index++) {
 | 
						|
        // The commands may not be shifted off, since they are needed in the result handler
 | 
						|
        pipeline_transaction_command(self, self.queue.get(index), index);
 | 
						|
    }
 | 
						|
 | 
						|
    self._client.internal_send_command(new Command('exec', [], function (err, replies) {
 | 
						|
        multi_callback(self, err, replies);
 | 
						|
    }));
 | 
						|
    self._client.uncork();
 | 
						|
    return !self._client.should_buffer;
 | 
						|
};
 | 
						|
 | 
						|
function batch_callback (self, cb, i) {
 | 
						|
    return function batch_callback (err, res) {
 | 
						|
        if (err) {
 | 
						|
            self.results[i] = err;
 | 
						|
            // Add the position to the error
 | 
						|
            self.results[i].position = i;
 | 
						|
        } else {
 | 
						|
            self.results[i] = res;
 | 
						|
        }
 | 
						|
        cb(err, res);
 | 
						|
    };
 | 
						|
}
 | 
						|
 | 
						|
Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) {
 | 
						|
    var self = this;
 | 
						|
    var len = self.queue.length;
 | 
						|
    var index = 0;
 | 
						|
    var command_obj;
 | 
						|
    if (len === 0) {
 | 
						|
        utils.reply_in_order(self._client, callback, null, []);
 | 
						|
        return !self._client.should_buffer;
 | 
						|
    }
 | 
						|
    self._client.cork();
 | 
						|
    if (!callback) {
 | 
						|
        while (command_obj = self.queue.shift()) {
 | 
						|
            self._client.internal_send_command(command_obj);
 | 
						|
        }
 | 
						|
        self._client.uncork();
 | 
						|
        return !self._client.should_buffer;
 | 
						|
    }
 | 
						|
    var callback_without_own_cb = function (err, res) {
 | 
						|
        if (err) {
 | 
						|
            self.results.push(err);
 | 
						|
            // Add the position to the error
 | 
						|
            var i = self.results.length - 1;
 | 
						|
            self.results[i].position = i;
 | 
						|
        } else {
 | 
						|
            self.results.push(res);
 | 
						|
        }
 | 
						|
        // Do not emit an error here. Otherwise each error would result in one emit.
 | 
						|
        // The errors will be returned in the result anyway
 | 
						|
    };
 | 
						|
    var last_callback = function (cb) {
 | 
						|
        return function (err, res) {
 | 
						|
            cb(err, res);
 | 
						|
            callback(null, self.results);
 | 
						|
        };
 | 
						|
    };
 | 
						|
    self.results = [];
 | 
						|
    while (command_obj = self.queue.shift()) {
 | 
						|
        if (typeof command_obj.callback === 'function') {
 | 
						|
            command_obj.callback = batch_callback(self, command_obj.callback, index);
 | 
						|
        } else {
 | 
						|
            command_obj.callback = callback_without_own_cb;
 | 
						|
        }
 | 
						|
        if (typeof callback === 'function' && index === len - 1) {
 | 
						|
            command_obj.callback = last_callback(command_obj.callback);
 | 
						|
        }
 | 
						|
        this._client.internal_send_command(command_obj);
 | 
						|
        index++;
 | 
						|
    }
 | 
						|
    self._client.uncork();
 | 
						|
    return !self._client.should_buffer;
 | 
						|
};
 | 
						|
 | 
						|
module.exports = Multi;
 |