This commit is contained in:
Jan Grewe 2018-12-30 10:48:31 +01:00
commit 16596806d3
20 changed files with 349 additions and 185 deletions

View File

@ -1,10 +1,10 @@
# https://docs.travis-ci.com/user/reference/overview/#virtualisation-environment-vs-operating-system
sudo: false
dist: trusty
dist: xenial
language: go
go:
- "1.10"
- "1.11"
# Only clone the most recent commit.
git:
@ -16,41 +16,26 @@ env:
before_install:
# Dump environment variables
- printenv
# Download the binary to bin folder in $GOPATH
# Download dep binary to bin folder in $GOPATH
- curl -L -s https://github.com/golang/dep/releases/download/v${DEP_VERSION}/dep-linux-amd64 -o $GOPATH/bin/dep
# Make the binary executable
- chmod +x $GOPATH/bin/dep
install:
- "go get -u github.com/jteeuwen/go-bindata/..."
- "dep ensure"
- "go get -u -v github.com/kevinburke/go-bindata/..."
- "go get -u -v honnef.co/go/tools/cmd/staticcheck"
- "make ensure"
before_script:
- "go-bindata
-o=\"${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/bindata.go\"
-prefix=\"${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/data/\"
${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/data/..."
- "make magneticod"
- "make magneticow"
script:
# The Unit Tests
- "go test github.com/boramalper/magnetico/cmd/magneticod/..."
- "go test github.com/boramalper/magnetico/cmd/magneticow/..."
- "go test github.com/boramalper/magnetico/pkg/persistence/..."
- "make test"
# Static Analysis (vet)
- "go vet github.com/boramalper/magnetico/cmd/magneticod/..."
- "go vet github.com/boramalper/magnetico/cmd/magneticow/..."
- "go vet github.com/boramalper/magnetico/pkg/persistence/..."
- "make vet"
# Formatting Errors
#
# Since gofmt returns zero even if there are files to be formatted, we use:
#
# ! gofmt -d ${GOPATH}/path/ 2>&1 | read
#
# to return 1 if there are files to be formatted, and 0 if not.
#
# https://groups.google.com/forum/#!topic/Golang-Nuts/pdrN4zleUio
- "! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticod/ 2>&1 | read"
- "! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/cmd/magneticow/ 2>&1 | read"
- "! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/pkg/persistence/ 2>&1 | read"
# Check formatting
- "make check-formatting"

0
COPYING Executable file → Normal file
View File

View File

@ -1,14 +1,15 @@
.PHONY: test format magneticod magneticow ensure test-magneticod test-magneticow test-persistence image image-magneticow image-magneticod
.PHONY: test format vet staticcheck magneticod magneticow ensure image image-magneticow image-magneticod
all: ensure test magneticod magneticow
all: test magneticod magneticow
magneticod:
go install --tags fts5 "-ldflags=-s -w" github.com/boramalper/magnetico/cmd/magneticod
go install --tags fts5 "-ldflags=-s -w -X main.compiledOn=`date -u +%Y-%m-%dT%H:%M:%SZ`" github.com/boramalper/magnetico/cmd/magneticod
magneticow:
# TODO: minify files!
go-bindata -o="cmd/magneticow/bindata.go" -prefix="cmd/magneticow/data/" -pkg="main" cmd/magneticow/data/...
go install --tags fts5 "-ldflags=-s -w" github.com/boramalper/magnetico/cmd/magneticow
# https://github.com/kevinburke/go-bindata
go-bindata -o="cmd/magneticow/bindata.go" -prefix="cmd/magneticow/data/" cmd/magneticow/data/...
go install --tags fts5 "-ldflags=-s -w -X main.compiledOn=`date -u +%Y-%m-%dT%H:%M:%SZ`" github.com/boramalper/magnetico/cmd/magneticow
image-magneticod:
docker build -t magneticod -f Dockerfile.magneticod .
@ -21,22 +22,33 @@ image: image-magneticod image-magneticow
# Download dependencies
ensure:
dep ensure -v
go get -u github.com/jteeuwen/go-bindata/...
test-magneticod:
go test github.com/boramalper/magnetico/cmd/magneticod/...
vet:
go vet github.com/boramalper/magnetico/...
test-magneticow:
go-bindata -o="cmd/magneticow/bindata.go" -prefix="cmd/magneticow/data/" -pkg="main" cmd/magneticow/data/...
go test github.com/boramalper/magnetico/cmd/magneticow/...
staticcheck:
staticcheck github.com/boramalper/magnetico/...
test-persistence:
go test github.com/boramalper/magnetico/pkg/persistence/...
test: test-persistence test-magneticod test-magneticow
test:
go test github.com/boramalper/magnetico/...
format:
gofmt -w cmd/magneticod
gofmt -w cmd/magneticow
gofmt -w pkg/persistence
gofmt -w ${GOPATH}/src/github.com/boramalper/magnetico/cmd/
gofmt -w ${GOPATH}/src/github.com/boramalper/magnetico/pkg/
# Formatting Errors
# Since gofmt returns zero even if there are files to be formatted, we use:
#
# ! gofmt -d ${GOPATH}/path/ 2>&1 | read
#
# to return 1 if there are files to be formatted, and 0 if not.
# https://groups.google.com/forum/#!topic/Golang-Nuts/pdrN4zleUio
#
# How can I use Bash syntax in Makefile targets?
# Because `read` is a bash command.
# https://stackoverflow.com/a/589300/4466589
#
check-formatting: SHELL:=/bin/bash # HERE: this is setting the shell for check-formatting only
check-formatting:
! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/cmd/ 2>&1 | tee /dev/fd/2 | read
! gofmt -l ${GOPATH}/src/github.com/boramalper/magnetico/pkg/ 2>&1 | tee /dev/fd/2 | read

View File

@ -1,7 +1,7 @@
# magnetico
*Autonomous (self-hosted) BitTorrent DHT search engine suite.*
[![chat on gitter](https://badges.gitter.im/gitterHQ/gitter.png)](https://gitter.im/magnetico-dev/magnetico-dev) [![Build Status on Travis CI](https://travis-ci.org/boramalper/magnetico.svg?branch=go-rewrite)](https://travis-ci.org/boramalper/magnetico) [![Build status on AppVeyor](https://ci.appveyor.com/api/projects/status/u2jtbe6jutya7p0x/branch/go-rewrite?svg=true)](https://ci.appveyor.com/project/boramalper/magnetico/branch/go-rewrite)
[![chat on gitter](https://badges.gitter.im/gitterHQ/gitter.png)](https://gitter.im/magnetico-dev/magnetico-dev) [![Build Status](https://travis-ci.org/boramalper/magnetico.svg?branch=master)](https://travis-ci.org/boramalper/magnetico) [![Build status on AppVeyor](https://ci.appveyor.com/api/projects/status/u2jtbe6jutya7p0x/branch/go-rewrite?svg=true)](https://ci.appveyor.com/project/boramalper/magnetico/branch/go-rewrite)
magnetico is the first autonomous (self-hosted) BitTorrent DHT search engine suite that is *designed
for end-users*. The suite consists of two packages:

View File

@ -2,7 +2,6 @@ package metadata
import (
"bytes"
"crypto/rand"
"crypto/sha1"
"encoding/binary"
"fmt"
@ -47,6 +46,8 @@ type Leech struct {
ut_metadata uint8
metadataReceived, metadataSize uint
metadata []byte
connClosed bool
}
type LeechEventHandlers struct {
@ -54,16 +55,13 @@ type LeechEventHandlers struct {
OnError func([20]byte, error) // must be supplied. args: infohash, error
}
func NewLeech(infoHash [20]byte, peerAddr *net.TCPAddr, ev LeechEventHandlers) *Leech {
func NewLeech(infoHash [20]byte, peerAddr *net.TCPAddr, clientID []byte, ev LeechEventHandlers) *Leech {
l := new(Leech)
l.infoHash = infoHash
l.peerAddr = peerAddr
copy(l.clientID[:], clientID)
l.ev = ev
if _, err := rand.Read(l.clientID[:]); err != nil {
panic(err.Error())
}
return l
}
@ -286,17 +284,26 @@ func (l *Leech) connect(deadline time.Time) error {
return nil
}
func (l *Leech) closeConn() {
if l.connClosed {
return
}
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
return
}
l.connClosed = true
}
func (l *Leech) Do(deadline time.Time) {
err := l.connect(deadline)
if err != nil {
l.OnError(errors.Wrap(err, "connect"))
return
}
defer func() {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
}()
defer l.closeConn()
err = l.doBtHandshake()
if err != nil {
@ -328,7 +335,7 @@ func (l *Leech) Do(deadline time.Time) {
rExtDict := new(extDict)
err = bencode.NewDecoder(rMessageBuf).Decode(rExtDict)
if err != nil {
zap.L().Warn("Couldn't decode extension message in the loop!", zap.Error(err))
l.OnError(errors.Wrap(err, "could not decode ext msg in the loop"))
return
}
@ -371,6 +378,10 @@ func (l *Leech) Do(deadline time.Time) {
}
}
// We are done with the transfer, close socket as soon as possible (i.e. NOW) to avoid hitting "too many open files"
// error.
l.closeConn()
// Verify the checksum
sha1Sum := sha1.Sum(l.metadata)
if !bytes.Equal(sha1Sum[:], l.infoHash[:]) {

View File

@ -1,15 +1,15 @@
package metadata
import (
"crypto/rand"
"math/rand"
"sync"
"time"
"github.com/boramalper/magnetico/pkg/util"
"go.uber.org/zap"
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
"github.com/boramalper/magnetico/pkg/persistence"
"github.com/boramalper/magnetico/pkg/util"
)
type Metadata struct {
@ -24,8 +24,9 @@ type Metadata struct {
}
type Sink struct {
clientID []byte
PeerID []byte
deadline time.Duration
maxNLeeches int
drain chan Metadata
incomingInfoHashes map[[20]byte]struct{}
incomingInfoHashesMx sync.Mutex
@ -33,18 +34,50 @@ type Sink struct {
termination chan interface{}
}
func NewSink(deadline time.Duration) *Sink {
func randomID() []byte {
/* > The peer_id is exactly 20 bytes (characters) long.
* >
* > There are mainly two conventions how to encode client and client version information into the peer_id,
* > Azureus-style and Shadow's-style.
* >
* > Azureus-style uses the following encoding: '-', two characters for client id, four ascii digits for version
* > number, '-', followed by random numbers.
* >
* > For example: '-AZ2060-'...
*
* https://wiki.theory.org/index.php/BitTorrentSpecification
*
* We encode the version number as:
* - First two digits for the major version number
* - Last two digits for the minor version number
* - Patch version number is not encoded.
*/
prefix := []byte("-MC0007-")
var rando []byte
for i := 20 - len(prefix); i >= 0; i-- {
rando = append(rando, randomDigit())
}
return append(prefix, rando...)
}
func randomDigit() byte {
var max, min int
max, min = '9', '0'
return byte(rand.Intn(max-min) + min)
}
func NewSink(deadline time.Duration, maxNLeeches int) *Sink {
ms := new(Sink)
ms.clientID = make([]byte, 20)
_, err := rand.Read(ms.clientID)
if err != nil {
zap.L().Panic("sinkMetadata couldn't read 20 random bytes for client ID!", zap.Error(err))
}
ms.PeerID = randomID()
ms.deadline = deadline
ms.maxNLeeches = maxNLeeches
ms.drain = make(chan Metadata)
ms.incomingInfoHashes = make(map[[20]byte]struct{})
ms.termination = make(chan interface{})
return ms
}
@ -55,6 +88,11 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
ms.incomingInfoHashesMx.Lock()
defer ms.incomingInfoHashesMx.Unlock()
// cap the max # of leeches
if len(ms.incomingInfoHashes) >= ms.maxNLeeches {
return
}
if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists {
return
}
@ -66,7 +104,7 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:]))
go NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{
go NewLeech(res.InfoHash, res.PeerAddr, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush,
OnError: ms.onLeechError,
}).Do(time.Now().Add(ms.deadline))
@ -88,20 +126,23 @@ func (ms *Sink) Terminate() {
}
func (ms *Sink) flush(result Metadata) {
if !ms.terminated {
ms.drain <- result
// Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the
// metadata!
var infoHash [20]byte
copy(infoHash[:], result.InfoHash)
ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock()
if ms.terminated {
return
}
}
func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
ms.drain <- result
// Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the
// metadata!
var infoHash [20]byte
copy(infoHash[:], result.InfoHash)
ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock()
}
func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock()

View File

@ -10,10 +10,13 @@ import (
"fmt"
"net"
"github.com/pkg/errors"
"regexp"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/torrent/bencode"
"github.com/willf/bloom"
"regexp"
)
type Message struct {
@ -251,10 +254,10 @@ func (e *Error) UnmarshalBencode(b []byte) (err error) {
matches := result[0][1:]
if _, err := fmt.Sscanf(string(matches[0]), "%d", &code); err != nil {
return fmt.Errorf("could not parse the error code: %s", err.Error())
return errors.Wrap(err, "could not parse error code")
}
if _, err := fmt.Sscanf(string(matches[1]), "%d", &msgLen); err != nil {
return fmt.Errorf("could not parse the error message length: %s", err.Error())
return errors.Wrap(err, "could not parse error msg length")
}
if len(matches[2]) != msgLen {

View File

@ -18,6 +18,7 @@ type TrawlingService struct {
// Private
protocol *Protocol
started bool
interval time.Duration
eventHandlers TrawlingServiceEventHandlers
trueNodeID []byte
@ -34,8 +35,9 @@ type TrawlingServiceEventHandlers struct {
OnResult func(TrawlingResult)
}
func NewTrawlingService(laddr string, initialMaxNeighbors uint, eventHandlers TrawlingServiceEventHandlers) *TrawlingService {
func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService {
service := new(TrawlingService)
service.interval = interval
service.protocol = NewProtocol(
laddr,
ProtocolEventHandlers{
@ -76,7 +78,7 @@ func (s *TrawlingService) Terminate() {
}
func (s *TrawlingService) trawl() {
for range time.Tick(1 * time.Second) {
for range time.Tick(s.interval) {
// TODO
// For some reason, we can't still detect congestion and this keeps increasing...
// Disable for now.

View File

@ -1,6 +1,9 @@
package dht
import "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
import (
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
"time"
)
type TrawlingManager struct {
// private
@ -8,7 +11,7 @@ type TrawlingManager struct {
services []*mainline.TrawlingService
}
func NewTrawlingManager(mlAddrs []string) *TrawlingManager {
func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager {
manager := new(TrawlingManager)
manager.output = make(chan mainline.TrawlingResult)
@ -19,6 +22,7 @@ func NewTrawlingManager(mlAddrs []string) *TrawlingManager {
manager.services = append(manager.services, mainline.NewTrawlingService(
addr,
2000,
interval,
mainline.TrawlingServiceEventHandlers{
OnResult: manager.onResult,
},

View File

@ -1,45 +1,43 @@
package main
import (
"fmt"
"math/rand"
"net"
"os"
"os/signal"
"runtime/pprof"
"time"
"github.com/boramalper/magnetico/pkg/util"
"github.com/pkg/errors"
"github.com/jessevdk/go-flags"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/boramalper/magnetico/pkg/util"
"github.com/boramalper/magnetico/cmd/magneticod/bittorrent/metadata"
"github.com/boramalper/magnetico/cmd/magneticod/dht"
"github.com/Wessie/appdirs"
"github.com/boramalper/magnetico/pkg/persistence"
)
type cmdFlags struct {
DatabaseURL string `long:"database" description:"URL of the database."`
TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"`
TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer deciseconds (one tenth of a second)."`
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"`
}
type opFlags struct {
DatabaseURL string
TrawlerMlAddrs []string
TrawlerMlInterval time.Duration
LeechMaxN int
Verbosity int
Profile string
}
var compiledOn string
func main() {
loggerLevel := zap.NewAtomicLevel()
// Logging levels: ("debug", "info", "warn", "error", "dpanic", "panic", and "fatal").
@ -48,7 +46,11 @@ func main() {
zapcore.Lock(os.Stderr),
loggerLevel,
))
defer logger.Sync()
defer func() {
if err := logger.Sync(); err != nil {
panic(err)
}
}()
zap.ReplaceGlobals(logger)
// opFlags is the "operational flags"
@ -58,9 +60,10 @@ func main() {
return
}
zap.L().Info("magneticod v0.7.0-beta1 has been started.")
zap.L().Info("Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>.")
zap.L().Info("magneticod v0.7.0-beta2 has been started.")
zap.L().Info("Copyright (C) 2018 Mert Bora ALPER <bora@boramalper.org>.")
zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.")
zap.S().Infof("Compiled on %s", compiledOn)
switch opFlags.Verbosity {
case 0:
@ -80,8 +83,19 @@ func main() {
if err != nil {
zap.L().Panic("Could not open the cpu profile file!", zap.Error(err))
}
pprof.StartCPUProfile(file)
defer file.Close()
if err = pprof.StartCPUProfile(file); err != nil {
zap.L().Fatal("Could not start CPU profiling!", zap.Error(err))
}
defer func() {
if err = file.Sync(); err != nil {
zap.L().Fatal("Could not sync profiling file!", zap.Error(err))
}
}()
defer func() {
if err = file.Close(); err != nil {
zap.L().Fatal("Could not close profiling file!", zap.Error(err))
}
}()
defer pprof.StopCPUProfile()
case "memory":
@ -91,17 +105,22 @@ func main() {
zap.L().Panic("NOT IMPLEMENTED")
}
// Initialise the random number generator
rand.Seed(time.Now().UnixNano())
// Handle Ctrl-C gracefully.
interruptChan := make(chan os.Signal)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt)
database, err := persistence.MakeDatabase(opFlags.DatabaseURL, logger)
if err != nil {
logger.Sugar().Fatalf("Could not open the database at `%s`: %s", opFlags.DatabaseURL, err.Error())
logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err))
}
trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs)
metadataSink := metadata.NewSink(2 * time.Minute)
trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs, opFlags.TrawlerMlInterval)
metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN)
zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID))
// The Event Loop
for stopped := false; !stopped; {
@ -117,8 +136,8 @@ func main() {
case md := <-metadataSink.Drain():
if err := database.AddNewTorrent(md.InfoHash, md.Name, md.Files); err != nil {
logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s",
md.InfoHash, err.Error())
zap.L().Fatal("Could not add new torrent to the database",
util.HexField("infohash", md.InfoHash), zap.Error(err))
}
zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash))
@ -134,10 +153,21 @@ func main() {
}
func parseFlags() (*opFlags, error) {
opF := new(opFlags)
cmdF := new(cmdFlags)
var cmdF struct {
DatabaseURL string `long:"database" description:"URL of the database."`
_, err := flags.Parse(cmdF)
TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"`
TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."`
LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"1000"`
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"`
}
opF := new(opFlags)
_, err := flags.Parse(&cmdF)
if err != nil {
return nil, err
}
@ -147,22 +177,32 @@ func parseFlags() (*opFlags, error) {
"sqlite3://" +
appdirs.UserDataDir("magneticod", "", "", false) +
"/database.sqlite3" +
"?_journal_mode=WAL" // https://github.com/mattn/go-sqlite3#connection-string
"?_journal_mode=WAL" + // https://github.com/mattn/go-sqlite3#connection-string
"&_busy_timeout=3000" + // in milliseconds
"&_foreign_keys=true"
} else {
opF.DatabaseURL = cmdF.DatabaseURL
}
if err = checkAddrs(cmdF.TrawlerMlAddrs); err != nil {
zap.S().Fatalf("Of argument (list) `trawler-ml-addr` %s", err.Error())
zap.S().Fatalf("Of argument (list) `trawler-ml-addr`", zap.Error(err))
} else {
opF.TrawlerMlAddrs = cmdF.TrawlerMlAddrs
}
// 1 decisecond = 100 milliseconds = 0.1 seconds
if cmdF.TrawlerMlInterval == 0 {
opF.TrawlerMlInterval = time.Duration(1) * 100 * time.Millisecond
opF.TrawlerMlInterval = 1 * time.Second
} else {
opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * 100 * time.Millisecond
opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * time.Second
}
opF.LeechMaxN = int(cmdF.LeechMaxN)
if opF.LeechMaxN > 1000 {
zap.S().Warnf(
"Beware that on many systems max # of file descriptors per process is limited to 1024. " +
"Setting maximum number of leeches greater than 1k might cause \"too many open files\" errors!",
)
}
opF.Verbosity = len(cmdF.Verbose)
@ -178,7 +218,7 @@ func checkAddrs(addrs []string) error {
// well.
_, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return fmt.Errorf("with %d(th) address `%s`: %s", i+1, addr, err.Error())
return errors.Wrapf(err, "%d(th) address (%s) error", i+1, addr)
}
}
return nil

View File

@ -48,7 +48,7 @@ func apiTorrentsHandler(w http.ResponseWriter, r *http.Request) {
return
}
if tq.LastID != nil && *tq.LastID < 0 {
if tq.LastID == nil {
respondError(w, 400, "lastID has to be greater than or equal to zero")
return
}

View File

@ -2,6 +2,8 @@ package main
import (
"encoding/hex"
"fmt"
"github.com/pkg/errors"
"net/http"
"strings"
"time"
@ -14,73 +16,67 @@ import (
func rootHandler(w http.ResponseWriter, r *http.Request) {
nTorrents, err := database.GetNumberOfTorrents()
if err != nil {
panic(err.Error())
handlerError(errors.Wrap(err, "GetNumberOfTorrents"), w)
return
}
err = templates["homepage"].Execute(w, struct {
_ = templates["homepage"].Execute(w, struct {
NTorrents uint
}{
NTorrents: nTorrents,
})
if err != nil {
panic(err.Error())
}
}
// TODO: we might as well move torrents.html into static...
func torrentsHandler(w http.ResponseWriter, r *http.Request) {
data := mustAsset("templates/torrents.html")
w.Header().Set("Content-Type", "text/html; charset=utf-8")
// Cache static resources for a day
w.Header().Set("Cache-Control", "max-age=86400")
w.Write(data)
_, _ = w.Write(data)
}
func torrentsInfohashHandler(w http.ResponseWriter, r *http.Request) {
infoHash, err := hex.DecodeString(mux.Vars(r)["infohash"])
if err != nil {
panic(err.Error())
handlerError(errors.Wrap(err, "cannot decode infohash"), w)
return
}
torrent, err := database.GetTorrent(infoHash)
if err != nil {
panic(err.Error())
handlerError(errors.Wrap(err, "cannot get torrent"), w)
return
}
if torrent == nil {
w.WriteHeader(404)
w.Write([]byte("torrent not found!"))
respondError(w, http.StatusNotFound, "torrent not found!")
return
}
files, err := database.GetFiles(infoHash)
if err != nil {
panic(err.Error())
handlerError(errors.Wrap(err, "could not get files"), w)
return
}
if files == nil {
w.WriteHeader(500)
w.Write([]byte("files not found what!!!"))
handlerError(fmt.Errorf("could not get files"), w)
return
}
err = templates["torrent"].Execute(w, struct {
_ = templates["torrent"].Execute(w, struct {
T *persistence.TorrentMetadata
F []persistence.File
}{
T: torrent,
F: files,
})
if err != nil {
panic("error while executing template!")
}
}
// TODO: we might as well move statistics.html into static...
func statisticsHandler(w http.ResponseWriter, r *http.Request) {
data := mustAsset("templates/statistics.html")
w.Header().Set("Content-Type", "text/html; charset=utf-8")
// Cache static resources for a day
w.Header().Set("Cache-Control", "max-age=86400")
w.Write(data)
_, _ = w.Write(data)
}
func feedHandler(w http.ResponseWriter, r *http.Request) {
@ -111,32 +107,24 @@ func feedHandler(w http.ResponseWriter, r *http.Request) {
nil,
)
if err != nil {
respondError(w, 400, err.Error())
handlerError(errors.Wrap(err, "query torrent"), w)
return
}
// It is much more convenient to write the XML deceleration manually*, and then process the XML
// template using template/html and send, then to use encoding/xml.
// template using template/html and send, than to use encoding/xml.
//
// *: https://github.com/golang/go/issues/3133
//
// TODO: maybe do it properly, even if it's inconvenient?
_, err = w.Write([]byte(`<?xml version="1.0" encoding="utf-8" standalone="yes"?>`))
if err != nil {
panic(err.Error())
}
err = templates["feed"].Execute(w, struct {
_, _ = w.Write([]byte(`<?xml version="1.0" encoding="utf-8" standalone="yes"?>`))
_ = templates["feed"].Execute(w, struct {
Title string
Torrents []persistence.TorrentMetadata
}{
Title: title,
Torrents: torrents,
})
if err != nil {
panic(err.Error())
}
}
func staticHandler(w http.ResponseWriter, r *http.Request) {
@ -155,5 +143,5 @@ func staticHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", contentType)
// Cache static resources for a day
w.Header().Set("Cache-Control", "max-age=86400")
w.Write(data)
_, _ = w.Write(data)
}

View File

@ -5,9 +5,9 @@ import (
"bytes"
"encoding/hex"
"fmt"
"github.com/pkg/errors"
"html/template"
"io"
"log"
"net/http"
"os"
"os/signal"
@ -31,6 +31,8 @@ import (
const N_TORRENTS = 20
var compiledOn string
// Set a Decoder instance as a package global, because it caches
// meta-data about structs, and an instance can be shared safely.
var decoder = schema.NewDecoder()
@ -44,6 +46,7 @@ var opts struct {
Credentials map[string][]byte // TODO: encapsulate credentials and mutex for safety
CredentialsRWMutex sync.RWMutex
CredentialsPath string
Verbosity int
}
func main() {
@ -58,14 +61,27 @@ func main() {
zap.ReplaceGlobals(logger)
zap.L().Info("magneticow v0.7.0-beta1 has been started.")
zap.L().Info("Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>.")
zap.L().Info("Copyright (C) 2018 Mert Bora ALPER <bora@boramalper.org>.")
zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.")
zap.S().Infof("Compiled on %s", compiledOn)
if err := parseFlags(); err != nil {
zap.L().Error("Error while initializing", zap.Error(err))
return
}
switch opts.Verbosity {
case 0:
loggerLevel.SetLevel(zap.WarnLevel)
case 1:
loggerLevel.SetLevel(zap.InfoLevel)
default: // Default: i.e. in case of 2 or more.
// TODO: print the caller (function)'s name and line number!
loggerLevel.SetLevel(zap.DebugLevel)
}
zap.ReplaceGlobals(logger)
// Reload credentials when you receive SIGHUP
sighupChan := make(chan os.Signal, 1)
signal.Notify(sighupChan, syscall.SIGHUP)
@ -156,7 +172,7 @@ func main() {
var err error
database, err = persistence.MakeDatabase(opts.Database, logger)
if err != nil {
panic(err.Error())
zap.L().Fatal("could not access to database", zap.Error(err))
}
decoder.IgnoreUnknownKeys(false)
@ -178,7 +194,8 @@ func respondError(w http.ResponseWriter, statusCode int, format string, a ...int
func mustAsset(name string) []byte {
data, err := Asset(name)
if err != nil {
log.Panicf("Could NOT access the requested resource `%s`: %s (please inform us, this is a BUG!)", name, err.Error())
zap.L().Panic("Could NOT access the requested resource! THIS IS A BUG, PLEASE REPORT",
zap.String("name", name), zap.Error(err))
}
return data
}
@ -189,6 +206,8 @@ func parseFlags() error {
Database string `short:"d" long:"database" description:"URL of the (magneticod) database"`
Cred string `short:"c" long:"credentials" description:"Path to the credentials file"`
NoAuth bool ` long:"no-auth" description:"Disables authorisation"`
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
}
if _, err := flags.Parse(&cmdFlags); err != nil {
@ -220,8 +239,6 @@ func parseFlags() error {
opts.CredentialsPath = cmdFlags.Cred
}
fmt.Printf("%v credpath %s\n", cmdFlags.NoAuth, opts.CredentialsPath)
if opts.CredentialsPath != "" {
opts.Credentials = make(map[string][]byte)
if err := loadCred(opts.CredentialsPath); err != nil {
@ -231,6 +248,8 @@ func parseFlags() error {
opts.Credentials = nil
}
opts.Verbosity = len(cmdFlags.Verbose)
return nil
}
@ -250,7 +269,7 @@ func loadCred(cred string) error {
if err == io.EOF {
break
}
return fmt.Errorf("error while reading line %d: %s", lineno, err.Error())
return errors.Wrapf(err, "while reading line %d", lineno)
}
line = line[:len(line)-1] // strip '\n'
@ -316,7 +335,5 @@ func BasicAuth(handler http.HandlerFunc, realm string) http.HandlerFunc {
func authenticate(w http.ResponseWriter, realm string) {
w.Header().Set("WWW-Authenticate", `Basic realm="`+realm+`"`)
w.WriteHeader(401)
if _, err := w.Write([]byte("Unauthorised.\n")); err != nil {
panic(err.Error())
}
_, _ = w.Write([]byte("Unauthorised.\n"))
}

View File

@ -1,7 +1,6 @@
package main
import (
"fmt"
"testing"
)
@ -113,5 +112,4 @@ func TestSchemaRequired(t *testing.T) {
if err == nil {
t.Error("err is nil")
}
fmt.Printf(err.Error())
}

8
cmd/magneticow/util.go Normal file
View File

@ -0,0 +1,8 @@
package main
import "net/http"
func handlerError(err error, w http.ResponseWriter) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}

7
pkg/README.md Normal file
View File

@ -0,0 +1,7 @@
# magnetico/pkg
[![GoDoc](https://godoc.org/github.com/boramalper/magnetico?status.svg)](https://godoc.org/github.com/boramalper/magnetico)
- The most significant package is `persistence`, that abstracts access to the
magnetico databases with different engines (currently, only SQLite).
**For REST-ful magneticow API, see [https://app.swaggerhub.com/apis/boramalper/magneticow-api/](https://app.swaggerhub.com/apis/boramalper/magneticow-api/).**

View File

@ -4,6 +4,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"net/url"
"go.uber.org/zap"
@ -104,7 +105,7 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) {
url_, err := url.Parse(rawURL)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "url.Parse")
}
switch url_.Scheme {

View File

@ -31,32 +31,44 @@ func ParseISO8601(s string) (*time.Time, Granularity, error) {
if err != nil {
return nil, -1, err
}
t := time.Date(year, time.December, daysOfMonth(time.December, year), 23, 59, 59, 0, time.UTC)
return &t, Year, nil
}
if matches := monthRE.FindStringSubmatch(s); len(matches) != 0 {
month, err := parseMonth(matches[2])
if err != nil {
return nil, -1, err
}
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
t := time.Date(year, month, 31, 23, 59, 59, 0, time.UTC)
return &t, Month, nil
}
if matches := weekRE.FindStringSubmatch(s); len(matches) != 0 {
week, err := parseWeek(matches[2])
if err != nil {
return nil, -1, err
}
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
t := time.Date(year, time.January, week*7, 23, 59, 59, 0, time.UTC)
return &t, Week, nil
}
if matches := dayRE.FindStringSubmatch(s); len(matches) != 0 {
month, err := parseMonth(matches[2])
if err != nil {
return nil, -1, err
}
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
@ -65,21 +77,29 @@ func ParseISO8601(s string) (*time.Time, Granularity, error) {
if err != nil {
return nil, -1, err
}
t := time.Date(year, month, day, 23, 59, 59, 0, time.UTC)
return &t, Day, nil
}
if matches := hourRE.FindStringSubmatch(s); len(matches) != 0 {
month, err := parseMonth(matches[2])
if err != nil {
return nil, -1, err
}
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
hour, err := parseHour(matches[4])
if err != nil {
return nil, -1, err
}
day, err := parseDay(matches[3], daysOfMonth(month, year))
if err != nil {
return nil, -1, err
}
t := time.Date(year, month, day, hour, 59, 59, 0, time.UTC)
return &t, Hour, nil
}

View File

@ -9,7 +9,7 @@ import (
"path"
"text/template"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -27,23 +27,44 @@ func makeSqlite3Database(url_ *url.URL) (Database, error) {
dbDir, _ := path.Split(url_.Path)
if err := os.MkdirAll(dbDir, 0755); err != nil {
return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error())
return nil, errors.Wrapf(err, "mkdirAll error for `%s`", dbDir)
}
var err error
url_.Scheme = ""
db.conn, err = sql.Open("sqlite3", url_.String())
if err != nil {
return nil, fmt.Errorf("sql.Open: %s", err.Error())
return nil, errors.Wrap(err, "sql.Open")
}
// > Open may just validate its arguments without creating a connection to the database. To
// > verify that the data source Name is valid, call Ping.
// https://golang.org/pkg/database/sql/#Open
if err = db.conn.Ping(); err != nil {
return nil, fmt.Errorf("sql.DB.Ping: %s", err.Error())
return nil, errors.Wrap(err, "sql.DB.Ping")
}
// > After some time we receive "unable to open database file" error while trying to execute a transaction using
// > Tx.Exec().
// -- boramalper
//
// > Not sure if this would be contributing to your issue, but one of the problems we've observed in the past is the
// > standard library's attempt to pool connections. (This makes more sense for database connections that are actual
// > network connections, as opposed to SQLite.)
// > Basically, the problem we encountered was that most pragmas (except specifically PRAGMA journal_mode=WAL, as
// > per the documentation) apply to the connection, so if the standard library is opening/closing connections
// > behind your back for pooling purposes, it can lead to unintended behavior.
// -- rittneje
//
// https://github.com/mattn/go-sqlite3/issues/618
//
// Our solution is to set the connection max lifetime to infinity (reuse connection forever), and max open
// connections to 3 (1 causes deadlocks, unlimited is too lax!). Max idle conns are set to 3 to persist connections
// (instead of opening the database again and again).
db.conn.SetConnMaxLifetime(0) // https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime
db.conn.SetMaxOpenConns(3)
db.conn.SetMaxIdleConns(3)
if err := db.setupDatabase(); err != nil {
return nil, errors.Wrap(err, "setupDatabase")
}
@ -297,9 +318,9 @@ func (db *sqlite3Database) QueryTorrents(
queryArgs = append(queryArgs, limit)
rows, err := db.conn.Query(sqlQuery, queryArgs...)
defer rows.Close()
defer closeRows(rows)
if err != nil {
return nil, fmt.Errorf("error while querying torrents: %s", err.Error())
return nil, errors.Wrap(err, "query error")
}
torrents := make([]TorrentMetadata, 0)
@ -354,7 +375,7 @@ func (db *sqlite3Database) GetTorrent(infoHash []byte) (*TorrentMetadata, error)
WHERE info_hash = ?`,
infoHash,
)
defer rows.Close()
defer closeRows(rows)
if err != nil {
return nil, err
}
@ -375,7 +396,7 @@ func (db *sqlite3Database) GetFiles(infoHash []byte) ([]File, error) {
rows, err := db.conn.Query(
"SELECT size, path FROM files, torrents WHERE files.torrent_id = torrents.id AND torrents.info_hash = ?;",
infoHash)
defer rows.Close()
defer closeRows(rows)
if err != nil {
return nil, err
}
@ -395,7 +416,7 @@ func (db *sqlite3Database) GetFiles(infoHash []byte) ([]File, error) {
func (db *sqlite3Database) GetStatistics(from string, n uint) (*Statistics, error) {
fromTime, gran, err := ParseISO8601(from)
if err != nil {
return nil, fmt.Errorf("error while parsing from: %s", err.Error())
return nil, errors.Wrap(err, "parsing ISO8601 error")
}
var toTime time.Time
@ -432,7 +453,7 @@ func (db *sqlite3Database) GetStatistics(from string, n uint) (*Statistics, erro
GROUP BY dt;`,
timef),
fromTime.Unix(), toTime.Unix())
defer rows.Close()
defer closeRows(rows)
if err != nil {
return nil, err
}
@ -479,12 +500,12 @@ func (db *sqlite3Database) setupDatabase() error {
PRAGMA encoding='UTF-8';
`)
if err != nil {
return fmt.Errorf("sql.DB.Exec (PRAGMAs): %s", err.Error())
return errors.Wrap(err, "sql.DB.Exec (PRAGMAs)")
}
tx, err := db.conn.Begin()
if err != nil {
return fmt.Errorf("sql.DB.Begin: %s", err.Error())
return errors.Wrap(err, "sql.DB.Begin")
}
// If everything goes as planned and no error occurs, we will commit the transaction before
// returning from the function so the tx.Rollback() call will fail, trying to rollback a
@ -512,13 +533,13 @@ func (db *sqlite3Database) setupDatabase() error {
);
`)
if err != nil {
return fmt.Errorf("sql.Tx.Exec (v0): %s", err.Error())
return errors.Wrap(err, "sql.Tx.Exec (v0)")
}
// Get the user_version:
rows, err := tx.Query("PRAGMA user_version;")
if err != nil {
return fmt.Errorf("sql.Tx.Query (user_version): %s", err.Error())
return errors.Wrap(err, "sql.Tx.Query (user_version)")
}
defer rows.Close()
var userVersion int
@ -526,7 +547,7 @@ func (db *sqlite3Database) setupDatabase() error {
return fmt.Errorf("sql.Rows.Next (user_version): PRAGMA user_version did not return any rows!")
}
if err = rows.Scan(&userVersion); err != nil {
return fmt.Errorf("sql.Rows.Scan (user_version): %s", err.Error())
return errors.Wrap(err, "sql.Rows.Scan (user_version)")
}
switch userVersion {
@ -541,7 +562,7 @@ func (db *sqlite3Database) setupDatabase() error {
PRAGMA user_version = 1;
`)
if err != nil {
return fmt.Errorf("sql.Tx.Exec (v0 -> v1): %s", err.Error())
return errors.Wrap(err, "sql.Tx.Exec (v0 -> v1)")
}
fallthrough
@ -586,7 +607,7 @@ func (db *sqlite3Database) setupDatabase() error {
PRAGMA user_version = 2;
`)
if err != nil {
return fmt.Errorf("sql.Tx.Exec (v1 -> v2): %s", err.Error())
return errors.Wrap(err, "sql.Tx.Exec (v1 -> v2)")
}
fallthrough
@ -650,7 +671,7 @@ func (db *sqlite3Database) setupDatabase() error {
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("sql.Tx.Commit: %s", err.Error())
return errors.Wrap(err, "sql.Tx.Commit")
}
return nil
@ -666,3 +687,9 @@ func executeTemplate(text string, data interface{}, funcs template.FuncMap) stri
}
return buf.String()
}
func closeRows(rows *sql.Rows) {
if err := rows.Close(); err != nil {
zap.L().Error("could not close row", zap.Error(err))
}
}

View File

@ -8,4 +8,4 @@ import (
func HexField(key string, val []byte) zapcore.Field {
return zapcore.Field{Key: key, Type: zapcore.StringType, String: hex.EncodeToString(val)}
}
}