From 58dc24d20cb123fb1be4c311aee97db127da0639 Mon Sep 17 00:00:00 2001 From: Matthias Detsch Date: Fri, 9 Feb 2024 16:56:15 +0100 Subject: [PATCH] feat: first working prototype downloader --- .gitignore | 3 ++ cmd/cli/main.go | 13 ++++++ go.mod | 6 +++ go.sum | 10 +++++ lib.go | 79 +++++++++++++++++++++++++++++++++++-- storage.go | 102 ++++++++++++++++++++++++++++++++++++++++++++---- upstream.go | 2 +- 7 files changed, 204 insertions(+), 11 deletions(-) create mode 100644 cmd/cli/main.go create mode 100644 go.sum diff --git a/.gitignore b/.gitignore index 3b735ec..de01256 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,6 @@ # Go workspace file go.work + +.hibp-data +.idea diff --git a/cmd/cli/main.go b/cmd/cli/main.go new file mode 100644 index 0000000..77d717d --- /dev/null +++ b/cmd/cli/main.go @@ -0,0 +1,13 @@ +package main + +import ( + "fmt" + + hibpsync "github.com/exaring/go-hibp-sync" +) + +func main() { + if err := hibpsync.Sync(); err != nil { + fmt.Printf("sync error: %q", err) + } +} diff --git a/go.mod b/go.mod index f7bec04..f4f3df3 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,9 @@ module github.com/exaring/go-hibp-sync go 1.21.6 + +require ( + github.com/alitto/pond v1.8.3 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-retryablehttp v0.7.5 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..51323ed --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs= +github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= +github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M= +github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/lib.go b/lib.go index 0be1a3c..8c1e807 100644 --- a/lib.go +++ b/lib.go @@ -1,15 +1,24 @@ package hibpsync +import ( + "fmt" + + "github.com/alitto/pond" + "github.com/hashicorp/go-retryablehttp" +) + const ( defaultDataDir = "./.hibp-data" defaultEndpoint = "https://api.pwnedpasswords.com/range/" defaultCheckETag = true + defaultWorkers = 100 ) type syncConfig struct { dataDir string endpoint string checkETag bool + worker int } type SyncOption func(*syncConfig) @@ -32,17 +41,81 @@ func WithCheckETag(checkETag bool) SyncOption { } } -func Sync(options ...SyncOption) { +func WithWorkers(workers int) SyncOption { + return func(c *syncConfig) { + c.worker = workers + } +} + +func Sync(options ...SyncOption) error { config := &syncConfig{ dataDir: defaultDataDir, endpoint: defaultEndpoint, checkETag: defaultCheckETag, + worker: defaultWorkers, } for _, option := range options { option(config) } - // TODO: Implement sync - // We want to use a pool of workers that draw their range from + rG, err := newRangeGenerator(0x00000, 0xFFFFF, "") + if err != nil { + return fmt.Errorf("creating range generator: %w", err) + } + + retryClient := retryablehttp.NewClient() //TODO: add dnscache, timeout + retryClient.RetryMax = 10 + retryClient.Logger = nil + + hc := hibpClient{ + endpoint: config.endpoint, + httpClient: retryClient.StandardClient(), + } + + storage := fsStorage{ + dataDir: config.dataDir, + } + + pool := pond.New(config.worker, 0, pond.MinWorkers(config.worker)) + defer pool.Stop() + + for { + rangeIndex, ok, err := rG.Next() + if err != nil { + return fmt.Errorf("getting next range: %w", err) + } + + if !ok { + break + } + + if rangeIndex%100 == 0 || rangeIndex < 10 { + fmt.Printf("processing range %d\n", rangeIndex) + } + + pool.Submit(func() { + rangePrefix := toRangeString(rangeIndex) + etag, err := storage.LoadETag(rangePrefix) + if err != nil { + fmt.Printf("error loading etag for range %q: %v\n", rangePrefix, err) + return + } + + resp, err := hc.RequestRange(rangePrefix, etag) + if err != nil { + fmt.Printf("error requesting range %q: %v\n", rangePrefix, err) + return + } + + if resp.NotModified { + return + } + if err := storage.Save(rangePrefix, resp.ETag, resp.Data); err != nil { + fmt.Printf("error saving range %q: %v\n", rangePrefix, err) + } + }) + } + + return nil } diff --git a/storage.go b/storage.go index 73731b6..432085d 100644 --- a/storage.go +++ b/storage.go @@ -1,6 +1,19 @@ package hibpsync -import "sync" +import ( + "bufio" + "errors" + "fmt" + "io" + "os" + "path" + "sync" +) + +const ( + fileMode = 0666 // TODO ??? + dirMode = 0744 // TODO ??? +) type fsStorage struct { dataDir string @@ -13,19 +26,94 @@ func (f *fsStorage) Save(key, etag string, data []byte) error { f.writeLock.Lock() defer f.writeLock.Unlock() - // TODO: Implement Save + if err := os.MkdirAll(f.subDir(key), dirMode); err != nil { + return fmt.Errorf("creating data directory: %w", err) + } + + filePath := f.filePath(key) + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("creating file %q: %w", filePath, err) + } + defer file.Close() + + if _, err := file.WriteString(etag + "\n"); err != nil { + return fmt.Errorf("writing etag to file %q: %w", filePath, err) + } + + if _, err := file.Write(data); err != nil { + return fmt.Errorf("writing data to file %q: %w", filePath, err) + } return nil } func (f *fsStorage) LoadETag(key string) (string, error) { - // TODO: Implement LoadETag + file, err := os.Open(f.filePath(key)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return "", nil + } + return "", fmt.Errorf("opening file %q: %w", f.filePath(key), err) + } + defer file.Close() + + etag, err := bufio.NewReader(file).ReadString('\n') + if err != nil { + return "", fmt.Errorf("reading etag from file %q: %w", f.filePath(key), err) + } + + // Remove the newline character from the etag + return etag[:len(etag)-1], nil +} + +func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) { + file, err := os.Open(f.filePath(key)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, fmt.Errorf("opening file %q: %w", f.filePath(key), err) + } + + if err := skipLine(file); err != nil { + file.Close() + return nil, fmt.Errorf("skipping etag line in file %q: %w", f.filePath(key), err) + } - return "", nil + return file, nil } -func (f *fsStorage) LoadData(key string) ([]byte, error) { - // TODO: Implement LoadData +func skipLine(reader io.ReadSeeker) error { + // Create a new buffered reader for efficient reading + br := bufio.NewReader(reader) + + // Read until the first newline character + _, err := br.ReadString('\n') + if err != nil && err != io.EOF { + return err + } + + // Get the current offset + offset, err := reader.Seek(0, io.SeekCurrent) + if err != nil { + return err + } + + // Seek back to the beginning of the file + _, err = reader.Seek(offset, io.SeekStart) + if err != nil { + return err + } + + return nil +} + +func (f *fsStorage) subDir(key string) string { + subDir := key[:2] + return path.Join(f.dataDir, subDir) +} - return nil, nil +func (f *fsStorage) filePath(key string) string { + return path.Join(f.subDir(key), key) } diff --git a/upstream.go b/upstream.go index f0eba51..15e6718 100644 --- a/upstream.go +++ b/upstream.go @@ -8,7 +8,7 @@ import ( type hibpClient struct { endpoint string - httpClient http.Client + httpClient *http.Client } type hibpResponse struct {