From 3dfda7c2bee877d98e04231081ed2b7f60320692 Mon Sep 17 00:00:00 2001 From: Alexey Skobkin Date: Sun, 22 Sep 2019 04:00:59 +0300 Subject: [PATCH] Beanstalkd engine implementation. --- go.mod | 5 +- pkg/README.md | 21 +++++++ pkg/persistence/beanstalkd.go | 108 ++++++++++++++++++++++++++++++++++ pkg/persistence/interface.go | 10 ++++ pkg/persistence/stdout.go | 8 +-- 5 files changed, 143 insertions(+), 9 deletions(-) create mode 100644 pkg/persistence/beanstalkd.go diff --git a/go.mod b/go.mod index df5a46d..8627b50 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/boramalper/magnetico +go 1.15 + require ( github.com/Wessie/appdirs v0.0.0-20141031215813-6573e894f8e2 github.com/anacrolix/dht v1.0.1 // indirect @@ -11,6 +13,7 @@ require ( github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect github.com/gorilla/mux v1.7.4 github.com/gorilla/schema v1.1.0 + github.com/iwanbk/gobeanstalk v0.0.0-20160903043409-dbbb23937c31 github.com/jackc/pgx/v4 v4.9.2 github.com/jessevdk/go-flags v1.4.0 github.com/kevinburke/go-bindata v3.16.0+incompatible // indirect @@ -28,5 +31,3 @@ require ( golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect honnef.co/go/tools v0.0.1-2020.1.3 // indirect ) - -go 1.13 diff --git a/pkg/README.md b/pkg/README.md index 8746f4b..fe6ed42 100644 --- a/pkg/README.md +++ b/pkg/README.md @@ -29,6 +29,27 @@ for more examples. Optional parameter `schema` was added to choose which schema will be used to store magnetico tables, sequences and indexes. +## Beanstalk MQ engine for magneticod + +[Beanstalkd](https://beanstalkd.github.io/) is very lightweight and simple MQ server implementation. +You can use it to organize delivery of the indexed data to your application. + +Use `beanstalk` URL schema to connect to beanstalkd server. For example: + +```shell +magneticod --database=beanstalkd://127.0.0.1:11300/magneticod_tube +``` + +Don't forget to [set](https://linux.die.net/man/1/beanstalkd) binlog persistence, change maximum job size +and `fsync()` period to be able to reliably save torrents with a large number of files: + +```shell +# Example settings (may not work for you) +beanstalkd -z 1048560 -b /var/lib/beanstalkd -f 2400000 +``` + +For job data example see `stdout` engine documentation below as `beanstalk` engine uses the same format. + ## Stdout Dummy Database Engine for magneticod Stdout dummy database engine for **magneticod** prints a new [JSON Line](http://jsonlines.org/) diff --git a/pkg/persistence/beanstalkd.go b/pkg/persistence/beanstalkd.go new file mode 100644 index 0000000..9497fdf --- /dev/null +++ b/pkg/persistence/beanstalkd.go @@ -0,0 +1,108 @@ +package persistence + +import ( + "encoding/hex" + "encoding/json" + "go.uber.org/zap" + "net/url" + "strings" + "time" + + "github.com/iwanbk/gobeanstalk" + "github.com/pkg/errors" +) + +func makeBeanstalkDatabase(url_ *url.URL) (Database, error) { + s := new(beanstalkd) + + var err error + s.bsQueue, err = gobeanstalk.Dial(url_.Hostname() + ":" + url_.Port()) + if err != nil { + return nil, errors.Wrap(err, "Beanstalkd connection error") + } + + tubeName := strings.TrimPrefix(url_.Path, "/") + + err = s.bsQueue.Use(tubeName) + if err != nil { + return nil, errors.Wrap(err, "Beanstalkd tube set error") + } + + zap.L().Info( + "Beanstalkd connection created", + zap.String("host", url_.Hostname()), + zap.String("port", url_.Port()), + zap.String("tube", tubeName), + ) + + return s, nil +} + +type beanstalkd struct { + bsQueue *gobeanstalk.Conn +} + +func (s *beanstalkd) Engine() databaseEngine { + return Beanstalkd +} + +func (s *beanstalkd) DoesTorrentExist(infoHash []byte) (bool, error) { + // Always say that "No the torrent does not exist" because we do not have + // a way to know if we have seen it before or not. + return false, nil +} + +func (s *beanstalkd) AddNewTorrent(infoHash []byte, name string, files []File) error { + payloadJson, err := json.Marshal(SimpleTorrentSummary{ + InfoHash: hex.EncodeToString(infoHash), + Name: name, + Files: files, + }) + + if err != nil { + return errors.Wrap(err, "DB engine beanstalkd encode error") + } + + jobId, err := s.bsQueue.Put(payloadJson, 0, 0, 30*time.Second) + + if err != nil { + return errors.Wrap(err, "DB engine beanstalkd Put() error") + } + + zap.L().Debug("New item put into the queue", zap.Uint64("job_id", jobId)) + + return nil +} + +func (s *beanstalkd) Close() error { + s.bsQueue.Quit() + return nil +} + +func (s *beanstalkd) GetNumberOfTorrents() (uint, error) { + return 0, NotImplementedError +} + +func (s *beanstalkd) QueryTorrents( + query string, + epoch int64, + orderBy OrderingCriteria, + ascending bool, + limit uint, + lastOrderedValue *float64, + lastID *uint64, +) ([]TorrentMetadata, error) { + return nil, NotImplementedError +} + +func (s *beanstalkd) GetTorrent(infoHash []byte) (*TorrentMetadata, error) { + return nil, NotImplementedError +} + +func (s *beanstalkd) GetFiles(infoHash []byte) ([]File, error) { + return nil, NotImplementedError +} + +func (s *beanstalkd) GetStatistics(from string, n uint) (*Statistics, error) { + return nil, NotImplementedError +} diff --git a/pkg/persistence/interface.go b/pkg/persistence/interface.go index 330c9ff..103f7cd 100644 --- a/pkg/persistence/interface.go +++ b/pkg/persistence/interface.go @@ -63,6 +63,7 @@ type databaseEngine uint8 const ( Sqlite3 databaseEngine = iota + 1 Postgres + Beanstalkd Stdout ) @@ -91,6 +92,12 @@ type TorrentMetadata struct { Relevance float64 `json:"relevance"` } +type SimpleTorrentSummary struct { + InfoHash string `json:"infoHash"` + Name string `json:"name"` + Files []File `json:"files"` +} + func (tm *TorrentMetadata) MarshalJSON() ([]byte, error) { type Alias TorrentMetadata return json.Marshal(&struct { @@ -122,6 +129,9 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) { case "stdout": return makeStdoutDatabase(url_) + case "beanstalk", "beanstalkd": + return makeBeanstalkDatabase(url_) + case "mysql": return nil, fmt.Errorf("mysql is not yet supported") diff --git a/pkg/persistence/stdout.go b/pkg/persistence/stdout.go index a6228c8..21c5004 100644 --- a/pkg/persistence/stdout.go +++ b/pkg/persistence/stdout.go @@ -9,12 +9,6 @@ import ( "github.com/pkg/errors" ) -type out struct { - InfoHash string `json:"infoHash"` - Name string `json:"name"` - Files []File `json:"files"` -} - func makeStdoutDatabase(_ *url.URL) (Database, error) { s := new(stdout) s.encoder = json.NewEncoder(os.Stdout) @@ -39,7 +33,7 @@ func (s *stdout) DoesTorrentExist(infoHash []byte) (bool, error) { } func (s *stdout) AddNewTorrent(infoHash []byte, name string, files []File) error { - err := s.encoder.Encode(out{ + err := s.encoder.Encode(SimpleTorrentSummary{ InfoHash: hex.EncodeToString(infoHash), Name: name, Files: files,