mempool

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2024 License: Apache-2.0, UNKNOWN not legal advice Imports: 0 Imported by: 0

Documentation

Overview

TODO: Better handle abci client errors. (make it automatically handle connection errors)

Index

Constants

View Source
const (
	MempoolChannel = byte(0x30)

	// UnknownPeerID is the peer ID to use when running CheckTx when there is
	// no peer (e.g. RPC)
	UnknownPeerID uint16 = 0
)

Variables

View Source
var ErrTxInCache = errors.New("Tx already exists in cache")

ErrTxInCache is returned to the client if we saw tx earlier

View Source
var Package = amino.RegisterPackage(amino.NewPackage(
	"github.com/gnolang/gno/tm2/pkg/bft/mempool",
	"tm",
	amino.GetCallersDirname(),
).WithDependencies().WithTypes(
	&TxMessage{},
))

Functions

This section is empty.

Types

type CListMempool

type CListMempool struct {
	// contains filtered or unexported fields
}

CListMempool is an ordered in-memory pool for transactions before they are proposed in a consensus round. Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool. The mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers.

func NewCListMempool

func NewCListMempool(
	config *cfg.MempoolConfig,
	proxyAppConn appconn.Mempool,
	height int64,
	maxTxBytes int64,
	options ...CListMempoolOption,
) *CListMempool

NewCListMempool returns a new mempool with the given configuration and connection to an application.

func (*CListMempool) CheckTx

func (mem *CListMempool) CheckTx(tx types.Tx, cb func(abci.Response)) (err error)

It blocks if we're waiting on Update() or Reap(). cb: A callback from the CheckTx command.

It gets called from another goroutine.

CONTRACT: Either cb will get called, or err returned.

func (*CListMempool) CheckTxWithInfo

func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(abci.Response), txInfo TxInfo) (err error)

func (*CListMempool) CloseWAL

func (mem *CListMempool) CloseWAL()

func (*CListMempool) EnableTxsAvailable

func (mem *CListMempool) EnableTxsAvailable()

NOTE: not thread safe - should only be called once, on startup

func (*CListMempool) Flush

func (mem *CListMempool) Flush()

func (*CListMempool) FlushAppConn

func (mem *CListMempool) FlushAppConn() error

func (*CListMempool) InitWAL

func (mem *CListMempool) InitWAL()

*panics* if can't create directory or open file. *not thread safe*

func (*CListMempool) Lock

func (mem *CListMempool) Lock()

func (*CListMempool) MaxTxBytes

func (mem *CListMempool) MaxTxBytes() int64

func (*CListMempool) ReapMaxBytesMaxGas

func (mem *CListMempool) ReapMaxBytesMaxGas(maxDataBytes, maxGas int64) types.Txs

func (*CListMempool) ReapMaxTxs

func (mem *CListMempool) ReapMaxTxs(max int) types.Txs

func (*CListMempool) SetLogger

func (mem *CListMempool) SetLogger(l *slog.Logger)

SetLogger sets the Logger.

func (*CListMempool) Size

func (mem *CListMempool) Size() int

func (*CListMempool) TxsAvailable

func (mem *CListMempool) TxsAvailable() <-chan struct{}

func (*CListMempool) TxsBytes

func (mem *CListMempool) TxsBytes() int64

func (*CListMempool) TxsFront

func (mem *CListMempool) TxsFront() *clist.CElement

TxsFront returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. FIXME: leaking implementation details!

func (*CListMempool) TxsWaitChan

func (mem *CListMempool) TxsWaitChan() <-chan struct{}

TxsWaitChan returns a channel to wait on transactions. It will be closed once the mempool is not empty (ie. the internal `mem.txs` has at least one element)

func (*CListMempool) Unlock

func (mem *CListMempool) Unlock()

func (*CListMempool) Update

func (mem *CListMempool) Update(
	height int64,
	txs types.Txs,
	deliverTxResponses []abci.ResponseDeliverTx,
	preCheck PreCheckFunc,
	maxTxBytes int64,
) error

type CListMempoolOption

type CListMempoolOption func(*CListMempool)

CListMempoolOption sets an optional parameter on the mempool.

func WithPreCheck

func WithPreCheck(f PreCheckFunc) CListMempoolOption

WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran before CheckTx.

type Mempool

