diff --git a/include/raft.h b/include/raft.h
index 7a8953c7..4b326a8e 100644
--- a/include/raft.h
+++ b/include/raft.h
@@ -240,14 +240,14 @@ typedef int (
raft_node_t* node
);
-/** Callback for detecting when non-voting nodes have obtained enough logs.
+/** Callback for detecting when non-voting nodes have obtained enough entries.
* This triggers only when there are no pending configuration changes.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node
* @return 0 does not want to be notified again; otherwise -1 */
typedef int (
-*func_node_has_sufficient_logs_f
+*func_node_has_sufficient_entries_f
) (
raft_server_t* raft,
void *user_data,
@@ -303,6 +303,21 @@ typedef int (
int vote
);
+/** Callback for applying log entries to the state machine.
+ * @param[in] raft The Raft server making this callback
+ * @param[in] user_data User data that is passed from Raft server
+ * @param[in] entries Array of entries to be applied
+ * @param[in] n_entries Number of entries to be applied
+ * @return 0 on success */
+typedef int (
+*func_apply_entries_f
+) (
+ raft_server_t* raft,
+ void *user_data,
+ raft_entry_t* entries,
+ int n_entries
+ );
+
/** Callback for saving log entry changes.
*
* This callback is used for:
@@ -322,8 +337,19 @@ typedef int (
* memory pointed to in the raft_entry_data_t struct. This MUST be done if
* the memory is temporary.
* @param[in] entry_idx The entries index in the log
+ * @param[in] n_entries The numberof entries.
* @return 0 on success */
typedef int (
+*func_logentries_event_f
+) (
+ raft_server_t* raft,
+ void *user_data,
+ raft_entry_t *entry,
+ int entry_idx,
+ int n_entries
+ );
+
+typedef int (
*func_logentry_event_f
) (
raft_server_t* raft,
@@ -343,11 +369,6 @@ typedef struct
/** Callback for notifying user that a node needs a snapshot sent */
func_send_snapshot_f send_snapshot;
- /** Callback for finite state machine application
- * Return 0 on success.
- * Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
- func_logentry_event_f applylog;
-
/** Callback for persisting vote data
* For safety reasons this callback MUST flush the change to disk. */
func_persist_vote_f persist_vote;
@@ -357,10 +378,40 @@ typedef struct
* disk atomically. */
func_persist_term_f persist_term;
- /** Callback for adding an entry to the log
+ /** Callback for adding entries to the log
* For safety reasons this callback MUST flush the change to disk.
* Return 0 on success.
* Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
+ func_logentries_event_f offer_entries;
+
+ /** Callback for removing the oldest entries from the log
+ * For safety reasons this callback MUST flush the change to disk.
+ * @note If memory was malloc'd in log_offer then this should be the right
+ * time to free the memory. */
+ func_logentries_event_f poll_entries;
+
+ /** Callback for removing the youngest entries from the log
+ * For safety reasons this callback MUST flush the change to disk.
+ * @note If memory was malloc'd in log_offer then this should be the right
+ * time to free the memory. */
+ func_logentries_event_f pop_entries;
+
+ /** Callback for determining which node this configuration log entry
+ * affects. This call only applies to configuration change log entries.
+ * @return the node ID of the node */
+ func_logentry_event_f entry_get_node_id;
+
+ /** Callback for detecting when a non-voting node has sufficient entries. */
+ func_node_has_sufficient_entries_f node_has_sufficient_entries;
+
+ /** Callback for catching debugging log messages
+ * This callback is optional */
+ func_log_f log;
+
+ /** WARNING: the below callbacks are deprecated */
+
+ /** Callback for adding an entry to the log
+ * For safety reasons this callback MUST flush the change to disk. */
func_logentry_event_f log_offer;
/** Callback for removing the oldest entry from the log
@@ -375,17 +426,18 @@ typedef struct
* time to free the memory. */
func_logentry_event_f log_pop;
+ /** Callback for finite state machine application
+ * Return 0 on success.
+ * Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
+ func_logentry_event_f applylog;
+
+ /** Callback for detecting when a non-voting node has sufficient entries. */
+ func_node_has_sufficient_entries_f node_has_sufficient_logs;
+
/** Callback for determining which node this configuration log entry
* affects. This call only applies to configuration change log entries.
* @return the node ID of the node */
func_logentry_event_f log_get_node_id;
-
- /** Callback for detecting when a non-voting node has sufficient logs. */
- func_node_has_sufficient_logs_f node_has_sufficient_logs;
-
- /** Callback for catching debugging log messages
- * This callback is optional */
- func_log_f log;
} raft_cbs_t;
typedef struct
@@ -477,12 +529,12 @@ int raft_periodic(raft_server_t* me, int msec_elapsed);
*
* Might call malloc once to increase the log entry array size.
*
- * The log_offer callback will be called.
+ * The offer_entries callback will be called.
*
* @note The memory pointer (ie. raft_entry_data_t) for each msg_entry_t is
* copied directly. If the memory is temporary you MUST either make the
* memory permanent (ie. via malloc) OR re-assign the memory within the
- * log_offer callback.
+ * offer_entries callback.
*
* @param[in] node The node who sent us this message
* @param[in] ae The appendentries message
@@ -535,12 +587,12 @@ int raft_recv_requestvote_response(raft_server_t* me,
*
* Might call malloc once to increase the log entry array size.
*
- * The log_offer callback will be called.
+ * The offer_entries callback will be called.
*
* @note The memory pointer (ie. raft_entry_data_t) in msg_entry_t is
* copied directly. If the memory is temporary you MUST either make the
* memory permanent (ie. via malloc) OR re-assign the memory within the
- * log_offer callback.
+ * offer_entries callback.
*
* Will fail:
*
@@ -710,6 +762,8 @@ void raft_set_commit_idx(raft_server_t* me, int commit_idx);
* RAFT_ERR_NOMEM memory allocation failure */
int raft_append_entry(raft_server_t* me, raft_entry_t* ety);
+int raft_append_entries(raft_server_t* me_, raft_entry_t* etys, int n_etys);
+
/** Confirm if a msg_entry_response has been committed.
* @param[in] r The response we want to check */
int raft_msg_entry_response_committed(raft_server_t* me_,
@@ -736,9 +790,11 @@ void raft_node_set_voting(raft_node_t* node, int voting);
* @return 1 if this is a voting node. Otherwise 0. */
int raft_node_is_voting(raft_node_t* me_);
-/** Check if a node has sufficient logs to be able to join the cluster.
+/** Check if a node has sufficient entries to be able to join the cluster.
**/
-int raft_node_has_sufficient_logs(raft_node_t* me_);
+int raft_node_has_sufficient_entries(raft_node_t* me_);
+
+#define raft_node_has_sufficient_logs raft_node_has_sufficient_entries
/** Apply all entries up to the commit index
* @return
@@ -803,11 +859,17 @@ int raft_get_snapshot_entry_idx(raft_server_t *me_);
int raft_snapshot_is_in_progress(raft_server_t *me_);
/** Remove the first log entry.
- * This should be used for compacting logs.
+ * This should be used for compacting entries.
* @return 0 on success
**/
int raft_poll_entry(raft_server_t* me_, raft_entry_t **ety);
+/** Remove the first log entries.
+ * This should be used for compacting entries.
+ * @return 0 on success
+ **/
+int raft_poll_entries(raft_server_t* me_, raft_entry_t **ety, const int len);
+
/** Get last applied entry
**/
raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_);
@@ -819,8 +881,8 @@ int raft_get_first_entry_idx(raft_server_t* me_);
* This is usually the result of a snapshot being loaded.
* We need to send an appendentries response.
*
- * @param[in] last_included_term Term of the last log of the snapshot
- * @param[in] last_included_index Index of the last log of the snapshot
+ * @param[in] last_included_term Term of the last entry of the snapshot
+ * @param[in] last_included_index Index of the last entry of the snapshot
*
* @return
* 0 on success
diff --git a/include/raft_private.h b/include/raft_private.h
index fd341b19..95e125ba 100644
--- a/include/raft_private.h
+++ b/include/raft_private.h
@@ -63,11 +63,11 @@ typedef struct {
/* my node ID */
raft_node_t* node;
- /* the log which has a voting cfg change, otherwise -1 */
+ /* log entry which has a voting cfg change, otherwise -1 */
int voting_cfg_change_log_idx;
- /* Our membership with the cluster is confirmed (ie. configuration log was
- * committed) */
+ /* Our membership with the cluster is confirmed (ie. configuration log
+ * entry was committed) */
int connected;
int snapshot_in_progress;
@@ -122,15 +122,21 @@ void raft_node_vote_for_me(raft_node_t* me_, const int vote);
int raft_node_has_vote_for_me(raft_node_t* me_);
-void raft_node_set_has_sufficient_logs(raft_node_t* me_);
+void raft_node_set_has_sufficient_entries(raft_node_t* me_);
int raft_votes_is_majority(const int nnodes, const int nvotes);
+/* DEPRECATED */
void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx);
+/* DEPRECATED */
void raft_pop_log(raft_server_t* me_, raft_entry_t* ety, const int idx);
-int raft_get_num_snapshottable_logs(raft_server_t* me_);
+void raft_offer_entries(raft_server_t* me_, raft_entry_t* ety, const int idx, int len);
+
+void raft_pop_entries(raft_server_t* me_, raft_entry_t* ety, const int idx, int len);
+
+int raft_get_num_snapshottable_entries(raft_server_t* me_);
int raft_node_is_active(raft_node_t* me_);
diff --git a/src/raft_log.c b/src/raft_log.c
index f8dfa56b..65655418 100644
--- a/src/raft_log.c
+++ b/src/raft_log.c
@@ -4,7 +4,7 @@
* found in the LICENSE file.
*
* @file
- * @brief ADT for managing Raft log entries (aka entries)
+ * @brief ADT for managing Raft log entries
* @author Willem Thiart himself@willemthiart.com
*/
@@ -136,7 +136,6 @@ void log_clear(log_t* me_)
me->base = 0;
}
-/** TODO: rename log_append */
int log_append_entry(log_t* me_, raft_entry_t* ety)
{
log_private_t* me = (log_private_t*)me_;
@@ -165,6 +164,11 @@ int log_append_entry(log_t* me_, raft_entry_t* ety)
return 0;
}
+int log_append_entries(log_t* me_, raft_entry_t* ety, int len)
+{
+ return 0;
+}
+
raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys)
{
log_private_t* me = (log_private_t*)me_;
@@ -274,6 +278,11 @@ int log_poll(log_t * me_, void** etyp)
return 0;
}
+int log_poll_entries(log_t * me_, void** etyp, const int len)
+{
+ return 0;
+}
+
raft_entry_t *log_peektail(log_t * me_)
{
log_private_t* me = (log_private_t*)me_;
diff --git a/src/raft_node.c b/src/raft_node.c
index 82a654a7..51ca71dd 100644
--- a/src/raft_node.c
+++ b/src/raft_node.c
@@ -16,12 +16,12 @@
#include "raft.h"
-#define RAFT_NODE_VOTED_FOR_ME (1 << 0)
-#define RAFT_NODE_VOTING (1 << 1)
-#define RAFT_NODE_HAS_SUFFICIENT_LOG (1 << 2)
-#define RAFT_NODE_INACTIVE (1 << 3)
-#define RAFT_NODE_VOTING_COMMITTED (1 << 4)
-#define RAFT_NODE_ADDITION_COMMITTED (1 << 5)
+#define RAFT_NODE_VOTED_FOR_ME (1 << 0)
+#define RAFT_NODE_VOTING (1 << 1)
+#define RAFT_NODE_HAS_SUFFICIENT_ENTRIES (1 << 2)
+#define RAFT_NODE_INACTIVE (1 << 3)
+#define RAFT_NODE_VOTING_COMMITTED (1 << 4)
+#define RAFT_NODE_ADDITION_COMMITTED (1 << 5)
typedef struct
{
@@ -127,16 +127,16 @@ int raft_node_is_voting(raft_node_t* me_)
return (me->flags & RAFT_NODE_VOTING) != 0;
}
-int raft_node_has_sufficient_logs(raft_node_t* me_)
+int raft_node_has_sufficient_entries(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
- return (me->flags & RAFT_NODE_HAS_SUFFICIENT_LOG) != 0;
+ return (me->flags & RAFT_NODE_HAS_SUFFICIENT_ENTRIES) != 0;
}
-void raft_node_set_has_sufficient_logs(raft_node_t* me_)
+void raft_node_set_has_sufficient_entries(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
- me->flags |= RAFT_NODE_HAS_SUFFICIENT_LOG;
+ me->flags |= RAFT_NODE_HAS_SUFFICIENT_ENTRIES;
}
void raft_node_set_active(raft_node_t* me_, int active)
diff --git a/src/raft_server.c b/src/raft_server.c
index fad848e1..7078ad4e 100644
--- a/src/raft_server.c
+++ b/src/raft_server.c
@@ -82,6 +82,15 @@ void raft_set_callbacks(raft_server_t* me_, raft_cbs_t* funcs, void* udata)
raft_server_private_t* me = (raft_server_private_t*)me_;
memcpy(&me->cb, funcs, sizeof(raft_cbs_t));
+
+ /* WARNING: node_has_sufficient_logs is deprecated, use node_has_sufficient_entries */
+ if (me->cb.node_has_sufficient_logs && !me->cb.node_has_sufficient_entries)
+ me->cb.node_has_sufficient_entries = me->cb.node_has_sufficient_logs;
+
+ /* WARNING: log_get_node is deprecated, use entry_get_node_id */
+ if (me->cb.log_get_node_id && !me->cb.entry_get_node_id)
+ me->cb.entry_get_node_id= me->cb.log_get_node_id;
+
me->udata = udata;
log_set_callbacks(me->log, &me->cb, me_);
}
@@ -322,13 +331,13 @@ int raft_recv_appendentries_response(raft_server_t* me_,
!raft_voting_change_is_in_progress(me_) &&
raft_get_current_idx(me_) <= r->current_idx + 1 &&
!raft_node_is_voting_committed(node) &&
- me->cb.node_has_sufficient_logs &&
- 0 == raft_node_has_sufficient_logs(node)
+ me->cb.node_has_sufficient_entries &&
+ 0 == raft_node_has_sufficient_entries(node)
)
{
- int e = me->cb.node_has_sufficient_logs(me_, me->udata, node);
+ int e = me->cb.node_has_sufficient_entries(me_, me->udata, node);
if (0 == e)
- raft_node_set_has_sufficient_logs(node);
+ raft_node_set_has_sufficient_entries(node);
}
/* Update commit idx */
@@ -787,7 +796,7 @@ int raft_apply_entry(raft_server_t* me_)
if (!raft_entry_is_cfg_change(ety))
return 0;
- int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, log_idx);
+ int node_id = me->cb.entry_get_node_id(me_, raft_get_udata(me_), ety, log_idx);
raft_node_t* node = raft_get_node(me_, node_id);
switch (ety->type) {
@@ -795,7 +804,7 @@ int raft_apply_entry(raft_server_t* me_)
raft_node_set_addition_committed(node, 1);
raft_node_set_voting_committed(node, 1);
/* Membership Change: confirm connection with cluster */
- raft_node_set_has_sufficient_logs(node);
+ raft_node_set_has_sufficient_entries(node);
if (node_id == raft_get_nodeid(me_))
me->connected = RAFT_NODE_STATUS_CONNECTED;
break;
@@ -1054,7 +1063,7 @@ void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx)
if (!raft_entry_is_cfg_change(ety))
return;
- int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, idx);
+ int node_id = me->cb.entry_get_node_id(me_, raft_get_udata(me_), ety, idx);
raft_node_t* node = raft_get_node(me_, node_id);
int is_self = node_id == raft_get_nodeid(me_);
@@ -1103,7 +1112,7 @@ void raft_pop_log(raft_server_t* me_, raft_entry_t* ety, const int idx)
if (!raft_entry_is_cfg_change(ety))
return;
- int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, idx);
+ int node_id = me->cb.entry_get_node_id(me_, raft_get_udata(me_), ety, idx);
switch (ety->type)
{
@@ -1156,6 +1165,21 @@ int raft_poll_entry(raft_server_t* me_, raft_entry_t **ety)
return 0;
}
+void raft_offer_entries(raft_server_t* me_, raft_entry_t* ety, const int idx, const int len)
+{
+
+}
+
+void raft_pop_entries(raft_server_t* me_, raft_entry_t* ety, const int idx, const int len)
+{
+
+}
+
+int raft_poll_entries(raft_server_t* me_, raft_entry_t **ety, const int len)
+{
+ return 0;
+}
+
int raft_get_first_entry_idx(raft_server_t* me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
@@ -1168,7 +1192,7 @@ int raft_get_first_entry_idx(raft_server_t* me_)
return me->snapshot_last_idx;
}
-int raft_get_num_snapshottable_logs(raft_server_t *me_)
+int raft_get_num_snapshottable_entries(raft_server_t *me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
if (raft_get_log_count(me_) <= 1)
@@ -1180,7 +1204,7 @@ int raft_begin_snapshot(raft_server_t *me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
- if (raft_get_num_snapshottable_logs(me_) == 0)
+ if (raft_get_num_snapshottable_entries(me_) == 0)
return -1;
int snapshot_target = raft_get_commit_idx(me_);
@@ -1205,7 +1229,7 @@ int raft_begin_snapshot(raft_server_t *me_)
"begin snapshot sli:%d slt:%d slogs:%d\n",
me->snapshot_last_idx,
me->snapshot_last_term,
- raft_get_num_snapshottable_logs(me_));
+ raft_get_num_snapshottable_entries(me_));
return 0;
}
@@ -1307,7 +1331,7 @@ int raft_begin_load_snapshot(
"loaded snapshot sli:%d slt:%d slogs:%d\n",
me->snapshot_last_idx,
me->snapshot_last_term,
- raft_get_num_snapshottable_logs(me_));
+ raft_get_num_snapshottable_entries(me_));
return 0;
}
@@ -1324,7 +1348,7 @@ int raft_end_load_snapshot(raft_server_t *me_)
raft_node_set_voting_committed(node, raft_node_is_voting(node));
raft_node_set_addition_committed(node, 1);
if (raft_node_is_voting(node))
- raft_node_set_has_sufficient_logs(node);
+ raft_node_set_has_sufficient_entries(node);
}
return 0;
diff --git a/tests/test_snapshotting.c b/tests/test_snapshotting.c
index ce3347dc..1f05bab2 100644
--- a/tests/test_snapshotting.c
+++ b/tests/test_snapshotting.c
@@ -233,7 +233,7 @@ void TestRaft_leader_snapshot_end_succeeds_if_log_compacted(CuTest * tc)
raft_recv_entry(r, &ety, &cr);
raft_set_commit_idx(r, 1);
CuAssertIntEquals(tc, 2, raft_get_log_count(r));
- CuAssertIntEquals(tc, 1, raft_get_num_snapshottable_logs(r));
+ CuAssertIntEquals(tc, 1, raft_get_num_snapshottable_entries(r));
CuAssertIntEquals(tc, 0, raft_begin_snapshot(r));
@@ -243,7 +243,7 @@ void TestRaft_leader_snapshot_end_succeeds_if_log_compacted(CuTest * tc)
CuAssertIntEquals(tc, 0, raft_poll_entry(r, &_ety));
CuAssertIntEquals(tc, 0, raft_end_snapshot(r));
- CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r));
+ CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_entries(r));
CuAssertIntEquals(tc, 1, raft_get_log_count(r));
CuAssertIntEquals(tc, 1, raft_get_commit_idx(r));
CuAssertIntEquals(tc, 1, raft_get_last_applied_idx(r));
@@ -284,7 +284,7 @@ void TestRaft_leader_snapshot_end_succeeds_if_log_compacted2(CuTest * tc)
raft_recv_entry(r, &ety, &cr);
raft_set_commit_idx(r, 2);
CuAssertIntEquals(tc, 3, raft_get_log_count(r));
- CuAssertIntEquals(tc, 2, raft_get_num_snapshottable_logs(r));
+ CuAssertIntEquals(tc, 2, raft_get_num_snapshottable_entries(r));
CuAssertIntEquals(tc, 0, raft_begin_snapshot(r));
@@ -294,7 +294,7 @@ void TestRaft_leader_snapshot_end_succeeds_if_log_compacted2(CuTest * tc)
CuAssertIntEquals(tc, 0, raft_poll_entry(r, &_ety));
CuAssertIntEquals(tc, 0, raft_end_snapshot(r));
- CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r));
+ CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_entries(r));
CuAssertIntEquals(tc, 1, raft_get_log_count(r));
CuAssertIntEquals(tc, 2, raft_get_commit_idx(r));
CuAssertIntEquals(tc, 2, raft_get_last_applied_idx(r));
@@ -331,7 +331,7 @@ void TestRaft_joinee_needs_to_get_snapshot(CuTest * tc)
raft_recv_entry(r, &ety, &cr);
raft_set_commit_idx(r, 1);
CuAssertIntEquals(tc, 2, raft_get_log_count(r));
- CuAssertIntEquals(tc, 1, raft_get_num_snapshottable_logs(r));
+ CuAssertIntEquals(tc, 1, raft_get_num_snapshottable_entries(r));
CuAssertIntEquals(tc, 0, raft_begin_snapshot(r));
CuAssertIntEquals(tc, 1, raft_get_last_applied_idx(r));
@@ -363,7 +363,7 @@ void TestRaft_follower_load_from_snapshot(CuTest * tc)
CuAssertIntEquals(tc, 0, raft_begin_load_snapshot(r, 5, 5));
CuAssertIntEquals(tc, 0, raft_end_load_snapshot(r));
CuAssertIntEquals(tc, 1, raft_get_log_count(r));
- CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r));
+ CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_entries(r));
CuAssertIntEquals(tc, 5, raft_get_commit_idx(r));
CuAssertIntEquals(tc, 5, raft_get_last_applied_idx(r));
@@ -403,7 +403,7 @@ void TestRaft_follower_load_from_snapshot_fails_if_already_loaded(CuTest * tc)
CuAssertIntEquals(tc, 0, raft_begin_load_snapshot(r, 5, 5));
CuAssertIntEquals(tc, 0, raft_end_load_snapshot(r));
CuAssertIntEquals(tc, 1, raft_get_log_count(r));
- CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r));
+ CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_entries(r));
CuAssertIntEquals(tc, 5, raft_get_commit_idx(r));
CuAssertIntEquals(tc, 5, raft_get_last_applied_idx(r));