- 'use strict';
- var net = require('net');
- var tls = require('tls');
- var events = require('events');
- var util = require('util');
- var fs = require('fs');
- var logger = require('winston');
- var ByteBuffer = require('bytebuffer');
- var StartTls = require('./starttls');
- var AuthReq = require('./authreq');
- var utils = require('./utils');
- var rpb = require('../protobuf/riakprotobuf');
- var DEFAULT_MAX_BUFFER = 2048 * 1024;
- var DEFAULT_INIT_BUFFER = 2 * 1024;
- // TODO FUTURE these are shared with RiakNode
- // NB: fixes GH 104
- // https://github.com/basho/riak-nodejs-client/issues/104
- // TODO FUTURE: remove this when Riak uses Erlang R17 or higher.
- var cid = {};
- function debugOutputConnectionListeners(name, conn) {
- if (logger.debug) {
- logger.debug('%s listeners for "close" event: %d', name, conn.listenerCount('close'));
- logger.debug('%s listeners for "connect" event: %d', name, conn.listenerCount('connect'));
- logger.debug('%s listeners for "data" event: %d', name, conn.listenerCount('data'));
- logger.debug('%s listeners for "drain" event: %d', name, conn.listenerCount('drain'));
- logger.debug('%s listeners for "end" event: %d', name, conn.listenerCount('end'));
- logger.debug('%s listeners for "error" event: %d', name, conn.listenerCount('error'));
- logger.debug('%s listeners for "lookup" event: %d', name, conn.listenerCount('lookup'));
- logger.debug('%s listeners for "timeout" event: %d', name, conn.listenerCount('timeout'));
- }
- }
- /**
- * @module Core
- */
- /**
- * Provides the RiakConnection class.
- * @class RiakConnection
- * @constructor
- * @param {Object} options - the options to use.
- */
- function RiakConnection(options) {
- events.EventEmitter.call(this);
- this.remoteAddress = options.remoteAddress;
- this.remotePort = options.remotePort;
- // This is to facilitate debugging
- if (!cid[this.remotePort]) {
- cid[this.remotePort] = 1;
- }
- this.name = util.format('[RiakConnection] (%s:%d-%d)',
- this.remoteAddress, this.remotePort, cid[this.remotePort]);
- cid[this.remotePort]++;
- if (options.cork) {
- this.cork = true;
- }
- if (options.auth) {
- this.auth = options.auth;
- this.auth.ciphers = RIAK_R16_CIPHERS;
- }
- if (options.healthCheck) {
- this.healthCheck = options.healthCheck;
- }
- this.connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
- if (options.hasOwnProperty('connectionTimeout')) {
- this.connectionTimeout = options.connectionTimeout;
- }
- this.requestTimeout = DEFAULT_REQUEST_TIMEOUT;
- if (options.hasOwnProperty('requestTimeout')) {
- this.requestTimeout = options.requestTimeout;
- }
- this.maxBufferSize = DEFAULT_MAX_BUFFER;
- if (options.hasOwnProperty('maxBufferSize')) {
- this.maxBufferSize = options.maxBufferSize;
- }
- var initBufferSize = DEFAULT_INIT_BUFFER;
- if (options.hasOwnProperty('initBufferSize')) {
- initBufferSize = options.initBufferSize;
- }
- this._emitAndClose = function(evt, evt_args) {
- if (!this.closed) {
- // NB: this can be useful
- // logger.debug("%s emitting '%s' args '%s'", this.name, evt, evt_args);
- // NB: RiakNode checks inFlight to re-try command if necessary
- // so don't set inFlight to false here, it will be set to false in close()
- this.closed = true;
- this._connection.end();
- this.emit(evt, this, evt_args);
- this.close();
- }
- };
- this._connHandleEnd = function () {
- logger.debug('%s handling "end" event', this.name);
- this._emitAndClose('connectionClosed');
- };
- this._connHandleTimeout = function (command) {
- var err = util.format("%s command '%s' timed out (in-flight: %s)",
- this.name, command.name, this.inFlight);
- if (logger.debug) {
- logger.debug(err);
- }
- this._emitAndClose('connectionClosed');
- };
- this._clearSocketTimeout = function() {
- if (this._connection) {
- if (this._boundConnectionTimeout) {
- this._connection.removeListener('timeout', this._boundConnectionTimeout);
- this._boundConnectionTimeout = null;
- }
- this._connection.setTimeout(0);
- }
- };
- // buffer is private
- var buffer = null;
- // private buffer functions
- function initBuffer(data) {
- // Create a new buffer to receive data if needed
- if (buffer === null) {
- buffer = new ByteBuffer(initBufferSize);
- }
- buffer.append(data);
- buffer.flip();
- }
- function getProtobufsFromBuffer(protobufArray) {
- if (arguments.length === 0) {
- protobufArray = [];
- }
- if (buffer.remaining() >= 4) {
- buffer.mark();
- var messageLength = buffer.readUint32();
- // See if we have the complete message
- if (buffer.remaining() >= messageLength) {
- // We have a complete message from riak
- var slice = buffer.slice(undefined, buffer.offset + messageLength);
- var code = slice.readUint8();
- // Our fun API does some creative things like ... returning only
- // a code, with 0 bytes following. In those cases we want to set
- // decoded to null.
- var decoded = null;
- if (messageLength - 1 > 0) {
- var ResponseProto = rpb.getProtoFor(code);
- // GH issue #45
- // Must use 'true' as argument to force copy of data
- // otherwise, subsequent fetches will clobber data
- decoded = ResponseProto.decode(slice.toBuffer(true));
- }
- protobufArray[protobufArray.length] = { msgCode : code, protobuf : decoded };
- // skip past message in buffer
- buffer.skip(messageLength);
- // recursively call this until we are out of messages
- return getProtobufsFromBuffer(protobufArray);
- } else {
- // rewind the offset
- buffer.reset();
- }
- }
- // ByteBuffer's 'flip()' effectively clears the buffer which we don't
- // want. We want to flip while preserving anything in the buffer and
- // compact if necessary.
- var newOffset = buffer.remaining();
- // Compact if necessary
- if (newOffset > 0 && buffer.offset !== 0) {
- buffer.copyTo(buffer, 0);
- }
- buffer.offset = newOffset;
- buffer.limit = buffer.capacity();
- return protobufArray;
- }
- function closeBuffer() {
- if (buffer) {
- buffer.clear();
- buffer = null;
- }
- }
- // protected buffer functions
- this._closeBuffer = function () {
- closeBuffer();
- };
- this._resetBuffer = function () {
- if (buffer && buffer.capacity() > this.maxBufferSize) {
- closeBuffer();
- }
- };
- this._buildProtobufArray = function (data) {
- initBuffer(data);
- return getProtobufsFromBuffer();
- };
- // protected execute functions
- this._executeInit = function() {
- this.lastUsed = Date.now();
- this.executeDone();
- };
- this._executeStart = function(command) {
- this.command = command;
- logger.debug('%s execute command:', this.name, command.name);
- this.inFlight = true;
- this.lastUsed = Date.now();
- };
- this._executeInit();
- this.closed = false;
- this._connectedEmitted = false;
- this._connection = new net.Socket();
- if (this._connection.setKeepAlive) {
- this._connection.setKeepAlive(true, 0);
- }
- if (this._connection.setNoDelay) {
- this._connection.setNoDelay(true);
- }
- // Note: useful for debugging event issues
- /*
- debugOutputConnectionListeners(this.name, this._connection);
- this.setMaxListeners(1);
- this._connection.setMaxListeners(1);
- */
- if (this.cork && !this._connection.cork) {
- logger.warn('%s wanted to use cork/uncork but not supported!', this.name);
- this.cork = false;
- } else {
- logger.debug('%s using cork() / uncork()', this.name);
- }
- }
- util.inherits(RiakConnection, events.EventEmitter);
- RiakConnection.prototype.connect = function() {
- this._boundConnectionError = this._connectionError.bind(this);
- this._connection.on('error', this._boundConnectionError);
- // This *is* the read/write timeout as well as idle timeout
- // https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback
- this._boundConnectionTimeout = this._connectionTimeout.bind(this);
- this._connection.setTimeout(this.connectionTimeout, this._boundConnectionTimeout);
- this._boundConnected = this._connected.bind(this);
- this._connection.connect(this.remotePort, this.remoteAddress, this._boundConnected);
- };
- RiakConnection.prototype._connected = function() {
- logger.debug('%s connected', this.name);
- this._connection.removeListener('connect', this._boundConnected);
- this._boundConnected = null;
- this._connection.removeListener('error', this._boundConnectionError);
- this._boundConnectionError = this._socketError.bind(this);
- this._connection.on('error', this._boundConnectionError);
- this._connection.on('end', this._connHandleEnd.bind(this));
- this._connection.on('data', this._receiveData.bind(this));
- if (this.auth) {
- /*
- * NB: at this point, we have not yet emitted the 'connected' event,
- * so listeners will not have yet registered for 'connectionClosed'.
- * This is why the 'close' event must raise 'connectFailed' via
- * _boundConnectionError
- */
- this._connection.on('close', this._boundConnectionError);
- this._boundResponseReceived = this._receiveStartTls.bind(this);
- this.on('responseReceived', this._boundResponseReceived);
- var command = new StartTls();
- this.execute(command);
- } else if (this.healthCheck) {
- // NB: see above comment re: 'close' event
- this._connection.on('close', this._boundConnectionError);
- this._boundResponseReceived = this._receiveHealthCheck.bind(this);
- this.on('responseReceived', this._boundResponseReceived);
- this.execute(this.healthCheck);
- } else {
- this._clearSocketTimeout();
- this._connection.on('close', this._connClosed.bind(this));
- logger.debug('%s emit connected, no-auth', this.name);
- this._connectedEmitted = true;
- this.emit('connected', this);
- }
- };
- RiakConnection.prototype._connectionError = function(err) {
- this._emitAndClose('connectFailed', err);
- };
- RiakConnection.prototype._connectionTimeout = function(err) {
- if (!err) {
- err = 'timed out or other error trying to connect';
- }
- this._connectionError(err);
- };
- RiakConnection.prototype._socketError = function(err) {
- // This is only called if we have an error after a successful connection
- // log only because close will be called right after
- // https://nodejs.org/api/net.html#net_event_error
- if (err) {
- logger.error('%s _socketError:', this.name);
- }
- };
- RiakConnection.prototype._receiveHealthCheck = function(conn, command, code, decoded) {
- // NB: this function is similar to _responseReceived in RiakNode
- logger.debug('%s receive health check response', this.name);
- this.executeDone();
- this.removeListener('responseReceived', this._boundResponseReceived);
- this._boundResponseReceived = null;
- this._connection.removeListener('close', this._boundConnectionError);
- this._boundConnectionError = null;
- this._connection.on('close', this._connClosed.bind(this));
- var self = this;
- function onError(err) {
- logger.error(err.msg);
- self._connectionError(err.msg);
- }
- function onSuccess() {
- logger.debug('%s health check, emit connected', self.name);
- self._connectedEmitted = true;
- self.emit('connected', self);
- }
- var data = {
- conn: conn,
- command: command,
- code: code,
- decoded: decoded,
- shouldCallback: true
- };
- utils.handleRiakResponse(data, onError, onSuccess);
- };
- RiakConnection.prototype._receiveStartTls = function(conn, command, code, decoded) {
- // NB: this function is similar to _responseReceived in RiakNode
- logger.debug('%s receive StartTls response', this.name);
- this.executeDone();
- this.removeListener('responseReceived', this._boundResponseReceived);
- this._boundResponseReceived = null;
- var self = this;
- function onError(err) {
- logger.error(err.msg);
- self._connectionError(err.msg);
- }
- function onSuccess() {
- var tls_secure_context = tls.createSecureContext(self.auth);
- var tls_socket_options = {
- isServer: false, // NB: required
- secureContext: tls_secure_context
- };
- self._connection.removeListener('error', self._boundConnectionError);
- self._connection = new tls.TLSSocket(self._connection, tls_socket_options);
- // NB: *must* re-register for data event!
- self._connection.on('data', self._receiveData.bind(self));
- self._connection.on('error', self._boundConnectionError);
- // NB: this is necessary since we have not yet emitted the 'connected' event
- // in which case the execute method would set up the timeout
- // _receiveData clears the timeout as well as the bound method
- self._boundConnectionTimeout = self._connectionTimeout.bind(self);
- self._connection.setTimeout(self.connectionTimeout, self._boundConnectionTimeout);
- var auth_options = {
- user: self.auth.user,
- password: self.auth.password
- };
- // On responseReceived event, move to next sequence in TLS negotiation
- self._boundResponseReceived = self._receiveAuthResp.bind(self);
- self.on('responseReceived', self._boundResponseReceived);
- // Execute AuthReq command
- /*
- if (logger.debug) {
- debugOutputConnectionListeners(self.name, self._connection);
- }
- */
- var command = new AuthReq(auth_options);
- self.execute(command);
- }
- var data = {
- conn: conn,
- command: command,
- code: code,
- decoded: decoded,
- shouldCallback: true
- };
- utils.handleRiakResponse(data, onError, onSuccess);
- };
- RiakConnection.prototype._receiveAuthResp = function(conn, command, code, decoded) {
- logger.debug('%s receive RpbAuthResp', this.name);
- this.executeDone();
- this.removeListener('responseReceived', this._boundResponseReceived);
- this._boundResponseReceived = null;
- this._connection.removeListener('close', this._boundConnectionError);
- this._boundConnectionError = null;
- this._connection.on('close', this._connClosed.bind(this));
- var self = this;
- function onError(err) {
- logger.error(err.msg);
- self._connectionError(err.msg);
- }
- function onSuccess() {
- logger.debug('%s emit connected, with-auth', self.name);
- self._connectedEmitted = true;
- self.emit('connected', self);
- }
- var data = {
- conn: conn,
- command: command,
- code: code,
- decoded: decoded,
- shouldCallback: true
- };
- utils.handleRiakResponse(data, onError, onSuccess);
- };
- RiakConnection.prototype._receiveData = function(data) {
- var protobufArray = this._buildProtobufArray(data);
- for (var i = 0; i < protobufArray.length; i++) {
- this._clearSocketTimeout();
- this.emit('responseReceived', this,
- this.command, protobufArray[i].msgCode, protobufArray[i].protobuf);
- }
- };
- // TODO FUTURE: what does "had_error" really mean?
- RiakConnection.prototype._connClosed = function(had_error) {
- this._emitAndClose('connectionClosed');
- };
- RiakConnection.prototype.close = function() {
- this.closed = true;
- this.executeDone();
- this.removeAllListeners();
- this._closeBuffer();
- if (this._connection) {
- this._connection.end();
- this._connection.removeAllListeners();
- this._connection.on('error', function (err) {
- if (err) {
- logger.error('%s error AFTER close:', this.name, err);
- }
- });
- this._connection = null;
- }
- logger.debug('%s closed', this.name);
- };
- RiakConnection.prototype.executeDone = function() {
- this.inFlight = false;
- this.command = {
- name: 'no-command'
- };
- this._resetBuffer();
- };
- // command includes user callback
- RiakConnection.prototype.execute = function(command) {
- if (this.inFlight === true) {
- logger.error('%s attempted to run command "%s" on in-use connection',
- this.name, command.name);
- return false;
- }
- this._executeStart(command);
- // write PB to socket
- var message = command.getRiakMessage();
- /*
- * NB: only bind to 'timeout' if 'connected' event has been emitted.
- * Initial connection, health check and starting TLS bind 'timeout'
- * to a handler that will raise 'connectFailed' on timeout
- */
- if (this._connectedEmitted) {
- if (this._boundConnectionTimeout) {
- this._connection.removeListener('timeout', this._boundConnectionTimeout);
- }
- this._boundConnectionTimeout = this._connHandleTimeout.bind(this, command);
- this._connection.setTimeout(this.requestTimeout, this._boundConnectionTimeout);
- }
- /*
- * Use of cork()/uncork() suggested by Doug Luce
- * https://github.com/dougluce
- * https://github.com/basho/riak-nodejs-client/pull/56
- * https://github.com/basho/riak-nodejs-client/pull/57
- */
- if (this.cork) {
- this._connection.cork();
- }
- this._connection.write(message.header);
- if (message.protobuf) {
- this._connection.write(message.protobuf);
- }
- if (this.cork) {
- this._connection.uncork();
- }
- return true;
- };
- module.exports = RiakConnection;