Documentation
¶
Index ¶
- type BaseReactor
- func (*BaseReactor) AddPeer(_ PeerConn)
- func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor
- func (*BaseReactor) InitPeer(peer PeerConn) PeerConn
- func (*BaseReactor) Receive(_ byte, _ PeerConn, _ []byte)
- func (*BaseReactor) RemovePeer(_ PeerConn, _ any)
- func (br *BaseReactor) SetSwitch(sw Switch)
- type ChannelDescriptor
- type ConnConfig
- type ConnInfo
- type ConnectionStatus
- type MultiplexSwitch
- func (sw *MultiplexSwitch) Broadcast(chID byte, data []byte)
- func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress)
- func (sw *MultiplexSwitch) OnStart() error
- func (sw *MultiplexSwitch) OnStop()
- func (sw *MultiplexSwitch) Peers() PeerSet
- func (sw *MultiplexSwitch) StopPeerForError(peer PeerConn, err error)
- func (sw *MultiplexSwitch) Subscribe(filterFn events.EventFilter) (<-chan events.Event, func())
- type MultiplexTransport
- func (mt *MultiplexTransport) Accept(ctx context.Context, behavior PeerBehavior) (PeerConn, error)
- func (mt *MultiplexTransport) Close() error
- func (mt *MultiplexTransport) Dial(ctx context.Context, addr types.NetAddress, behavior PeerBehavior) (PeerConn, error)
- func (mt *MultiplexTransport) Listen(addr types.NetAddress) error
- func (mt *MultiplexTransport) NetAddress() types.NetAddress
- func (mt *MultiplexTransport) Remove(p PeerConn)
- type PeerBehavior
- type PeerConn
- type PeerSet
- type Reactor
- type Switch
- type SwitchOption
- type Transport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseReactor ¶
type BaseReactor struct {
service.BaseService // Provides Start, Stop, Quit
Switch Switch
}
func NewBaseReactor ¶
func NewBaseReactor(name string, impl Reactor) *BaseReactor
func (*BaseReactor) AddPeer ¶
func (*BaseReactor) AddPeer(_ PeerConn)
func (*BaseReactor) GetChannels ¶
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor
func (*BaseReactor) InitPeer ¶
func (*BaseReactor) InitPeer(peer PeerConn) PeerConn
func (*BaseReactor) RemovePeer ¶
func (*BaseReactor) RemovePeer(_ PeerConn, _ any)
func (*BaseReactor) SetSwitch ¶
func (br *BaseReactor) SetSwitch(sw Switch)
type ChannelDescriptor ¶
type ChannelDescriptor = conn.ChannelDescriptor
type ConnConfig ¶
type ConnConfig struct {
MConfig conn.MConnConfig
ReactorsByCh map[byte]Reactor
ChDescs []*conn.ChannelDescriptor
OnPeerError func(PeerConn, error)
}
type ConnInfo ¶
type ConnInfo struct {
Outbound bool // flag indicating if the connection is dialed
Persistent bool // flag indicating if the connection is persistent
Private bool // flag indicating if the peer is private (not shared)
Conn net.Conn // the source connection
RemoteIP net.IP // the remote IP of the peer
SocketAddr *types.NetAddress
}
ConnInfo wraps the remote peer connection
type ConnectionStatus ¶
type ConnectionStatus = conn.ConnectionStatus
type MultiplexSwitch ¶
type MultiplexSwitch struct {
service.BaseService
// contains filtered or unexported fields
}
MultiplexSwitch handles peer connections and exposes an API to receive incoming messages on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one or more `Channels`. So while sending outgoing messages is typically performed on the peer, incoming messages are received on the reactor.
func NewMultiplexSwitch ¶
func NewMultiplexSwitch( transport Transport, opts ...SwitchOption, ) *MultiplexSwitch
NewMultiplexSwitch creates a new MultiplexSwitch with the given config.
func (*MultiplexSwitch) Broadcast ¶
func (sw *MultiplexSwitch) Broadcast(chID byte, data []byte)
Broadcast broadcasts the given data to the given channel, across the entire switch peer set, without blocking
func (*MultiplexSwitch) DialPeers ¶
func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress)
DialPeers adds the peers to the dial queue for async dialing. To monitor dial progress, subscribe to adequate p2p MultiplexSwitch events
func (*MultiplexSwitch) OnStart ¶
func (sw *MultiplexSwitch) OnStart() error
OnStart implements BaseService. It starts all the reactors and peers.
func (*MultiplexSwitch) OnStop ¶
func (sw *MultiplexSwitch) OnStop()
OnStop implements BaseService. It stops all peers and reactors.
func (*MultiplexSwitch) Peers ¶
func (sw *MultiplexSwitch) Peers() PeerSet
Peers returns the set of peers that are connected to the switch.
func (*MultiplexSwitch) StopPeerForError ¶
func (sw *MultiplexSwitch) StopPeerForError(peer PeerConn, err error)
StopPeerForError disconnects from a peer due to external error. If the peer is persistent, it will attempt to reconnect
func (*MultiplexSwitch) Subscribe ¶
func (sw *MultiplexSwitch) Subscribe(filterFn events.EventFilter) (<-chan events.Event, func())
Subscribe registers to live events happening on the p2p Switch. Returns the notification channel, along with an unsubscribe method
type MultiplexTransport ¶
type MultiplexTransport struct {
// contains filtered or unexported fields
}
MultiplexTransport accepts and dials tcp connections and upgrades them to multiplexed peers.
func NewMultiplexTransport ¶
func NewMultiplexTransport( nodeInfo types.NodeInfo, nodeKey types.NodeKey, mConfig conn.MConnConfig, logger *slog.Logger, ) *MultiplexTransport
NewMultiplexTransport returns a tcp connected multiplexed peer.
func (*MultiplexTransport) Accept ¶
func (mt *MultiplexTransport) Accept(ctx context.Context, behavior PeerBehavior) (PeerConn, error)
Accept waits for a verified inbound Peer to connect, and returns it [BLOCKING]
func (*MultiplexTransport) Close ¶
func (mt *MultiplexTransport) Close() error
Close stops the multiplex transport
func (*MultiplexTransport) Dial ¶
func (mt *MultiplexTransport) Dial( ctx context.Context, addr types.NetAddress, behavior PeerBehavior, ) (PeerConn, error)
Dial creates an outbound Peer connection, and verifies it (performs handshaking) [BLOCKING]
func (*MultiplexTransport) Listen ¶
func (mt *MultiplexTransport) Listen(addr types.NetAddress) error
Listen starts an active process of listening for incoming connections [NON-BLOCKING]
func (*MultiplexTransport) NetAddress ¶
func (mt *MultiplexTransport) NetAddress() types.NetAddress
NetAddress returns the transport's listen address (for p2p connections)
func (*MultiplexTransport) Remove ¶
func (mt *MultiplexTransport) Remove(p PeerConn)
Remove removes the peer resources from the transport
type PeerBehavior ¶
type PeerBehavior interface {
// ReactorChDescriptors returns the Reactor channel descriptors
ReactorChDescriptors() []*conn.ChannelDescriptor
// Reactors returns the node's active p2p Reactors (modules)
Reactors() map[byte]Reactor
// HandlePeerError propagates a peer connection error for further processing
HandlePeerError(PeerConn, error)
// IsPersistentPeer returns a flag indicating if the given peer is persistent
IsPersistentPeer(types.ID) bool
// IsPrivatePeer returns a flag indicating if the given peer is private
IsPrivatePeer(types.ID) bool
}
PeerBehavior wraps the Reactor and MultiplexSwitch information a Transport would need when dialing or accepting new Peer connections. It is worth noting that the only reason why this information is required in the first place, is because Peers expose an API through which different TM modules can interact with them. In the future™, modules should not directly "Send" anything to Peers, but instead communicate through other mediums, such as the P2P module
type PeerConn ¶
type PeerConn interface {
service.Service
FlushStop()
ID() types.ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
RemoteAddr() net.Addr // remote address of the connection
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
IsPrivate() bool // do we share the peer
CloseConn() error // close original connection
NodeInfo() types.NodeInfo // peer's info
Status() ConnectionStatus
SocketAddr() *types.NetAddress // actual address of the socket
Send(byte, []byte) bool
TrySend(byte, []byte) bool
Set(string, any)
Get(string) any
}
PeerConn is a wrapper for a connected peer
type PeerSet ¶
type PeerSet interface {
Add(peer PeerConn)
Remove(key types.ID) bool
Has(key types.ID) bool
Get(key types.ID) PeerConn
List() []PeerConn
NumInbound() uint64 // returns the number of connected inbound nodes
NumOutbound() uint64 // returns the number of connected outbound nodes
}
PeerSet has a (immutable) subset of the methods of PeerSet.
type Reactor ¶
type Reactor interface {
service.Service // Start, Stop
// SetSwitch allows setting a switch.
SetSwitch(Switch)
// GetChannels returns the list of MConnection.ChannelDescriptor. Make sure
// that each ID is unique across all the reactors added to the switch.
GetChannels() []*conn.ChannelDescriptor
// InitPeer is called by the switch before the peer is started. Use it to
// initialize data for the peer (e.g. peer state).
//
// NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start
// the peer. Do not store any data associated with the peer in the reactor
// itself unless you don't want to have a state, which is never cleaned up.
InitPeer(peer PeerConn) PeerConn
// AddPeer is called by the switch after the peer is added and successfully
// started. Use it to start goroutines communicating with the peer.
AddPeer(peer PeerConn)
// RemovePeer is called by the switch when the peer is stopped (due to error
// or other reason).
RemovePeer(peer PeerConn, reason any)
// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
Receive(chID byte, peer PeerConn, msgBytes []byte)
}
Reactor is responsible for handling incoming messages on one or more Channel. MultiplexSwitch calls GetChannels when reactor is added to it. When a new peer joins our node, InitPeer and AddPeer are called. RemovePeer is called when the peer is stopped. Receive is called when a message is received on a channel associated with this reactor.
PeerConn#Send or PeerConn#TrySend should be used to send the message to a peer.
type Switch ¶
type Switch interface {
// Broadcast publishes data on the given channel, to all peers
Broadcast(chID byte, data []byte)
// Peers returns the latest peer set
Peers() PeerSet
// Subscribe subscribes to active switch events
Subscribe(filterFn events.EventFilter) (<-chan events.Event, func())
// StopPeerForError stops the peer with the given reason
StopPeerForError(peer PeerConn, err error)
// DialPeers marks the given peers as ready for async dialing
DialPeers(peerAddrs ...*types.NetAddress)
}
Switch is the abstraction in the p2p module that handles and manages peer connections thorough a Transport
type SwitchOption ¶
type SwitchOption func(*MultiplexSwitch)
SwitchOption is a callback used for configuring the p2p MultiplexSwitch
func WithMaxInboundPeers ¶
func WithMaxInboundPeers(maxInbound uint64) SwitchOption
WithMaxInboundPeers sets the p2p switch's maximum inbound peer limit
func WithMaxOutboundPeers ¶
func WithMaxOutboundPeers(maxOutbound uint64) SwitchOption
WithMaxOutboundPeers sets the p2p switch's maximum outbound peer limit
func WithPersistentPeers ¶
func WithPersistentPeers(peerAddrs []*types.NetAddress) SwitchOption
WithPersistentPeers sets the p2p switch's persistent peer set
func WithPrivatePeers ¶
func WithPrivatePeers(peerIDs []types.ID) SwitchOption
WithPrivatePeers sets the p2p switch's private peer set
func WithReactor ¶
func WithReactor(name string, reactor Reactor) SwitchOption
WithReactor sets the p2p switch reactors
type Transport ¶
type Transport interface {
// NetAddress returns the Transport's dial address
NetAddress() types.NetAddress
// Accept returns a newly connected inbound peer
Accept(context.Context, PeerBehavior) (PeerConn, error)
// Dial dials a peer, and returns it
Dial(context.Context, types.NetAddress, PeerBehavior) (PeerConn, error)
// Remove drops any resources associated
// with the PeerConn in the transport
Remove(PeerConn)
}
Transport handles peer dialing and connection acceptance. Additionally, it is also responsible for any custom connection mechanisms (like handshaking). Peers returned by the transport are considered to be verified and sound