API Docs for: 2.2.2
Show:

File: lib/commands/kv/storevalue.js

'use strict';

var RiakObject = require('./riakobject');
var CommandBase = require('../commandbase');
var inherits = require('util').inherits;
var Joi = require('joi');

/**
 * Provides the StoreValue class, its builder, and its response.
 * @module KV
 */

/**
 * Command used to store data in Riak.
 *
 * As a convenience, a builder class is provided:
 *
 *      var storeValue = new StoreValue.Builder()
 *          .withBucket('myBucket')
 *          .withKey('myKey')
 *          .withContent(myObj)
 *          .build();
 *
 * See {{#crossLink "StoreValue.Builder"}}StoreValue.Builder{{/crossLink}}
 *
 * @class StoreValue
 * @constructor
 * @param {Object} options The options for this command.
 * @param {String} [options.bucketType=default] The bucket type in riak. If not supplied 'default' is used.
 * @param {String} options.bucket The bucket in riak.
 * @param {String} [options.key] The key for the object you want to store.
 * @param {Buffer} [options.vclock] The vector clock to use.
 * @param {RiakObject|String|Buffer|Object} options.value The value to store in Riak. JS Objects will be passed to JSON.stringify().
 * @param {Number} [options.w] The W value to use.
 * @param {Number} [options.dw] The DW value to use.
 * @param {Number} [options.pw] The PW value to use.
 * @param {Boolean} [options.returnBody] Return the stored object and meta (incl. siblings)
 * @param {Boolean} [options.returnHead] Return the metatdata only for the stored object.
 * @param {Boolean} [options.convertToJs] Convert the returned value(s) to JS objects using JSON.parse()
 * @param {Function} [options.conflictResolver] A function used to resolve siblings to a single object.
 * @param {RiakObject[]|Object[]} options.conflictResolver.objects The array of objects returned from Riak.
 * @param {Number} [options.timeout] Set a timeout for this command.
 * @param {Boolean} [options.ifNotModified] The if_not_modified flag.
 * @param {Boolean} [options.ifNoneMatch] The if_none_match flag.
 * @param {Function} callback The allback to be executed when the operation completes.
 * @param {String} callback.err An error message. Will be null if no error.
 * @param {Object} callback.response The response from Riak
 * @param {Buffer} callback.response.vclock The vector clock for this object (and its siblings)
 * @param {Object[]|RiakObject[]} callback.response.values An array of one or more values. Either RiakObjects or JS objects if convertToJs was used.
 * @param {String} callback.response.generatedKey The key generated by Riak if one was not supplied.
 * @param {Object} callback.data additional error data. Will be null if no error.
 * @extends CommandBase
 */
function StoreValue(options, callback) {
    CommandBase.call(this, 'RpbPutReq', 'RpbPutResp', callback);
    this.validateOptions(options, schema);
    var hasBucket = this.options.bucket ||
        (RiakObject.isRiakObject(options.value) && this.options.value.bucket);
    if (!hasBucket) {
        throw new Error('Must supply bucket directly or via a RiakObject');
    }
}

inherits(StoreValue, CommandBase);

StoreValue.prototype.constructPbRequest = function() {

    var protobuf = this.getPbReqBuilder();

    // RiakObject values take precidence
    var value = this.options.value;

    if (RiakObject.isRiakObject(value)) {
        if (value.hasOwnProperty('vclock')) {
            this.options.vclock = value.vclock;
        }
        if (value.hasOwnProperty('bucket')) {
            this.options.bucket = value.bucket;
        }
        if (value.hasOwnProperty('bucketType')) {
            this.options.bucketType = value.bucketType;
        }
        if (value.hasOwnProperty('key')) {
            this.options.key = value.key;
        }
    } else {
        value = new RiakObject().setValue(this.options.value);
    }

    protobuf.setBucket(new Buffer(this.options.bucket));
    protobuf.setType(new Buffer(this.options.bucketType));

    if (this.options.key) {
        protobuf.setKey(new Buffer(this.options.key));
    }

    protobuf.setVclock(this.options.vclock);
    protobuf.setW(this.options.w);
    protobuf.setDw(this.options.dw);
    protobuf.setPw(this.options.pw);
    protobuf.setReturnBody(this.options.returnBody);
    protobuf.setReturnHead(this.options.returnHead);
    protobuf.setTimeout(this.options.timeout);
    protobuf.setIfNotModified(this.options.ifNotModified);
    protobuf.setIfNoneMatch(this.options.ifNoneMatch);

    var rpbContent = RiakObject.populateRpbContentFromRiakObject(value);
    protobuf.setContent(rpbContent);

    return protobuf;

};

StoreValue.prototype.onSuccess = function(rpbPutResp) {

    // unless returnHead or returnMeta were specified, or if no key was supplied
    // Riak doesn't return any data
    var response = {};
    if (rpbPutResp) {

        var responseKey = null;

        if (rpbPutResp.getKey()) {
            responseKey = rpbPutResp.getKey().toString('utf8');
        }

        var values;
        var vclock = null;
        if (rpbPutResp.getContent().length) {
            var pbContentArray = rpbPutResp.getContent();

            values = new Array(pbContentArray.length);
            vclock = rpbPutResp.getVclock().toBuffer();
            for (var i = 0; i < pbContentArray.length; i++) {
                var riakObject = RiakObject.createFromRpbContent(pbContentArray[i], this.options.convertToJs);
                riakObject.vclock = vclock;
                riakObject.bucket = this.options.bucket;
                riakObject.bucketType = this.options.bucketType;
                riakObject.key = responseKey ? responseKey : this.options.key;

                values[i] = riakObject;
            }

            if (this.options.conflictResolver) {
                values = [this.options.conflictResolver(values)];
            }

        } else {
            values =[];
        }

        response = { generatedKey: responseKey, vclock: vclock, values: values };

    } else {
        response = { generatedKey: null, vclock: null, values: [] };
    }

    this._callback(null, response);

    return true;
};

var schema = Joi.object().keys({
    bucket: Joi.string().optional(),
    bucketType: Joi.string().default('default'),
    key: Joi.binary().default(null).optional(),
    value: Joi.any().required(),
    w: Joi.number().default(null).optional(),
    dw: Joi.number().default(null).optional(),
    pw: Joi.number().default(null).optional(),
    returnBody: Joi.boolean().default(false).optional(),
    returnHead: Joi.boolean().default(false).optional(),
    timeout: Joi.number().default(null).optional(),
    ifNotModified: Joi.boolean().default(false).optional(),
    ifNoneMatch: Joi.boolean().default(false).optional(),
    vclock: Joi.binary().default(null).optional(),
    convertToJs: Joi.boolean().default(false).optional(),
    conflictResolver: Joi.func().default(null).optional()
});

/**
 * A builder for constructing StoreValue instances.
 *
 * Rather than having to manually construct the __options__ and instantiating
 * a StoreValue directly, this builder may be used.
 *
 *     var storeValue = new StoreValue.Builder()
 *          .withBucket('myBucket')
 *          .withKey('myKey')
 *          .withContent(myObj);
 *          .build();
 *
 * @class StoreValue.Builder
 * @constructor
 */
function Builder() {}

