Skip to content

Commit

Permalink
chore: format source code
Browse files Browse the repository at this point in the history
  • Loading branch information
ehds committed Apr 22, 2024
1 parent 69d60c3 commit 6df4cc9
Show file tree
Hide file tree
Showing 70 changed files with 5,865 additions and 5,259 deletions.
8 changes: 8 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Run manually to reformat a file:
# clang-format -i --style=file <file>
# find . -iname '*.cc' -o -iname '*.h' -o -iname '*.h.in' | xargs clang-format -i --style=file
BasedOnStyle: Google
DerivePointerAlignment: false

# FIXME(hds)
IndentWidth: 4
17 changes: 9 additions & 8 deletions src/braft/ballot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,32 @@

#include "braft/ballot.h"

#include <algorithm>
#include <cassert>
#include <iterator>

#include "braft/configuration.h"

namespace braft {

void Ballot::init(const Configuration& conf, std::optional<const Configuration> old_conf) {
void Ballot::init(const Configuration& conf,
std::optional<const Configuration> old_conf) {
_peers.clear();
_old_peers.clear();
_quorum = 0;
_old_quorum = 0;

CHECK_GT(conf.size(), 0);
_peers.reserve(conf.size());
for (auto iter = conf.begin(); iter != conf.end(); ++iter) {
_peers.push_back(*iter);
}
std::copy(conf.begin(), conf.end(), std::back_inserter(_peers));
_quorum = _peers.size() / 2 + 1;

if (!old_conf.has_value()) {
return;
}
_old_peers.reserve(old_conf->size());
for (Configuration::const_iterator iter = old_conf->begin();
iter != old_conf->end(); ++iter) {
_old_peers.push_back(*iter);
}
std::copy(old_conf->begin(), old_conf->end(),
std::back_inserter(_old_peers));
_old_quorum = _old_peers.size() / 2 + 1;
}

Expand Down
76 changes: 41 additions & 35 deletions src/braft/ballot_box.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) 2015 Baidu.com, Inc. All Rights Reserved
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -14,31 +14,30 @@

// Authors: Zhangyi Chen([email protected])

#include "braft/ballot_box.h"

#include <bthread/unstable.h>
#include <butil/scoped_lock.h>
#include <bvar/latency_recorder.h>
#include <bthread/unstable.h>

#include <cstddef>
#include <optional>
#include "braft/ballot_box.h"
#include "braft/util.h"
#include "braft/fsm_caller.h"

#include "braft/closure_queue.h"
#include "braft/fsm_caller.h"
#include "braft/util.h"

namespace braft {

BallotBox::BallotBox()
: _waiter(NULL)
, _closure_queue(NULL)
, _last_committed_index(0)
, _pending_index(0)
{
}
: _waiter(NULL),
_closure_queue(NULL),
_last_committed_index(0),
_pending_index(0) {}

BallotBox::~BallotBox() {
clear_pending_tasks();
}
BallotBox::~BallotBox() { clear_pending_tasks(); }

int BallotBox::init(const BallotBoxOptions &options) {
int BallotBox::init(const BallotBoxOptions& options) {
if (options.waiter == NULL || options.closure_queue == NULL) {
LOG(ERROR) << "waiter is NULL";
return EINVAL;
Expand All @@ -48,9 +47,9 @@ int BallotBox::init(const BallotBoxOptions &options) {
return 0;
}

int BallotBox::commit_at(
int64_t first_log_index, int64_t last_log_index, const PeerId& peer) {
// FIXME(chenzhangyi01): The cricital section is unacceptable because it
int BallotBox::commit_at(int64_t first_log_index, int64_t last_log_index,
const PeerId& peer) {
// FIXME(chenzhangyi01): The cricital section is unacceptable because it
// blocks all the other Replicators and LogManagers
std::unique_lock<raft_mutex_t> lck(_mutex);
if (_pending_index == 0) {
Expand All @@ -59,14 +58,16 @@ int BallotBox::commit_at(
if (last_log_index < _pending_index) {
return 0;
}
if (last_log_index >= _pending_index + (int64_t)_pending_meta_queue.size()) {
if (last_log_index >=
_pending_index + (int64_t)_pending_meta_queue.size()) {
return ERANGE;
}

int64_t last_committed_index = 0;
const int64_t start_at = std::max(_pending_index, first_log_index);
Ballot::PosHint pos_hint;
for (int64_t log_index = start_at; log_index <= last_log_index; ++log_index) {
for (int64_t log_index = start_at; log_index <= last_log_index;
++log_index) {
Ballot& bl = _pending_meta_queue[log_index - _pending_index];
pos_hint = bl.grant(peer, pos_hint);
if (bl.granted()) {
Expand All @@ -82,15 +83,17 @@ int BallotBox::commit_at(
// peers, the quorum would decrease by 1, e.g. 3 of 4 changes to 2 of 3. In
// this case, the log after removal may be committed before some previous
// logs, since we use the new configuration to deal the quorum of the
// removal request, we think it's safe to commit all the uncommitted
// removal request, we think it's safe to commit all the uncommitted
// previous logs, which is not well proved right now
// TODO: add vlog when committing previous logs
for (int64_t index = _pending_index; index <= last_committed_index; ++index) {
for (int64_t index = _pending_index; index <= last_committed_index;
++index) {
_pending_meta_queue.pop_front();
}

_pending_index = last_committed_index + 1;
_last_committed_index.store(last_committed_index, butil::memory_order_relaxed);
_last_committed_index.store(last_committed_index,
butil::memory_order_relaxed);
lck.unlock();
// The order doesn't matter
_waiter->on_committed(last_committed_index);
Expand All @@ -111,16 +114,17 @@ int BallotBox::clear_pending_tasks() {
int BallotBox::reset_pending_index(int64_t new_pending_index) {
BAIDU_SCOPED_LOCK(_mutex);
CHECK(_pending_index == 0 && _pending_meta_queue.empty())
<< "pending_index " << _pending_index << " pending_meta_queue "
<< "pending_index " << _pending_index << " pending_meta_queue "
<< _pending_meta_queue.size();
CHECK_GT(new_pending_index, _last_committed_index.load(
butil::memory_order_relaxed));
CHECK_GT(new_pending_index,
_last_committed_index.load(butil::memory_order_relaxed));
_pending_index = new_pending_index;
_closure_queue->reset_first_index(new_pending_index);
return 0;
}

int BallotBox::append_pending_task(const Configuration& conf, const Configuration* old_conf,
int BallotBox::append_pending_task(const Configuration& conf,
const Configuration* old_conf,
Closure* closure) {
Ballot bl;
bl.init(conf,
Expand All @@ -142,12 +146,14 @@ int BallotBox::set_last_committed_index(int64_t last_committed_index) {
<< ", parameter last_committed_index=" << last_committed_index;
return -1;
}
if (last_committed_index <
_last_committed_index.load(butil::memory_order_relaxed)) {
if (last_committed_index <
_last_committed_index.load(butil::memory_order_relaxed)) {
return EINVAL;
}
if (last_committed_index > _last_committed_index.load(butil::memory_order_relaxed)) {
_last_committed_index.store(last_committed_index, butil::memory_order_relaxed);
if (last_committed_index >
_last_committed_index.load(butil::memory_order_relaxed)) {
_last_committed_index.store(last_committed_index,
butil::memory_order_relaxed);
lck.unlock();
_waiter->on_committed(last_committed_index);
}
Expand All @@ -164,7 +170,7 @@ void BallotBox::describe(std::ostream& os, bool use_html) {
pending_queue_size = _pending_meta_queue.size();
}
lck.unlock();
const char *newline = use_html ? "<br>" : "\r\n";
const char* newline = use_html ? "<br>" : "\r\n";
os << "last_committed_index: " << committed_index << newline;
if (pending_queue_size != 0) {
os << "pending_index: " << pending_index << newline;
Expand Down
66 changes: 31 additions & 35 deletions src/braft/ballot_box.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) 2015 Baidu.com, Inc. All Rights Reserved
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -14,42 +14,40 @@

// Authors: Zhangyi Chen([email protected])

#ifndef BRAFT_BALLOT_BOX_H
#define BRAFT_BALLOT_BOX_H
#ifndef BRAFT_BALLOT_BOX_H
#define BRAFT_BALLOT_BOX_H

#include <butil/atomicops.h> // butil::atomic
#include <stdint.h> // int64_t

#include <stdint.h> // int64_t
#include <set> // std::set
#include <deque>
#include <butil/atomicops.h> // butil::atomic
#include <set> // std::set

#include "braft/ballot.h"
#include "braft/raft.h"
#include "braft/util.h"
#include "braft/ballot.h"

namespace braft {

class FSMCaller;
class ClosureQueue;

struct BallotBoxOptions {
BallotBoxOptions()
: waiter(NULL)
, closure_queue(NULL)
{}
BallotBoxOptions() : waiter(NULL), closure_queue(NULL) {}
FSMCaller* waiter;
ClosureQueue* closure_queue;
};

struct BallotBoxStatus {
BallotBoxStatus()
: committed_index(0), pending_index(0), pending_queue_size(0)
{}
: committed_index(0), pending_index(0), pending_queue_size(0) {}
int64_t committed_index;
int64_t pending_index;
int64_t pending_queue_size;
};

class BallotBox {
public:
public:
BallotBox();
~BallotBox();

Expand All @@ -61,46 +59,44 @@ class BallotBox {
const PeerId& peer);

// Called when the leader steps down, otherwise the behavior is undefined
// When a leader steps down, the uncommitted user applications should
// When a leader steps down, the uncommitted user applications should
// fail immediately, which the new leader will deal whether to commit or
// truncate.
int clear_pending_tasks();

// Called when a candidate becomes the new leader, otherwise the behavior is
// undefined.
// According to the raft algorithm, the logs from pervious terms can't be
// committed until a log at the new term becomes committed, so
// According to the raft algorithm, the logs from pervious terms can't be
// committed until a log at the new term becomes committed, so
// |new_pending_index| should be |last_log_index| + 1.
int reset_pending_index(int64_t new_pending_index);

// Called by leader, otherwise the behavior is undefined
// Store application context before replication.
int append_pending_task(const Configuration& conf,
const Configuration* old_conf,
Closure* closure);
int append_pending_task(const Configuration& conf,
const Configuration* old_conf, Closure* closure);

// Called by follower, otherwise the behavior is undefined.
// Set committed index received from leader
int set_last_committed_index(int64_t last_committed_index);

int64_t last_committed_index()
{ return _last_committed_index.load(butil::memory_order_acquire); }
int64_t last_committed_index() {
return _last_committed_index.load(butil::memory_order_acquire);
}

void describe(std::ostream& os, bool use_html);

void get_status(BallotBoxStatus* ballot_box_status);

private:

FSMCaller* _waiter;
ClosureQueue* _closure_queue;
raft_mutex_t _mutex;
butil::atomic<int64_t> _last_committed_index;
int64_t _pending_index;
std::deque<Ballot> _pending_meta_queue;

private:
FSMCaller* _waiter;
ClosureQueue* _closure_queue;
raft_mutex_t _mutex;
butil::atomic<int64_t> _last_committed_index;
int64_t _pending_index;
std::deque<Ballot> _pending_meta_queue;
};

} // namespace braft

#endif //BRAFT_BALLOT_BOX_H
#endif // BRAFT_BALLOT_BOX_H
Loading

0 comments on commit 6df4cc9

Please sign in to comment.