type Mempool interface {
	// CheckTx executes a new transaction against the application to determine
	// its validity and whether it should be added to the mempool.
	CheckTx(tx types.Tx, callback func(abci.Response)) error

	// CheckTxWithInfo performs the same operation as CheckTx, but with extra
	// meta data about the tx.
	// Currently this metadata is the peer who sent it, used to prevent the tx
	// from being gossiped back to them.
	CheckTxWithInfo(tx types.Tx, callback func(abci.Response), txInfo TxInfo) error

	// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxDataBytes
	// bytes total with the condition that the total gasWanted must be less than
	// maxGas.
	// If both maxes are negative, there is no cap on the size of all returned
	// transactions (~ all available transactions).
	ReapMaxBytesMaxGas(maxDataBytes, maxGas int64) types.Txs

	// ReapMaxTxs reaps up to max transactions from the mempool.
	// If max is negative, there is no cap on the size of all returned
	// transactions (~ all available transactions).
	ReapMaxTxs(max int) types.Txs

	// Lock locks the mempool. The consensus must be able to hold lock to safely update.
	Lock()

	// Unlock unlocks the mempool.
	Unlock()

	// Update informs the mempool that the given txs were committed and can be discarded.
	// NOTE: this should be called *after* block is committed by consensus.
	// NOTE: unsafe; Lock/Unlock must be managed by caller
	Update(blockHeight int64, blockTxs types.Txs, deliverTxResponses []abci.ResponseDeliverTx, newPreFn PreCheckFunc, maxTxBytes int64) error

	// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
	// done. E.g. from CheckTx.
	FlushAppConn() error

	// Flush removes all transactions from the mempool and cache
	Flush()

	// TxsAvailable returns a channel which fires once for every height,
	// and only when transactions are available in the mempool.
	// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
	TxsAvailable() <-chan struct{}

	// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will
	// trigger once every height when transactions are available.
	EnableTxsAvailable()

	// Size returns the number of transactions in the mempool.
	Size() int

	// TxsBytes returns the total size of all txs in the mempool.
	TxsBytes() int64

	// Maximum allowable tx size.
	MaxTxBytes() int64

	// InitWAL creates a directory for the WAL file and opens a file itself.
	InitWAL()

	// CloseWAL closes and discards the underlying WAL file.
	// Any further writes will not be relayed to disk.
	CloseWAL()
}

Mempool defines the mempool interface.

Updates to the mempool need to be synchronized with committing a block so apps can reset their transient state on Commit.

type MempoolIsFullError

type MempoolIsFullError struct {
	// contains filtered or unexported fields
}

MempoolIsFullError means Tendermint & an application can't handle that much load

func (MempoolIsFullError) Error

func (e MempoolIsFullError) Error() string

type MempoolMessage

type MempoolMessage interface{}

MempoolMessage is a message sent or received by the Reactor.

type PeerState

type PeerState interface {
	GetHeight() int64
}

PeerState describes the state of a peer.

type PreCheckFunc

type PreCheckFunc func(types.Tx) error

PreCheckFunc is an optional filter executed before CheckTx and rejects transaction if false is returned. An example would be to ensure that a transaction doesn't exceeded the block size.

NOTE: there is no PostCheckFunc, for otherwise a checktx transaction that passes in the app's checktx state would increment sequence etc, causing an unexpected signature error until the next block.

type Reactor

type Reactor struct {
	p2p.BaseReactor
	// contains filtered or unexported fields
}

Reactor handles mempool tx broadcasting amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.

func NewReactor

func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor

NewReactor returns a new Reactor with the given config and mempool.

func (*Reactor) AddPeer

func (memR *Reactor) AddPeer(peer p2p.Peer)

AddPeer implements Reactor. It starts a broadcast routine ensuring all txs are forwarded to the given peer.

func (*Reactor) GetChannels

func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor

GetChannels implements Reactor. It returns the list of channels for this reactor.

func (*Reactor) OnStart

func (memR *Reactor) OnStart() error

OnStart implements p2p.BaseReactor.

func (*Reactor) Receive

func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)

Receive implements Reactor. It adds any received transactions to the mempool.

func (*Reactor) RemovePeer

func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{})

RemovePeer implements Reactor.

func (*Reactor) SetLogger

func (memR *Reactor) SetLogger(l *slog.Logger)

SetLogger sets the Logger on the reactor and the underlying mempool.

type TxInfo

type TxInfo struct {
	// We don't use p2p.ID here because it's too big. The gain is to store max 2
	// bytes with each tx to identify the sender rather than 20 bytes.
	SenderID uint16
}

TxInfo are parameters that get passed when attempting to add a tx to the mempool.

type TxMessage

type TxMessage struct {
	Tx types.Tx
}

TxMessage is a MempoolMessage containing a transaction.

func (*TxMessage) String

func (m *TxMessage) String() string

String returns a string representation of the TxMessage.

type TxTooLargeError

type TxTooLargeError struct {
	// contains filtered or unexported fields
}

TxTooLargeError means the tx is too big to be sent in a message to other peers

func (TxTooLargeError) Error

func (e TxTooLargeError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL