store

package
v0.0.0-...-7aac2a2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBatchFull = errors.New("batch is full")

ErrBatchFull indicates that the batch is full

View Source
var ErrNotConnected = errors.New("not connected to target server/service")

ErrNotConnected - indicates that the target connection is not active.

Functions

func StreamItems

func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, logger logger)

StreamItems reads the keys from the store and replays the corresponding item to the target.

Types

type Batch

type Batch[I any] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Batch represents an ordered batch

func NewBatch

func NewBatch[I any](config BatchConfig[I]) *Batch[I]

NewBatch creates a new batch

func (*Batch[I]) Add

func (b *Batch[I]) Add(item I) error

Add adds the item to the batch

func (*Batch[I]) Close

func (b *Batch[I]) Close() error

Close commits the pending items and quits the goroutines

func (*Batch[_]) Len

func (b *Batch[_]) Len() int

Len returns the no of items in the batch

type BatchConfig

type BatchConfig[I any] struct {
	Limit         uint32
	Store         Store[I]
	CommitTimeout time.Duration
	Log           logger
}

BatchConfig represents the batch config

type Key

type Key struct {
	Name      string
	Compress  bool
	Extension string
	ItemCount int
}

Key denotes the key present in the store.

func (Key) String

func (k Key) String() string

String returns the filepath name

type QueueStore

type QueueStore[_ any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

QueueStore - Filestore for persisting items.

func NewQueueStore

func NewQueueStore[I any](directory string, limit uint64, ext string) *QueueStore[I]

NewQueueStore - Creates an instance for QueueStore.

func (*QueueStore[_]) Del

func (store *QueueStore[_]) Del(key Key) error

Del - Deletes an entry from the store.

func (*QueueStore[_]) Delete

func (store *QueueStore[_]) Delete() error

Delete - Remove the store directory from disk

func (*QueueStore[I]) Get

func (store *QueueStore[I]) Get(key Key) (item I, err error)

Get - gets an item from the store.

func (*QueueStore[I]) GetMultiple

func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error)

GetMultiple will read the multi payload file and fetch the items

func (*QueueStore[I]) GetRaw

func (store *QueueStore[I]) GetRaw(key Key) (raw []byte, err error)

GetRaw - gets an item from the store.

func (*QueueStore[_]) Len

func (store *QueueStore[_]) Len() int

Len returns the entry count.

func (*QueueStore[_]) List

func (store *QueueStore[_]) List() (keys []Key)

List - lists all files registered in the store.

func (*QueueStore[_]) Open

func (store *QueueStore[_]) Open() error

Open - Creates the directory if not present.

func (*QueueStore[I]) Put

func (store *QueueStore[I]) Put(item I) (Key, error)

Put - puts an item to the store.

func (*QueueStore[I]) PutMultiple

func (store *QueueStore[I]) PutMultiple(items []I) (Key, error)

PutMultiple - puts an item to the store.

func (*QueueStore[I]) PutRaw

func (store *QueueStore[I]) PutRaw(b []byte) (Key, error)

PutRaw - puts the raw bytes to the store

type Store

type Store[I any] interface {
	Put(item I) (Key, error)
	PutMultiple(item []I) (Key, error)
	Get(key Key) (I, error)
	GetMultiple(key Key) ([]I, error)
	GetRaw(key Key) ([]byte, error)
	PutRaw(b []byte) (Key, error)
	Len() int
	List() []Key
	Del(key Key) error
	Open() error
	Delete() error
}

Store - Used to persist items.

type Target

type Target interface {
	Name() string
	SendFromStore(key Key) error
}

Target - store target interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL