flow

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0, BSD-3-Clause, UNKNOWN not legal advice Imports: 0 Imported by: 0

README

Data Flow Rate Control

To download and install this package run:

go get github.com/mxk/go-flowrate/flowrate

The documentation is available at:

http://godoc.org/github.com/mxk/go-flowrate/flowrate

Documentation

Overview

Package flow provides the tools for monitoring and limiting the flow rate of an arbitrary data stream.

Index

Constants

This section is empty.

Variables

View Source
var ErrLimit = errors.New("flowrate: flow rate limit exceeded")

ErrLimit is returned by the Writer when a non-blocking write is short due to the transfer rate limit.

Functions

This section is empty.

Types

type Limiter

type Limiter interface {
	Done() int64
	Status() Status
	SetTransferSize(bytes int64)
	SetLimit(newLimit int64) (old int64)
	SetBlocking(blocking bool) (old bool)
}

Limiter is implemented by the Reader and Writer to provide a consistent interface for monitoring and controlling data transfer.

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor monitors and limits the transfer rate of a data stream.

func New

func New(sampleRate, windowSize time.Duration) *Monitor

New creates a new flow control monitor. Instantaneous transfer rate is measured and updated for each sampleRate interval. windowSize determines the weight of each sample in the exponential moving average (EMA) calculation. The exact formulas are:

sampleTime = currentTime - prevSampleTime
sampleRate = byteCount / sampleTime
weight     = 1 - exp(-sampleTime/windowSize)
newRate    = weight*sampleRate + (1-weight)*oldRate

The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s, respectively.

func (*Monitor) Done

func (m *Monitor) Done() int64

Done marks the transfer as finished and prevents any further updates or limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and Limit methods become NOOPs. It returns the total number of bytes transferred.

func (*Monitor) IO

func (m *Monitor) IO(n int, err error) (int, error)

IO is a convenience method intended to wrap io.Reader and io.Writer method execution. It calls m.Update(n) and then returns (n, err) unmodified.

func (*Monitor) Limit

func (m *Monitor) Limit(want int, rate int64, block bool) (n int)

Limit restricts the instantaneous (per-sample) data flow to rate bytes per second. It returns the maximum number of bytes (0 <= n <= want) that may be transferred immediately without exceeding the limit. If block == true, the call blocks until n > 0. want is returned unmodified if want < 1, rate < 1, or the transfer is inactive (after a call to Done).

At least one byte is always allowed to be transferred in any given sampling period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate is 10 bytes per second.

For usage examples, see the implementation of Reader and Writer in io.go.

func (*Monitor) SetREMA

func (m *Monitor) SetREMA(rEMA float64)

Hack to set the current rEMA.

func (*Monitor) SetTransferSize

func (m *Monitor) SetTransferSize(bytes int64)

SetTransferSize specifies the total size of the data transfer, which allows the Monitor to calculate the overall progress and time to completion.

func (*Monitor) Status

func (m *Monitor) Status() Status

Status returns current transfer status information. The returned value becomes static after a call to Done.

func (*Monitor) Update

func (m *Monitor) Update(n int) int

Update records the transfer of n bytes and returns n. It should be called after each Read/Write operation, even if n is 0.

type Percent

type Percent uint32

Percent represents a percentage in increments of 1/1000th of a percent.

func (Percent) Float

func (p Percent) Float() float64

func (Percent) String

func (p Percent) String() string

type Reader

type Reader struct {
	io.Reader // Data source
	*Monitor  // Flow control monitor
	// contains filtered or unexported fields
}

Reader implements io.ReadCloser with a restriction on the rate of data transfer.

func NewReader

func NewReader(r io.Reader, limit int64) *Reader

NewReader restricts all Read operations on r to limit bytes per second.

func (*Reader) Close

func (r *Reader) Close() error

Close closes the underlying reader if it implements the io.Closer interface.

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

Read reads up to len(p) bytes into p without exceeding the current transfer rate limit. It returns (0, nil) immediately if r is non-blocking and no new bytes can be read at this time.

func (*Reader) SetBlocking

func (r *Reader) SetBlocking(blocking bool) (old bool)

SetBlocking changes the blocking behavior and returns the previous setting. A Read call on a non-blocking reader returns immediately if no additional bytes may be read at this time due to the rate limit.

func (*Reader) SetLimit

func (r *Reader) SetLimit(newLimit int64) (old int64)

SetLimit changes the transfer rate limit to new bytes per second and returns the previous setting.

type Status

type Status struct {
	Active   bool          // Flag indicating an active transfer
	Start    time.Time     // Transfer start time
	Duration time.Duration // Time period covered by the statistics
	Idle     time.Duration // Time since the last transfer of at least 1 byte
	Bytes    int64         // Total number of bytes transferred
	Samples  int64         // Total number of samples taken
	InstRate int64         // Instantaneous transfer rate
	CurRate  int64         // Current transfer rate (EMA of InstRate)
	AvgRate  int64         // Average transfer rate (Bytes / Duration)
	PeakRate int64         // Maximum instantaneous transfer rate
	BytesRem int64         // Number of bytes remaining in the transfer
	TimeRem  time.Duration // Estimated time to completion
	Progress Percent       // Overall transfer progress
}

Status represents the current Monitor status. All transfer rates are in bytes per second rounded to the nearest byte.

type Writer

type Writer struct {
	io.Writer // Data destination
	*Monitor  // Flow control monitor
	// contains filtered or unexported fields
}

Writer implements io.WriteCloser with a restriction on the rate of data transfer.

func NewWriter

func NewWriter(w io.Writer, limit int64) *Writer

NewWriter restricts all Write operations on w to limit bytes per second. The transfer rate and the default blocking behavior (true) can be changed directly on the returned *Writer.

func (*Writer) Close

func (w *Writer) Close() error

Close closes the underlying writer if it implements the io.Closer interface.

func (*Writer) SetBlocking

func (w *Writer) SetBlocking(blocking bool) (old bool)

SetBlocking changes the blocking behavior and returns the previous setting. A Write call on a non-blocking writer returns as soon as no additional bytes may be written at this time due to the rate limit.

func (*Writer) SetLimit

func (w *Writer) SetLimit(newLimit int64) (old int64)

SetLimit changes the transfer rate limit to new bytes per second and returns the previous setting.

func (*Writer) Write

func (w *Writer) Write(p []byte) (n int, err error)

Write writes len(p) bytes from p to the underlying data stream without exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is non-blocking and no additional bytes can be written at this time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL