Documentation ¶
Index ¶
- type BaseReactor
- func (*BaseReactor) AddPeer(_ PeerConn)
- func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor
- func (*BaseReactor) InitPeer(peer PeerConn) PeerConn
- func (*BaseReactor) Receive(_ byte, _ PeerConn, _ []byte)
- func (*BaseReactor) RemovePeer(_ PeerConn, _ any)
- func (br *BaseReactor) SetSwitch(sw Switch)
- type ChannelDescriptor
- type ConnConfig
- type ConnInfo
- type ConnectionStatus
- type MultiplexSwitch
- func (sw *MultiplexSwitch) Broadcast(chID byte, data []byte)
- func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress)
- func (sw *MultiplexSwitch) OnStart() error
- func (sw *MultiplexSwitch) OnStop()
- func (sw *MultiplexSwitch) Peers() PeerSet
- func (sw *MultiplexSwitch) StopPeerForError(peer PeerConn, err error)
- func (sw *MultiplexSwitch) Subscribe(filterFn events.EventFilter) (<-chan events.Event, func())
- type MultiplexTransport
- func (mt *MultiplexTransport) Accept(ctx context.Context, behavior PeerBehavior) (PeerConn, error)
- func (mt *MultiplexTransport) Close() error
- func (mt *MultiplexTransport) Dial(ctx context.Context, addr types.NetAddress, behavior PeerBehavior) (PeerConn, error)
- func (mt *MultiplexTransport) Listen(addr types.NetAddress) error
- func (mt *MultiplexTransport) NetAddress() types.NetAddress
- func (mt *MultiplexTransport) Remove(p PeerConn)
- type PeerBehavior
- type PeerConn
- type PeerSet
- type Reactor
- type Switch
- type SwitchOption
- type Transport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseReactor ¶
type BaseReactor struct { service.BaseService // Provides Start, Stop, Quit Switch Switch }
func NewBaseReactor ¶
func NewBaseReactor(name string, impl Reactor) *BaseReactor
func (*BaseReactor) AddPeer ¶
func (*BaseReactor) AddPeer(_ PeerConn)
func (*BaseReactor) GetChannels ¶
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor
func (*BaseReactor) InitPeer ¶
func (*BaseReactor) InitPeer(peer PeerConn) PeerConn
func (*BaseReactor) RemovePeer ¶
func (*BaseReactor) RemovePeer(_ PeerConn, _ any)
func (*BaseReactor) SetSwitch ¶
func (br *BaseReactor) SetSwitch(sw Switch)
type ChannelDescriptor ¶
type ChannelDescriptor = conn.ChannelDescriptor
type ConnConfig ¶
type ConnConfig struct { MConfig conn.MConnConfig ReactorsByCh map[byte]Reactor ChDescs []*conn.ChannelDescriptor OnPeerError func(PeerConn, error) }
type ConnInfo ¶
type ConnInfo struct { Outbound bool // flag indicating if the connection is dialed Persistent bool // flag indicating if the connection is persistent Private bool // flag indicating if the peer is private (not shared) Conn net.Conn // the source connection RemoteIP net.IP // the remote IP of the peer SocketAddr *types.NetAddress }
ConnInfo wraps the remote peer connection
type ConnectionStatus ¶
type ConnectionStatus = conn.ConnectionStatus
type MultiplexSwitch ¶
type MultiplexSwitch struct { service.BaseService // contains filtered or unexported fields }
MultiplexSwitch handles peer connections and exposes an API to receive incoming messages on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one or more `Channels`. So while sending outgoing messages is typically performed on the peer, incoming messages are received on the reactor.
func NewMultiplexSwitch ¶
func NewMultiplexSwitch( transport Transport, opts ...SwitchOption, ) *MultiplexSwitch
NewMultiplexSwitch creates a new MultiplexSwitch with the given config.
func (*MultiplexSwitch) Broadcast ¶
func (sw *MultiplexSwitch) Broadcast(chID byte, data []byte)
Broadcast broadcasts the given data to the given channel, across the entire switch peer set, without blocking
func (*MultiplexSwitch) DialPeers ¶
func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress)
DialPeers adds the peers to the dial queue for async dialing. To monitor dial progress, subscribe to adequate p2p MultiplexSwitch events
func (*MultiplexSwitch) OnStart ¶
func (sw *MultiplexSwitch) OnStart() error
OnStart implements BaseService. It starts all the reactors and peers.
func (*MultiplexSwitch) OnStop ¶
func (sw *MultiplexSwitch) OnStop()
OnStop implements BaseService. It stops all peers and reactors.
func (*MultiplexSwitch) Peers ¶
func (sw *MultiplexSwitch) Peers() PeerSet
Peers returns the set of peers that are connected to the switch.
func (*MultiplexSwitch) StopPeerForError ¶
func (sw *MultiplexSwitch) StopPeerForError(peer PeerConn, err error)
StopPeerForError disconnects from a peer due to external error. If the peer is persistent, it will attempt to reconnect
func (*MultiplexSwitch) Subscribe ¶
func (sw *MultiplexSwitch) Subscribe(filterFn events.EventFilter) (<-chan events.Event, func())
Subscribe registers to live events happening on the p2p Switch. Returns the notification channel, along with an unsubscribe method
type MultiplexTransport ¶
type MultiplexTransport struct {
// contains filtered or unexported fields
}
MultiplexTransport accepts and dials tcp connections and upgrades them to multiplexed peers.
func NewMultiplexTransport ¶
func NewMultiplexTransport( nodeInfo types.NodeInfo, nodeKey types.NodeKey, mConfig conn.MConnConfig, logger *slog.Logger, ) *MultiplexTransport
NewMultiplexTransport returns a tcp connected multiplexed peer.
func (*MultiplexTransport) Accept ¶
func (mt *MultiplexTransport) Accept(ctx context.Context, behavior PeerBehavior) (PeerConn, error)
Accept waits for a verified inbound Peer to connect, and returns it [BLOCKING]
func (*MultiplexTransport) Close ¶
func (mt *MultiplexTransport) Close() error
Close stops the multiplex transport
func (*MultiplexTransport) Dial ¶
func (mt *MultiplexTransport) Dial( ctx context.Context, addr types.NetAddress, behavior PeerBehavior, ) (PeerConn, error)
Dial creates an outbound Peer connection, and verifies it (performs handshaking) [BLOCKING]
func (*MultiplexTransport) Listen ¶
func (mt *MultiplexTransport) Listen(addr types.NetAddress) error
Listen starts an active process of listening for incoming connections [NON-BLOCKING]
func (*MultiplexTransport) NetAddress ¶
func (mt *MultiplexTransport) NetAddress() types.NetAddress
NetAddress returns the transport's listen address (for p2p connections)
func (*MultiplexTransport) Remove ¶
func (mt *MultiplexTransport) Remove(p PeerConn)
Remove removes the peer resources from the transport
type PeerBehavior ¶
type PeerBehavior interface { // ReactorChDescriptors returns the Reactor channel descriptors ReactorChDescriptors() []*conn.ChannelDescriptor // Reactors returns the node's active p2p Reactors (modules) Reactors() map[byte]Reactor // HandlePeerError propagates a peer connection error for further processing HandlePeerError(PeerConn, error) // IsPersistentPeer returns a flag indicating if the given peer is persistent IsPersistentPeer(types.ID) bool // IsPrivatePeer returns a flag indicating if the given peer is private IsPrivatePeer(types.ID) bool }
PeerBehavior wraps the Reactor and MultiplexSwitch information a Transport would need when dialing or accepting new Peer connections. It is worth noting that the only reason why this information is required in the first place, is because Peers expose an API through which different TM modules can interact with them. In the future™, modules should not directly "Send" anything to Peers, but instead communicate through other mediums, such as the P2P module
type PeerConn ¶
type PeerConn interface { service.Service FlushStop() ID() types.ID // peer's cryptographic ID RemoteIP() net.IP // remote IP of the connection RemoteAddr() net.Addr // remote address of the connection IsOutbound() bool // did we dial the peer IsPersistent() bool // do we redial this peer when we disconnect IsPrivate() bool // do we share the peer CloseConn() error // close original connection NodeInfo() types.NodeInfo // peer's info Status() ConnectionStatus SocketAddr() *types.NetAddress // actual address of the socket Send(byte, []byte) bool TrySend(byte, []byte) bool Set(string, any) Get(string) any }
PeerConn is a wrapper for a connected peer
type PeerSet ¶
type PeerSet interface { Add(peer PeerConn) Remove(key types.ID) bool Has(key types.ID) bool Get(key types.ID) PeerConn List() []PeerConn NumInbound() uint64 // returns the number of connected inbound nodes NumOutbound() uint64 // returns the number of connected outbound nodes }
PeerSet has a (immutable) subset of the methods of PeerSet.
type Reactor ¶
type Reactor interface { service.Service // Start, Stop // SetSwitch allows setting a switch. SetSwitch(Switch) // GetChannels returns the list of MConnection.ChannelDescriptor. Make sure // that each ID is unique across all the reactors added to the switch. GetChannels() []*conn.ChannelDescriptor // InitPeer is called by the switch before the peer is started. Use it to // initialize data for the peer (e.g. peer state). // // NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start // the peer. Do not store any data associated with the peer in the reactor // itself unless you don't want to have a state, which is never cleaned up. InitPeer(peer PeerConn) PeerConn // AddPeer is called by the switch after the peer is added and successfully // started. Use it to start goroutines communicating with the peer. AddPeer(peer PeerConn) // RemovePeer is called by the switch when the peer is stopped (due to error // or other reason). RemovePeer(peer PeerConn, reason interface{}) // Receive is called by the switch when msgBytes is received from the peer. // // NOTE reactor can not keep msgBytes around after Receive completes without // copying. // // CONTRACT: msgBytes are not nil. Receive(chID byte, peer PeerConn, msgBytes []byte) }
Reactor is responsible for handling incoming messages on one or more Channel. MultiplexSwitch calls GetChannels when reactor is added to it. When a new peer joins our node, InitPeer and AddPeer are called. RemovePeer is called when the peer is stopped. Receive is called when a message is received on a channel associated with this reactor.
PeerConn#Send or PeerConn#TrySend should be used to send the message to a peer.
type Switch ¶
type Switch interface { // Broadcast publishes data on the given channel, to all peers Broadcast(chID byte, data []byte) // Peers returns the latest peer set Peers() PeerSet // Subscribe subscribes to active switch events Subscribe(filterFn events.EventFilter) (<-chan events.Event, func()) // StopPeerForError stops the peer with the given reason StopPeerForError(peer PeerConn, err error) // DialPeers marks the given peers as ready for async dialing DialPeers(peerAddrs ...*types.NetAddress) }
Switch is the abstraction in the p2p module that handles and manages peer connections thorough a Transport
type SwitchOption ¶
type SwitchOption func(*MultiplexSwitch)
SwitchOption is a callback used for configuring the p2p MultiplexSwitch
func WithMaxInboundPeers ¶
func WithMaxInboundPeers(maxInbound uint64) SwitchOption
WithMaxInboundPeers sets the p2p switch's maximum inbound peer limit
func WithMaxOutboundPeers ¶
func WithMaxOutboundPeers(maxOutbound uint64) SwitchOption
WithMaxOutboundPeers sets the p2p switch's maximum outbound peer limit
func WithPersistentPeers ¶
func WithPersistentPeers(peerAddrs []*types.NetAddress) SwitchOption
WithPersistentPeers sets the p2p switch's persistent peer set
func WithPrivatePeers ¶
func WithPrivatePeers(peerIDs []types.ID) SwitchOption
WithPrivatePeers sets the p2p switch's private peer set
func WithReactor ¶
func WithReactor(name string, reactor Reactor) SwitchOption
WithReactor sets the p2p switch reactors
type Transport ¶
type Transport interface { // NetAddress returns the Transport's dial address NetAddress() types.NetAddress // Accept returns a newly connected inbound peer Accept(context.Context, PeerBehavior) (PeerConn, error) // Dial dials a peer, and returns it Dial(context.Context, types.NetAddress, PeerBehavior) (PeerConn, error) // Remove drops any resources associated // with the PeerConn in the transport Remove(PeerConn) }
Transport handles peer dialing and connection acceptance. Additionally, it is also responsible for any custom connection mechanisms (like handshaking). Peers returned by the transport are considered to be verified and sound