diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 6debfd95c..525857930 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -25,6 +25,7 @@ jobs: image: erlang:${{ matrix.otp }} steps: + - uses: lukka/get-cmake@latest - uses: actions/checkout@v2 - name: Compile run: ./rebar3 compile diff --git a/eqc/crdt_statem_eqc.erl b/eqc/crdt_statem_eqc.erl index 51b498ff8..3247fb7a9 100644 --- a/eqc/crdt_statem_eqc.erl +++ b/eqc/crdt_statem_eqc.erl @@ -62,7 +62,7 @@ next_state(#state{vnodes=VNodes0, mod_state=Expected, mod=Mod}=S,V, VNodes = lists:keyreplace(ID, 1, VNodes0, {ID, V}), S#state{vnodes=VNodes, mod_state=Mod:update_expected(ID, Op, Expected)}; next_state(#state{vnodes=VNodes0, mod_state=Expected0, mod=Mod}=S,V, - {call,?MODULE, merge, [_Mod, {IDS, _C}=_Source, {ID, _C}=_Dest]}) -> + {call,?MODULE, merge, [_Mod, {IDS, C}=_Source, {ID, C}=_Dest]}) -> VNodes = lists:keyreplace(ID, 1, VNodes0, {ID, V}), Expected = Mod:update_expected(ID, {merge, IDS}, Expected0), S#state{vnodes=VNodes, mod_state=Expected}; diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index f19835e88..8be962113 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -66,7 +66,6 @@ ]}. %% @doc A path under which the repl real-time overload queue will be stored. -%% @doc A path under which the reaper overload queue will be stored. {mapping, "replrtq_dataroot", "riak_kv.replrtq_dataroot", [ {default, "$(platform_data_dir)/kv_replrtqsrc"}, {datatype, directory} @@ -108,8 +107,8 @@ %% @doc Parallel key store type %% When running in parallel mode, which will be the default if the backend does -%% not support native tictac aae (i.e. is not leveled), what type of parallel -%% key store should be kept - leveled_ko (leveled and key-ordered), or +%% not support native tictac aae (i.e. is not leveled), what type of parallel +%% key store should be kept - leveled_ko (leveled and key-ordered), or %% leveled_so (leveled and segment ordered). %% When running in native mode, this setting is ignored {mapping, "tictacaae_parallelstore", "riak_kv.tictacaae_parallelstore", [ @@ -119,7 +118,7 @@ ]}. %% @doc Minimum Rebuild Wait -%% The minimum number of hours to wait between rebuilds. Default value is 2 +%% The minimum number of hours to wait between rebuilds. Default value is 2 %% weeks {mapping, "tictacaae_rebuildwait", "riak_kv.tictacaae_rebuildwait", [ {datatype, integer}, @@ -128,8 +127,8 @@ %% @doc Maximum Rebuild Delay %% The number of seconds which represents the length of the period in which the -%% next rebuild will be scheduled. So if all vnodes are scheduled to rebuild -%% at the same time, they will actually rebuild randomly between 0 an this +%% next rebuild will be scheduled. So if all vnodes are scheduled to rebuild +%% at the same time, they will actually rebuild randomly between 0 an this %% value (in seconds) after the rebuild time. Default value is 4 days {mapping, "tictacaae_rebuilddelay", "riak_kv.tictacaae_rebuilddelay", [ {datatype, integer}, @@ -137,9 +136,9 @@ ]}. %% @doc Store heads in parallel key stores -%% If running a parallel key store, the whole "head" object may be stored to -%% allow for fold_heads queries to be run against the parallel store. -%% Alternatively, the cost of the parallel key store can be reduced by storing +%% If running a parallel key store, the whole "head" object may be stored to +%% allow for fold_heads queries to be run against the parallel store. +%% Alternatively, the cost of the parallel key store can be reduced by storing %% only a minimal data set necessary for AAE and monitoring {mapping, "tictacaae_storeheads", "riak_kv.tictacaae_storeheads", [ {datatype, {flag, enabled, disabled}}, @@ -151,10 +150,10 @@ %% The number of milliseconds which the vnode must wait between self-pokes to %% maybe prompt the next exchange. Default is 8 minutes - check all partitions %% when n=3 once every hour (in each direction). A cycle of exchanges will -%% take (n - 1) * n + 1 exchange ticks for each nval. +%% take (n - 1) * n + 1 exchange ticks for each nval. %% Note if this is to be reduced further the riak_core vnode_inactivity_timeout -%% should also be reduced or handoffs may be blocked. To be safe the -%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick. +%% should also be reduced or handoffs may be blocked. To be safe the +%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick. {mapping, "tictacaae_exchangetick", "riak_kv.tictacaae_exchangetick", [ {datatype, integer}, {default, 480000}, @@ -180,7 +179,7 @@ %% faster by doubling. There are 1M segments in a standard tree overall. %% Performance tuning can also be made by adjusting the `tictacaae_repairloops` %% and `tictacaae_rangeboost` - but `tictacaae_maxresults` is the simplest -%% factor that is likely to result in a relatively predictable (and linear) +%% factor that is likely to result in a relatively predictable (and linear) %% outcome in terms of both CPU cost and repair speed. {mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [ {datatype, integer}, @@ -204,18 +203,18 @@ hidden ]}. -%% @doc Multiplier to the `tictcaaae_maxresults` when following an initial AAE +%% @doc Multiplier to the `tictcaaae_maxresults` when following an initial AAE %% exchange with a range-limited exchange. %% After each exchange, where sufficient deltas are discovered there will be a %% `tictacaae_repairloops` number of range-limited queries (assuming %% sufficient results continue to be found). Each of these may have the -%% the number of max results boosted by this integer factor. +%% the number of max results boosted by this integer factor. %% For example, if `tictacaae_maxresuts` is set to 64, and %% `tictacaae_repairloops` is set to 4, and the `tictacaae_rangeboost` is set %% to 2 - the initial loop will use `tictacaae_maxresuts` of 64, but any %% AAE exchanges on loops 1 to 4 will use 128. %% Exchanges with range-limited queries are more efficient, and so more tree -%% segments can be fetched without creating significant CPU overheads, hence +%% segments can be fetched without creating significant CPU overheads, hence %% the use of this boost to maxresults. {mapping, "tictacaae_rangeboost", "riak_kv.tictacaae_rangeboost", [ {datatype, integer}, @@ -249,7 +248,7 @@ %% Separate assured forwarding pools will be used of `af_worker_pool_size` for %% informational aae_folds (find_keys, object_stats) and functional folds %% (merge_tree_range, fetch_clock_range). The be_pool is used only for tictac -%% AAE rebuilds at present +%% AAE rebuilds at present {mapping, "node_worker_pool_size", "riak_kv.node_worker_pool_size", [ {datatype, integer}, {default, 4} @@ -470,7 +469,10 @@ {translation, "riak_kv.aae_throttle_limits", - riak_core_throttle:create_limits_translator_fun("anti_entropy", "mailbox_size") + begin + lists:foreach(fun code:add_path/1, filelib:wildcard("lib/*/ebin")), + riak_core_throttle:create_limits_translator_fun("anti_entropy", "mailbox_size") + end }. %% @see leveldb.bloomfilter @@ -943,11 +945,11 @@ ]}. -%% @doc For Tictac full-sync does all data need to be sync'd, or should a -%% specific bucket be sync'd (bucket), or a specific bucket type (type). +%% @doc For Tictac full-sync does all data need to be sync'd, or should a +%% specific bucket be sync'd (bucket), or a specific bucket type (type). %% Note that in most cases sync of all data is lower overhead than sync of %% a subset of data - as cached AAE trees will be used. -%% TODO: type is not yet implemented. +%% TODO: type is not yet implemented. {mapping, "ttaaefs_scope", "riak_kv.ttaaefs_scope", [ {datatype, {enum, [all, bucket, type, disabled]}}, {default, disabled} @@ -993,7 +995,7 @@ %% If using range_check to speed-up repairs, this can be reduced as the %% range_check maxresults will be boosted by the ttaaefs_rangeboost When using %% range_check a value of 64 is recommended, which may be reduced to 32 or 16 -%% if the cluster has a very large volume of keys and/or limited capacity. +%% if the cluster has a very large volume of keys and/or limited capacity. %% Only reduce below 16 in exceptional circumstances. %% More capacity to process sync queries can be added by increaseing the af2 %% and af3 queue sizes - but this will be at the risk of there being a bigger @@ -1008,21 +1010,21 @@ %% ttaaefs_max results * ttaaefs_rangeboost. %% When using range_check, a small maxresults can be used, in effect using %% other *_check syncs as discovery queries (to find the range_check for the -%% range_check to do the heavy lifting) +%% range_check to do the heavy lifting) {mapping, "ttaaefs_rangeboost", "riak_kv.ttaaefs_rangeboost", [ {datatype, integer}, {default, 16} ]}. %% @doc For Tictac bucket full-sync which bucket should be sync'd by this -%% node. Only ascii string bucket definitions supported (which will be -%% converted using list_to_binary). +%% node. Only ascii string bucket definitions supported (which will be +%% converted using list_to_binary). {mapping, "ttaaefs_bucketfilter_name", "riak_kv.ttaaefs_bucketfilter_name", [ {datatype, string}, {commented, "sample_bucketname"} ]}. -%% @doc For Tictac bucket full-sync what is the bucket type of the bucket name. +%% @doc For Tictac bucket full-sync what is the bucket type of the bucket name. %% Only ascii string type bucket definitions supported (these %% definitions will be converted to binary using list_to_binary) {mapping, "ttaaefs_bucketfilter_type", "riak_kv.ttaaefs_bucketfilter_type", [ @@ -1030,7 +1032,7 @@ {commented, "default"} ]}. -%% @doc For Tictac bucket-type full-sync what is the bucket type to be sync'd. +%% @doc For Tictac bucket-type full-sync what is the bucket type to be sync'd. %% Only ascii string type bucket definitions supported (these %% definitions will be converted to binary using list_to_binary). %% TODO: Type-based filtering is not yet supported @@ -1131,7 +1133,7 @@ %% The af3_queue size, and the ttaaefs_maxresults, both need to be tuned to %% ensure that the allcheck can run wihtin the 30 minute timeout. %% For per-bucket replication all is a reference to all of the data for that -%% bucket, and warnings about sizing are specially relevant. +%% bucket, and warnings about sizing are specially relevant. {mapping, "ttaaefs_allcheck", "riak_kv.ttaaefs_allcheck", [ {datatype, integer}, {default, 0} @@ -1177,7 +1179,7 @@ %% @doc How many times per 24hour period should the a range_check be run. The %% range_check is intended to be a smart check, in that it will: %% - use a last_modified range starting from the last successful check as its -%% range if the last check was successful (i.e. showed the clusters to be +%% range if the last check was successful (i.e. showed the clusters to be %% in sync); %% - use a range identified by the last check (a last modified range, and %% perhaps also a specific Bucket) if a range to limit the issues has been @@ -1404,7 +1406,7 @@ %% @doc Enable the `recalc` compaction strategy within the leveled backend in %% riak. The default (when disabled) is `retain`, but this will leave -%% uncollected garbage within the, journal. +%% uncollected garbage within the, journal. %% It is now recommended from Riak KV 2.9.2 to consider the `recalc` strategy. %% This strategy has a side effect of slower startups, and slower recovery %% from a wiped ledger - but it will not keep an overhead of garbage within @@ -1439,7 +1441,7 @@ %% each worker is taking per query in microseconds, so the overall queries %% per second supported will be: %% (1000000 div worker_vnode_pool_worktime) * n_val * worker_count -%% It should normally be possible to support >> 100 queries per second with +%% It should normally be possible to support >> 100 queries per second with %% just a single worker per vnode. %% The statistic worker_vnode_pool_queuetime_mean will track the average time %% a query is spending on a queue, should the vnode pool be exhausted. diff --git a/rebar.config b/rebar.config index e4c528584..251d12678 100644 --- a/rebar.config +++ b/rebar.config @@ -1,3 +1,4 @@ +%% -*- mode: erlang -*- {minimum_otp_vsn, "22.0"}. {src_dirs, ["./priv/tracers", "./src"]}. @@ -42,16 +43,16 @@ ]}. {deps, [ - {riak_core, {git, "https://github.com/basho/riak_core.git", {branch, "develop"}}}, - {sidejob, {git, "https://github.com/basho/sidejob.git", {branch, "develop"}}}, + {redbug, "2.0.8"}, + {sext, "1.8.0"}, + {sidejob, "2.1.0"}, + {recon, "2.5.2"}, + {hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}}, + {riak_core, {git, "https://github.com/TI-Tokyo/riak_core.git", {branch, "develop"}}}, {bitcask, {git, "https://github.com/basho/bitcask.git", {branch, "develop"}}}, - {redbug, {git, "https://github.com/shiguredo/redbug", {branch, "otp-25"}}}, - {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.2"}}}, - {sext, {git, "https://github.com/uwiger/sext.git", {tag, "1.8.0"}}}, {riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {branch, "develop"}}}, {riak_dt, {git, "https://github.com/basho/riak_dt.git", {branch, "develop"}}}, {riak_api, {git, "https://github.com/basho/riak_api.git", {branch, "develop"}}}, - {hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}}, - {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.1"}}}, - {rhc, {git, "https://github.com/basho/riak-erlang-http-client", {branch, "develop-3.2-otp24"}}} + {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.2"}}}, + {riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.10"}}} ]}. diff --git a/src/riak_kv.app.src b/src/riak_kv.app.src index 663237879..36deaa90a 100644 --- a/src/riak_kv.app.src +++ b/src/riak_kv.app.src @@ -26,7 +26,7 @@ redbug, recon, riakc, - rhc + riakhttpc ]}, {registered, []}, {mod, {riak_kv_app, []}}, diff --git a/src/riak_kv_bucket.erl b/src/riak_kv_bucket.erl index e4a2f1502..ede072c1e 100644 --- a/src/riak_kv_bucket.erl +++ b/src/riak_kv_bucket.erl @@ -1263,7 +1263,7 @@ immutable_consistent(undefined, _N, undefined, _Bad) -> immutable_consistent(true, _N, undefined, _Bad) -> %% consistent still set to true and n_val not modified true; -immutable_consistent(Consistent, _N, _N, _Bad) when Consistent =:= undefined orelse +immutable_consistent(Consistent, N, N, _Bad) when Consistent =:= undefined orelse Consistent =:= true -> %% consistent not modified or still set to true and n_val %% modified but set to same value @@ -1306,10 +1306,10 @@ undefined_props(Names, Props, Errors) -> immutable_dt(_NewDT=undefined, _NewAllowMult=undefined, _ExistingDT, _Bad) -> %% datatype and allow_mult are not being modified, so its valid true; -immutable_dt(_Datatype, undefined, _Datatype, _Bad) -> +immutable_dt(Datatype, undefined, Datatype, _Bad) -> %% data types from new and existing match and allow mult not modified, valid true; -immutable_dt(_Datatype, true, _Datatype, _Bad) -> +immutable_dt(Datatype, true, Datatype, _Bad) -> %% data type from new and existing match and allow mult still set to true, %% valid true; @@ -1325,7 +1325,7 @@ immutable_dt(_Datatype, true, _Datatype2, Bad) -> immutable_dt(_Datatype, false, undefined, Bad) -> %% datatype defined when it wasn't before has_datatype(Bad); -immutable_dt(_Datatype, false, _Datatype, Bad) -> +immutable_dt(Datatype, false, Datatype, Bad) -> %% attempt to set allow_mult to false when data type set is invalid, datatype not modified has_allow_mult(Bad); immutable_dt(undefined, false, _Datatype, Bad) -> @@ -1337,7 +1337,7 @@ immutable_dt(_Datatype, false, _Datatype2, Bad) -> immutable_dt(undefined, _, _Datatype, Bad) -> %% datatype not modified but allow_mult is invalid has_allow_mult(Bad); -immutable_dt(_Datatype, _, _Datatype, Bad) -> +immutable_dt(Datatype, _, Datatype, Bad) -> %% allow mult is invalid but data types still match has_allow_mult(Bad); immutable_dt(_, _, _, Bad) -> diff --git a/src/riak_kv_clusteraae_fsm.erl b/src/riak_kv_clusteraae_fsm.erl index 66b9f779e..11b99e8e9 100644 --- a/src/riak_kv_clusteraae_fsm.erl +++ b/src/riak_kv_clusteraae_fsm.erl @@ -18,7 +18,7 @@ %% %% ------------------------------------------------------------------- -%% @doc The AAE fold FSM allows for coverage folds acrosss Tictac AAE +%% @doc The AAE fold FSM allows for coverage folds acrosss Tictac AAE %% Controllers -module(riak_kv_clusteraae_fsm). @@ -42,10 +42,10 @@ -define(EMPTY, <<>>). --define(NVAL_QUERIES, +-define(NVAL_QUERIES, [merge_root_nval, merge_branch_nval, fetch_clocks_nval, list_buckets]). --define(RANGE_QUERIES, +-define(RANGE_QUERIES, [merge_tree_range, fetch_clocks_range, repl_keys_range, repair_keys_range, find_keys, object_stats, @@ -90,8 +90,8 @@ %% the cluster and increase parallelistaion of the process. %% The count change_method() will perform no reaps/deletes - but will %% simply count the matching keys - this is cheaper than runnning - %% find_tombs/find_keys to accumulate/sort a large list for counting. --type query_types() :: + %% find_tombs/find_keys to accumulate/sort a large list for counting. +-type query_types() :: merge_root_nval|merge_branch_nval|fetch_clocks_nval| merge_tree_range|fetch_clocks_range|repl_keys_range|repair_keys_range| find_keys|object_stats| @@ -100,7 +100,7 @@ -type query_definition() :: % Use of these folds depends on the Tictac AAE being enabled in either - % native mode, or in parallel mode with key_order being used. + % native mode, or in parallel mode with key_order being used. % N-val AAE (using cached trees) {merge_root_nval, n_val()}| @@ -114,19 +114,19 @@ {fetch_clocks_nval, n_val(), segment_filter()}| {fetch_clocks_nval, n_val(), segment_filter(), modified_range()}| % Scan over all the keys for a given n_val in the tictac AAE key store - % (which for native stores will be the actual key store), skipping + % (which for native stores will be the actual key store), skipping % those blocks of the store not containing keys in the segment filter, % returning a list of keys and clocks for that n_val within the - % cluster. This is a background operation, but will have lower + % cluster. This is a background operation, but will have lower % overheads than traditional store folds, subject to the size of the % segment filter being small - ideally o(10) or smaller % Variant supported with a modified range, which will be converted into % a fetch_clocks_range % Range-based AAE (requiring folds over native/parallel AAE key stores) - {merge_tree_range, + {merge_tree_range, bucket(), - key_range(), + key_range(), tree_size(), {segments, segment_filter(), tree_size()} | all, modified_range() | all, @@ -139,7 +139,7 @@ % Different size trees can be requested. Smaller tree sizes are more % likely to lead to false negative results, but are more efficient % to calculate and have a reduced load on the network - % + % % A segment_filter() may be passed. For example, if a tree comparison % has been done between two clusters, it might be preferable to confirm % the differences before fetching clocks. This can be done by @@ -169,9 +169,9 @@ % which are either: % - pre_hash (use the default pre-calculated hash) % - {rehash, IV} rehash the vector clock concatenated with an integer - {fetch_clocks_range, + {fetch_clocks_range, bucket(), - key_range(), + key_range(), {segments, segment_filter(), tree_size()} | all, modified_range() | all}| % Return the keys and clocks in the given bucket and key range. @@ -197,13 +197,13 @@ % % The leveled backend supports a max_key_count which could be used to % provide a loose_limit on the results returned. However, there are - % issues with this and segment_ordered backends, as well as extra + % issues with this and segment_ordered backends, as well as extra % complexity curtailing the results (and signalling the results are % curtailed). The main downside of large result sets is network over % use. Perhaps compressing the payload may be a better answer? - {repl_keys_range, + {repl_keys_range, bucket(), - key_range(), + key_range(), modified_range() | all, riak_kv_replrtq_src:queue_name()}| % Replicate all the objects in a given key and modified range. By @@ -228,7 +228,7 @@ % participating in coverage % Operational support functions - {find_keys, + {find_keys, bucket(), key_range(), modified_range() | all, @@ -244,11 +244,11 @@ % is the pre-calculated size stored in the aae key store as % metadata. % - % The query returns a list of [{Key, SiblingCount}] tuples or - % [{Key, ObjectSize}] tuples depending on the filter requested. The + % The query returns a list of [{Key, SiblingCount}] tuples or + % [{Key, ObjectSize}] tuples depending on the filter requested. The % cost of this operation will increase with the size of the range - % - % It would be beneficial to use the results of object_stats (or + % + % It would be beneficial to use the results of object_stats (or % knowledge of the application) to ensure that the result size of % this query is reasonably bounded (e.g. don't set too low an object % size). If only interested in the outcom of recent modifications, @@ -259,15 +259,15 @@ % - the total count of objects in the key range % - the accumulated total size of all objects in the range % - a list [{Magnitude, ObjectCount}] tuples where Magnitude represents - % the order of magnitude of the size of the object (e.g. 1KB is objects + % the order of magnitude of the size of the object (e.g. 1KB is objects % from 100 bytes to 1KB, 10KB is objects from 1KB to 10KB etc) % - a list of [{SiblingCount, ObjectCount}] tuples where Sibling Count % is the number of siblings the object has. % - sample portion - (n_val * sample_size) / ring_size % e.g. - % [{total_count, 1000}, - % {total_size, 1000000}, - % {sizes, [{1, 800}, {2, 180}, {3, 20}]}, + % [{total_count, 1000}, + % {total_size, 1000000}, + % {sizes, [{1, 800}, {2, 180}, {3, 20}]}, % {siblings, [{1, 1000}]}] % % If only interested in the outcome of recent modifications, @@ -275,7 +275,7 @@ {find_tombs, bucket(), - key_range(), + key_range(), {segments, segment_filter(), tree_size()} | all, modified_range() | all} | % Find all tombstones in the range that match the criteria, and @@ -355,16 +355,16 @@ -endif. -spec init(from(), inbound_api()) -> init_response(). -%% @doc -%% Return a tuple containing the ModFun to call per vnode, the number of -%% primary preflist vnodes the operation should cover, the service to use to -%% check for available nodes,and the registered name to use to access the +%% @doc +%% Return a tuple containing the ModFun to call per vnode, the number of +%% primary preflist vnodes the operation should cover, the service to use to +%% check for available nodes,and the registered name to use to access the %% vnode master process. init(From={_, _, _}, [Query, Timeout]) -> % Get the bucket n_val for use in creating a coverage plan QueryType = element(1, Query), - NVal = - case {lists:member(QueryType, ?NVAL_QUERIES), + NVal = + case {lists:member(QueryType, ?NVAL_QUERIES), lists:member(QueryType, ?RANGE_QUERIES)} of {true, false} -> element(2, Query); @@ -382,7 +382,7 @@ init(From={_, _, _}, [Query, Timeout]) -> merge_root_nval -> ?EMPTY; merge_branch_nval -> - lists:map(fun(X) -> {X, ?EMPTY} end, + lists:map(fun(X) -> {X, ?EMPTY} end, element(3, Query)); merge_tree_range -> TreeSize = element(4, Query), @@ -392,7 +392,7 @@ init(From={_, _, _}, [Query, Timeout]) -> repair_keys_range -> {[], 0, element(5, Query), ?REPAIR_BATCH_SIZE}; object_stats -> - [{total_count, 0}, + [{total_count, 0}, {total_size, 0}, {sizes, []}, {siblings, []}]; @@ -415,28 +415,28 @@ init(From={_, _, _}, [Query, Timeout]) -> end end, - Req = riak_kv_requests:new_aaefold_request(Query, InitAcc, NVal), + Req = riak_kv_requests:new_aaefold_request(Query, InitAcc, NVal), - State = #state{from = From, - acc = InitAcc, + State = #state{from = From, + acc = InitAcc, start_time = os:timestamp(), query_type = QueryType}, ?LOG_INFO("AAE fold prompted of type=~w", [QueryType]), - {Req, all, NVal, 1, - riak_kv, riak_kv_vnode_master, - Timeout, + {Req, all, NVal, 1, + riak_kv, riak_kv_vnode_master, + Timeout, State}. - + process_results({error, Reason}, _State) -> ?LOG_WARNING("Failure to process fold results due to ~w", [Reason]), {error, Reason}; process_results(Results, State) -> - % Results are received as a one-off for each vnode in this case, and so + % Results are received as a one-off for each vnode in this case, and so % once results are merged work is always done. Acc = State#state.acc, QueryType = State#state.query_type, - UpdAcc = + UpdAcc = case lists:member(QueryType, ?LIST_ACCUMULATE_QUERIES) of true -> case QueryType of @@ -472,15 +472,15 @@ process_results(Results, State) -> {_EL, AccCount, all, RBS} = Acc, {[], AccCount + Count, all, RBS}; object_stats -> - [{total_count, R_TC}, + [{total_count, R_TC}, {total_size, R_TS}, {sizes, R_SzL}, {siblings, R_SbL}] = Results, - [{total_count, A_TC}, + [{total_count, A_TC}, {total_size, A_TS}, {sizes, A_SzL}, {siblings, A_SbL}] = Acc, - [{total_count, R_TC + A_TC}, + [{total_count, R_TC + A_TC}, {total_size, R_TS + A_TS}, {sizes, merge_countinlists(A_SzL, R_SzL)}, {siblings, merge_countinlists(A_SbL, R_SbL)}]; @@ -503,7 +503,7 @@ process_results(Results, State) -> %% Once the coverage FSM has received done for all vnodes (as an output from %% process_results), then it will call finish(clean, State) and so the results -%% can be sent to the client, and the FSM can be stopped. +%% can be sent to the client, and the FSM can be stopped. finish({error, Error}, State=#state{from={raw, ReqId, ClientPid}}) -> % Notify the requesting client that an error % occurred or the timeout has elapsed. @@ -511,10 +511,10 @@ finish({error, Error}, State=#state{from={raw, ReqId, ClientPid}}) -> ClientPid ! {ReqId, {error, Error}}, {stop, normal, State}; finish(clean, State=#state{from={raw, ReqId, ClientPid}}) -> - % The client doesn't expect results in increments only the final result, + % The client doesn't expect results in increments only the final result, % so no need for a seperate send of a 'done' message QueryDuration = timer:now_diff(os:timestamp(), State#state.start_time), - ?LOG_INFO("Finished aaefold of type=~w with fold_time=~w seconds", + ?LOG_INFO("Finished aaefold of type=~w with fold_time=~w seconds", [State#state.query_type, QueryDuration/1000000]), Results = case State#state.query_type of @@ -530,7 +530,7 @@ finish(clean, State=#state{from={raw, ReqId, ClientPid}}) -> end; false -> ok - end, + end, Count; _ -> State#state.acc @@ -614,14 +614,14 @@ pb_encode_results(merge_tree_range, QD, Tree) -> %% TODO: %% Using leveled_tictac:export_tree/1 requires unnecessary base64 encoding %% and decoding. Add a leveled_tictac:export_tree_raw fun to avoid this - {struct, - [{<<"level1">>, EncodedL1}, + {struct, + [{<<"level1">>, EncodedL1}, {<<"level2">>, {struct, EncodedL2}}]} = leveled_tictac:export_tree(Tree), L2 = - lists:map(fun({I, CB}) -> + lists:map(fun({I, CB}) -> CBDecoded = base64:decode(CB), - Iint = binary_to_integer(I), + Iint = binary_to_integer(I), <> end, EncodedL2), @@ -636,32 +636,32 @@ pb_encode_results(fetch_clocks_range, _QD, KeysNClocks) -> keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)}; pb_encode_results(repl_keys_range, _QD, ReplResult) -> R = element(2, ReplResult), - #rpbaaefoldkeycountresp{response_type = <<"repl_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"repl_keys">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = R}]}; pb_encode_results(repair_keys_range, _QD, ReplResult) -> R = element(2, ReplResult), - #rpbaaefoldkeycountresp{response_type = <<"repair_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"repair_keys">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = R}]}; pb_encode_results(find_keys, _QD, Results) -> - KeyCountMap = + KeyCountMap = fun({_B, K, V}) -> #rpbkeyscount{tag = K, count = V} end, - #rpbaaefoldkeycountresp{response_type = <<"find_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"find_keys">>, keys_count = lists:map(KeyCountMap, Results)}; pb_encode_results(find_tombs, QD, Results) -> pb_encode_results(find_keys, QD, Results); pb_encode_results(reap_tombs, _QD, Count) -> - #rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>, + #rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = Count}]}; pb_encode_results(erase_keys, _QD, Count) -> - #rpbaaefoldkeycountresp{response_type = <<"erase_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"erase_keys">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = Count}]}; @@ -680,7 +680,7 @@ pb_encode_results(object_stats, _QD, Results) -> end, SzL0 = lists:map(EncodeIdxL(sizes), SzL), SbL0 = lists:map(EncodeIdxL(siblings), SbL), - KeysCount = + KeysCount = [#rpbkeyscount{tag = atom_to_binary(total_count, unicode), count = TC}, #rpbkeyscount{tag = atom_to_binary(total_size, unicode), @@ -732,7 +732,7 @@ encode_find_key(Key, Value) -> {<<"value">>, Value}]. encode_bucket({Type, Bucket}) -> - {struct, + {struct, [{<<"bucket-type">>, Type}, {<<"bucket">>, Bucket}]}; encode_bucket(Bucket) -> {struct, [{<<"bucket">>, Bucket}]}. @@ -761,7 +761,7 @@ hash_function({rehash, InitialisationVector}) -> %% @doc -%% Send requests to the reaper, but every batch size get the reaper stats (a +%% Send requests to the reaper, but every batch size get the reaper stats (a %% sync operation) to avoid mailbox overload. -spec handle_in_batches(reap_tombs|erase_keys, list(riak_kv_reaper:reap_reference())| @@ -771,7 +771,7 @@ handle_in_batches(_Type, [], _BatchCount, _Worker) -> ok; handle_in_batches(Type, RefList, BatchCount, Worker) when BatchCount >= ?DELETE_BATCH_SIZE -> - + case Type of reap_tombs -> _ = riak_kv_reaper:reap_stats(Worker); @@ -792,8 +792,8 @@ handle_in_batches(Type, [Ref|RestRefs], BatchCount, Worker) -> %% Internal functions %% =================================================================== --spec merge_countinlists(list({integer(), integer()}), - list({integer(), integer()})) +-spec merge_countinlists(list({integer(), integer()}), + list({integer(), integer()})) -> list({integer(), integer()}). %% @doc %% Take two lists with {IntegerId, Count} tuples and return a list where the @@ -810,7 +810,7 @@ merge_countinlists(ResultList, AccList) -> end end, AccList0 = lists:map(MapFun, AccList), - lists:ukeymerge(1, + lists:ukeymerge(1, lists:ukeysort(1, AccList0), lists:ukeysort(1, ResultList)). @@ -842,7 +842,7 @@ is_nval(_) -> is_segment_list(L) when is_list(L) -> lists:all(fun is_integer/1, L); is_segment_list(_) -> - false. + false. is_segment_filter({segments, SegmentList, TreeSize}) -> IsSegmentList = is_segment_list(SegmentList), @@ -862,12 +862,12 @@ is_modrange(_) -> false. -convert_modrange({date, {LowDate, LowTime}, {HighDate, HighTime}}) +convert_modrange({date, {LowDate, LowTime}, {HighDate, HighTime}}) when is_tuple(LowDate), is_tuple(LowTime), is_tuple(HighDate), is_tuple(HighTime) -> EpochTime = calendar:datetime_to_gregorian_seconds({{1970,1,1},{0,0,0}}), - LowTS = + LowTS = calendar:datetime_to_gregorian_seconds({LowDate, LowTime}) - EpochTime, HighTS = @@ -989,12 +989,12 @@ json_encode_tictac_withentries_test() -> encode_results_ofsize(TreeSize) -> Tree = leveled_tictac:new_tree(tictac_folder_test, TreeSize), ExtractFun = fun(K, V) -> {K, V} end, - FoldFun = + FoldFun = fun({Key, Value}, AccTree) -> leveled_tictac:add_kv(AccTree, Key, Value, ExtractFun) end, - KVList = [{<<"key1">>, <<"value1">>}, - {<<"key2">>, <<"value2">>}, + KVList = [{<<"key1">>, <<"value1">>}, + {<<"key2">>, <<"value2">>}, {<<"key3">>, <<"value3">>}], Tree0 = lists:foldl(FoldFun, Tree, KVList), JsonTree = json_encode_results(merge_tree_range, Tree0), @@ -1027,7 +1027,7 @@ convert_validate_test() -> ?assertMatch(true, is_valid_fold(convert_fold(F))) end, [Q1, Q2, Q3, Q4, Q5, Q6, Q7]), - + IQ1 = {objects_stats, {<<"T">>, <<"B">>}, all, all}, IQ2 = {object_stats, all, all, all}, IQ3 = {object_stats, <<"B">>, {<<"SK">>, <<"EK">>}, all}, @@ -1041,4 +1041,3 @@ convert_validate_test() -> [IQ1, IQ2, IQ3, IQ4, IQ5, IQ6, IQ7]). -endif. - diff --git a/src/riak_kv_console.erl b/src/riak_kv_console.erl index e6cb38eea..ad8ea8222 100644 --- a/src/riak_kv_console.erl +++ b/src/riak_kv_console.erl @@ -692,7 +692,7 @@ repair_2i(Args) -> error; {error, Reason} -> io:format("Error: ~p\n", [Reason]), - io:format("Usage: riak-admin repair-2i [--speed [1-100]] ...\n", []), + io:format("Usage: riak admin repair-2i [--speed [1-100]] ...\n", []), io:format("Speed defaults to 100 (full speed)\n", []), io:format("If no partitions are given, all partitions in the\n" "node are repaired\n", []), diff --git a/src/riak_kv_index_hashtree.erl b/src/riak_kv_index_hashtree.erl index 7c74af222..ba4f5ec25 100644 --- a/src/riak_kv_index_hashtree.erl +++ b/src/riak_kv_index_hashtree.erl @@ -591,7 +591,7 @@ load_built(#state{trees=Trees}) -> %% Generate a hash value for a `riak_object' -spec hash_object({riak_object:bucket(), riak_object:key()}, - riak_object_t2b() | + riak_object_t2b() | riak_object:riak_object() | riak_object:proxy_object(), version()) -> binary(). hash_object({Bucket, Key}, RObj, Version) -> @@ -616,10 +616,10 @@ fold_keys(Partition, HashtreePid, Index, HasIndexTree) -> FoldFun = fold_fun(HashtreePid, HasIndexTree), {Limit, Wait} = get_build_throttle(), ?LOG_INFO("Making fold request to reconstruct AAE tree idx=~p" - ++ " with version ~w", + ++ " with version ~w", [Partition, Version]), - Opts = - case Version of + Opts = + case Version of legacy -> [aae_reconstruction, {iterator_refresh, true}]; _ -> @@ -637,13 +637,13 @@ fold_keys(Partition, HashtreePid, Index, HasIndexTree) -> %% The accumulator in the fold is the number of bytes hashed %% modulo the "build limit" size. If we get an int back, everything is ok -handle_fold_keys_result({Result, {Limit, Delay}}, HashtreePid, Index) +handle_fold_keys_result({Result, {Limit, Delay}}, HashtreePid, Index) when is_integer(Result) -> - ?LOG_INFO("Finished AAE tree build idx=~p limit ~w delay ~w", + ?LOG_INFO("Finished AAE tree build idx=~p limit ~w delay ~w", [Index, Limit, Delay]), gen_server:cast(HashtreePid, build_finished); handle_fold_keys_result(Result, HashtreePid, Index) -> - ?LOG_ERROR("Failed to build hashtree for idx=~p. Result was: ~p", + ?LOG_ERROR("Failed to build hashtree for idx=~p. Result was: ~p", [Index, Result]), gen_server:cast(HashtreePid, build_failed). @@ -866,7 +866,7 @@ expand_items(HasIndex, Items, Version) -> expand_item(Has2ITree, {object, BKey, RObj}, Version, Others) -> IndexN = riak_kv_util:get_index_n(BKey), BinBKey = term_to_binary(BKey), - ObjHash = + ObjHash = try hash_object(BKey, RObj, Version) catch Error:Reason -> diff --git a/src/riak_kv_overflow_queue.erl b/src/riak_kv_overflow_queue.erl index 5d69b64d3..b08ceb0a7 100644 --- a/src/riak_kv_overflow_queue.erl +++ b/src/riak_kv_overflow_queue.erl @@ -52,7 +52,7 @@ -type filename() :: file:filename()|none. -type queue_stats() :: list({pos_integer(), non_neg_integer()}). --record(overflowq, +-record(overflowq, { mqueues :: list({priority(), queue:queue()}) | not_logged, @@ -134,7 +134,7 @@ log(Type, JobID, Attempts, Aborts, Queue) -> OverflowLengths, DiscardCounts]), [Type, JobID, Attempts, Aborts]), - + ResetDiscards = lists:map(fun({P, _L}) -> {P, 0} end, Queue#overflowq.overflow_discards), @@ -155,13 +155,13 @@ stats(Queue) -> addto_queue(Priority, Item, FlowQ) -> MQueueLimit = FlowQ#overflowq.mqueue_limit, MQueueLengths = FlowQ#overflowq.mqueue_lengths, - {Priority, CurrentQL} = + {Priority, CurrentQL} = lists:keyfind(Priority, 1, MQueueLengths), OverflowFiles = FlowQ#overflowq.overflow_files, MQueues = FlowQ#overflowq.mqueues, {Priority, {OverflowFile, OverflowC}} = lists:keyfind(Priority, 1, OverflowFiles), - + case {OverflowFile, CurrentQL} of {none, CurrentQL} when CurrentQL < MQueueLimit -> UpdQueueLengths = @@ -272,7 +272,7 @@ close(FilePath, FlowQ) -> fetch_batch(Priority, MaxBatchSize, FlowQ) -> UpdFlowQ = maybereload_queue(Priority, MaxBatchSize, FlowQ), case lists:keyfind(Priority, 1, UpdFlowQ#overflowq.mqueue_lengths) of - {Priority, 0} -> + {Priority, 0} -> {empty, UpdFlowQ}; {Priority, MQueueL} -> BatchSize = min(MQueueL, MaxBatchSize), @@ -315,7 +315,7 @@ maybereload_queue(Priority, BatchSize, FlowQ) -> %% There are enough items on the queue, don't reload FlowQ; {{Priority, _N}, {Priority, {none, start}}} -> - %% There are no overflow files to reload from + %% There are no overflow files to reload from FlowQ; {{Priority, N}, {Priority, {File, Continuation}}} -> %% Attempt to refill the queue from the overflow file @@ -364,7 +364,7 @@ maybereload_queue(Priority, BatchSize, FlowQ) -> mqueues = UpdMQueues, mqueue_lengths = UpdMQueueCounts, overflow_lengths = UpdOverflowCounts, - overflow_files = UpdOverflowFiles} + overflow_files = UpdOverflowFiles} end end. @@ -439,16 +439,16 @@ basic_inmemory_test() -> ?assertMatch([1], B2), {mqueue_lengths, MQL2} = lists:keyfind(mqueue_lengths, 1, stats(FlowQ3)), ?assertMatch([{1, 99}, {2, 0}], MQL2), - + {B3, FlowQ4} = fetch_batch(1, 99, FlowQ3), ExpB = lists:seq(2, 100), ?assertMatch(ExpB, B3), {empty, _FlowQ5} = fetch_batch(1, 1, FlowQ4), - + ok = filelib:ensure_dir(RootPath), {ok, Files} = file:list_dir(RootPath), - + ?assertMatch([], Files). basic_overflow_test() -> @@ -473,12 +473,12 @@ basic_overflow_test() -> ok = filelib:ensure_dir(RootPath), {ok, Files1} = file:list_dir(RootPath), ?assertMatch(0, length(Files1)), - + FlowQ_NEW = new([1, 2], RootPath, 1000, 5000), {mqueue_lengths, MQL2} = lists:keyfind(mqueue_lengths, 1, stats(FlowQ_NEW)), ?assertMatch([{1, 0}, {2, 0}], MQL2), - + ok = filelib:ensure_dir(RootPath), {ok, Files2} = file:list_dir(RootPath), ?assertMatch(0, length(Files2)). @@ -527,7 +527,7 @@ underover_overflow_test() -> Refs2 = lists:seq(7001, 8000), FlowQ8 = lists:foldl(fun(R, FQ) -> addto_queue(1, R, FQ) end, FlowQ7, Refs2), - + {B7, FlowQ9} = fetch_batch(1, 1200, FlowQ8), ExpB7 = lists:seq(1801, 3000), ?assertMatch(ExpB7, B7), @@ -569,7 +569,7 @@ underover_overflow_test() -> Refs3 = lists:seq(8001, 10000), FlowQ15 = lists:foldl(fun(R, FQ) -> addto_queue(1, R, FQ) end, FlowQ14, Refs3), - + {mqueue_lengths, MQL2} = lists:keyfind(mqueue_lengths, 1, stats(FlowQ15)), ?assertMatch([{1, 1000}, {2, 0}], MQL2), @@ -583,4 +583,4 @@ underover_overflow_test() -> close(RootPath, FlowQ15). --endif. \ No newline at end of file +-endif. diff --git a/src/riak_kv_reaper.erl b/src/riak_kv_reaper.erl index 305694d44..5d9a655a1 100644 --- a/src/riak_kv_reaper.erl +++ b/src/riak_kv_reaper.erl @@ -98,7 +98,7 @@ request_reap(Pid, ReapReference) -> list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}). reap_stats() -> reap_stats(?MODULE). --spec reap_stats(pid()|module()) -> +-spec reap_stats(pid()|module()) -> list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}). reap_stats(Pid) -> riak_kv_queue_manager:stats(Pid). diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index e6871d85b..6946271a5 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -81,19 +81,19 @@ update_workers(WorkerCount, PerPeerLimit) -> init([]) -> case application:get_env(riak_kv, replrtq_peer_discovery, false) of - true -> + true -> SinkPeers = application:get_env(riak_kv, replrtq_sinkpeers, ""), DefaultQueue = app_helper:get_env(riak_kv, replrtq_sinkqueue), SnkQueuePeerInfo = riak_kv_replrtq_snk:tokenise_peers(DefaultQueue, SinkPeers), - MinDelay = + MinDelay = application:get_env(riak_kv, replrtq_prompt_min_seconds, ?AUTO_DISCOVERY_MINIMUM_SECONDS), lists:foreach( - fun({QueueName, _PeerInfo}) -> + fun({QueueName, _PeerInfo}) -> _ = schedule_discovery(QueueName, self(), MinDelay) end, SnkQueuePeerInfo), @@ -105,9 +105,8 @@ init([]) -> handle_call({update_discovery, QueueName}, _From, State) -> case lists:keyfind(QueueName, 1, State#state.discovery_peers) of false -> - ?LOG_INFO( - "Type=~w discovery for unconfigured QueueName=~w", - [update, QueueName]), + ?LOG_INFO("Type=~w discovery for unconfigured QueueName=~w", + [update, QueueName]), {reply, false, State}; {QueueName, PeerInfo} -> R = do_discovery(QueueName, PeerInfo, update), @@ -133,7 +132,7 @@ handle_cast({prompt_discovery, QueueName}, State) -> handle_info({scheduled_discovery, QueueName}, State) -> ok = prompt_discovery(QueueName), - MinDelay = + MinDelay = application:get_env( riak_kv, replrtq_prompt_min_seconds, @@ -158,7 +157,7 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ %% @doc -%% Prompt the riak_kv_replrtq_peer to discover peers for a given queue name +%% Prompt the riak_kv_replrtq_peer to discover peers for a given queue name -spec prompt_discovery(riak_kv_replrtq_snk:queue_name()) -> ok. prompt_discovery(QueueName) -> gen_server:cast(?MODULE, {prompt_discovery, QueueName}). @@ -180,7 +179,7 @@ schedule_discovery(QueueName, DiscoveryPid, SecondsDelay) -> do_discovery(QueueName, PeerInfo, Type) -> {SnkWorkerCount, PerPeerLimit} = riak_kv_replrtq_snk:get_worker_counts(), StartDelayMS = riak_kv_replrtq_snk:starting_delay(), - CurrentPeers = + CurrentPeers = case Type of count_change -> %% Ignore current peers, to update worker counts, so all @@ -213,8 +212,8 @@ do_discovery(QueueName, PeerInfo, Type) -> ok; CurrentPeers when is_list(CurrentPeers) -> ?LOG_INFO( - "Type=~w discovery old_peers=~w new_peers=~w", - [Type, length(CurrentPeers), length(DiscoveredPeers)]) + "Type=~w discovery old_peers=~w new_peers=~w", + [Type, length(CurrentPeers), length(DiscoveredPeers)]) end, riak_kv_replrtq_snk:add_snkqueue(QueueName, DiscoveredPeers, @@ -235,7 +234,7 @@ discover_peers(PeerInfo, StartingDelayMS) -> end, lists:usort(lists:foldl(ConvertToPeerInfoFun, [], Peers)). - + -spec discover_from_peer( riak_kv_replrtq_snk:peer_info(), list({binary(), pos_integer(), pb|http})) @@ -244,7 +243,7 @@ discover_from_peer(PeerInfo, Acc) -> {_PeerID, _Delay, Host, Port, Protocol} = PeerInfo, RemoteGenFun = riak_kv_replrtq_snk:remote_client_fun(Protocol, Host, Port), RemoteFun = RemoteGenFun(), - UpdAcc = + UpdAcc = try case RemoteFun(peer_discovery) of {ok, IPPorts} -> @@ -265,5 +264,3 @@ discover_from_peer(PeerInfo, Acc) -> end, RemoteFun(close), UpdAcc. - - \ No newline at end of file diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index db45cb4f2..16deb8ccc 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -47,7 +47,7 @@ add_snkqueue/4, current_peers/1]). --export([repl_fetcher/1, +-export([repl_fetcher/1, tokenise_peers/2, get_worker_counts/0, set_worker_counts/2, @@ -210,7 +210,7 @@ add_snkqueue(QueueName, Peers, WorkerCount) -> %% number of workers overall -spec add_snkqueue(queue_name(), list(peer_info()), pos_integer(), pos_integer()) -> ok. -add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) +add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) when PerPeerLimit =< WorkerCount -> gen_server:call(?MODULE, {add, QueueName, Peers, WorkerCount, PerPeerLimit}). @@ -218,7 +218,7 @@ add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) %% @doc %% Return the current list of peers being used by this snk host, and the -%% settings currently being used for this host and he workers per peer. +%% settings currently being used for this host and he workers per peer. %% Returns undefined if there are currently no peers defined. -spec current_peers(queue_name()) -> list(peer_info())|undefined. current_peers(QueueName) -> @@ -435,7 +435,7 @@ handle_info({prompt_requeue, WorkItem}, State) -> terminate(_Reason, State) -> WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work), - CloseFun = + CloseFun = fun(SinkWork) -> lists:foreach( fun({{_QN, _Iter, _Peer}, _LocalC, RemoteFun, _RCF}) -> @@ -558,7 +558,7 @@ map_peer_to_wi_fun({QueueName, Iteration, PeerInfo}) -> %% @doc %% Return a function which when called will enclose a remote_fun for sending %% requests with a reusable client (if required) --spec remote_client_fun(http|pb, string(), pos_integer()) -> +-spec remote_client_fun(http|pb, string(), pos_integer()) -> fun(() -> remote_fun()). remote_client_fun(http, Host, Port) -> InitClientFun = client_start(http, Host, Port, []), @@ -582,9 +582,9 @@ remote_client_fun(pb, Host, Port) -> app_helper:get_env(riak_kv, repl_cert_filename), KeyFilename = app_helper:get_env(riak_kv, repl_key_filename), - SecuritySitename = + SecuritySitename = app_helper:get_env(riak_kv, repl_username), - Opts = + Opts = case CaCertificateFilename of undefined -> [{silence_terminate_crash, true}]; @@ -621,7 +621,7 @@ remote_client_fun(pb, Host, Port) -> end end. --spec client_start(pb|http, string(), pos_integer(), list()) +-spec client_start(pb|http, string(), pos_integer(), list()) -> fun(() -> rhc:rhc()|pid()|no_pid). client_start(pb, Host, Port, Opts) -> fun() -> @@ -789,7 +789,7 @@ add_failure({S, {failure, Failure}, FT, PT, RT, MT}) -> -spec add_repltime(queue_stats(), {integer(), integer(), integer()}) -> queue_stats(). -add_repltime({S, +add_repltime({S, F, {replfetch_time, FT}, {replpush_time, PT}, {replmod_time, RT}, MT}, diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index 20d9336cb..b029685d5 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -161,7 +161,7 @@ % If a priority 3 item is pushed to the queue and the length of the % queue_cache is less than the object limit, and the overflowq is empty % for this priority, the item will be added to the queue_cache. - % + % % If a priority 3 item is pushed to the queue and the length of the % queue_cache is at/over the object limit, or the overflowq is non-empty % then the item will be added to the overflowq, and if the object_ref is @@ -171,7 +171,7 @@ % added first to the overflowq . % % If a fetch request is received and the priority 3 queue_cache is - % non-empty then the next entry from this queue will be returned. + % non-empty then the next entry from this queue will be returned. % If the overflow queue is empty, then an attempt will be made to % return a batch from the overflowq, to add to the queue_cache. % @@ -333,6 +333,8 @@ init([FilePath]) -> ?LOG_TIMER_SECONDS * 1000), erlang:send_after(LogFreq, self(), log_queue), + {OL, QL} = get_limits(), + {ok, #state{queue_filtermap = QFM, queue_overflow = QO, queue_local = QC, @@ -828,7 +830,7 @@ empty_local_queue() -> {{queue:new(), 0, 0}, {queue:new(), 0, 0}, {queue:new(), 0, 0}}. -spec empty_overflow_queue(queue_name(), string()) - -> riak_kv_overflow_queue:overflowq(). + -> riak_kv_overflow_queue:overflowq(). empty_overflow_queue(QueueName, FilePath) -> {_OL, QL} = get_limits(), Priorities = [?FLD_PRIORITY, ?AAE_PRIORITY, ?RTQ_PRIORITY], @@ -1079,7 +1081,7 @@ limit_aaefold_test() -> ok = replrtq_aaefold(?QN1, Grp4), ?assertMatch({?QN1, {100000, 0, 2000}}, length_rtq(?QN1)), - + lists:foreach(fun(_I) -> _ = popfrom_rtq(?QN1) end, lists:seq(1, 4000)), ?assertMatch({?QN1, {98000, 0, 0}}, length_rtq(?QN1)), @@ -1106,7 +1108,7 @@ limit_ttaaefs_test() -> ok = replrtq_ttaaefs(?QN1, Grp4), ?assertMatch({?QN1, {0, 100000, 2000}}, length_rtq(?QN1)), - + lists:foreach(fun(_I) -> _ = popfrom_rtq(?QN1) end, lists:seq(1, 4000)), ?assertMatch({?QN1, {0, 98000, 0}}, length_rtq(?QN1)), @@ -1118,7 +1120,7 @@ limit_ttaaefs_test() -> ok = replrtq_ttaaefs(?QN1, Grp4), ?assertMatch({?QN1, {0, 2000, 0}}, length_rtq(?QN1)), - + lists:foreach(fun(_I) -> _ = popfrom_rtq(?QN1) end, lists:seq(1, 2000)), ?assertMatch({?QN1, {0, 0, 0}}, length_rtq(?QN1)), diff --git a/src/riak_kv_stat_bc.erl b/src/riak_kv_stat_bc.erl index 1353fc054..84c20a16d 100644 --- a/src/riak_kv_stat_bc.erl +++ b/src/riak_kv_stat_bc.erl @@ -23,7 +23,7 @@ %% @doc riak_kv_stat_bc is a module that maps the new riak_kv_stats metrics %% to the old set of stats. It exists to maintain backwards compatibility for -%% those using the `/stats' endpoint and `riak-admin status'. This module +%% those using the `/stats' endpoint and `riak admin status'. This module %% should be considered soon to be deprecated and temporary. %% %% Legacy stats: diff --git a/src/riak_kv_ttaaefs_manager.erl b/src/riak_kv_ttaaefs_manager.erl index 6fceee1bc..2cd742824 100644 --- a/src/riak_kv_ttaaefs_manager.erl +++ b/src/riak_kv_ttaaefs_manager.erl @@ -143,7 +143,7 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - + %% @doc %% Override shcedule and process an individual work_item. If called from %% riak_client an integer ReqID is passed to allow for a response to be @@ -160,7 +160,7 @@ process_workitem(WorkItem, ReqID, From, Now) -> gen_server:cast(?MODULE, {WorkItem, ReqID, From, Now}). %% @doc -%% Pause the management of full-sync from this node +%% Pause the management of full-sync from this node -spec pause() -> ok|{error, already_paused}. pause() -> gen_server:call(?MODULE, pause). @@ -217,12 +217,12 @@ init([]) -> RangeCheck = app_helper:get_env(riak_kv, ttaaefs_rangecheck), AutoCheck = app_helper:get_env(riak_kv, ttaaefs_autocheck), - {SliceCount, Schedule} = + {SliceCount, Schedule} = case Scope of disabled -> {24, [{no_check, 24}, - {all_check, 0}, + {all_check, 0}, {day_check, 0}, {hour_check, 0}, {range_check, 0}, @@ -238,11 +238,11 @@ init([]) -> {range_check, RangeCheck}, {auto_check, AutoCheck}]} end, - + CheckWindow = app_helper:get_env(riak_kv, ttaaefs_allcheck_window), - State1 = + State1 = case Scope of all -> LocalNVal = app_helper:get_env(riak_kv, ttaaefs_localnval), @@ -257,7 +257,7 @@ init([]) -> B = app_helper:get_env(riak_kv, ttaaefs_bucketfilter_name), T = app_helper:get_env(riak_kv, ttaaefs_bucketfilter_type), B0 = - case is_binary(B) of + case is_binary(B) of true -> B; false -> @@ -283,8 +283,8 @@ init([]) -> slice_count = SliceCount, slot_info_fun = fun get_slotinfo/0} end, - - + + % Fetch connectivity information for remote cluster PeerIP = app_helper:get_env(riak_kv, ttaaefs_peerip), PeerPort = app_helper:get_env(riak_kv, ttaaefs_peerport), @@ -295,9 +295,9 @@ init([]) -> app_helper:get_env(riak_kv, repl_cert_filename), KeyFilename = app_helper:get_env(riak_kv, repl_key_filename), - SecuritySitename = + SecuritySitename = app_helper:get_env(riak_kv, repl_username), - SSLEnabled = + SSLEnabled = (CaCertificateFilename =/= undefined) and (CertificateFilename =/= undefined) and (KeyFilename =/= undefined) and @@ -312,13 +312,13 @@ init([]) -> false -> undefined end, - + % Queue name to be used for AAE exchanges on this cluster SrcQueueName = app_helper:get_env(riak_kv, ttaaefs_queuename), PeerQueueName = application:get_env(riak_kv, ttaaefs_queuename_peer, disabled), - State2 = + State2 = State1#state{peer_ip = PeerIP, peer_port = PeerPort, peer_protocol = PeerProtocol, @@ -326,7 +326,7 @@ init([]) -> queue_name = SrcQueueName, peer_queue_name = PeerQueueName, check_window = CheckWindow}, - + ?LOG_INFO("Initiated Tictac AAE Full-Sync Mgr with scope=~w", [Scope]), {ok, State2, ?INITIAL_TIMEOUT}. @@ -334,7 +334,7 @@ handle_call(pause, _From, State) -> case State#state.is_paused of true -> {reply, {error, already_paused}, State}; - false -> + false -> PausedSchedule = [{no_check, State#state.slice_count}, {all_check, 0}, @@ -369,7 +369,7 @@ handle_call(resume, _From, State) -> {reply, {error, not_paused}, State, ?INITIAL_TIMEOUT} end; handle_call({set_sink, Protocol, PeerIP, PeerPort}, _From, State) -> - State0 = + State0 = State#state{peer_ip = PeerIP, peer_port = PeerPort, peer_protocol = Protocol}, @@ -398,7 +398,7 @@ handle_call({set_bucketsync, BucketList}, _From, State) -> handle_cast({reply_complete, ReqID, Result}, State) -> LastExchangeStart = State#state.last_exchange_start, Duration = timer:now_diff(os:timestamp(), LastExchangeStart), - {Pause, State0} = + {Pause, State0} = case Result of {waiting_all_results, _Deltas} -> % If the exchange ends with waiting all results, then consider @@ -414,7 +414,7 @@ handle_cast({reply_complete, ReqID, Result}, State) -> SyncState == branch_compare -> riak_kv_stat:update({ttaaefs, sync_sync, Duration}), ?LOG_INFO( - "exchange=~w complete result=~w in duration=~w s" ++ + "exchange=~w complete result=~w in duration=~w s" " sync_state=true", [ReqID, Result, Duration div 1000000]), disable_tree_repairs(), @@ -447,7 +447,7 @@ handle_cast({all_check, ReqID, From, _Now}, State) -> none, undefined, full}; bucket -> [H|T] = State#state.bucket_list, - {range, range, + {range, range, {filter, H, all, large, all, all, pre_hash}, T ++ [H], partial} @@ -472,7 +472,7 @@ handle_cast({day_check, ReqID, From, Now}, State) -> State#state.local_nval, State#state.remote_nval, Filter, - undefined, + undefined, full, State, day_check), @@ -508,14 +508,14 @@ handle_cast({hour_check, ReqID, From, Now}, State) -> State#state.local_nval, State#state.remote_nval, Filter, - undefined, + undefined, full, State, hour_check), {noreply, State0, Timeout}; bucket -> [H|T] = State#state.bucket_list, - + % Note that the tree size is amended as well as the time range. % The bigger the time range, the bigger the tree. Bigger trees % are less efficient when there is little change, but can more @@ -577,7 +577,7 @@ handle_cast({range_check, ReqID, From, _Now}, State) -> State#state.local_nval, State#state.remote_nval, Filter, - undefined, + undefined, full, State, range_check), @@ -651,7 +651,7 @@ handle_info(timeout, State) -> [OldInfo, SlotInfo]), {[], undefined} end, - {WorkItem, Wait, RemainingSlices, ScheduleStartTime} = + {WorkItem, Wait, RemainingSlices, ScheduleStartTime} = take_next_workitem(Allocations, State#state.schedule, StartTime, @@ -696,7 +696,7 @@ code_change(_OldVsn, State, _Extra) -> set_range(Bucket, KeyRange, LowDate, HighDate) -> EpochTime = calendar:datetime_to_gregorian_seconds({{1970,1,1},{0,0,0}}), - LowTS = + LowTS = calendar:datetime_to_gregorian_seconds(LowDate) - EpochTime, HighTS = calendar:datetime_to_gregorian_seconds(HighDate) - EpochTime, @@ -710,7 +710,7 @@ clear_range() -> application:set_env(riak_kv, ttaaefs_check_range, none). -spec get_range() -> - none|{riak_object:bucket()|all, + none|{riak_object:bucket()|all, {riak_object:key(), riak_object:key()}|all, pos_integer(), pos_integer()}. get_range() -> @@ -781,7 +781,7 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, StopFun = fun() -> stop_client(RemoteClient, RemoteMod) end, RemoteSendFun = generate_sendfun({RemoteClient, RemoteMod}, RNVal), LocalSendFun = generate_sendfun(local, LNVal), - ReqID0 = + ReqID0 = case ReqID of no_reply -> erlang:phash2({self(), os:timestamp()}); @@ -790,7 +790,7 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, end, ReplyFun = generate_replyfun(ReqID == no_reply, ReqID0, From, StopFun), - + MaxResults = case WorkType of range_check -> @@ -806,7 +806,7 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, ttaaefs_maxresults, ?MAX_RESULTS) end, - + LocalRepairFun = fun(RepairList) -> riak_kv_replrtq_src:replrtq_ttaaefs( @@ -859,17 +859,17 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, [{RemoteSendFun, all}], RepairFun, ReplyFun, - Filter, + Filter, [{transition_pause_ms, ExchangePause}, {max_results, MaxResults}, {scan_timeout, ?CRASH_TIMEOUT div 2}, {purpose, WorkType}]), - - ?LOG_INFO("Starting ~w full-sync work_item=~w " ++ + + ?LOG_INFO("Starting ~w full-sync work_item=~w " ++ "reqid=~w exchange id=~s pid=~w", [Ref, WorkType, ReqID0, ExID, ExPid]), riak_kv_stat:update({ttaaefs, WorkType}), - + {State#state{bucket_list = NextBucketList, last_exchange_start = os:timestamp()}, ?CRASH_TIMEOUT} @@ -902,7 +902,7 @@ get_slotinfo() -> lists:sort([node()|UpNodes]) end, NotMe = lists:takewhile(fun(N) -> N /= node() end, UpNodes0), - ClusterSlice = + ClusterSlice = max(min(app_helper:get_env(riak_kv, ttaaefs_cluster_slice, 1), 4), 1), {length(NotMe) + 1, length(UpNodes0), ClusterSlice}. @@ -910,17 +910,17 @@ get_slotinfo() -> %% Return a function which will send aae_exchange messages to a remote %% cluster, and return the response. The function should make an async call %% to try and make the remote and local cluster sends happen as close to -%% parallel as possible. +%% parallel as possible. -spec generate_sendfun({rhc:rhc(), rhc}|{pid(), riakc_pb_socket}|local, nval()) -> aae_exchange:send_fun(). generate_sendfun(SendClient, NVal) -> fun(Msg, all, Colour) -> AAE_Exchange = self(), - ReturnFun = - fun(R) -> + ReturnFun = + fun(R) -> aae_exchange:reply(AAE_Exchange, R, Colour) end, - SendFun = + SendFun = case SendClient of local -> C = riak_client:new(node(), undefined), @@ -952,7 +952,7 @@ init_client(pb, IP, Port, undefined) -> Options = [{auto_reconnect, true}], init_pbclient(IP, Port, Options); init_client(pb, IP, Port, Credentials) -> - SecurityOpts = + SecurityOpts = [{cacertfile, element(1, Credentials)}, {certfile, element(2, Credentials)}, {keyfile, element(3, Credentials)}, @@ -977,7 +977,7 @@ init_pbclient(IP, Port, Options) -> ?LOG_INFO("Cannot reach remote cluster ~p ~p as ~p", [IP, Port, Reason]), {no_client, riakc_pb_socket} - catch + catch _Exception:Reason -> ?LOG_WARNING("Cannot reach remote cluster ~p ~p exception ~p", [IP, Port, Reason]), @@ -995,7 +995,7 @@ local_sender({fetch_clocks, SegmentIDs}, C, ReturnFun, NVal) -> local_sender({fetch_clocks, SegmentIDs, MR}, C, ReturnFun, NVal) -> %% riak_client expects modified range of form %% {date, non_neg_integer(), non_neg_integer()} - %% where as the riak erlang clients just expect + %% where as the riak erlang clients just expect %% {non_neg_integer(), non_neg_integer()} %% They keyword all must also be supported LMR = localise_modrange(MR), @@ -1010,12 +1010,12 @@ local_sender({fetch_clocks_range, B0, KR, SF, MR}, C, ReturnFun, _NVal) -> -spec run_localfold(riak_kv_clusteraae_fsm:query_definition(), riak_client:riak_client(), - fun((any()) -> ok)) -> + fun((any()) -> ok)) -> fun(() -> ok). run_localfold(Query, Client, ReturnFun) -> fun() -> case riak_client:aae_fold(Query, Client) of - {ok, R} -> + {ok, R} -> ReturnFun(R); {error, Error} -> ReturnFun({error, Error}) @@ -1126,7 +1126,7 @@ generate_replyfun(Clientless, ReqID, From, StopClientFun) -> % Reply to riak_client From ! {ReqID, Result} end, - gen_server:cast(?MODULE, {reply_complete, ReqID, Result}), + gen_server:cast(?MODULE, {reply_complete, ReqID, Result}), StopClientFun() end. @@ -1172,11 +1172,11 @@ generate_repairfun(LocalRepairFun, RemoteRepairFun, MaxResults, LogInfo) -> {SrcRepair, SnkRepair} = lists:foldl(FoldFun, {[], []}, RepairList), ?LOG_INFO( "AAE reqid=~w work_item=~w scope=~w shows sink ahead " ++ - "for key_count=~w keys limited by max_results=~w", + "for key_count=~w keys limited by max_results=~w", [ExchangeID, WorkItem, WorkScope, length(SnkRepair), MaxResults]), ?LOG_INFO( "AAE reqid=~w work_item=~w scope=~w shows source ahead " ++ - "for key_count=~w keys limited by max_results=~w", + "for key_count=~w keys limited by max_results=~w", [ExchangeID, WorkItem, WorkScope, length(SrcRepair), MaxResults]), riak_kv_stat:update({ttaaefs, snk_ahead, length(SnkRepair)}), riak_kv_stat:update({ttaaefs, src_ahead, length(SrcRepair)}), @@ -1192,9 +1192,9 @@ generate_repairfun(LocalRepairFun, RemoteRepairFun, MaxResults, LogInfo) -> %% @doc Examine the number of repairs, and the repair summary and determine -%% what to do next e.g. set a range for the next range_check +%% what to do next e.g. set a range for the next range_check -spec determine_next_action( - non_neg_integer(), + non_neg_integer(), pos_integer(), work_scope(), work_item(), list(repair_summary())) -> ok. @@ -1267,7 +1267,7 @@ decode_clock(EncodedClock) -> -spec summarise_repairs(integer(), list(repair_reference()), work_scope(), - work_item()) -> + work_item()) -> list(repair_summary()). summarise_repairs(ExchangeID, RepairList, WorkScope, WorkItem) -> FoldFun = @@ -1285,7 +1285,7 @@ summarise_repairs(ExchangeID, RepairList, WorkScope, WorkItem) -> LogFun = fun({B, C, MinDT, MaxDT}) -> ?LOG_INFO( - "AAE exchange=~w work_item=~w type=~w repaired " ++ + "AAE exchange=~w work_item=~w type=~w repaired " ++ "key_count=~w for bucket=~p with low date ~p high date ~p", [ExchangeID, WorkScope, WorkItem, C, B, MinDT, MaxDT]) end, @@ -1296,7 +1296,7 @@ summarise_repairs(ExchangeID, RepairList, WorkScope, WorkItem) -> %% Take the next work item from the list of allocations, assuming that the %% starting time for that work item has not alreasy passed. If there are no %% more items queue, start a new queue based on the wants for the schedule. --spec take_next_workitem(list(allocation()), +-spec take_next_workitem(list(allocation()), schedule_wants(), erlang:timestamp()|undefined, node_info(), @@ -1307,7 +1307,7 @@ take_next_workitem([], Wants, ScheduleStartTime, SlotInfo, SliceCount) -> NewAllocations = choose_schedule(Wants), % Should be 24 hours after ScheduleStartTime - so add 24 hours to % ScheduleStartTime - RevisedStartTime = + RevisedStartTime = case ScheduleStartTime of undefined -> beginning_of_next_period(os:timestamp(), SliceCount); @@ -1375,7 +1375,7 @@ beginning_of_next_period({Mega, Sec, _Micro}, SlotCount) -> SlotsPassed = (NowGS - TopOfDayGS) div SlotSize, NextPeriodGS = TopOfDayGS + SlotSize * (SlotsPassed + 1), EpochSeconds = Mega * ?MEGA + Sec + NextPeriodGS - NowGS, - {EpochSeconds div ?MEGA, EpochSeconds rem ?MEGA, 0}. + {EpochSeconds div ?MEGA, EpochSeconds rem ?MEGA, 0}. %% @doc @@ -1506,8 +1506,8 @@ choose_schedule_test() -> AllSyncAll = choose_schedule(AllSyncAllSchedule), ExpAllSyncAll = lists:map(fun(I) -> {I, all_check} end, lists:seq(1, 100)), ?assertMatch(AllSyncAll, ExpAllSyncAll), - - MixedSyncSchedule = + + MixedSyncSchedule = [{no_check, 6}, {all_check, 1}, {auto_check, 3}, @@ -1521,14 +1521,14 @@ choose_schedule_test() -> HourWorkload = lists:map(SliceForHourFun, lists:filter(IsSyncFun, MixedSync)), ?assertMatch(84, length(lists:usort(HourWorkload))), - FoldFun = + FoldFun = fun(I, Acc) -> true = I > Acc, I end, BiggestI = lists:foldl(FoldFun, 0, HourWorkload), ?assertMatch(true, BiggestI >= 84), - + CountFun = fun({_I, Type}, Acc) -> {Type, CD} = lists:keyfind(Type, 1, Acc), @@ -1543,7 +1543,7 @@ take_first_workitem_test() -> SC = 48, Wants = [{no_check, SC}, - {all_check, 0}, + {all_check, 0}, {auto_check, 0}, {day_check, 0}, {hour_check, 0}, @@ -1562,26 +1562,26 @@ take_first_workitem_test() -> beginning_of_next_period({Mega, Sec, Micro}, SC), % 24 hours on, the new scheudle start time should be the same it would be % if we started now - {no_check, PromptSeconds, SchedRem, ScheduleStartTime} = + {no_check, PromptSeconds, SchedRem, ScheduleStartTime} = take_next_workitem([], Wants, OrigStartTime, {1, 8, 1}, SC), ?assertMatch(true, ScheduleStartTime > {Mega, Sec, Micro}), ?assertMatch(true, PromptSeconds > 0), - {no_check, PromptMoreSeconds, SchedRem, ScheduleStartTime} = + {no_check, PromptMoreSeconds, SchedRem, ScheduleStartTime} = take_next_workitem([], Wants, OrigStartTime, {2, 8, 1}, SC), ?assertMatch(true, PromptMoreSeconds > PromptSeconds), - {no_check, PromptEvenMoreSeconds, SchedRem, ScheduleStartTime} = + {no_check, PromptEvenMoreSeconds, SchedRem, ScheduleStartTime} = take_next_workitem([], Wants, OrigStartTime, {7, 8, 1}, SC), ?assertMatch(true, PromptEvenMoreSeconds > PromptMoreSeconds), - {no_check, PromptYetMoreSeconds, _T0, ScheduleStartTime} = + {no_check, PromptYetMoreSeconds, _T0, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 1}, SC), ?assertMatch(true, PromptYetMoreSeconds > PromptEvenMoreSeconds), - {no_check, PromptS2YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptS2YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 2}, SC), - {no_check, PromptS3YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptS3YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 3}, SC), - {no_check, PromptS4YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptS4YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 4}, SC), - {no_check, PromptN2YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptN2YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {2, 8, 1}, SC), ?assertMatch(true, PromptS4YetMoreSeconds > PromptS3YetMoreSeconds), ?assertMatch(true, PromptS3YetMoreSeconds > PromptS2YetMoreSeconds), @@ -1599,7 +1599,7 @@ window_test() -> ?assert(in_window(Now0, {0, 0})), ?assertNot(in_window(Now0, {1, 1})), ?assertNot(in_window(Now0, {23, 23})), - + NowSecs1 = calendar:datetime_to_gregorian_seconds( {{2000, 1, 1}, {23, 59, 59}}), diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 016ed4698..a63bc0b35 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -4490,9 +4490,9 @@ log_key_amnesia(VId, Obj, InEpoch, InCntr, Coord, LocalEpoch, LocalCntr) -> % so a read repair is prompted riak_kv_reader:request_read({B, K}) end, - ?LOG_WARNING("Inbound clock entry for ~p in ~p/~p greater than local." ++ - "Epochs: {In:~p Local:~p}. Counters: {In:~p Local:~p}.", - [VId, B, K, InEpoch, LocalEpoch, InCntr, LocalCntr]). + ?LOG_WARNING("Inbound clock entry for ~p in ~p/~p greater than local." + "Epochs: {In:~p Local:~p}. Counters: {In:~p Local:~p}.", + [VId, B, K, InEpoch, LocalEpoch, InCntr, LocalCntr]). %% @private generate an epoch actor, and update the vnode state. -spec new_key_epoch(#state{}) -> {EpochActor :: binary(), #state{}}.