API Docs for: 2.2.2
Show:

File: lib/commands/ts/store.js

'use strict';

var util = require('util');
var Joi = require('joi');
var logger = require('winston');

var ByteBuffer = require('bytebuffer');

var CommandBase = require('../commandbase');
var tsdata = require('./data');
var tsutils = require('./utils');
var utils = require('../../utils');

var rpb = require('../../protobuf/riakprotobuf');
var TsColumnType = rpb.getProtoFor('TsColumnType');
var TsColumnDescription = rpb.getProtoFor('TsColumnDescription');
var TsRow = rpb.getProtoFor('TsRow');
var TsCell = rpb.getProtoFor('TsCell');

/**
 * Provides the Store class, its builder, and its response.
 * @module TS
 */

/**
 * Command used to store timeseries data in Riak.
 *
 * As a convenience, a builder class is provided:
 *
 *      var storeValue = new Store.Builder()
 *          .withTable('table')
 *          .withColumns(cols)
 *          .withRows(rows)
 *          .build();
 *
 * See {{#crossLink "Store.Builder"}}Store.Builder{{/crossLink}}
 *
 * @class Store
 * @constructor
 * @param {Object} options The options for this command.
 * @param {String} options.table The timeseries table in Riak.
 * @param {Object[]} options.columns The timeseries columns in Riak.
 * @param {Object[]} options.rows The timeseries rows in Riak.
 * @param {Function} callback The allback to be executed when the operation completes.
 * @param {String} callback.err An error message. Will be null if no error.
 * @param {Object} callback.response Will be either true or false.
 * @param {Object} callback.data additional error data. Will be null if no error.
 * @extends CommandBase
 */
function Store(options, callback) {
    CommandBase.call(this, 'TsPutReq', 'TsPutResp', callback);
    this.validateOptions(options, schema);
}

util.inherits(Store, CommandBase);

Store.prototype.constructPbRequest = function() {
    var protobuf = this.getPbReqBuilder();
    protobuf.setTable(new Buffer(this.options.table));

    var i, ct;
    if (this.options.columns) {
        // Use column descriptions to populate cells
        var columnTypes = [];
        for (i = 0; i < this.options.columns.length; i++) {
            var col = this.options.columns[i];

            var tscd = new TsColumnDescription();
            tscd.setName(new Buffer(col.name));

            ct = col.type;
            columnTypes.push(ct);
            tscd.setType(ct);

            protobuf.columns.push(tscd);
        }

        for (i = 0; i < this.options.rows.length; i++) {
            var tsr = new TsRow();
            var row = this.options.rows[i];
            for (var j = 0; j < row.length; j++) {
                var tsc = new TsCell();
                ct = columnTypes[j];
                var cell = row[j];
                if (!utils.isNullOrUndefined(cell)) {
                    switch (ct) {
                        case TsColumnType.VARCHAR:
                            if (Buffer.isBuffer(cell)) {
                                tsc.setVarcharValue(cell);
                            } else if (ByteBuffer.isByteBuffer(cell)) {
                                tsc.setVarcharValue(cell.toBuffer());
                            } else {
                                tsc.setVarcharValue(new Buffer(cell));
                            }
                            break;
                        case TsColumnType.SINT64:
                            // NB: must be convertible to Long to serialize as sint64
                            tsc.setSint64Value(cell);
                            break;
                        case TsColumnType.DOUBLE:
                            tsc.setDoubleValue(cell);
                            break;
                        case TsColumnType.TIMESTAMP:
                            // NB: must be convertible to Long to serialize as sint64
                            var val = tsutils.maybeConvertToLong(cell);
                            tsc.setTimestampValue(val);
                            break;
                        case TsColumnType.BOOLEAN:
                            tsc.setBooleanValue(cell);
                            break;
                        default:
                            var msg = util.format("could not serialize: %s, column type: %d",
                                            JSON.stringify(cell), ct);
                            throw new Error(msg);
                    }
                }
                tsr.cells.push(tsc);
            }
            protobuf.rows.push(tsr);
        }
    } else {
        this.options.rows.forEach(function (row) {
            var cells = tsutils.convertToTsCells(row);
            var tsr = new TsRow();
            Array.prototype.push.apply(tsr.cells, cells);
            protobuf.rows.push(tsr);
        });
    }

    return protobuf;
};

Store.prototype.onSuccess = function(rpbPutResp) {
    this._callback(null, true);
    return true;
};

var columnTypeSchema = Joi.number().valid(tsdata.validColumnTypes);

var columnSchema = Joi.object().keys({
    name: Joi.string().required(),
    type: columnTypeSchema.required(),
    complexType: Joi.array().items(columnTypeSchema).optional(),
});

var schema = Joi.object().keys({
    table: Joi.string().required(),
    columns: Joi.array().items(columnSchema).optional(),
    rows: Joi.array().items(Joi.array().sparse()).required(),
});

/**
 * A builder for constructing Store instances.
 *
 * Rather than having to manually construct the __options__ and instantiating
 * a Store directly, this builder may be used.
 *
 *     var storeValue = new Store.Builder()
 *          .withTable('table')
 *          .withColumns(columns)
 *          .withRows(rows)
 *          .build();
 *
 * @class Store.Builder
 * @constructor
 */
function Builder() {}

Builder.prototype = {
    /**
     * Set the table.
     * @method withTable
     * @param {String} table the table in Riak
     * @chainable
     */
    withTable : function(table) {
        this.table = table;
        return this;
    },
    /**
     * Set the columns (optional).
     * @method withColumns
     * @param {Object[]} columns the timeseries columns in Riak
     * @chainable
     */
    withColumns : function(columns) {
        this.columns = columns;
        return this;
    },
    /**
     * Set the rows.
     * @method withRows
     * @param {Object[]} rows the timeseries row data
     * @chainable
     */
    withRows : function(rows) {
        this.rows = rows;
        return this;
    },
    /**
     * Set the callback to be executed when the operation completes.
     * @method withCallback
     * @param {Function} callback The allback to be executed when the operation completes.
     * @param {String} callback.err An error message. Will be null if no error.
     * @param {Object} callback.response The response from Riak
     * @chainable
     */
    withCallback : function(callback) {
        this.callback = callback;
        return this;
    },
    /**
     * Construct a Store instance.
     * @method build
     * @return {Store} a Store instance
     */
    build : function() {
        var cb = this.callback;
        delete this.callback;
        return new Store(this, cb);
    }
};

module.exports = Store;
module.exports.Builder = Builder;