API Docs for: 2.2.2
Show:

File: lib/core/riakconnection.js

  1. 'use strict';
  2.  
  3. var net = require('net');
  4. var tls = require('tls');
  5. var events = require('events');
  6. var util = require('util');
  7. var fs = require('fs');
  8. var logger = require('winston');
  9.  
  10. var ByteBuffer = require('bytebuffer');
  11. var StartTls = require('./starttls');
  12. var AuthReq = require('./authreq');
  13. var utils = require('./utils');
  14.  
  15. var rpb = require('../protobuf/riakprotobuf');
  16.  
  17. var DEFAULT_MAX_BUFFER = 2048 * 1024;
  18. var DEFAULT_INIT_BUFFER = 2 * 1024;
  19.  
  20. // TODO FUTURE these are shared with RiakNode
  21. var DEFAULT_CONNECTION_TIMEOUT = 3000;
  22. var DEFAULT_REQUEST_TIMEOUT = 5000;
  23.  
  24. // NB: fixes GH 104
  25. // https://github.com/basho/riak-nodejs-client/issues/104
  26. // TODO FUTURE: remove this when Riak uses Erlang R17 or higher.
  27. var RIAK_R16_CIPHERS = 'DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA:AES128-SHA256:AES128-SHA:AES256-SHA256:AES256-SHA:RC4-SHA';
  28.  
  29. var cid = {};
  30.  
  31. function debugOutputConnectionListeners(name, conn) {
  32. if (logger.debug) {
  33. logger.debug('%s listeners for "close" event: %d', name, conn.listenerCount('close'));
  34. logger.debug('%s listeners for "connect" event: %d', name, conn.listenerCount('connect'));
  35. logger.debug('%s listeners for "data" event: %d', name, conn.listenerCount('data'));
  36. logger.debug('%s listeners for "drain" event: %d', name, conn.listenerCount('drain'));
  37. logger.debug('%s listeners for "end" event: %d', name, conn.listenerCount('end'));
  38. logger.debug('%s listeners for "error" event: %d', name, conn.listenerCount('error'));
  39. logger.debug('%s listeners for "lookup" event: %d', name, conn.listenerCount('lookup'));
  40. logger.debug('%s listeners for "timeout" event: %d', name, conn.listenerCount('timeout'));
  41. }
  42. }
  43.  
  44. /**
  45. * @module Core
  46. */
  47.  
  48. /**
  49. * Provides the RiakConnection class.
  50. * @class RiakConnection
  51. * @constructor
  52. * @param {Object} options - the options to use.
  53. */
  54. function RiakConnection(options) {
  55. events.EventEmitter.call(this);
  56.  
  57. this.remoteAddress = options.remoteAddress;
  58. this.remotePort = options.remotePort;
  59.  
  60. // This is to facilitate debugging
  61. if (!cid[this.remotePort]) {
  62. cid[this.remotePort] = 1;
  63. }
  64. this.name = util.format('[RiakConnection] (%s:%d-%d)',
  65. this.remoteAddress, this.remotePort, cid[this.remotePort]);
  66. cid[this.remotePort]++;
  67.  
  68. if (options.cork) {
  69. this.cork = true;
  70. }
  71.  
  72. if (options.auth) {
  73. this.auth = options.auth;
  74. this.auth.ciphers = RIAK_R16_CIPHERS;
  75. }
  76.  
  77. if (options.healthCheck) {
  78. this.healthCheck = options.healthCheck;
  79. }
  80.  
  81. this.connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
  82. if (options.hasOwnProperty('connectionTimeout')) {
  83. this.connectionTimeout = options.connectionTimeout;
  84. }
  85.  
  86. this.requestTimeout = DEFAULT_REQUEST_TIMEOUT;
  87. if (options.hasOwnProperty('requestTimeout')) {
  88. this.requestTimeout = options.requestTimeout;
  89. }
  90.  
  91. this.maxBufferSize = DEFAULT_MAX_BUFFER;
  92. if (options.hasOwnProperty('maxBufferSize')) {
  93. this.maxBufferSize = options.maxBufferSize;
  94. }
  95.  
  96. var initBufferSize = DEFAULT_INIT_BUFFER;
  97. if (options.hasOwnProperty('initBufferSize')) {
  98. initBufferSize = options.initBufferSize;
  99. }
  100.  
  101. this._emitAndClose = function(evt, evt_args) {
  102. if (!this.closed) {
  103. // NB: this can be useful
  104. // logger.debug("%s emitting '%s' args '%s'", this.name, evt, evt_args);
  105. // NB: RiakNode checks inFlight to re-try command if necessary
  106. // so don't set inFlight to false here, it will be set to false in close()
  107. this.closed = true;
  108. this._connection.end();
  109. this.emit(evt, this, evt_args);
  110. this.close();
  111. }
  112. };
  113.  
  114. this._connHandleEnd = function () {
  115. logger.debug('%s handling "end" event', this.name);
  116. this._emitAndClose('connectionClosed');
  117. };
  118.  
  119. this._connHandleTimeout = function (command) {
  120. var err = util.format("%s command '%s' timed out (in-flight: %s)",
  121. this.name, command.name, this.inFlight);
  122. if (logger.debug) {
  123. logger.debug(err);
  124. }
  125. this._emitAndClose('connectionClosed');
  126. };
  127.  
  128. this._clearSocketTimeout = function() {
  129. if (this._connection) {
  130. if (this._boundConnectionTimeout) {
  131. this._connection.removeListener('timeout', this._boundConnectionTimeout);
  132. this._boundConnectionTimeout = null;
  133. }
  134. this._connection.setTimeout(0);
  135. }
  136. };
  137.  
  138. // buffer is private
  139. var buffer = null;
  140.  
  141. // private buffer functions
  142. function initBuffer(data) {
  143. // Create a new buffer to receive data if needed
  144. if (buffer === null) {
  145. buffer = new ByteBuffer(initBufferSize);
  146. }
  147. buffer.append(data);
  148. buffer.flip();
  149. }
  150.  
  151. function getProtobufsFromBuffer(protobufArray) {
  152. if (arguments.length === 0) {
  153. protobufArray = [];
  154. }
  155.  
  156. if (buffer.remaining() >= 4) {
  157. buffer.mark();
  158. var messageLength = buffer.readUint32();
  159.  
  160. // See if we have the complete message
  161. if (buffer.remaining() >= messageLength) {
  162. // We have a complete message from riak
  163. var slice = buffer.slice(undefined, buffer.offset + messageLength);
  164. var code = slice.readUint8();
  165.  
  166. // Our fun API does some creative things like ... returning only
  167. // a code, with 0 bytes following. In those cases we want to set
  168. // decoded to null.
  169. var decoded = null;
  170. if (messageLength - 1 > 0) {
  171. var ResponseProto = rpb.getProtoFor(code);
  172. // GH issue #45
  173. // Must use 'true' as argument to force copy of data
  174. // otherwise, subsequent fetches will clobber data
  175. decoded = ResponseProto.decode(slice.toBuffer(true));
  176. }
  177.  
  178. protobufArray[protobufArray.length] = { msgCode : code, protobuf : decoded };
  179. // skip past message in buffer
  180. buffer.skip(messageLength);
  181. // recursively call this until we are out of messages
  182. return getProtobufsFromBuffer(protobufArray);
  183. } else {
  184. // rewind the offset
  185. buffer.reset();
  186. }
  187. }
  188.  
  189. // ByteBuffer's 'flip()' effectively clears the buffer which we don't
  190. // want. We want to flip while preserving anything in the buffer and
  191. // compact if necessary.
  192.  
  193. var newOffset = buffer.remaining();
  194. // Compact if necessary
  195. if (newOffset > 0 && buffer.offset !== 0) {
  196. buffer.copyTo(buffer, 0);
  197. }
  198. buffer.offset = newOffset;
  199. buffer.limit = buffer.capacity();
  200.  
  201. return protobufArray;
  202. }
  203.  
  204. function closeBuffer() {
  205. if (buffer) {
  206. buffer.clear();
  207. buffer = null;
  208. }
  209. }
  210.  
  211. // protected buffer functions
  212. this._closeBuffer = function () {
  213. closeBuffer();
  214. };
  215.  
  216. this._resetBuffer = function () {
  217. if (buffer && buffer.capacity() > this.maxBufferSize) {
  218. closeBuffer();
  219. }
  220. };
  221.  
  222. this._buildProtobufArray = function (data) {
  223. initBuffer(data);
  224. return getProtobufsFromBuffer();
  225. };
  226.  
  227. // protected execute functions
  228. this._executeInit = function() {
  229. this.lastUsed = Date.now();
  230. this.executeDone();
  231. };
  232.  
  233. this._executeStart = function(command) {
  234. this.command = command;
  235. logger.debug('%s execute command:', this.name, command.name);
  236. this.inFlight = true;
  237. this.lastUsed = Date.now();
  238. };
  239.  
  240. this._executeInit();
  241.  
  242. this.closed = false;
  243. this._connectedEmitted = false;
  244.  
  245. this._connection = new net.Socket();
  246. if (this._connection.setKeepAlive) {
  247. this._connection.setKeepAlive(true, 0);
  248. }
  249. if (this._connection.setNoDelay) {
  250. this._connection.setNoDelay(true);
  251. }
  252.  
  253. // Note: useful for debugging event issues
  254. /*
  255. debugOutputConnectionListeners(this.name, this._connection);
  256. this.setMaxListeners(1);
  257. this._connection.setMaxListeners(1);
  258. */
  259.  
  260. if (this.cork && !this._connection.cork) {
  261. logger.warn('%s wanted to use cork/uncork but not supported!', this.name);
  262. this.cork = false;
  263. } else {
  264. logger.debug('%s using cork() / uncork()', this.name);
  265. }
  266. }
  267.  
  268. util.inherits(RiakConnection, events.EventEmitter);
  269.  
  270. RiakConnection.prototype.connect = function() {
  271. this._boundConnectionError = this._connectionError.bind(this);
  272. this._connection.on('error', this._boundConnectionError);
  273.  
  274. // This *is* the read/write timeout as well as idle timeout
  275. // https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback
  276. this._boundConnectionTimeout = this._connectionTimeout.bind(this);
  277. this._connection.setTimeout(this.connectionTimeout, this._boundConnectionTimeout);
  278.  
  279. this._boundConnected = this._connected.bind(this);
  280. this._connection.connect(this.remotePort, this.remoteAddress, this._boundConnected);
  281. };
  282.  
  283. RiakConnection.prototype._connected = function() {
  284. logger.debug('%s connected', this.name);
  285. this._connection.removeListener('connect', this._boundConnected);
  286. this._boundConnected = null;
  287.  
  288. this._connection.removeListener('error', this._boundConnectionError);
  289. this._boundConnectionError = this._socketError.bind(this);
  290. this._connection.on('error', this._boundConnectionError);
  291.  
  292. this._connection.on('end', this._connHandleEnd.bind(this));
  293. this._connection.on('data', this._receiveData.bind(this));
  294.  
  295. if (this.auth) {
  296. /*
  297. * NB: at this point, we have not yet emitted the 'connected' event,
  298. * so listeners will not have yet registered for 'connectionClosed'.
  299. * This is why the 'close' event must raise 'connectFailed' via
  300. * _boundConnectionError
  301. */
  302. this._connection.on('close', this._boundConnectionError);
  303. this._boundResponseReceived = this._receiveStartTls.bind(this);
  304. this.on('responseReceived', this._boundResponseReceived);
  305. var command = new StartTls();
  306. this.execute(command);
  307. } else if (this.healthCheck) {
  308. // NB: see above comment re: 'close' event
  309. this._connection.on('close', this._boundConnectionError);
  310. this._boundResponseReceived = this._receiveHealthCheck.bind(this);
  311. this.on('responseReceived', this._boundResponseReceived);
  312. this.execute(this.healthCheck);
  313. } else {
  314. this._clearSocketTimeout();
  315. this._connection.on('close', this._connClosed.bind(this));
  316. logger.debug('%s emit connected, no-auth', this.name);
  317. this._connectedEmitted = true;
  318. this.emit('connected', this);
  319. }
  320. };
  321.  
  322. RiakConnection.prototype._connectionError = function(err) {
  323. this._emitAndClose('connectFailed', err);
  324. };
  325.  
  326. RiakConnection.prototype._connectionTimeout = function(err) {
  327. if (!err) {
  328. err = 'timed out or other error trying to connect';
  329. }
  330. this._connectionError(err);
  331. };
  332.  
  333. RiakConnection.prototype._socketError = function(err) {
  334. // This is only called if we have an error after a successful connection
  335. // log only because close will be called right after
  336. // https://nodejs.org/api/net.html#net_event_error
  337. if (err) {
  338. logger.error('%s _socketError:', this.name);
  339. }
  340. };
  341.  
  342. RiakConnection.prototype._receiveHealthCheck = function(conn, command, code, decoded) {
  343. // NB: this function is similar to _responseReceived in RiakNode
  344. logger.debug('%s receive health check response', this.name);
  345. this.executeDone();
  346. this.removeListener('responseReceived', this._boundResponseReceived);
  347. this._boundResponseReceived = null;
  348. this._connection.removeListener('close', this._boundConnectionError);
  349. this._boundConnectionError = null;
  350. this._connection.on('close', this._connClosed.bind(this));
  351.  
  352. var self = this;
  353. function onError(err) {
  354. logger.error(err.msg);
  355. self._connectionError(err.msg);
  356. }
  357. function onSuccess() {
  358. logger.debug('%s health check, emit connected', self.name);
  359. self._connectedEmitted = true;
  360. self.emit('connected', self);
  361. }
  362. var data = {
  363. conn: conn,
  364. command: command,
  365. code: code,
  366. decoded: decoded,
  367. shouldCallback: true
  368. };
  369. utils.handleRiakResponse(data, onError, onSuccess);
  370. };
  371.  
  372. RiakConnection.prototype._receiveStartTls = function(conn, command, code, decoded) {
  373. // NB: this function is similar to _responseReceived in RiakNode
  374. logger.debug('%s receive StartTls response', this.name);
  375. this.executeDone();
  376. this.removeListener('responseReceived', this._boundResponseReceived);
  377. this._boundResponseReceived = null;
  378.  
  379. var self = this;
  380. function onError(err) {
  381. logger.error(err.msg);
  382. self._connectionError(err.msg);
  383. }
  384. function onSuccess() {
  385. var tls_secure_context = tls.createSecureContext(self.auth);
  386. var tls_socket_options = {
  387. isServer: false, // NB: required
  388. secureContext: tls_secure_context
  389. };
  390.  
  391. self._connection.removeListener('error', self._boundConnectionError);
  392. self._connection = new tls.TLSSocket(self._connection, tls_socket_options);
  393. // NB: *must* re-register for data event!
  394. self._connection.on('data', self._receiveData.bind(self));
  395. self._connection.on('error', self._boundConnectionError);
  396.  
  397. // NB: this is necessary since we have not yet emitted the 'connected' event
  398. // in which case the execute method would set up the timeout
  399. // _receiveData clears the timeout as well as the bound method
  400. self._boundConnectionTimeout = self._connectionTimeout.bind(self);
  401. self._connection.setTimeout(self.connectionTimeout, self._boundConnectionTimeout);
  402.  
  403. var auth_options = {
  404. user: self.auth.user,
  405. password: self.auth.password
  406. };
  407.  
  408. // On responseReceived event, move to next sequence in TLS negotiation
  409. self._boundResponseReceived = self._receiveAuthResp.bind(self);
  410. self.on('responseReceived', self._boundResponseReceived);
  411.  
  412. // Execute AuthReq command
  413. /*
  414. if (logger.debug) {
  415. debugOutputConnectionListeners(self.name, self._connection);
  416. }
  417. */
  418. var command = new AuthReq(auth_options);
  419. self.execute(command);
  420. }
  421. var data = {
  422. conn: conn,
  423. command: command,
  424. code: code,
  425. decoded: decoded,
  426. shouldCallback: true
  427. };
  428. utils.handleRiakResponse(data, onError, onSuccess);
  429. };
  430.  
  431. RiakConnection.prototype._receiveAuthResp = function(conn, command, code, decoded) {
  432. logger.debug('%s receive RpbAuthResp', this.name);
  433. this.executeDone();
  434. this.removeListener('responseReceived', this._boundResponseReceived);
  435. this._boundResponseReceived = null;
  436. this._connection.removeListener('close', this._boundConnectionError);
  437. this._boundConnectionError = null;
  438. this._connection.on('close', this._connClosed.bind(this));
  439.  
  440. var self = this;
  441. function onError(err) {
  442. logger.error(err.msg);
  443. self._connectionError(err.msg);
  444. }
  445. function onSuccess() {
  446. logger.debug('%s emit connected, with-auth', self.name);
  447. self._connectedEmitted = true;
  448. self.emit('connected', self);
  449. }
  450. var data = {
  451. conn: conn,
  452. command: command,
  453. code: code,
  454. decoded: decoded,
  455. shouldCallback: true
  456. };
  457. utils.handleRiakResponse(data, onError, onSuccess);
  458. };
  459.  
  460. RiakConnection.prototype._receiveData = function(data) {
  461. var protobufArray = this._buildProtobufArray(data);
  462. for (var i = 0; i < protobufArray.length; i++) {
  463. this._clearSocketTimeout();
  464. this.emit('responseReceived', this,
  465. this.command, protobufArray[i].msgCode, protobufArray[i].protobuf);
  466. }
  467. };
  468.  
  469. // TODO FUTURE: what does "had_error" really mean?
  470. RiakConnection.prototype._connClosed = function(had_error) {
  471. this._emitAndClose('connectionClosed');
  472. };
  473.  
  474. RiakConnection.prototype.close = function() {
  475. this.closed = true;
  476. this.executeDone();
  477. this.removeAllListeners();
  478. this._closeBuffer();
  479. if (this._connection) {
  480. this._connection.end();
  481. this._connection.removeAllListeners();
  482. this._connection.on('error', function (err) {
  483. if (err) {
  484. logger.error('%s error AFTER close:', this.name, err);
  485. }
  486. });
  487. this._connection = null;
  488. }
  489. logger.debug('%s closed', this.name);
  490. };
  491.  
  492. RiakConnection.prototype.executeDone = function() {
  493. this.inFlight = false;
  494. this.command = {
  495. name: 'no-command'
  496. };
  497. this._resetBuffer();
  498. };
  499.  
  500. // command includes user callback
  501. RiakConnection.prototype.execute = function(command) {
  502. if (this.inFlight === true) {
  503. logger.error('%s attempted to run command "%s" on in-use connection',
  504. this.name, command.name);
  505. return false;
  506. }
  507.  
  508. this._executeStart(command);
  509.  
  510. // write PB to socket
  511. var message = command.getRiakMessage();
  512.  
  513. /*
  514. * NB: only bind to 'timeout' if 'connected' event has been emitted.
  515. * Initial connection, health check and starting TLS bind 'timeout'
  516. * to a handler that will raise 'connectFailed' on timeout
  517. */
  518. if (this._connectedEmitted) {
  519. if (this._boundConnectionTimeout) {
  520. this._connection.removeListener('timeout', this._boundConnectionTimeout);
  521. }
  522. this._boundConnectionTimeout = this._connHandleTimeout.bind(this, command);
  523. this._connection.setTimeout(this.requestTimeout, this._boundConnectionTimeout);
  524. }
  525.  
  526. /*
  527. * Use of cork()/uncork() suggested by Doug Luce
  528. * https://github.com/dougluce
  529. * https://github.com/basho/riak-nodejs-client/pull/56
  530. * https://github.com/basho/riak-nodejs-client/pull/57
  531. */
  532. if (this.cork) {
  533. this._connection.cork();
  534. }
  535.  
  536. this._connection.write(message.header);
  537.  
  538. if (message.protobuf) {
  539. this._connection.write(message.protobuf);
  540. }
  541.  
  542. if (this.cork) {
  543. this._connection.uncork();
  544. }
  545.  
  546. return true;
  547. };
  548.  
  549. module.exports = RiakConnection;
  550.