Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions include/libp2p/protocol/gossip/gossip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace libp2p {
} // namespace libp2p

namespace libp2p::protocol::gossip {

/// Gossip pub-sub protocol config
struct Config {
/// Network density factors for gossip meshes
Expand Down Expand Up @@ -93,6 +92,26 @@ namespace libp2p::protocol::gossip {
bool sign_messages = false;
};

/// RPC limits to control message processing
struct RPCLimits {
/// Maximum subscriptions that will be processed in a single message and the
/// rest will be ignored
size_t max_subscriptions = 5000;

/// Maximum messages that will be processed in a single message and the rest
/// will be ignored
size_t max_ihave_messages = 5000;
size_t max_iwant_messages = 5000;
size_t max_graft_messages = 5000;
size_t max_prune_messages = 5000;

/// Maximum message ids that will be processed in a single message and the
/// rest will be ignored
size_t max_ihave_message_ids = 5000;
size_t max_iwant_message_ids = 5000;
size_t max_prune_peer_infos = 16;
};

using TopicId = std::string;
using TopicList = std::vector<TopicId>;
using TopicSet = std::set<TopicId>;
Expand Down Expand Up @@ -157,6 +176,7 @@ namespace libp2p::protocol::gossip {
std::shared_ptr<peer::IdentityManager> idmgr,
std::shared_ptr<crypto::CryptoProvider> crypto_provider,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_marshaller,
Config config = Config{});
Config config = Config{},
RPCLimits limits = RPCLimits{});

} // namespace libp2p::protocol::gossip
10 changes: 7 additions & 3 deletions src/protocol/gossip/impl/connectivity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ namespace libp2p::protocol::gossip {
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<Host> host,
std::shared_ptr<MessageReceiver> msg_receiver,
ConnectionStatusFeedback on_connected)
ConnectionStatusFeedback on_connected,
std::shared_ptr<RPCLimits> limits)
: config_(std::move(config)),
scheduler_(std::move(scheduler)),
host_(std::move(host)),
msg_receiver_(std::move(msg_receiver)),
connected_cb_(std::move(on_connected)),
limits_(std::move(limits)),
log_("gossip",
"Connectivity",
host_->getPeerInfo().id.toBase58().substr(46)) {}
Expand Down Expand Up @@ -191,7 +193,8 @@ namespace libp2p::protocol::gossip {
on_stream_event_,
*msg_receiver_,
std::move(stream),
ctx);
ctx,
limits_);

gossip_stream->read();

Expand Down Expand Up @@ -308,7 +311,8 @@ namespace libp2p::protocol::gossip {
on_stream_event_,
*msg_receiver_,
std::move(stream),
ctx);
ctx,
limits_);

gossip_stream->read();

Expand Down
4 changes: 3 additions & 1 deletion src/protocol/gossip/impl/connectivity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace libp2p::protocol::gossip {
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<Host> host,
std::shared_ptr<MessageReceiver> msg_receiver,
ConnectionStatusFeedback on_connected);
ConnectionStatusFeedback on_connected,
std::shared_ptr<RPCLimits> limits);

~Connectivity() override;

Expand Down Expand Up @@ -106,6 +107,7 @@ namespace libp2p::protocol::gossip {
ConnectionStatusFeedback connected_cb_;
Stream::Feedback on_stream_event_;
bool started_ = false;
std::shared_ptr<RPCLimits> limits_;

/// All known peers
PeerSet all_peers_;
Expand Down
15 changes: 10 additions & 5 deletions src/protocol/gossip/impl/gossip_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ namespace libp2p::protocol::gossip {
std::shared_ptr<peer::IdentityManager> idmgr,
std::shared_ptr<crypto::CryptoProvider> crypto_provider,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_marshaller,
Config config) {
Config config,
RPCLimits limits) {
return std::make_shared<GossipCore>(std::move(config),
std::move(scheduler),
std::move(host),
std::move(idmgr),
std::move(crypto_provider),
std::move(key_marshaller));
std::move(key_marshaller),
std::move(limits));
}

// clang-format off
Expand All @@ -41,8 +43,10 @@ namespace libp2p::protocol::gossip {
std::shared_ptr<Host> host,
std::shared_ptr<peer::IdentityManager> idmgr,
std::shared_ptr<crypto::CryptoProvider> crypto_provider,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_marshaller)
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_marshaller,
RPCLimits limits)
: config_(std::move(config)),
limits_(std::move(limits)),
create_message_id_([](const Bytes &from, const Bytes &seq,
const Bytes &data){
return createMessageId(from, seq, data);
Expand All @@ -62,7 +66,7 @@ namespace libp2p::protocol::gossip {
onLocalSubscriptionChanged(subscribe, topic);
}
)),
msg_seq_(scheduler_->now().count()),
msg_seq_(scheduler_->now().count()),
log_("gossip", "Gossip", local_peer_id_.toBase58().substr(46)) {}
// clang-format on

Expand Down Expand Up @@ -100,7 +104,8 @@ namespace libp2p::protocol::gossip {
shared_from_this(),
[this](bool connected, const PeerContextPtr &ctx) {
onPeerConnection(connected, ctx);
}
},
std::make_shared<RPCLimits>(limits_)
);
// clang-format on

Expand Down
6 changes: 5 additions & 1 deletion src/protocol/gossip/impl/gossip_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ namespace libp2p::protocol::gossip {
std::shared_ptr<Host> host,
std::shared_ptr<peer::IdentityManager> idmgr,
std::shared_ptr<crypto::CryptoProvider> crypto_provider,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_marshaller);
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_marshaller,
RPCLimits limits);

~GossipCore() override = default;

Expand Down Expand Up @@ -90,6 +91,9 @@ namespace libp2p::protocol::gossip {
/// Configuration parameters
const Config config_;

/// RPC Parsing limits
const RPCLimits limits_;

/// Message ID function
MessageIdFn create_message_id_;

Expand Down
54 changes: 53 additions & 1 deletion src/protocol/gossip/impl/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include <generated/protocol/gossip/protobuf/rpc.pb.h>

#include "peer_context.hpp"

namespace libp2p::protocol::gossip {

namespace {
Expand All @@ -23,7 +25,8 @@ namespace libp2p::protocol::gossip {

// need to define default ctor/dtor here in translation unit due to unique_ptr
// to type which is incomplete in header
MessageParser::MessageParser() = default;
MessageParser::MessageParser(std::shared_ptr<RPCLimits> limits)
: limits_(std::move(limits)) {}
MessageParser::~MessageParser() = default;

bool MessageParser::parse(BytesIn bytes) {
Expand All @@ -42,49 +45,91 @@ namespace libp2p::protocol::gossip {
return;
}

size_t curr_subscriptions = 0;

for (const auto &s : pb_msg_->subscriptions()) {
if (!s.has_subscribe() || !s.has_topicid()) {
continue;
}

if (curr_subscriptions == limits_->max_subscriptions) {
break;
}
receiver.onSubscription(from, s.subscribe(), s.topicid());

curr_subscriptions++;
}

if (pb_msg_->has_control()) {
const auto &c = pb_msg_->control();
size_t curr_ihave_messages = 0;
size_t curr_iwant_messages = 0;
size_t curr_graft_messages = 0;
size_t curr_prune_messages = 0;

for (const auto &h : c.ihave()) {
if (curr_ihave_messages == limits_->max_ihave_messages) {
break;
}
size_t curr_ihave_message_ids = 0;
if (!h.has_topicid() || h.messageids_size() == 0) {
continue;
}
const TopicId &topic = h.topicid();
for (const auto &msg_id : h.messageids()) {
if (curr_ihave_message_ids == limits_->max_ihave_message_ids) {
break;
}
if (msg_id.empty()) {
continue;
}
receiver.onIHave(from, topic, fromString(msg_id));

curr_ihave_message_ids++;
}

curr_ihave_messages++;
}

for (const auto &w : c.iwant()) {
if (curr_iwant_messages == limits_->max_iwant_messages) {
break;
}
size_t curr_iwant_message_ids = 0;
if (w.messageids_size() == 0) {
continue;
}
for (const auto &msg_id : w.messageids()) {
if (curr_iwant_message_ids == limits_->max_iwant_message_ids) {
break;
}
if (msg_id.empty()) {
continue;
}
receiver.onIWant(from, fromString(msg_id));

curr_iwant_message_ids++;
}
curr_iwant_messages++;
}

for (const auto &gr : c.graft()) {
if (curr_graft_messages == limits_->max_graft_messages) {
break;
}
if (!gr.has_topicid()) {
continue;
}
receiver.onGraft(from, gr.topicid());

curr_graft_messages++;
}

for (const auto &pr : c.prune()) {
if (curr_prune_messages == limits_->max_prune_messages) {
break;
}
size_t curr_prune_peer_infos = 0;
if (!pr.has_topicid()) {
continue;
}
Expand All @@ -95,13 +140,20 @@ namespace libp2p::protocol::gossip {
log()->debug(
"prune backoff={}, {} peers", backoff_time, pr.peers_size());
for (const auto &peer : pr.peers()) {
if (curr_prune_peer_infos == limits_->max_prune_peer_infos) {
break;
}
// TODO(artem): meshsub 1.1.0 + signed peer records NYI

log()->debug("peer id size={}, signed peer record size={}",
peer.peerid().size(),
peer.signedpeerrecord().size());

curr_prune_peer_infos++;
}
receiver.onPrune(from, pr.topicid(), backoff_time);

curr_prune_messages++;
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/protocol/gossip/impl/message_parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "common.hpp"

#include <libp2p/protocol/gossip/gossip.hpp>

namespace pubsub::pb {
// protobuf message forward declaration
class RPC;
Expand All @@ -20,7 +22,7 @@ namespace libp2p::protocol::gossip {
/// Protobuf message parser.
class MessageParser {
public:
MessageParser();
MessageParser(std::shared_ptr<RPCLimits> limits);

~MessageParser();

Expand All @@ -33,6 +35,8 @@ namespace libp2p::protocol::gossip {
private:
/// Parsed protobuf message
std::unique_ptr<pubsub::pb::RPC> pb_msg_;
/// Initialised RPC Limits
std::shared_ptr<RPCLimits> limits_;
};

} // namespace libp2p::protocol::gossip
9 changes: 6 additions & 3 deletions src/protocol/gossip/impl/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
#include <libp2p/basic/varint_reader.hpp>
#include <libp2p/basic/write.hpp>

#include "message_parser.hpp"
#include "peer_context.hpp"

#define TRACE_ENABLED 0
#include <libp2p/common/trace.hpp>

#include "message_parser.hpp"

namespace libp2p::protocol::gossip {

Stream::Stream(size_t stream_id,
Expand All @@ -26,7 +27,8 @@ namespace libp2p::protocol::gossip {
const Feedback &feedback,
MessageReceiver &msg_receiver,
std::shared_ptr<connection::Stream> stream,
PeerContextPtr peer)
PeerContextPtr peer,
std::shared_ptr<RPCLimits> limits)
: stream_id_(stream_id),
timeout_(config.rw_timeout_msec),
scheduler_(scheduler),
Expand All @@ -35,6 +37,7 @@ namespace libp2p::protocol::gossip {
msg_receiver_(msg_receiver),
stream_(std::move(stream)),
peer_(std::move(peer)),
limits_(std::move(limits)),
read_buffer_(std::make_shared<std::vector<uint8_t>>()) {
assert(feedback_);
assert(stream_);
Expand Down Expand Up @@ -109,7 +112,7 @@ namespace libp2p::protocol::gossip {
peer_->str,
stream_id_);

MessageParser parser;
MessageParser parser{limits_};
if (!parser.parse(*read_buffer_)) {
feedback_(peer_, Error::MESSAGE_PARSE_ERROR);
return;
Expand Down
4 changes: 3 additions & 1 deletion src/protocol/gossip/impl/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace libp2p::protocol::gossip {
const Feedback &feedback,
MessageReceiver &msg_receiver,
std::shared_ptr<connection::Stream> stream,
PeerContextPtr peer);
PeerContextPtr peer,
std::shared_ptr<RPCLimits> limits);

/// Begins reading messages from stream
void read();
Expand Down Expand Up @@ -64,6 +65,7 @@ namespace libp2p::protocol::gossip {
MessageReceiver &msg_receiver_;
std::shared_ptr<connection::Stream> stream_;
PeerContextPtr peer_;
std::shared_ptr<RPCLimits> limits_;

std::deque<SharedBuffer> pending_buffers_;

Expand Down
8 changes: 8 additions & 0 deletions test/libp2p/protocol/gossip/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ target_link_libraries(gossip_local_subs_test
p2p_gossip
p2p_testutil_peer
)

addtest(gossip_rpc_limits_test
gossip_rpc_limits_test.cpp
)
target_link_libraries(gossip_rpc_limits_test
p2p_gossip
p2p_testutil_peer
)
Loading
Loading