diff --git a/.travis.yml b/.travis.yml index af852d9..8d95e61 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ # https://docs.travis-ci.com/user/reference/overview/#virtualisation-environment-vs-operating-system sudo: false -dist: trusty +dist: xenial language: go go: - - "1.10" + - "1.11" # Only clone the most recent commit. git: @@ -16,41 +16,26 @@ env: before_install: # Dump environment variables - printenv - # Download the binary to bin folder in $GOPATH + # Download dep binary to bin folder in $GOPATH - curl -L -s https://github.com/golang/dep/releases/download/v${DEP_VERSION}/dep-linux-amd64 -o $GOPATH/bin/dep # Make the binary executable - chmod +x $GOPATH/bin/dep install: - - "go get -u github.com/jteeuwen/go-bindata/..." - - "dep ensure" + - "go get -u -v github.com/kevinburke/go-bindata/..." + - "go get -u -v honnef.co/go/tools/cmd/staticcheck" + - "make ensure" before_script: - - "go-bindata - -o=\"${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/bindata.go\" - -prefix=\"${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/data/\" - ${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/data/..." + - "make magneticod" + - "make magneticow" script: # The Unit Tests - - "go test github.com/boramalper/magnetico/cmd/magneticod/..." - - "go test github.com/boramalper/magnetico/cmd/magneticow/..." - - "go test github.com/boramalper/magnetico/pkg/persistence/..." + - "make test" # Static Analysis (vet) - - "go vet github.com/boramalper/magnetico/cmd/magneticod/..." - - "go vet github.com/boramalper/magnetico/cmd/magneticow/..." - - "go vet github.com/boramalper/magnetico/pkg/persistence/..." + - "make vet" - # Formatting Errors - # - # Since gofmt returns zero even if there are files to be formatted, we use: - # - # ! gofmt -d ${GOPATH}/path/ 2>&1 | read - # - # to return 1 if there are files to be formatted, and 0 if not. - # - # https://groups.google.com/forum/#!topic/Golang-Nuts/pdrN4zleUio - - "! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticod/ 2>&1 | read" - - "! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/ 2>&1 | read" - - "! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/pkg/persistence/ 2>&1 | read" + # Check formatting + - "make check-formatting" diff --git a/COPYING b/COPYING old mode 100755 new mode 100644 diff --git a/Makefile b/Makefile index 71bd3cf..65fab64 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,15 @@ -.PHONY: test format magneticod magneticow ensure test-magneticod test-magneticow test-persistence image image-magneticow image-magneticod +.PHONY: test format vet staticcheck magneticod magneticow ensure image image-magneticow image-magneticod -all: ensure test magneticod magneticow +all: test magneticod magneticow magneticod: - go install --tags fts5 "-ldflags=-s -w" github.com/boramalper/magnetico/cmd/magneticod + go install --tags fts5 "-ldflags=-s -w -X main.compiledOn=`date -u +%Y-%m-%dT%H:%M:%SZ`" github.com/boramalper/magnetico/cmd/magneticod magneticow: # TODO: minify files! - go-bindata -o="cmd/magneticow/bindata.go" -prefix="cmd/magneticow/data/" -pkg="main" cmd/magneticow/data/... - go install --tags fts5 "-ldflags=-s -w" github.com/boramalper/magnetico/cmd/magneticow + # https://github.com/kevinburke/go-bindata + go-bindata -o="cmd/magneticow/bindata.go" -prefix="cmd/magneticow/data/" cmd/magneticow/data/... + go install --tags fts5 "-ldflags=-s -w -X main.compiledOn=`date -u +%Y-%m-%dT%H:%M:%SZ`" github.com/boramalper/magnetico/cmd/magneticow image-magneticod: docker build -t magneticod -f Dockerfile.magneticod . @@ -21,22 +22,33 @@ image: image-magneticod image-magneticow # Download dependencies ensure: dep ensure -v - go get -u github.com/jteeuwen/go-bindata/... -test-magneticod: - go test github.com/boramalper/magnetico/cmd/magneticod/... +vet: + go vet github.com/boramalper/magnetico/... -test-magneticow: - go-bindata -o="cmd/magneticow/bindata.go" -prefix="cmd/magneticow/data/" -pkg="main" cmd/magneticow/data/... - go test github.com/boramalper/magnetico/cmd/magneticow/... +staticcheck: + staticcheck github.com/boramalper/magnetico/... -test-persistence: - go test github.com/boramalper/magnetico/pkg/persistence/... - -test: test-persistence test-magneticod test-magneticow +test: + go test github.com/boramalper/magnetico/... format: - gofmt -w cmd/magneticod - gofmt -w cmd/magneticow - gofmt -w pkg/persistence + gofmt -w ${GOPATH}/src/github.com/boramalper/magnetico/cmd/ + gofmt -w ${GOPATH}/src/github.com/boramalper/magnetico/pkg/ +# Formatting Errors +# Since gofmt returns zero even if there are files to be formatted, we use: +# +# ! gofmt -d ${GOPATH}/path/ 2>&1 | read +# +# to return 1 if there are files to be formatted, and 0 if not. +# https://groups.google.com/forum/#!topic/Golang-Nuts/pdrN4zleUio +# +# How can I use Bash syntax in Makefile targets? +# Because `read` is a bash command. +# https://stackoverflow.com/a/589300/4466589 +# +check-formatting: SHELL:=/bin/bash # HERE: this is setting the shell for check-formatting only +check-formatting: + ! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/cmd/ 2>&1 | tee /dev/fd/2 | read + ! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/pkg/ 2>&1 | tee /dev/fd/2 | read diff --git a/README.md b/README.md index 359c919..fc3366e 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # magnetico *Autonomous (self-hosted) BitTorrent DHT search engine suite.* -[![chat on gitter](https://badges.gitter.im/gitterHQ/gitter.png)](https://gitter.im/magnetico-dev/magnetico-dev) [![Build Status on Travis CI](https://travis-ci.org/boramalper/magnetico.svg?branch=go-rewrite)](https://travis-ci.org/boramalper/magnetico) [![Build status on AppVeyor](https://ci.appveyor.com/api/projects/status/u2jtbe6jutya7p0x/branch/go-rewrite?svg=true)](https://ci.appveyor.com/project/boramalper/magnetico/branch/go-rewrite) +[![chat on gitter](https://badges.gitter.im/gitterHQ/gitter.png)](https://gitter.im/magnetico-dev/magnetico-dev) [![Build Status](https://travis-ci.org/boramalper/magnetico.svg?branch=master)](https://travis-ci.org/boramalper/magnetico) [![Build status on AppVeyor](https://ci.appveyor.com/api/projects/status/u2jtbe6jutya7p0x/branch/go-rewrite?svg=true)](https://ci.appveyor.com/project/boramalper/magnetico/branch/go-rewrite) magnetico is the first autonomous (self-hosted) BitTorrent DHT search engine suite that is *designed for end-users*. The suite consists of two packages: diff --git a/cmd/magneticod/bittorrent/metadata/leech.go b/cmd/magneticod/bittorrent/metadata/leech.go index 3ca7397..fad486a 100644 --- a/cmd/magneticod/bittorrent/metadata/leech.go +++ b/cmd/magneticod/bittorrent/metadata/leech.go @@ -2,7 +2,6 @@ package metadata import ( "bytes" - "crypto/rand" "crypto/sha1" "encoding/binary" "fmt" @@ -47,6 +46,8 @@ type Leech struct { ut_metadata uint8 metadataReceived, metadataSize uint metadata []byte + + connClosed bool } type LeechEventHandlers struct { @@ -54,16 +55,13 @@ type LeechEventHandlers struct { OnError func([20]byte, error) // must be supplied. args: infohash, error } -func NewLeech(infoHash [20]byte, peerAddr *net.TCPAddr, ev LeechEventHandlers) *Leech { +func NewLeech(infoHash [20]byte, peerAddr *net.TCPAddr, clientID []byte, ev LeechEventHandlers) *Leech { l := new(Leech) l.infoHash = infoHash l.peerAddr = peerAddr + copy(l.clientID[:], clientID) l.ev = ev - if _, err := rand.Read(l.clientID[:]); err != nil { - panic(err.Error()) - } - return l } @@ -286,17 +284,26 @@ func (l *Leech) connect(deadline time.Time) error { return nil } +func (l *Leech) closeConn() { + if l.connClosed { + return + } + + if err := l.conn.Close(); err != nil { + zap.L().Panic("couldn't close leech connection!", zap.Error(err)) + return + } + + l.connClosed = true +} + func (l *Leech) Do(deadline time.Time) { err := l.connect(deadline) if err != nil { l.OnError(errors.Wrap(err, "connect")) return } - defer func() { - if err := l.conn.Close(); err != nil { - zap.L().Panic("couldn't close leech connection!", zap.Error(err)) - } - }() + defer l.closeConn() err = l.doBtHandshake() if err != nil { @@ -328,7 +335,7 @@ func (l *Leech) Do(deadline time.Time) { rExtDict := new(extDict) err = bencode.NewDecoder(rMessageBuf).Decode(rExtDict) if err != nil { - zap.L().Warn("Couldn't decode extension message in the loop!", zap.Error(err)) + l.OnError(errors.Wrap(err, "could not decode ext msg in the loop")) return } @@ -371,6 +378,10 @@ func (l *Leech) Do(deadline time.Time) { } } + // We are done with the transfer, close socket as soon as possible (i.e. NOW) to avoid hitting "too many open files" + // error. + l.closeConn() + // Verify the checksum sha1Sum := sha1.Sum(l.metadata) if !bytes.Equal(sha1Sum[:], l.infoHash[:]) { diff --git a/cmd/magneticod/bittorrent/metadata/sink.go b/cmd/magneticod/bittorrent/metadata/sink.go index 2842a9c..386a1d3 100644 --- a/cmd/magneticod/bittorrent/metadata/sink.go +++ b/cmd/magneticod/bittorrent/metadata/sink.go @@ -1,15 +1,15 @@ package metadata import ( - "crypto/rand" + "math/rand" "sync" "time" - "github.com/boramalper/magnetico/pkg/util" "go.uber.org/zap" "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" "github.com/boramalper/magnetico/pkg/persistence" + "github.com/boramalper/magnetico/pkg/util" ) type Metadata struct { @@ -24,8 +24,9 @@ type Metadata struct { } type Sink struct { - clientID []byte + PeerID []byte deadline time.Duration + maxNLeeches int drain chan Metadata incomingInfoHashes map[[20]byte]struct{} incomingInfoHashesMx sync.Mutex @@ -33,18 +34,50 @@ type Sink struct { termination chan interface{} } -func NewSink(deadline time.Duration) *Sink { +func randomID() []byte { + /* > The peer_id is exactly 20 bytes (characters) long. + * > + * > There are mainly two conventions how to encode client and client version information into the peer_id, + * > Azureus-style and Shadow's-style. + * > + * > Azureus-style uses the following encoding: '-', two characters for client id, four ascii digits for version + * > number, '-', followed by random numbers. + * > + * > For example: '-AZ2060-'... + * + * https://wiki.theory.org/index.php/BitTorrentSpecification + * + * We encode the version number as: + * - First two digits for the major version number + * - Last two digits for the minor version number + * - Patch version number is not encoded. + */ + prefix := []byte("-MC0007-") + + var rando []byte + for i := 20 - len(prefix); i >= 0; i-- { + rando = append(rando, randomDigit()) + } + + return append(prefix, rando...) +} + +func randomDigit() byte { + var max, min int + max, min = '9', '0' + return byte(rand.Intn(max-min) + min) +} + +func NewSink(deadline time.Duration, maxNLeeches int) *Sink { ms := new(Sink) - ms.clientID = make([]byte, 20) - _, err := rand.Read(ms.clientID) - if err != nil { - zap.L().Panic("sinkMetadata couldn't read 20 random bytes for client ID!", zap.Error(err)) - } + ms.PeerID = randomID() ms.deadline = deadline + ms.maxNLeeches = maxNLeeches ms.drain = make(chan Metadata) ms.incomingInfoHashes = make(map[[20]byte]struct{}) ms.termination = make(chan interface{}) + return ms } @@ -55,6 +88,11 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { ms.incomingInfoHashesMx.Lock() defer ms.incomingInfoHashesMx.Unlock() + // cap the max # of leeches + if len(ms.incomingInfoHashes) >= ms.maxNLeeches { + return + } + if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { return } @@ -66,7 +104,7 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:])) - go NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{ + go NewLeech(res.InfoHash, res.PeerAddr, ms.PeerID, LeechEventHandlers{ OnSuccess: ms.flush, OnError: ms.onLeechError, }).Do(time.Now().Add(ms.deadline)) @@ -88,20 +126,23 @@ func (ms *Sink) Terminate() { } func (ms *Sink) flush(result Metadata) { - if !ms.terminated { - ms.drain <- result - // Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the - // metadata! - var infoHash [20]byte - copy(infoHash[:], result.InfoHash) - ms.incomingInfoHashesMx.Lock() - delete(ms.incomingInfoHashes, infoHash) - ms.incomingInfoHashesMx.Unlock() + if ms.terminated { + return } -} -func (ms *Sink) onLeechError(infoHash [20]byte, err error) { - zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) + ms.drain <- result + // Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the + // metadata! + var infoHash [20]byte + copy(infoHash[:], result.InfoHash) + ms.incomingInfoHashesMx.Lock() + delete(ms.incomingInfoHashes, infoHash) + ms.incomingInfoHashesMx.Unlock() +} + +func (ms *Sink) onLeechError(infoHash [20]byte, err error) { + zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) + ms.incomingInfoHashesMx.Lock() delete(ms.incomingInfoHashes, infoHash) ms.incomingInfoHashesMx.Unlock() diff --git a/cmd/magneticod/dht/mainline/codec.go b/cmd/magneticod/dht/mainline/codec.go index 9103ebc..768be3c 100644 --- a/cmd/magneticod/dht/mainline/codec.go +++ b/cmd/magneticod/dht/mainline/codec.go @@ -10,10 +10,13 @@ import ( "fmt" "net" + "github.com/pkg/errors" + + "regexp" + "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/torrent/bencode" "github.com/willf/bloom" - "regexp" ) type Message struct { @@ -251,10 +254,10 @@ func (e *Error) UnmarshalBencode(b []byte) (err error) { matches := result[0][1:] if _, err := fmt.Sscanf(string(matches[0]), "%d", &code); err != nil { - return fmt.Errorf("could not parse the error code: %s", err.Error()) + return errors.Wrap(err, "could not parse error code") } if _, err := fmt.Sscanf(string(matches[1]), "%d", &msgLen); err != nil { - return fmt.Errorf("could not parse the error message length: %s", err.Error()) + return errors.Wrap(err, "could not parse error msg length") } if len(matches[2]) != msgLen { diff --git a/cmd/magneticod/dht/mainline/service.go b/cmd/magneticod/dht/mainline/service.go index 6e2101f..4a735af 100644 --- a/cmd/magneticod/dht/mainline/service.go +++ b/cmd/magneticod/dht/mainline/service.go @@ -18,6 +18,7 @@ type TrawlingService struct { // Private protocol *Protocol started bool + interval time.Duration eventHandlers TrawlingServiceEventHandlers trueNodeID []byte @@ -34,8 +35,9 @@ type TrawlingServiceEventHandlers struct { OnResult func(TrawlingResult) } -func NewTrawlingService(laddr string, initialMaxNeighbors uint, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { +func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { service := new(TrawlingService) + service.interval = interval service.protocol = NewProtocol( laddr, ProtocolEventHandlers{ @@ -76,7 +78,7 @@ func (s *TrawlingService) Terminate() { } func (s *TrawlingService) trawl() { - for range time.Tick(1 * time.Second) { + for range time.Tick(s.interval) { // TODO // For some reason, we can't still detect congestion and this keeps increasing... // Disable for now. diff --git a/cmd/magneticod/dht/managers.go b/cmd/magneticod/dht/managers.go index 95990d0..32a370f 100644 --- a/cmd/magneticod/dht/managers.go +++ b/cmd/magneticod/dht/managers.go @@ -1,6 +1,9 @@ package dht -import "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" +import ( + "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" + "time" +) type TrawlingManager struct { // private @@ -8,7 +11,7 @@ type TrawlingManager struct { services []*mainline.TrawlingService } -func NewTrawlingManager(mlAddrs []string) *TrawlingManager { +func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager { manager := new(TrawlingManager) manager.output = make(chan mainline.TrawlingResult) @@ -19,6 +22,7 @@ func NewTrawlingManager(mlAddrs []string) *TrawlingManager { manager.services = append(manager.services, mainline.NewTrawlingService( addr, 2000, + interval, mainline.TrawlingServiceEventHandlers{ OnResult: manager.onResult, }, diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 86e9037..1cfb1ed 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -1,45 +1,43 @@ package main import ( - "fmt" + "math/rand" "net" "os" "os/signal" "runtime/pprof" "time" - "github.com/boramalper/magnetico/pkg/util" + "github.com/pkg/errors" + "github.com/jessevdk/go-flags" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/boramalper/magnetico/pkg/util" + "github.com/boramalper/magnetico/cmd/magneticod/bittorrent/metadata" "github.com/boramalper/magnetico/cmd/magneticod/dht" "github.com/Wessie/appdirs" + "github.com/boramalper/magnetico/pkg/persistence" ) -type cmdFlags struct { - DatabaseURL string `long:"database" description:"URL of the database."` - - TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"` - TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer deciseconds (one tenth of a second)."` - - Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` - Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"` -} - type opFlags struct { DatabaseURL string TrawlerMlAddrs []string TrawlerMlInterval time.Duration + LeechMaxN int + Verbosity int Profile string } +var compiledOn string + func main() { loggerLevel := zap.NewAtomicLevel() // Logging levels: ("debug", "info", "warn", "error", "dpanic", "panic", and "fatal"). @@ -48,7 +46,11 @@ func main() { zapcore.Lock(os.Stderr), loggerLevel, )) - defer logger.Sync() + defer func() { + if err := logger.Sync(); err != nil { + panic(err) + } + }() zap.ReplaceGlobals(logger) // opFlags is the "operational flags" @@ -58,9 +60,10 @@ func main() { return } - zap.L().Info("magneticod v0.7.0-beta1 has been started.") - zap.L().Info("Copyright (C) 2017 Mert Bora ALPER .") + zap.L().Info("magneticod v0.7.0-beta2 has been started.") + zap.L().Info("Copyright (C) 2018 Mert Bora ALPER .") zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.") + zap.S().Infof("Compiled on %s", compiledOn) switch opFlags.Verbosity { case 0: @@ -80,8 +83,19 @@ func main() { if err != nil { zap.L().Panic("Could not open the cpu profile file!", zap.Error(err)) } - pprof.StartCPUProfile(file) - defer file.Close() + if err = pprof.StartCPUProfile(file); err != nil { + zap.L().Fatal("Could not start CPU profiling!", zap.Error(err)) + } + defer func() { + if err = file.Sync(); err != nil { + zap.L().Fatal("Could not sync profiling file!", zap.Error(err)) + } + }() + defer func() { + if err = file.Close(); err != nil { + zap.L().Fatal("Could not close profiling file!", zap.Error(err)) + } + }() defer pprof.StopCPUProfile() case "memory": @@ -91,17 +105,22 @@ func main() { zap.L().Panic("NOT IMPLEMENTED") } + // Initialise the random number generator + rand.Seed(time.Now().UnixNano()) + // Handle Ctrl-C gracefully. - interruptChan := make(chan os.Signal) + interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt) database, err := persistence.MakeDatabase(opFlags.DatabaseURL, logger) if err != nil { - logger.Sugar().Fatalf("Could not open the database at `%s`: %s", opFlags.DatabaseURL, err.Error()) + logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err)) } - trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs) - metadataSink := metadata.NewSink(2 * time.Minute) + trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs, opFlags.TrawlerMlInterval) + metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN) + + zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID)) // The Event Loop for stopped := false; !stopped; { @@ -117,8 +136,8 @@ func main() { case md := <-metadataSink.Drain(): if err := database.AddNewTorrent(md.InfoHash, md.Name, md.Files); err != nil { - logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", - md.InfoHash, err.Error()) + zap.L().Fatal("Could not add new torrent to the database", + util.HexField("infohash", md.InfoHash), zap.Error(err)) } zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash)) @@ -134,10 +153,21 @@ func main() { } func parseFlags() (*opFlags, error) { - opF := new(opFlags) - cmdF := new(cmdFlags) + var cmdF struct { + DatabaseURL string `long:"database" description:"URL of the database."` - _, err := flags.Parse(cmdF) + TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"` + TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."` + + LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"1000"` + + Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` + Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"` + } + + opF := new(opFlags) + + _, err := flags.Parse(&cmdF) if err != nil { return nil, err } @@ -147,22 +177,32 @@ func parseFlags() (*opFlags, error) { "sqlite3://" + appdirs.UserDataDir("magneticod", "", "", false) + "/database.sqlite3" + - "?_journal_mode=WAL" // https://github.com/mattn/go-sqlite3#connection-string + "?_journal_mode=WAL" + // https://github.com/mattn/go-sqlite3#connection-string + "&_busy_timeout=3000" + // in milliseconds + "&_foreign_keys=true" + } else { opF.DatabaseURL = cmdF.DatabaseURL } if err = checkAddrs(cmdF.TrawlerMlAddrs); err != nil { - zap.S().Fatalf("Of argument (list) `trawler-ml-addr` %s", err.Error()) + zap.S().Fatalf("Of argument (list) `trawler-ml-addr`", zap.Error(err)) } else { opF.TrawlerMlAddrs = cmdF.TrawlerMlAddrs } - // 1 decisecond = 100 milliseconds = 0.1 seconds if cmdF.TrawlerMlInterval == 0 { - opF.TrawlerMlInterval = time.Duration(1) * 100 * time.Millisecond + opF.TrawlerMlInterval = 1 * time.Second } else { - opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * 100 * time.Millisecond + opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * time.Second + } + + opF.LeechMaxN = int(cmdF.LeechMaxN) + if opF.LeechMaxN > 1000 { + zap.S().Warnf( + "Beware that on many systems max # of file descriptors per process is limited to 1024. " + + "Setting maximum number of leeches greater than 1k might cause \"too many open files\" errors!", + ) } opF.Verbosity = len(cmdF.Verbose) @@ -178,7 +218,7 @@ func checkAddrs(addrs []string) error { // well. _, err := net.ResolveUDPAddr("udp", addr) if err != nil { - return fmt.Errorf("with %d(th) address `%s`: %s", i+1, addr, err.Error()) + return errors.Wrapf(err, "%d(th) address (%s) error", i+1, addr) } } return nil diff --git a/cmd/magneticow/api.go b/cmd/magneticow/api.go index aba6bf3..c89ee5e 100644 --- a/cmd/magneticow/api.go +++ b/cmd/magneticow/api.go @@ -48,7 +48,7 @@ func apiTorrentsHandler(w http.ResponseWriter, r *http.Request) { return } - if tq.LastID != nil && *tq.LastID < 0 { + if tq.LastID == nil { respondError(w, 400, "lastID has to be greater than or equal to zero") return } diff --git a/cmd/magneticow/handlers.go b/cmd/magneticow/handlers.go index d445c3e..9ff5993 100644 --- a/cmd/magneticow/handlers.go +++ b/cmd/magneticow/handlers.go @@ -2,6 +2,8 @@ package main import ( "encoding/hex" + "fmt" + "github.com/pkg/errors" "net/http" "strings" "time" @@ -14,73 +16,67 @@ import ( func rootHandler(w http.ResponseWriter, r *http.Request) { nTorrents, err := database.GetNumberOfTorrents() if err != nil { - panic(err.Error()) + handlerError(errors.Wrap(err, "GetNumberOfTorrents"), w) + return } - err = templates["homepage"].Execute(w, struct { + _ = templates["homepage"].Execute(w, struct { NTorrents uint }{ NTorrents: nTorrents, }) - if err != nil { - panic(err.Error()) - } } -// TODO: we might as well move torrents.html into static... func torrentsHandler(w http.ResponseWriter, r *http.Request) { data := mustAsset("templates/torrents.html") w.Header().Set("Content-Type", "text/html; charset=utf-8") // Cache static resources for a day w.Header().Set("Cache-Control", "max-age=86400") - w.Write(data) + _, _ = w.Write(data) } func torrentsInfohashHandler(w http.ResponseWriter, r *http.Request) { infoHash, err := hex.DecodeString(mux.Vars(r)["infohash"]) if err != nil { - panic(err.Error()) + handlerError(errors.Wrap(err, "cannot decode infohash"), w) + return } torrent, err := database.GetTorrent(infoHash) if err != nil { - panic(err.Error()) + handlerError(errors.Wrap(err, "cannot get torrent"), w) + return } if torrent == nil { - w.WriteHeader(404) - w.Write([]byte("torrent not found!")) + respondError(w, http.StatusNotFound, "torrent not found!") return } files, err := database.GetFiles(infoHash) if err != nil { - panic(err.Error()) + handlerError(errors.Wrap(err, "could not get files"), w) + return } if files == nil { - w.WriteHeader(500) - w.Write([]byte("files not found what!!!")) + handlerError(fmt.Errorf("could not get files"), w) return } - err = templates["torrent"].Execute(w, struct { + _ = templates["torrent"].Execute(w, struct { T *persistence.TorrentMetadata F []persistence.File }{ T: torrent, F: files, }) - if err != nil { - panic("error while executing template!") - } } -// TODO: we might as well move statistics.html into static... func statisticsHandler(w http.ResponseWriter, r *http.Request) { data := mustAsset("templates/statistics.html") w.Header().Set("Content-Type", "text/html; charset=utf-8") // Cache static resources for a day w.Header().Set("Cache-Control", "max-age=86400") - w.Write(data) + _, _ = w.Write(data) } func feedHandler(w http.ResponseWriter, r *http.Request) { @@ -111,32 +107,24 @@ func feedHandler(w http.ResponseWriter, r *http.Request) { nil, ) if err != nil { - respondError(w, 400, err.Error()) + handlerError(errors.Wrap(err, "query torrent"), w) return } // It is much more convenient to write the XML deceleration manually*, and then process the XML - // template using template/html and send, then to use encoding/xml. + // template using template/html and send, than to use encoding/xml. // // *: https://github.com/golang/go/issues/3133 // // TODO: maybe do it properly, even if it's inconvenient? - - _, err = w.Write([]byte(``)) - if err != nil { - panic(err.Error()) - } - - err = templates["feed"].Execute(w, struct { + _, _ = w.Write([]byte(``)) + _ = templates["feed"].Execute(w, struct { Title string Torrents []persistence.TorrentMetadata }{ Title: title, Torrents: torrents, }) - if err != nil { - panic(err.Error()) - } } func staticHandler(w http.ResponseWriter, r *http.Request) { @@ -155,5 +143,5 @@ func staticHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", contentType) // Cache static resources for a day w.Header().Set("Cache-Control", "max-age=86400") - w.Write(data) + _, _ = w.Write(data) } diff --git a/cmd/magneticow/main.go b/cmd/magneticow/main.go index 9d63451..1785c94 100644 --- a/cmd/magneticow/main.go +++ b/cmd/magneticow/main.go @@ -5,9 +5,9 @@ import ( "bytes" "encoding/hex" "fmt" + "github.com/pkg/errors" "html/template" "io" - "log" "net/http" "os" "os/signal" @@ -31,6 +31,8 @@ import ( const N_TORRENTS = 20 +var compiledOn string + // Set a Decoder instance as a package global, because it caches // meta-data about structs, and an instance can be shared safely. var decoder = schema.NewDecoder() @@ -44,6 +46,7 @@ var opts struct { Credentials map[string][]byte // TODO: encapsulate credentials and mutex for safety CredentialsRWMutex sync.RWMutex CredentialsPath string + Verbosity int } func main() { @@ -58,14 +61,27 @@ func main() { zap.ReplaceGlobals(logger) zap.L().Info("magneticow v0.7.0-beta1 has been started.") - zap.L().Info("Copyright (C) 2017 Mert Bora ALPER .") + zap.L().Info("Copyright (C) 2018 Mert Bora ALPER .") zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.") + zap.S().Infof("Compiled on %s", compiledOn) if err := parseFlags(); err != nil { zap.L().Error("Error while initializing", zap.Error(err)) return } + switch opts.Verbosity { + case 0: + loggerLevel.SetLevel(zap.WarnLevel) + case 1: + loggerLevel.SetLevel(zap.InfoLevel) + default: // Default: i.e. in case of 2 or more. + // TODO: print the caller (function)'s name and line number! + loggerLevel.SetLevel(zap.DebugLevel) + } + + zap.ReplaceGlobals(logger) + // Reload credentials when you receive SIGHUP sighupChan := make(chan os.Signal, 1) signal.Notify(sighupChan, syscall.SIGHUP) @@ -156,7 +172,7 @@ func main() { var err error database, err = persistence.MakeDatabase(opts.Database, logger) if err != nil { - panic(err.Error()) + zap.L().Fatal("could not access to database", zap.Error(err)) } decoder.IgnoreUnknownKeys(false) @@ -178,7 +194,8 @@ func respondError(w http.ResponseWriter, statusCode int, format string, a ...int func mustAsset(name string) []byte { data, err := Asset(name) if err != nil { - log.Panicf("Could NOT access the requested resource `%s`: %s (please inform us, this is a BUG!)", name, err.Error()) + zap.L().Panic("Could NOT access the requested resource! THIS IS A BUG, PLEASE REPORT", + zap.String("name", name), zap.Error(err)) } return data } @@ -189,6 +206,8 @@ func parseFlags() error { Database string `short:"d" long:"database" description:"URL of the (magneticod) database"` Cred string `short:"c" long:"credentials" description:"Path to the credentials file"` NoAuth bool ` long:"no-auth" description:"Disables authorisation"` + + Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` } if _, err := flags.Parse(&cmdFlags); err != nil { @@ -220,8 +239,6 @@ func parseFlags() error { opts.CredentialsPath = cmdFlags.Cred } - fmt.Printf("%v credpath %s\n", cmdFlags.NoAuth, opts.CredentialsPath) - if opts.CredentialsPath != "" { opts.Credentials = make(map[string][]byte) if err := loadCred(opts.CredentialsPath); err != nil { @@ -231,6 +248,8 @@ func parseFlags() error { opts.Credentials = nil } + opts.Verbosity = len(cmdFlags.Verbose) + return nil } @@ -250,7 +269,7 @@ func loadCred(cred string) error { if err == io.EOF { break } - return fmt.Errorf("error while reading line %d: %s", lineno, err.Error()) + return errors.Wrapf(err, "while reading line %d", lineno) } line = line[:len(line)-1] // strip '\n' @@ -316,7 +335,5 @@ func BasicAuth(handler http.HandlerFunc, realm string) http.HandlerFunc { func authenticate(w http.ResponseWriter, realm string) { w.Header().Set("WWW-Authenticate", `Basic realm="`+realm+`"`) w.WriteHeader(401) - if _, err := w.Write([]byte("Unauthorised.\n")); err != nil { - panic(err.Error()) - } + _, _ = w.Write([]byte("Unauthorised.\n")) } diff --git a/cmd/magneticow/main_test.go b/cmd/magneticow/main_test.go index 1d4fce1..b4023c3 100644 --- a/cmd/magneticow/main_test.go +++ b/cmd/magneticow/main_test.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "testing" ) @@ -113,5 +112,4 @@ func TestSchemaRequired(t *testing.T) { if err == nil { t.Error("err is nil") } - fmt.Printf(err.Error()) } diff --git a/cmd/magneticow/util.go b/cmd/magneticow/util.go new file mode 100644 index 0000000..bd26c6b --- /dev/null +++ b/cmd/magneticow/util.go @@ -0,0 +1,8 @@ +package main + +import "net/http" + +func handlerError(err error, w http.ResponseWriter) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) +} diff --git a/pkg/README.md b/pkg/README.md new file mode 100644 index 0000000..ceacb37 --- /dev/null +++ b/pkg/README.md @@ -0,0 +1,7 @@ +# magnetico/pkg +[![GoDoc](https://godoc.org/github.com/boramalper/magnetico?status.svg)](https://godoc.org/github.com/boramalper/magnetico) + +- The most significant package is `persistence`, that abstracts access to the + magnetico databases with different engines (currently, only SQLite). + +**For REST-ful magneticow API, see [https://app.swaggerhub.com/apis/boramalper/magneticow-api/](https://app.swaggerhub.com/apis/boramalper/magneticow-api/).** diff --git a/pkg/persistence/interface.go b/pkg/persistence/interface.go index 11c034c..61986c3 100644 --- a/pkg/persistence/interface.go +++ b/pkg/persistence/interface.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/pkg/errors" "net/url" "go.uber.org/zap" @@ -104,7 +105,7 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) { url_, err := url.Parse(rawURL) if err != nil { - return nil, err + return nil, errors.Wrap(err, "url.Parse") } switch url_.Scheme { diff --git a/pkg/persistence/iso8601.go b/pkg/persistence/iso8601.go index e43f428..597b9e7 100644 --- a/pkg/persistence/iso8601.go +++ b/pkg/persistence/iso8601.go @@ -31,32 +31,44 @@ func ParseISO8601(s string) (*time.Time, Granularity, error) { if err != nil { return nil, -1, err } + t := time.Date(year, time.December, daysOfMonth(time.December, year), 23, 59, 59, 0, time.UTC) return &t, Year, nil } if matches := monthRE.FindStringSubmatch(s); len(matches) != 0 { month, err := parseMonth(matches[2]) + if err != nil { + return nil, -1, err + } year, err := parseYear(matches[1]) if err != nil { return nil, -1, err } + t := time.Date(year, month, 31, 23, 59, 59, 0, time.UTC) return &t, Month, nil } if matches := weekRE.FindStringSubmatch(s); len(matches) != 0 { week, err := parseWeek(matches[2]) + if err != nil { + return nil, -1, err + } year, err := parseYear(matches[1]) if err != nil { return nil, -1, err } + t := time.Date(year, time.January, week*7, 23, 59, 59, 0, time.UTC) return &t, Week, nil } if matches := dayRE.FindStringSubmatch(s); len(matches) != 0 { month, err := parseMonth(matches[2]) + if err != nil { + return nil, -1, err + } year, err := parseYear(matches[1]) if err != nil { return nil, -1, err @@ -65,21 +77,29 @@ func ParseISO8601(s string) (*time.Time, Granularity, error) { if err != nil { return nil, -1, err } + t := time.Date(year, month, day, 23, 59, 59, 0, time.UTC) return &t, Day, nil } if matches := hourRE.FindStringSubmatch(s); len(matches) != 0 { month, err := parseMonth(matches[2]) + if err != nil { + return nil, -1, err + } year, err := parseYear(matches[1]) if err != nil { return nil, -1, err } hour, err := parseHour(matches[4]) + if err != nil { + return nil, -1, err + } day, err := parseDay(matches[3], daysOfMonth(month, year)) if err != nil { return nil, -1, err } + t := time.Date(year, month, day, hour, 59, 59, 0, time.UTC) return &t, Hour, nil } diff --git a/pkg/persistence/sqlite3.go b/pkg/persistence/sqlite3.go index ba322c3..f7485e9 100644 --- a/pkg/persistence/sqlite3.go +++ b/pkg/persistence/sqlite3.go @@ -9,7 +9,7 @@ import ( "path" "text/template" "time" - + _ "github.com/mattn/go-sqlite3" "github.com/pkg/errors" "go.uber.org/zap" @@ -27,23 +27,44 @@ func makeSqlite3Database(url_ *url.URL) (Database, error) { dbDir, _ := path.Split(url_.Path) if err := os.MkdirAll(dbDir, 0755); err != nil { - return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error()) + return nil, errors.Wrapf(err, "mkdirAll error for `%s`", dbDir) } var err error url_.Scheme = "" db.conn, err = sql.Open("sqlite3", url_.String()) if err != nil { - return nil, fmt.Errorf("sql.Open: %s", err.Error()) + return nil, errors.Wrap(err, "sql.Open") } // > Open may just validate its arguments without creating a connection to the database. To // > verify that the data source Name is valid, call Ping. // https://golang.org/pkg/database/sql/#Open if err = db.conn.Ping(); err != nil { - return nil, fmt.Errorf("sql.DB.Ping: %s", err.Error()) + return nil, errors.Wrap(err, "sql.DB.Ping") } + // > After some time we receive "unable to open database file" error while trying to execute a transaction using + // > Tx.Exec(). + // -- boramalper + // + // > Not sure if this would be contributing to your issue, but one of the problems we've observed in the past is the + // > standard library's attempt to pool connections. (This makes more sense for database connections that are actual + // > network connections, as opposed to SQLite.) + // > Basically, the problem we encountered was that most pragmas (except specifically PRAGMA journal_mode=WAL, as + // > per the documentation) apply to the connection, so if the standard library is opening/closing connections + // > behind your back for pooling purposes, it can lead to unintended behavior. + // -- rittneje + // + // https://github.com/mattn/go-sqlite3/issues/618 + // + // Our solution is to set the connection max lifetime to infinity (reuse connection forever), and max open + // connections to 3 (1 causes deadlocks, unlimited is too lax!). Max idle conns are set to 3 to persist connections + // (instead of opening the database again and again). + db.conn.SetConnMaxLifetime(0) // https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime + db.conn.SetMaxOpenConns(3) + db.conn.SetMaxIdleConns(3) + if err := db.setupDatabase(); err != nil { return nil, errors.Wrap(err, "setupDatabase") } @@ -297,9 +318,9 @@ func (db *sqlite3Database) QueryTorrents( queryArgs = append(queryArgs, limit) rows, err := db.conn.Query(sqlQuery, queryArgs...) - defer rows.Close() + defer closeRows(rows) if err != nil { - return nil, fmt.Errorf("error while querying torrents: %s", err.Error()) + return nil, errors.Wrap(err, "query error") } torrents := make([]TorrentMetadata, 0) @@ -354,7 +375,7 @@ func (db *sqlite3Database) GetTorrent(infoHash []byte) (*TorrentMetadata, error) WHERE info_hash = ?`, infoHash, ) - defer rows.Close() + defer closeRows(rows) if err != nil { return nil, err } @@ -375,7 +396,7 @@ func (db *sqlite3Database) GetFiles(infoHash []byte) ([]File, error) { rows, err := db.conn.Query( "SELECT size, path FROM files, torrents WHERE files.torrent_id = torrents.id AND torrents.info_hash = ?;", infoHash) - defer rows.Close() + defer closeRows(rows) if err != nil { return nil, err } @@ -395,7 +416,7 @@ func (db *sqlite3Database) GetFiles(infoHash []byte) ([]File, error) { func (db *sqlite3Database) GetStatistics(from string, n uint) (*Statistics, error) { fromTime, gran, err := ParseISO8601(from) if err != nil { - return nil, fmt.Errorf("error while parsing from: %s", err.Error()) + return nil, errors.Wrap(err, "parsing ISO8601 error") } var toTime time.Time @@ -432,7 +453,7 @@ func (db *sqlite3Database) GetStatistics(from string, n uint) (*Statistics, erro GROUP BY dt;`, timef), fromTime.Unix(), toTime.Unix()) - defer rows.Close() + defer closeRows(rows) if err != nil { return nil, err } @@ -479,12 +500,12 @@ func (db *sqlite3Database) setupDatabase() error { PRAGMA encoding='UTF-8'; `) if err != nil { - return fmt.Errorf("sql.DB.Exec (PRAGMAs): %s", err.Error()) + return errors.Wrap(err, "sql.DB.Exec (PRAGMAs)") } tx, err := db.conn.Begin() if err != nil { - return fmt.Errorf("sql.DB.Begin: %s", err.Error()) + return errors.Wrap(err, "sql.DB.Begin") } // If everything goes as planned and no error occurs, we will commit the transaction before // returning from the function so the tx.Rollback() call will fail, trying to rollback a @@ -512,13 +533,13 @@ func (db *sqlite3Database) setupDatabase() error { ); `) if err != nil { - return fmt.Errorf("sql.Tx.Exec (v0): %s", err.Error()) + return errors.Wrap(err, "sql.Tx.Exec (v0)") } // Get the user_version: rows, err := tx.Query("PRAGMA user_version;") if err != nil { - return fmt.Errorf("sql.Tx.Query (user_version): %s", err.Error()) + return errors.Wrap(err, "sql.Tx.Query (user_version)") } defer rows.Close() var userVersion int @@ -526,7 +547,7 @@ func (db *sqlite3Database) setupDatabase() error { return fmt.Errorf("sql.Rows.Next (user_version): PRAGMA user_version did not return any rows!") } if err = rows.Scan(&userVersion); err != nil { - return fmt.Errorf("sql.Rows.Scan (user_version): %s", err.Error()) + return errors.Wrap(err, "sql.Rows.Scan (user_version)") } switch userVersion { @@ -541,7 +562,7 @@ func (db *sqlite3Database) setupDatabase() error { PRAGMA user_version = 1; `) if err != nil { - return fmt.Errorf("sql.Tx.Exec (v0 -> v1): %s", err.Error()) + return errors.Wrap(err, "sql.Tx.Exec (v0 -> v1)") } fallthrough @@ -586,7 +607,7 @@ func (db *sqlite3Database) setupDatabase() error { PRAGMA user_version = 2; `) if err != nil { - return fmt.Errorf("sql.Tx.Exec (v1 -> v2): %s", err.Error()) + return errors.Wrap(err, "sql.Tx.Exec (v1 -> v2)") } fallthrough @@ -650,7 +671,7 @@ func (db *sqlite3Database) setupDatabase() error { } if err = tx.Commit(); err != nil { - return fmt.Errorf("sql.Tx.Commit: %s", err.Error()) + return errors.Wrap(err, "sql.Tx.Commit") } return nil @@ -666,3 +687,9 @@ func executeTemplate(text string, data interface{}, funcs template.FuncMap) stri } return buf.String() } + +func closeRows(rows *sql.Rows) { + if err := rows.Close(); err != nil { + zap.L().Error("could not close row", zap.Error(err)) + } +} diff --git a/pkg/util/util.go b/pkg/util/util.go index eff2728..9c070a1 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -8,4 +8,4 @@ import ( func HexField(key string, val []byte) zapcore.Field { return zapcore.Field{Key: key, Type: zapcore.StringType, String: hex.EncodeToString(val)} -} \ No newline at end of file +}