Builder.prototype = {

    /**
     * Set the bucket.
     * Note that this can also be provided via RiakObject passed in via withContent() and
     * doing so will take precidence over this method.
     * @method withBucket
     * @param {String} bucket the bucket in Riak
     * @chainable
     */
    withBucket : function(bucket) {
        this.bucket = bucket;
        return this;
    },
    /**
     * Set the bucket type.
     * If not supplied, 'default' is used.
     * Note that this can also be provided via RiakObject passed in via withContent() and
     * doing so will take precidence over this method.
     * @method withBucketType
     * @param {String} bucketType the bucket type in riak
     * @chainable
     */
    withBucketType : function(bucketType) {
        this.bucketType = bucketType;
        return this;
    },
    /**
     * Set the key.
     * If not set, riak will generate and return a key.
     * Note that this can also be provided via RiakObject passed in via withContent() and
     * doing so will take precidence over this method.
     * @method withKey
     * @param {String} key the key in riak.
     * @chainable
     */
    withKey : function(key) {
        this.key = key;
        return this;
    },
    /**
     * Set the value and its metadata to be stored in Riak.
     * If a JS object is supplied, it will be converted to JSON
     * using JSON.stringify()
     * @method withContent
     * @param {RiakObject|String|Buffer|Object} value the data to store in Riak
     * @chainable
     */
    withContent : function(value) {
        this.value = value;
        return this;
    },
    /**
     * Set the vector clock.
     * Convenience method if a RiakObject is not supplied as content.
     * Note that a vclock supplied via RiakObject in withContent() will have precendence over this.
     * @method withVClock
     * @param {Buffer} vclock a vector clock returned from a previous fetch
     */
    withVClock : function(vclock) {
        this.vclock = vclock;
        return this;
    },
    /**
    * Set the W value.
    * How many replicas to write to before returning a successful response.
    * If not set the bucket default is used.
    * @method withW
    * @param {number} w the W value.
    * @chainable
    */
    withW : function(w) {
        this.w = w ;
        return this;
    },
    /**
     * Set the DW value.
     * How many replicas to commit to durable storage before returning a successful response.
     * If not set the bucket default is used.
     * @method withDw
     * @param {number} dw the DW value.
     * @chainable
     */
    withDw : function(dw) {
        this.dw = dw;
        return this;
    },
    /**
     * Set the PW value.
     * How many primary nodes must be up when the write is attempted.
     * If not set the bucket default is used.
     * @method withPw
     * @param {number} pw the PW value.
     * @chainable
     */
    withPw : function(pw) {
        this.pw = pw;
        return this;
    },
    /**
    * Return the object after storing (including any siblings).
    * If siblings are present, an optional conflictResolver can be
    * provided to resolve them.
    * @method withReturnBody
    * @param {boolean} returnBody true to return the object.
    * @param {boolean} [convertToJs] convert the returned value(s) to JS objects using JSON.parse()
    * @param {Function} [conflictResolver] the conflict resolver
    * @param {RiakObject[]} conflictResolver.riakobjects array of RiakObjects returned by Riak.
    * @chainable
    */
    withReturnBody: function(returnBody, convertToJs, conflictResolver) {
        this.returnBody = arguments[0];
        for (var i = 1; i < arguments.length; i++) {
            var arg = arguments[i];
            if (typeof arg === 'boolean') {
                this.convertToJs = arg;
            } else {
                this.conflictResolver = arg;
            }
        }

        return this;
    },
    /**
    * Return the metadata after storing the value.
    *
    * Causes Riak to only return the metadata for the object.
    * If siblings are present, an optional conflictResolver can be
    * provided to resolve them.
    * @method withReturnHead
    * @param {Function} [conflictResolver] the conflict resolver
    * @param {RiakObject[]} conflictResolver.riakobjects array of RiakObjects returned by Riak.
    * @param {boolean} returnHead true to return only metadata.
    * @chainable
    */
    withReturnHead : function(returnHead, conflictResolver) {
        this.returnHead = returnHead;
        this.conflictResolver = conflictResolver;
        return this;
    },
    /**
    * Set the if_not_modified flag.
    *
    * Setting this to true means to store the value only if the
    * supplied vclock matches the one in the database.
    *
    * Be aware there are several cases where this may not actually happen.
    * Use of this feature is discouraged.
    * @method withIfNotModified
    * @param {Boolean} ifNotModified the if_not_modified value.
    * @chainable
    */
    withIfNotModified : function(ifNotModified) {
        this.ifNotModified = ifNotModified;
        return this;
    },
    /**
    * Set the if_none_match flag.
    *
    * Setting this to true means store the value only if this
    * bucket/key combination are not already defined.
    *
    * Be aware that there are several cases where
    * this may not actually happen. Use of this option is discouraged.
    * @method withIfNoneMatch
    * @param {boolean} ifNoneMatch the if_non-match value.
    * @chainable
    */
    withIfNoneMatch : function(ifNoneMatch) {
        this.ifNoneMatch = ifNoneMatch;
        return this;
    },
    /**
    * Set a timeout for this operation.
    * @method withTimeout
    * @param {number} timeout a timeout in milliseconds.
    * @chainable
    */
    withTimeout : function(timeout) {
        this.timeout = timeout;
        return this;
    },
    /**
     * Set the callback to be executed when the operation completes.
     * @method withCallback
     * @param {Function} callback The allback to be executed when the operation completes.
     * @param {String} callback.err An error message. Will be null if no error.
     * @param {Object} callback.response The response from Riak
     * @param {Buffer} callback.response.vclock The vector clock for this object (and its siblings)
     * @param {Object[]|RiakObject[]} callback.response.values An array of one or more values. Either RiakObjects or JS objects if convertToJs was used.
     * @param {String} callback.response.generatedKey The key generated by Riak if one was not supplied.
     * @chainable
     */
    withCallback : function(callback) {
        this.callback = callback;
        return this;
    },
    /**
     * Construct a StoreValue instance.
     * @method build
     * @return {StoreValue} a StoreValue instance
     */
    build : function() {
        var cb = this.callback;
        delete this.callback;
        return new StoreValue(this, cb);
    }

};

module.exports = StoreValue;
module.exports.Builder = Builder;