From 1b69de4ff07730bd9d4a283cd939e3e91b62d166 Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Tue, 4 Mar 2025 13:18:09 +0100 Subject: [PATCH 01/12] More logging for the downloadserver as requested in #459 --- service/downloadserver/downloadserver.go | 100 ++++++++++++++--------- 1 file changed, 60 insertions(+), 40 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 2056987a..2787c6ba 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -110,46 +110,66 @@ func (c *UsageCache[C]) Done(key string) { } func (d *DownloadServer) handleGetPiece(c echo.Context) error { - id := c.Param("id") - pieceCid, err := cid.Parse(id) - if err != nil { - return c.String(http.StatusBadRequest, "failed to parse piece CID: "+err.Error()) - } - if pieceCid.Type() != cid.FilCommitmentUnsealed { - return c.String(http.StatusBadRequest, "CID is not a commp") - } - var pieceMetadata *contentprovider.PieceMetadata - var ok bool - pieceMetadata, ok = d.usageCache.Get(pieceCid.String()) - if !ok { - var statusCode int - pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) - if err != nil && statusCode >= 400 { - return c.String(statusCode, "failed to query metadata API: "+err.Error()) - } - if err != nil { - return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error()) - } - d.usageCache.Set(pieceCid.String(), *pieceMetadata) - } - defer func() { - d.usageCache.Done(pieceCid.String()) - }() - pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files) - if err != nil { - return c.String(http.StatusInternalServerError, "failed to create piece reader: "+err.Error()) - } - defer pieceReader.Close() - contentprovider.SetCommonHeaders(c, pieceCid.String()) - http.ServeContent( - c.Response(), - c.Request(), - pieceCid.String()+".car", - pieceMetadata.Car.CreatedAt, - pieceReader, - ) - - return nil + id := c.Param("id") + pieceCid, err := cid.Parse(id) + if err != nil { + Logger.Errorw("Invalid piece CID", "id", id, "error", err) + return c.String(http.StatusBadRequest, "failed to parse piece CID: "+err.Error()) + } + + if pieceCid.Type() != cid.FilCommitmentUnsealed { + Logger.Warnw("Received invalid CID type", "id", id, "type", pieceCid.Type()) + return c.String(http.StatusBadRequest, "CID is not a commp") + } + + var pieceMetadata *contentprovider.PieceMetadata + var ok bool + pieceMetadata, ok = d.usageCache.Get(pieceCid.String()) + if !ok { + var statusCode int + Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid.String()) + + pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) + if err != nil { + Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err) + if statusCode >= 400 { + return c.String(statusCode, "failed to query metadata API: "+err.Error()) + } + return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error()) + } + d.usageCache.Set(pieceCid.String(), *pieceMetadata) + } + + defer func() { + d.usageCache.Done(pieceCid.String()) + }() + + // Log the CAR file and storage details + Logger.Infow("Attempting to create piece reader", + "pieceCID", pieceCid.String(), + "carFile", pieceMetadata.Car, + "storageType", pieceMetadata.Storage.Type) + + pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files) + if err != nil { + Logger.Errorw("Failed to create piece reader", "pieceCID", pieceCid.String(), "error", err) + return c.String(http.StatusInternalServerError, "failed to create piece reader: "+err.Error()) + } + defer pieceReader.Close() + + Logger.Infow("Serving content", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") + + contentprovider.SetCommonHeaders(c, pieceCid.String()) + + http.ServeContent( + c.Response(), + c.Request(), + pieceCid.String()+".car", + pieceMetadata.Car.CreatedAt, + pieceReader, + ) + + return nil } func GetMetadata( From 94f1185e98c71aa46d2fe556defc0abc32441bdf Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Tue, 4 Mar 2025 13:32:06 +0100 Subject: [PATCH 02/12] Added exponential backoff timeout's to avoid fail2ban errors --- service/downloadserver/downloadserver.go | 260 ++++++++++++++--------- 1 file changed, 154 insertions(+), 106 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 2787c6ba..9b2fd6c6 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -2,6 +2,7 @@ package downloadserver import ( "context" + "math/rand" "fmt" "net/http" "strings" @@ -25,6 +26,12 @@ import ( const shutdownTimeout = 5 * time.Second +// Maximum retry attempts +const maxRetries = 5 + +// Initial wait duration before retrying (doubles each retry) +const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules + type DownloadServer struct { bind string api string @@ -110,66 +117,66 @@ func (c *UsageCache[C]) Done(key string) { } func (d *DownloadServer) handleGetPiece(c echo.Context) error { - id := c.Param("id") - pieceCid, err := cid.Parse(id) - if err != nil { - Logger.Errorw("Invalid piece CID", "id", id, "error", err) - return c.String(http.StatusBadRequest, "failed to parse piece CID: "+err.Error()) - } - - if pieceCid.Type() != cid.FilCommitmentUnsealed { - Logger.Warnw("Received invalid CID type", "id", id, "type", pieceCid.Type()) - return c.String(http.StatusBadRequest, "CID is not a commp") - } - - var pieceMetadata *contentprovider.PieceMetadata - var ok bool - pieceMetadata, ok = d.usageCache.Get(pieceCid.String()) - if !ok { - var statusCode int - Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid.String()) - - pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) - if err != nil { - Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err) - if statusCode >= 400 { - return c.String(statusCode, "failed to query metadata API: "+err.Error()) - } - return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error()) - } - d.usageCache.Set(pieceCid.String(), *pieceMetadata) - } - - defer func() { - d.usageCache.Done(pieceCid.String()) - }() - - // Log the CAR file and storage details - Logger.Infow("Attempting to create piece reader", - "pieceCID", pieceCid.String(), - "carFile", pieceMetadata.Car, - "storageType", pieceMetadata.Storage.Type) - - pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files) - if err != nil { - Logger.Errorw("Failed to create piece reader", "pieceCID", pieceCid.String(), "error", err) - return c.String(http.StatusInternalServerError, "failed to create piece reader: "+err.Error()) - } - defer pieceReader.Close() - - Logger.Infow("Serving content", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") - - contentprovider.SetCommonHeaders(c, pieceCid.String()) - - http.ServeContent( - c.Response(), - c.Request(), - pieceCid.String()+".car", - pieceMetadata.Car.CreatedAt, - pieceReader, - ) - - return nil + id := c.Param("id") + pieceCid, err := cid.Parse(id) + if err != nil { + Logger.Errorw("Invalid piece CID", "id", id, "error", err) + return c.String(http.StatusBadRequest, "failed to parse piece CID: "+err.Error()) + } + + if pieceCid.Type() != cid.FilCommitmentUnsealed { + Logger.Warnw("Received invalid CID type", "id", id, "type", pieceCid.Type()) + return c.String(http.StatusBadRequest, "CID is not a commp") + } + + var pieceMetadata *contentprovider.PieceMetadata + var ok bool + pieceMetadata, ok = d.usageCache.Get(pieceCid.String()) + if !ok { + var statusCode int + Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid.String()) + + pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) + if err != nil { + Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err) + if statusCode >= 400 { + return c.String(statusCode, "failed to query metadata API: "+err.Error()) + } + return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error()) + } + d.usageCache.Set(pieceCid.String(), *pieceMetadata) + } + + defer func() { + d.usageCache.Done(pieceCid.String()) + }() + + // Log the CAR file and storage details + Logger.Infow("Attempting to create piece reader", + "pieceCID", pieceCid.String(), + "carFile", pieceMetadata.Car, + "storageType", pieceMetadata.Storage.Type) + + pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files) + if err != nil { + Logger.Errorw("Failed to create piece reader", "pieceCID", pieceCid.String(), "error", err) + return c.String(http.StatusInternalServerError, "failed to create piece reader: "+err.Error()) + } + defer pieceReader.Close() + + Logger.Infow("Serving content", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") + + contentprovider.SetCommonHeaders(c, pieceCid.String()) + + http.ServeContent( + c.Response(), + c.Request(), + pieceCid.String()+".car", + pieceMetadata.Car.CreatedAt, + pieceReader, + ) + + return nil } func GetMetadata( @@ -179,65 +186,106 @@ func GetMetadata( clientConfig model.ClientConfig, pieceCid string) (*contentprovider.PieceMetadata, int, error) { api = strings.TrimSuffix(api, "/") - req, err := http.NewRequestWithContext(ctx, http.MethodGet, api+"/piece/metadata/"+pieceCid, nil) - if err != nil { - return nil, 0, errors.WithStack(err) - } - req.Header.Add("Accept", "application/cbor") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, 0, errors.WithStack(err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return nil, resp.StatusCode, errors.Errorf("failed to get metadata: %s", resp.Status) - } + var lastErr error + var lastStatusCode int - var pieceMetadata contentprovider.PieceMetadata - err = cbor.NewDecoder(resp.Body).Decode(&pieceMetadata) - if err != nil { - return nil, 0, errors.Wrap(err, "failed to decode metadata") - } + for attempt := 1; attempt <= maxRetries; attempt++ { + Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid, "attempt", attempt) - cfg := make(map[string]string) - backend, ok := storagesystem.BackendMap[pieceMetadata.Storage.Type] - if !ok { - return nil, 0, errors.Newf("storage type %s is not supported", pieceMetadata.Storage.Type) - } + // Set request timeout (increase timeout to 120s per attempt) + reqCtx, cancel := context.WithTimeout(ctx, 120*time.Second) + defer cancel() - prefix := pieceMetadata.Storage.Type + "-" - provider := pieceMetadata.Storage.Config["provider"] - providerOptions, err := underscore.Find(backend.ProviderOptions, func(providerOption storagesystem.ProviderOptions) bool { - return providerOption.Provider == provider - }) - if err != nil { - return nil, 0, errors.Newf("provider '%s' is not supported", provider) - } + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, api+"/piece/metadata/"+pieceCid, nil) + if err != nil { + Logger.Errorw("Failed to create request", "pieceCID", pieceCid, "error", err) + return nil, 0, errors.WithStack(err) + } - for _, option := range providerOptions.Options { - if option.Default != nil { - cfg[option.Name] = fmt.Sprintf("%v", option.Default) + req.Header.Add("Accept", "application/cbor") + resp, err := http.DefaultClient.Do(req) + if err != nil { + Logger.Warnw("Failed to reach metadata API", "pieceCID", pieceCid, "attempt", attempt, "error", err) + lastErr = err + time.Sleep(exponentialBackoff(attempt)) + continue // Retry + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + Logger.Errorw("Metadata API returned error", "pieceCID", pieceCid, "statusCode", resp.StatusCode, "attempt", attempt) + lastStatusCode = resp.StatusCode + lastErr = errors.Errorf("failed to get metadata: %s", resp.Status) + time.Sleep(exponentialBackoff(attempt)) + continue // Retry } - } - for key, value := range pieceMetadata.Storage.Config { - cfg[key] = value - } + // Decode metadata response + var pieceMetadata contentprovider.PieceMetadata + err = cbor.NewDecoder(resp.Body).Decode(&pieceMetadata) + if err != nil { + Logger.Errorw("Failed to decode metadata", "pieceCID", pieceCid, "attempt", attempt, "error", err) + lastErr = errors.Wrap(err, "failed to decode metadata") + time.Sleep(exponentialBackoff(attempt)) + continue // Retry + } + + // Process metadata: Override storage config with client config + cfg := make(map[string]string) + backend, ok := storagesystem.BackendMap[pieceMetadata.Storage.Type] + if !ok { + return nil, 0, errors.Newf("storage type %s is not supported", pieceMetadata.Storage.Type) + } - for key, value := range config { - if strings.HasPrefix(key, prefix) { - trimmed := strings.TrimPrefix(key, prefix) - snake := strings.ReplaceAll(trimmed, "-", "_") - cfg[snake] = value + prefix := pieceMetadata.Storage.Type + "-" + provider := pieceMetadata.Storage.Config["provider"] + providerOptions, err := underscore.Find(backend.ProviderOptions, func(providerOption storagesystem.ProviderOptions) bool { + return providerOption.Provider == provider + }) + if err != nil { + return nil, 0, errors.Newf("provider '%s' is not supported", provider) } + + for _, option := range providerOptions.Options { + if option.Default != nil { + cfg[option.Name] = fmt.Sprintf("%v", option.Default) + } + } + + for key, value := range pieceMetadata.Storage.Config { + cfg[key] = value + } + + for key, value := range config { + if strings.HasPrefix(key, prefix) { + trimmed := strings.TrimPrefix(key, prefix) + snake := strings.ReplaceAll(trimmed, "-", "_") + cfg[snake] = value + } + } + + pieceMetadata.Storage.Config = cfg + storage.OverrideStorageWithClientConfig(&pieceMetadata.Storage, clientConfig) + + // Successfully fetched metadata, return it + Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid) + return &pieceMetadata, 0, nil } - pieceMetadata.Storage.Config = cfg - storage.OverrideStorageWithClientConfig(&pieceMetadata.Storage, clientConfig) - return &pieceMetadata, 0, nil + // If all retries fail, return the last error + return nil, lastStatusCode, lastErr } +// Exponential backoff function (2^attempt with jitter) +func exponentialBackoff(attempt int) time.Duration { + baseDelay := time.Duration(1< Date: Tue, 4 Mar 2025 13:44:35 +0100 Subject: [PATCH 03/12] Final version with caching to relax fail2ban --- service/downloadserver/downloadserver.go | 44 ++++++++++++++++++++---- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 9b2fd6c6..c031b800 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -33,11 +33,13 @@ const maxRetries = 5 const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules type DownloadServer struct { - bind string - api string - config map[string]string - clientConfig model.ClientConfig - usageCache *UsageCache[contentprovider.PieceMetadata] + bind string + api string + config map[string]string + clientConfig model.ClientConfig + usageCache *UsageCache[contentprovider.PieceMetadata] + + metadataCache sync.Map // Cache for ongoing metadata requests } type cacheItem[C any] struct { @@ -129,24 +131,54 @@ func (d *DownloadServer) handleGetPiece(c echo.Context) error { return c.String(http.StatusBadRequest, "CID is not a commp") } + Logger.Infow("Processing request", "pieceCID", pieceCid.String()) + + // Check usage cache first var pieceMetadata *contentprovider.PieceMetadata var ok bool pieceMetadata, ok = d.usageCache.Get(pieceCid.String()) + if !ok { + Logger.Infow("Metadata not found in cache, fetching from API", "pieceCID", pieceCid.String()) + + // Check if another request is already fetching this metadata + if result, exists := d.metadataCache.Load(pieceCid.String()); exists { + Logger.Infow("Waiting for ongoing metadata fetch", "pieceCID", pieceCid.String()) + metadataChan := result.(chan *contentprovider.PieceMetadata) + pieceMetadata, ok = <-metadataChan + if ok { + Logger.Infow("Received metadata from another request", "pieceCID", pieceCid.String()) + d.usageCache.Set(pieceCid.String(), *pieceMetadata) + goto ServeContent + } + } + + // Create a new channel for waiting requests + metadataChan := make(chan *contentprovider.PieceMetadata, 1) + d.metadataCache.Store(pieceCid.String(), metadataChan) + defer d.metadataCache.Delete(pieceCid.String()) // Remove from cache once fetched + + // Fetch metadata (call standalone function) var statusCode int Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid.String()) - pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) + if err != nil { Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err) + close(metadataChan) // Close channel so waiting goroutines don't hang if statusCode >= 400 { return c.String(statusCode, "failed to query metadata API: "+err.Error()) } return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error()) } + + Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid.String()) d.usageCache.Set(pieceCid.String(), *pieceMetadata) + metadataChan <- pieceMetadata + close(metadataChan) } +ServeContent: defer func() { d.usageCache.Done(pieceCid.String()) }() From d06e3bd51e089872190fae70122c4dbc8247d369 Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:22:13 +0100 Subject: [PATCH 04/12] More fixes for buffers and avoiding duplicates --- service/downloadserver/downloadserver.go | 118 +++++++++-------------- 1 file changed, 48 insertions(+), 70 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index c031b800..919ba44f 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -3,25 +3,24 @@ package downloadserver import ( "context" "math/rand" - "fmt" "net/http" "strings" "sync" "time" + "io" + "bufio" + "github.com/cockroachdb/errors" - "github.com/data-preservation-programs/singularity/handler/storage" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/service" "github.com/data-preservation-programs/singularity/service/contentprovider" - "github.com/data-preservation-programs/singularity/storagesystem" "github.com/data-preservation-programs/singularity/store" "github.com/fxamacker/cbor/v2" "github.com/ipfs/go-cid" "github.com/ipfs/go-log/v2" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" - "github.com/rjNemo/underscore" ) const shutdownTimeout = 5 * time.Second @@ -133,39 +132,47 @@ func (d *DownloadServer) handleGetPiece(c echo.Context) error { Logger.Infow("Processing request", "pieceCID", pieceCid.String()) - // Check usage cache first + // **Check usage cache first** var pieceMetadata *contentprovider.PieceMetadata var ok bool pieceMetadata, ok = d.usageCache.Get(pieceCid.String()) if !ok { - Logger.Infow("Metadata not found in cache, fetching from API", "pieceCID", pieceCid.String()) + Logger.Infow("Metadata not found in cache, checking for ongoing fetch", "pieceCID", pieceCid.String()) + + // **Check if another request is already fetching this metadata** + metadataChan := make(chan *contentprovider.PieceMetadata, 1) + actual, loaded := d.metadataCache.LoadOrStore(pieceCid.String(), metadataChan) - // Check if another request is already fetching this metadata - if result, exists := d.metadataCache.Load(pieceCid.String()); exists { + if loaded { + // **Wait for the existing metadata fetch** Logger.Infow("Waiting for ongoing metadata fetch", "pieceCID", pieceCid.String()) - metadataChan := result.(chan *contentprovider.PieceMetadata) + metadataChan = actual.(chan *contentprovider.PieceMetadata) pieceMetadata, ok = <-metadataChan - if ok { + if pieceMetadata != nil { Logger.Infow("Received metadata from another request", "pieceCID", pieceCid.String()) d.usageCache.Set(pieceCid.String(), *pieceMetadata) goto ServeContent + } else { + Logger.Errorw("Metadata fetch failed, returning error", "pieceCID", pieceCid.String()) + return c.String(http.StatusInternalServerError, "Failed to fetch metadata") } } - // Create a new channel for waiting requests - metadataChan := make(chan *contentprovider.PieceMetadata, 1) - d.metadataCache.Store(pieceCid.String(), metadataChan) - defer d.metadataCache.Delete(pieceCid.String()) // Remove from cache once fetched - - // Fetch metadata (call standalone function) + // **Fetch metadata (call standalone function)** var statusCode int Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid.String()) + pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) + // **Handle errors** if err != nil { Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err) - close(metadataChan) // Close channel so waiting goroutines don't hang + + // **Remove metadata cache entry to allow retries** + d.metadataCache.Delete(pieceCid.String()) + + close(metadataChan) // **Close channel so waiting goroutines don't hang** if statusCode >= 400 { return c.String(statusCode, "failed to query metadata API: "+err.Error()) } @@ -173,9 +180,11 @@ func (d *DownloadServer) handleGetPiece(c echo.Context) error { } Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid.String()) + + // **Ensure metadata is cached before notifying other requests** d.usageCache.Set(pieceCid.String(), *pieceMetadata) metadataChan <- pieceMetadata - close(metadataChan) + close(metadataChan) // Close channel after setting metadata } ServeContent: @@ -183,10 +192,10 @@ ServeContent: d.usageCache.Done(pieceCid.String()) }() - // Log the CAR file and storage details - Logger.Infow("Attempting to create piece reader", - "pieceCID", pieceCid.String(), - "carFile", pieceMetadata.Car, + // **Log CAR file and storage details** + Logger.Infow("Attempting to create piece reader", + "pieceCID", pieceCid.String(), + "carFile", pieceMetadata.Car, "storageType", pieceMetadata.Storage.Type) pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files) @@ -196,17 +205,19 @@ ServeContent: } defer pieceReader.Close() - Logger.Infow("Serving content", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") + Logger.Infow("Serving content with increased buffer", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") contentprovider.SetCommonHeaders(c, pieceCid.String()) - http.ServeContent( - c.Response(), - c.Request(), - pieceCid.String()+".car", - pieceMetadata.Car.CreatedAt, - pieceReader, - ) + // **Use buffered writer (16MB buffer) to improve streaming performance** + bufferedWriter := bufio.NewWriterSize(c.Response().Writer, 16*1024*1024) // 16MB buffer + defer bufferedWriter.Flush() + + _, err = io.Copy(bufferedWriter, pieceReader) + if err != nil { + Logger.Errorw("Error streaming content", "pieceCID", pieceCid.String(), "error", err) + return c.String(http.StatusInternalServerError, "Error streaming content: "+err.Error()) + } return nil } @@ -245,6 +256,12 @@ func GetMetadata( } defer resp.Body.Close() + // Handle known HTTP errors (don't retry on these) + if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound { + Logger.Warnw("Permanent metadata API error", "pieceCID", pieceCid, "statusCode", resp.StatusCode) + return nil, resp.StatusCode, errors.Errorf("permanent error: %s", resp.Status) + } + if resp.StatusCode != http.StatusOK { Logger.Errorw("Metadata API returned error", "pieceCID", pieceCid, "statusCode", resp.StatusCode, "attempt", attempt) lastStatusCode = resp.StatusCode @@ -263,49 +280,12 @@ func GetMetadata( continue // Retry } - // Process metadata: Override storage config with client config - cfg := make(map[string]string) - backend, ok := storagesystem.BackendMap[pieceMetadata.Storage.Type] - if !ok { - return nil, 0, errors.Newf("storage type %s is not supported", pieceMetadata.Storage.Type) - } - - prefix := pieceMetadata.Storage.Type + "-" - provider := pieceMetadata.Storage.Config["provider"] - providerOptions, err := underscore.Find(backend.ProviderOptions, func(providerOption storagesystem.ProviderOptions) bool { - return providerOption.Provider == provider - }) - if err != nil { - return nil, 0, errors.Newf("provider '%s' is not supported", provider) - } - - for _, option := range providerOptions.Options { - if option.Default != nil { - cfg[option.Name] = fmt.Sprintf("%v", option.Default) - } - } - - for key, value := range pieceMetadata.Storage.Config { - cfg[key] = value - } - - for key, value := range config { - if strings.HasPrefix(key, prefix) { - trimmed := strings.TrimPrefix(key, prefix) - snake := strings.ReplaceAll(trimmed, "-", "_") - cfg[snake] = value - } - } - - pieceMetadata.Storage.Config = cfg - storage.OverrideStorageWithClientConfig(&pieceMetadata.Storage, clientConfig) - - // Successfully fetched metadata, return it Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid) return &pieceMetadata, 0, nil } // If all retries fail, return the last error + Logger.Errorw("All metadata fetch attempts failed", "pieceCID", pieceCid, "error", lastErr) return nil, lastStatusCode, lastErr } @@ -316,8 +296,6 @@ func exponentialBackoff(attempt int) time.Duration { return baseDelay + jitter } - - func (d *DownloadServer) Start(ctx context.Context, exitErr chan<- error) error { e := echo.New() e.Use(middleware.GzipWithConfig(middleware.GzipConfig{})) From 3239606a0e982c8c4e3fee706536b63dbb5bc824 Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:23:14 +0100 Subject: [PATCH 05/12] Proper buffers for Rclone ( 256 MB ) --- storagesystem/rclone.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/storagesystem/rclone.go b/storagesystem/rclone.go index c3913b94..bce38e2e 100644 --- a/storagesystem/rclone.go +++ b/storagesystem/rclone.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "bufio" "strings" "sync" "time" @@ -205,6 +206,7 @@ func (r *readerWithRetry) Read(p []byte) (int, error) { func (h RCloneHandler) Read(ctx context.Context, path string, offset int64, length int64) (io.ReadCloser, fs.Object, error) { logger.Debugw("Read: reading path", "type", h.fs.Name(), "root", h.fs.Root(), "path", path, "offset", offset, "length", length) + if length == 0 { object, err := h.fs.NewObject(ctx, path) if err != nil { @@ -212,29 +214,43 @@ func (h RCloneHandler) Read(ctx context.Context, path string, offset int64, leng } return io.NopCloser(bytes.NewReader(nil)), object, nil } + + // Fetch object from rclone storage object, err := h.fsNoHead.NewObject(ctx, path) if err != nil { return nil, nil, errors.Wrapf(err, "failed to open object %s", path) } + option := &fs.SeekOption{Offset: offset} + + // Open the object for reading reader, err := object.Open(ctx, option) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to open object stream %s", path) + } + + // Apply a 256MB buffer & wrap it in io.NopCloser to add a Close method + bufferedReader := io.NopCloser(bufio.NewReaderSize(reader, 256*1024*1024)) // ✅ Wrapped with io.NopCloser + readerWithRetry := &readerWithRetry{ ctx: ctx, object: object, - reader: reader, + reader: bufferedReader, // ✅ Now implements io.ReadCloser offset: offset, retryDelay: h.retryDelay, retryBackoff: h.retryBackoff, retryCountMax: h.retryMaxCount, retryBackoffExponential: h.retryBackoffExponential, } + if length < 0 { - return readerWithRetry, object, errors.WithStack(err) + return readerWithRetry, object, nil } + return readCloser{ Reader: io.LimitReader(readerWithRetry, length), Closer: readerWithRetry, - }, object, errors.WithStack(err) + }, object, nil } func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, error) { From f19e1f958a2b5bea534467afdb45de2da1c2cf0a Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:33:55 +0100 Subject: [PATCH 06/12] Hold off on 429's --- service/downloadserver/downloadserver.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 919ba44f..6ba6db6b 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -9,7 +9,7 @@ import ( "time" "io" "bufio" - + "strconv" "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/model" @@ -256,12 +256,22 @@ func GetMetadata( } defer resp.Body.Close() - // Handle known HTTP errors (don't retry on these) - if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound { - Logger.Warnw("Permanent metadata API error", "pieceCID", pieceCid, "statusCode", resp.StatusCode) - return nil, resp.StatusCode, errors.Errorf("permanent error: %s", resp.Status) + // Handle 429 Too Many Requests + if resp.StatusCode == http.StatusTooManyRequests { + retryAfter := resp.Header.Get("Retry-After") + waitTime := exponentialBackoff(attempt) // Default backoff + if retryAfter != "" { + parsedWait, err := strconv.Atoi(retryAfter) + if err == nil { + waitTime = time.Duration(parsedWait) * time.Second + } + } + Logger.Warnw("Rate limited (429), retrying after", "pieceCID", pieceCid, "retryAfter", waitTime) + time.Sleep(waitTime) + continue // Retry after waiting } + // Handle other errors if resp.StatusCode != http.StatusOK { Logger.Errorw("Metadata API returned error", "pieceCID", pieceCid, "statusCode", resp.StatusCode, "attempt", attempt) lastStatusCode = resp.StatusCode @@ -289,7 +299,7 @@ func GetMetadata( return nil, lastStatusCode, lastErr } -// Exponential backoff function (2^attempt with jitter) +// Exponential backoff function (2^attempt with jitter) - Ensure only one definition exists func exponentialBackoff(attempt int) time.Duration { baseDelay := time.Duration(1< Date: Tue, 4 Mar 2025 14:39:09 +0100 Subject: [PATCH 07/12] gofmt --- service/downloadserver/downloadserver.go | 18 +++++++++--------- storagesystem/rclone.go | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 6ba6db6b..451a34f0 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -1,15 +1,15 @@ package downloadserver import ( + "bufio" "context" + "io" "math/rand" "net/http" + "strconv" "strings" "sync" "time" - "io" - "bufio" - "strconv" "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/model" @@ -32,13 +32,13 @@ const maxRetries = 5 const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules type DownloadServer struct { - bind string - api string - config map[string]string - clientConfig model.ClientConfig - usageCache *UsageCache[contentprovider.PieceMetadata] + bind string + api string + config map[string]string + clientConfig model.ClientConfig + usageCache *UsageCache[contentprovider.PieceMetadata] - metadataCache sync.Map // Cache for ongoing metadata requests + metadataCache sync.Map // Cache for ongoing metadata requests } type cacheItem[C any] struct { diff --git a/storagesystem/rclone.go b/storagesystem/rclone.go index bce38e2e..b8132e80 100644 --- a/storagesystem/rclone.go +++ b/storagesystem/rclone.go @@ -1,10 +1,10 @@ package storagesystem import ( + "bufio" "bytes" "context" "io" - "bufio" "strings" "sync" "time" From f6c5b2bfd74fc649fef5840a48cd875ff47e6634 Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Tue, 4 Mar 2025 15:50:44 +0100 Subject: [PATCH 08/12] More ratelimits --- service/downloadserver/downloadserver.go | 25 +++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 451a34f0..38eab078 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -1,15 +1,15 @@ package downloadserver import ( - "bufio" "context" - "io" "math/rand" "net/http" - "strconv" "strings" "sync" "time" + "io" + "bufio" + "strconv" "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/model" @@ -32,13 +32,13 @@ const maxRetries = 5 const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules type DownloadServer struct { - bind string - api string - config map[string]string - clientConfig model.ClientConfig - usageCache *UsageCache[contentprovider.PieceMetadata] + bind string + api string + config map[string]string + clientConfig model.ClientConfig + usageCache *UsageCache[contentprovider.PieceMetadata] - metadataCache sync.Map // Cache for ongoing metadata requests + metadataCache sync.Map // Cache for ongoing metadata requests } type cacheItem[C any] struct { @@ -117,6 +117,9 @@ func (c *UsageCache[C]) Done(key string) { item.usageCount-- } +var rateLimiter = time.Tick(60 * time.Second) // Allow 1 request per second + + func (d *DownloadServer) handleGetPiece(c echo.Context) error { id := c.Param("id") pieceCid, err := cid.Parse(id) @@ -159,6 +162,10 @@ func (d *DownloadServer) handleGetPiece(c echo.Context) error { } } + // **Rate limit metadata fetch** + Logger.Infow("Waiting for rate limiter before fetching metadata", "pieceCID", pieceCid.String()) + <-rateLimiter // **Wait before sending request to avoid 429 errors** + // **Fetch metadata (call standalone function)** var statusCode int Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid.String()) From 72591408fe6b8a9d114efb12eae9a750040e2060 Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Tue, 4 Mar 2025 15:51:55 +0100 Subject: [PATCH 09/12] More ratelimits --- storagesystem/rclone.go | 109 +++++++++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 35 deletions(-) diff --git a/storagesystem/rclone.go b/storagesystem/rclone.go index b8132e80..bca204cf 100644 --- a/storagesystem/rclone.go +++ b/storagesystem/rclone.go @@ -8,7 +8,8 @@ import ( "strings" "sync" "time" - + + "strconv" "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/model" "github.com/gammazero/workerpool" @@ -171,37 +172,69 @@ func (r *readerWithRetry) Close() error { } func (r *readerWithRetry) Read(p []byte) (int, error) { - if r.ctx.Err() != nil { - return 0, r.ctx.Err() - } - n, err := r.reader.Read(p) - r.offset += int64(n) - //nolint:errorlint - if err == io.EOF || err == nil { - return n, err - } - - if r.retryCount >= r.retryCountMax { - return n, err - } + if r.ctx.Err() != nil { + return 0, r.ctx.Err() + } + + n, err := r.reader.Read(p) + r.offset += int64(n) + + //nolint:errorlint + if err == io.EOF || err == nil { + return n, err + } + + if r.retryCount >= r.retryCountMax { + return n, err + } + + // Handle rate limiting (429 Too Many Requests) + if strings.Contains(err.Error(), "429 Too Many Requests") { + retryAfter := extractRetryAfter(err.Error()) + if retryAfter > 0 { + logger.Warnf("Rate limited (429), retrying after %ds", retryAfter) + time.Sleep(time.Duration(retryAfter) * time.Second) + } else { + time.Sleep(r.retryDelay) + } + } else { + logger.Warnf("Read error: %s, retrying after %s", err, r.retryDelay) + select { + case <-r.ctx.Done(): + return n, errors.Join(err, r.ctx.Err()) + case <-time.After(r.retryDelay): + } + } + + // Increment retry count and apply exponential backoff + r.retryCount++ + r.retryDelay = time.Duration(float64(r.retryDelay) * r.retryBackoffExponential) + r.retryDelay += r.retryBackoff + + // Close and reopen reader + r.reader.Close() + var err2 error + r.reader, err2 = r.object.Open(r.ctx, &fs.SeekOption{Offset: r.offset}) + if err2 != nil { + return n, errors.Join(err, err2) + } + + return n, nil +} - // error is not EOF - logger.Warnf("Read error: %s, retrying after %s", err, r.retryDelay) - select { - case <-r.ctx.Done(): - return n, errors.Join(err, r.ctx.Err()) - case <-time.After(r.retryDelay): - } - r.retryCount += 1 - r.retryDelay = time.Duration(float64(r.retryDelay) * r.retryBackoffExponential) - r.retryDelay += r.retryBackoff - r.reader.Close() - var err2 error - r.reader, err2 = r.object.Open(r.ctx, &fs.SeekOption{Offset: r.offset}) - if err2 != nil { - return n, errors.Join(err, err2) - } - return n, nil +func extractRetryAfter(errorMsg string) int { + parts := strings.Split(errorMsg, " ") + for i, part := range parts { + if part == "429" && i+2 < len(parts) && parts[i+1] == "Too" && parts[i+2] == "Many" { + if i+3 < len(parts) { + retryTime, err := strconv.Atoi(parts[i+3]) + if err == nil { + return retryTime + } + } + } + } + return 0 // Default to no Retry-After found } func (h RCloneHandler) Read(ctx context.Context, path string, offset int64, length int64) (io.ReadCloser, fs.Object, error) { @@ -264,6 +297,12 @@ func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, err config := fs.GetConfig(ctx) overrideConfig(config, s) + config.Transfers = 1 // Limit number of concurrent transfers + config.MultiThreadStreams = 1 // One parallel chunk per file + config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming + config.LowLevelRetries = 5 // Reduce retry attempts to avoid API bans + config.TPSLimit = 1 // Throttle API requests + noHeadObjectConfig := make(map[string]string) headObjectConfig := make(map[string]string) for k, v := range s.Config { @@ -292,10 +331,10 @@ func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, err name: s.Name, fs: headFS, fsNoHead: noHeadFS, - retryMaxCount: 10, - retryDelay: time.Second, - retryBackoff: time.Second, - retryBackoffExponential: 1.0, + retryMaxCount: 60, + retryDelay: 60, + retryBackoff: 120, + retryBackoffExponential: 2.0, scanConcurrency: scanConcurrency, } From ed69b5642087149909087a18c8dddc16a9d9a513 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 4 Mar 2025 18:18:28 +0100 Subject: [PATCH 10/12] gofmt --- service/downloadserver/downloadserver.go | 19 ++-- storagesystem/rclone.go | 134 +++++++++++------------ 2 files changed, 76 insertions(+), 77 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 38eab078..fed0f713 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -1,15 +1,15 @@ package downloadserver import ( + "bufio" "context" + "io" "math/rand" "net/http" + "strconv" "strings" "sync" "time" - "io" - "bufio" - "strconv" "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/model" @@ -32,13 +32,13 @@ const maxRetries = 5 const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules type DownloadServer struct { - bind string - api string - config map[string]string - clientConfig model.ClientConfig - usageCache *UsageCache[contentprovider.PieceMetadata] + bind string + api string + config map[string]string + clientConfig model.ClientConfig + usageCache *UsageCache[contentprovider.PieceMetadata] - metadataCache sync.Map // Cache for ongoing metadata requests + metadataCache sync.Map // Cache for ongoing metadata requests } type cacheItem[C any] struct { @@ -119,7 +119,6 @@ func (c *UsageCache[C]) Done(key string) { var rateLimiter = time.Tick(60 * time.Second) // Allow 1 request per second - func (d *DownloadServer) handleGetPiece(c echo.Context) error { id := c.Param("id") pieceCid, err := cid.Parse(id) diff --git a/storagesystem/rclone.go b/storagesystem/rclone.go index bca204cf..668c779c 100644 --- a/storagesystem/rclone.go +++ b/storagesystem/rclone.go @@ -8,8 +8,7 @@ import ( "strings" "sync" "time" - - "strconv" + "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/model" "github.com/gammazero/workerpool" @@ -18,6 +17,7 @@ import ( "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/object" "golang.org/x/exp/slices" + "strconv" ) var logger = log.Logger("storage") @@ -172,69 +172,69 @@ func (r *readerWithRetry) Close() error { } func (r *readerWithRetry) Read(p []byte) (int, error) { - if r.ctx.Err() != nil { - return 0, r.ctx.Err() - } - - n, err := r.reader.Read(p) - r.offset += int64(n) - - //nolint:errorlint - if err == io.EOF || err == nil { - return n, err - } - - if r.retryCount >= r.retryCountMax { - return n, err - } - - // Handle rate limiting (429 Too Many Requests) - if strings.Contains(err.Error(), "429 Too Many Requests") { - retryAfter := extractRetryAfter(err.Error()) - if retryAfter > 0 { - logger.Warnf("Rate limited (429), retrying after %ds", retryAfter) - time.Sleep(time.Duration(retryAfter) * time.Second) - } else { - time.Sleep(r.retryDelay) - } - } else { - logger.Warnf("Read error: %s, retrying after %s", err, r.retryDelay) - select { - case <-r.ctx.Done(): - return n, errors.Join(err, r.ctx.Err()) - case <-time.After(r.retryDelay): - } - } - - // Increment retry count and apply exponential backoff - r.retryCount++ - r.retryDelay = time.Duration(float64(r.retryDelay) * r.retryBackoffExponential) - r.retryDelay += r.retryBackoff - - // Close and reopen reader - r.reader.Close() - var err2 error - r.reader, err2 = r.object.Open(r.ctx, &fs.SeekOption{Offset: r.offset}) - if err2 != nil { - return n, errors.Join(err, err2) - } - - return n, nil + if r.ctx.Err() != nil { + return 0, r.ctx.Err() + } + + n, err := r.reader.Read(p) + r.offset += int64(n) + + //nolint:errorlint + if err == io.EOF || err == nil { + return n, err + } + + if r.retryCount >= r.retryCountMax { + return n, err + } + + // Handle rate limiting (429 Too Many Requests) + if strings.Contains(err.Error(), "429 Too Many Requests") { + retryAfter := extractRetryAfter(err.Error()) + if retryAfter > 0 { + logger.Warnf("Rate limited (429), retrying after %ds", retryAfter) + time.Sleep(time.Duration(retryAfter) * time.Second) + } else { + time.Sleep(r.retryDelay) + } + } else { + logger.Warnf("Read error: %s, retrying after %s", err, r.retryDelay) + select { + case <-r.ctx.Done(): + return n, errors.Join(err, r.ctx.Err()) + case <-time.After(r.retryDelay): + } + } + + // Increment retry count and apply exponential backoff + r.retryCount++ + r.retryDelay = time.Duration(float64(r.retryDelay) * r.retryBackoffExponential) + r.retryDelay += r.retryBackoff + + // Close and reopen reader + r.reader.Close() + var err2 error + r.reader, err2 = r.object.Open(r.ctx, &fs.SeekOption{Offset: r.offset}) + if err2 != nil { + return n, errors.Join(err, err2) + } + + return n, nil } func extractRetryAfter(errorMsg string) int { - parts := strings.Split(errorMsg, " ") - for i, part := range parts { - if part == "429" && i+2 < len(parts) && parts[i+1] == "Too" && parts[i+2] == "Many" { - if i+3 < len(parts) { - retryTime, err := strconv.Atoi(parts[i+3]) - if err == nil { - return retryTime - } - } - } - } - return 0 // Default to no Retry-After found + parts := strings.Split(errorMsg, " ") + for i, part := range parts { + if part == "429" && i+2 < len(parts) && parts[i+1] == "Too" && parts[i+2] == "Many" { + if i+3 < len(parts) { + retryTime, err := strconv.Atoi(parts[i+3]) + if err == nil { + return retryTime + } + } + } + } + return 0 // Default to no Retry-After found } func (h RCloneHandler) Read(ctx context.Context, path string, offset int64, length int64) (io.ReadCloser, fs.Object, error) { @@ -297,11 +297,11 @@ func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, err config := fs.GetConfig(ctx) overrideConfig(config, s) - config.Transfers = 1 // Limit number of concurrent transfers - config.MultiThreadStreams = 1 // One parallel chunk per file - config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming - config.LowLevelRetries = 5 // Reduce retry attempts to avoid API bans - config.TPSLimit = 1 // Throttle API requests + config.Transfers = 1 // Limit number of concurrent transfers + config.MultiThreadStreams = 1 // One parallel chunk per file + config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming + config.LowLevelRetries = 5 // Reduce retry attempts to avoid API bans + config.TPSLimit = 1 // Throttle API requests noHeadObjectConfig := make(map[string]string) headObjectConfig := make(map[string]string) From 100759cf8052a5ed03bd943c9b7f2425baed3e19 Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Thu, 6 Mar 2025 12:21:18 +0100 Subject: [PATCH 11/12] Cleaning up --- storagesystem/rclone.go | 72 +++++++++++------------------------------ 1 file changed, 19 insertions(+), 53 deletions(-) diff --git a/storagesystem/rclone.go b/storagesystem/rclone.go index 668c779c..b8e48ebc 100644 --- a/storagesystem/rclone.go +++ b/storagesystem/rclone.go @@ -17,7 +17,6 @@ import ( "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/object" "golang.org/x/exp/slices" - "strconv" ) var logger = log.Logger("storage") @@ -175,10 +174,8 @@ func (r *readerWithRetry) Read(p []byte) (int, error) { if r.ctx.Err() != nil { return 0, r.ctx.Err() } - n, err := r.reader.Read(p) r.offset += int64(n) - //nolint:errorlint if err == io.EOF || err == nil { return n, err @@ -188,55 +185,27 @@ func (r *readerWithRetry) Read(p []byte) (int, error) { return n, err } - // Handle rate limiting (429 Too Many Requests) - if strings.Contains(err.Error(), "429 Too Many Requests") { - retryAfter := extractRetryAfter(err.Error()) - if retryAfter > 0 { - logger.Warnf("Rate limited (429), retrying after %ds", retryAfter) - time.Sleep(time.Duration(retryAfter) * time.Second) - } else { - time.Sleep(r.retryDelay) - } - } else { - logger.Warnf("Read error: %s, retrying after %s", err, r.retryDelay) - select { - case <-r.ctx.Done(): - return n, errors.Join(err, r.ctx.Err()) - case <-time.After(r.retryDelay): - } + // error is not EOF + logger.Warnf("Read error: %s, retrying after %s", err, r.retryDelay) + select { + case <-r.ctx.Done(): + return n, errors.Join(err, r.ctx.Err()) + case <-time.After(r.retryDelay): } - - // Increment retry count and apply exponential backoff - r.retryCount++ + r.retryCount += 1 r.retryDelay = time.Duration(float64(r.retryDelay) * r.retryBackoffExponential) r.retryDelay += r.retryBackoff - - // Close and reopen reader r.reader.Close() var err2 error r.reader, err2 = r.object.Open(r.ctx, &fs.SeekOption{Offset: r.offset}) if err2 != nil { return n, errors.Join(err, err2) + logger.Warnf("Read error: %s, retrying after 5s", err) + time.Sleep(5 * time.Second) // 🔥 Add delay before retrying 🔥 } - return n, nil } -func extractRetryAfter(errorMsg string) int { - parts := strings.Split(errorMsg, " ") - for i, part := range parts { - if part == "429" && i+2 < len(parts) && parts[i+1] == "Too" && parts[i+2] == "Many" { - if i+3 < len(parts) { - retryTime, err := strconv.Atoi(parts[i+3]) - if err == nil { - return retryTime - } - } - } - } - return 0 // Default to no Retry-After found -} - func (h RCloneHandler) Read(ctx context.Context, path string, offset int64, length int64) (io.ReadCloser, fs.Object, error) { logger.Debugw("Read: reading path", "type", h.fs.Name(), "root", h.fs.Root(), "path", path, "offset", offset, "length", length) @@ -296,12 +265,7 @@ func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, err ctx, _ = fs.AddConfig(ctx) config := fs.GetConfig(ctx) overrideConfig(config, s) - - config.Transfers = 1 // Limit number of concurrent transfers - config.MultiThreadStreams = 1 // One parallel chunk per file - config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming - config.LowLevelRetries = 5 // Reduce retry attempts to avoid API bans - config.TPSLimit = 1 // Throttle API requests + config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming noHeadObjectConfig := make(map[string]string) headObjectConfig := make(map[string]string) @@ -331,10 +295,10 @@ func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, err name: s.Name, fs: headFS, fsNoHead: noHeadFS, - retryMaxCount: 60, - retryDelay: 60, - retryBackoff: 120, - retryBackoffExponential: 2.0, + retryMaxCount: 10, + retryDelay: time.Second, + retryBackoff: time.Second, + retryBackoffExponential: 1.0, scanConcurrency: scanConcurrency, } @@ -400,7 +364,9 @@ func overrideConfig(config *fs.ConfigInfo, s model.Storage) { if s.ClientConfig.UseServerModTime != nil { config.UseServerModTime = *s.ClientConfig.UseServerModTime } - if s.ClientConfig.LowLevelRetries != nil { - config.LowLevelRetries = *s.ClientConfig.LowLevelRetries - } + config.Transfers = 1 // Only 1 file download at a time + config.Checkers = 1 // Only 1 checker (avoids excessive HTTP requests) + config.LowLevelRetries = 10 + config.MaxBacklog = 1 + config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming } From 77e4881525eabca8125afc12afcf4b55491767db Mon Sep 17 00:00:00 2001 From: CryptoWhizzard <36404435+cryptowhizzard@users.noreply.github.com> Date: Thu, 6 Mar 2025 12:22:08 +0100 Subject: [PATCH 12/12] Cleaning up --- service/downloadserver/downloadserver.go | 125 ++++++++++------------- 1 file changed, 56 insertions(+), 69 deletions(-) diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index fed0f713..1806c247 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -14,8 +14,8 @@ import ( "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/service" - "github.com/data-preservation-programs/singularity/service/contentprovider" "github.com/data-preservation-programs/singularity/store" + "github.com/data-preservation-programs/singularity/service/contentprovider" "github.com/fxamacker/cbor/v2" "github.com/ipfs/go-cid" "github.com/ipfs/go-log/v2" @@ -26,11 +26,15 @@ import ( const shutdownTimeout = 5 * time.Second // Maximum retry attempts -const maxRetries = 5 +const maxRetries = 60 // Initial wait duration before retrying (doubles each retry) const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules +var archiveReqLock sync.Mutex // Ensures only 1 API request is active at a time +var nextAllowedRequestTime time.Time +var nextAllowedRequestMu sync.Mutex + type DownloadServer struct { bind string api string @@ -117,9 +121,12 @@ func (c *UsageCache[C]) Done(key string) { item.usageCount-- } -var rateLimiter = time.Tick(60 * time.Second) // Allow 1 request per second +var streamLimit = make(chan struct{}, 1) // Only 1 concurrent stream func (d *DownloadServer) handleGetPiece(c echo.Context) error { + streamLimit <- struct{}{} // Acquire semaphore (waits if limit reached) + defer func() { <-streamLimit }() // Release when done + id := c.Param("id") pieceCid, err := cid.Parse(id) if err != nil { @@ -142,78 +149,37 @@ func (d *DownloadServer) handleGetPiece(c echo.Context) error { if !ok { Logger.Infow("Metadata not found in cache, checking for ongoing fetch", "pieceCID", pieceCid.String()) - // **Check if another request is already fetching this metadata** - metadataChan := make(chan *contentprovider.PieceMetadata, 1) - actual, loaded := d.metadataCache.LoadOrStore(pieceCid.String(), metadataChan) - - if loaded { - // **Wait for the existing metadata fetch** - Logger.Infow("Waiting for ongoing metadata fetch", "pieceCID", pieceCid.String()) - metadataChan = actual.(chan *contentprovider.PieceMetadata) - pieceMetadata, ok = <-metadataChan - if pieceMetadata != nil { - Logger.Infow("Received metadata from another request", "pieceCID", pieceCid.String()) - d.usageCache.Set(pieceCid.String(), *pieceMetadata) - goto ServeContent - } else { - Logger.Errorw("Metadata fetch failed, returning error", "pieceCID", pieceCid.String()) - return c.String(http.StatusInternalServerError, "Failed to fetch metadata") - } - } - - // **Rate limit metadata fetch** - Logger.Infow("Waiting for rate limiter before fetching metadata", "pieceCID", pieceCid.String()) - <-rateLimiter // **Wait before sending request to avoid 429 errors** - - // **Fetch metadata (call standalone function)** + // **Fetch metadata** var statusCode int - Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid.String()) - pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) // **Handle errors** if err != nil { Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err) - - // **Remove metadata cache entry to allow retries** - d.metadataCache.Delete(pieceCid.String()) - - close(metadataChan) // **Close channel so waiting goroutines don't hang** - if statusCode >= 400 { - return c.String(statusCode, "failed to query metadata API: "+err.Error()) - } - return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error()) + return c.String(statusCode, "failed to query metadata API: "+err.Error()) } Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid.String()) - - // **Ensure metadata is cached before notifying other requests** d.usageCache.Set(pieceCid.String(), *pieceMetadata) - metadataChan <- pieceMetadata - close(metadataChan) // Close channel after setting metadata } -ServeContent: - defer func() { - d.usageCache.Done(pieceCid.String()) - }() - - // **Log CAR file and storage details** - Logger.Infow("Attempting to create piece reader", - "pieceCID", pieceCid.String(), - "carFile", pieceMetadata.Car, - "storageType", pieceMetadata.Storage.Type) + defer d.usageCache.Done(pieceCid.String()) - pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files) + // **Initialize Piece Reader - Corrected** + pieceReader, err := store.NewPieceReader( + c.Request().Context(), + pieceMetadata.Car, + pieceMetadata.Storage, + pieceMetadata.CarBlocks, + pieceMetadata.Files, + ) if err != nil { Logger.Errorw("Failed to create piece reader", "pieceCID", pieceCid.String(), "error", err) return c.String(http.StatusInternalServerError, "failed to create piece reader: "+err.Error()) } defer pieceReader.Close() - Logger.Infow("Serving content with increased buffer", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") - - contentprovider.SetCommonHeaders(c, pieceCid.String()) + Logger.Infow("Serving content", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") // **Use buffered writer (16MB buffer) to improve streaming performance** bufferedWriter := bufio.NewWriterSize(c.Response().Writer, 16*1024*1024) // 16MB buffer @@ -228,6 +194,7 @@ ServeContent: return nil } + func GetMetadata( ctx context.Context, api string, @@ -239,10 +206,24 @@ func GetMetadata( var lastErr error var lastStatusCode int + // Lock to ensure only one request is in-flight + archiveReqLock.Lock() + defer archiveReqLock.Unlock() + + // Wait if we are rate-limited + nextAllowedRequestMu.Lock() + waitTime := time.Until(nextAllowedRequestTime) + nextAllowedRequestMu.Unlock() + + if waitTime > 0 { + Logger.Warnw("Rate limit active, waiting", "pieceCID", pieceCid, "waitTime", waitTime) + time.Sleep(waitTime) + } + for attempt := 1; attempt <= maxRetries; attempt++ { Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid, "attempt", attempt) - // Set request timeout (increase timeout to 120s per attempt) + // Set request timeout (increase timeout per attempt) reqCtx, cancel := context.WithTimeout(ctx, 120*time.Second) defer cancel() @@ -262,21 +243,27 @@ func GetMetadata( } defer resp.Body.Close() - // Handle 429 Too Many Requests - if resp.StatusCode == http.StatusTooManyRequests { - retryAfter := resp.Header.Get("Retry-After") - waitTime := exponentialBackoff(attempt) // Default backoff - if retryAfter != "" { - parsedWait, err := strconv.Atoi(retryAfter) - if err == nil { - waitTime = time.Duration(parsedWait) * time.Second - } + // Handle 429 Too Many Requests + if resp.StatusCode == http.StatusTooManyRequests { + retryAfter := resp.Header.Get("Retry-After") + retryDuration := exponentialBackoff(attempt) // Default backoff + if retryAfter != "" { + parsedWait, err := strconv.Atoi(retryAfter) + if err == nil { + retryDuration = time.Duration(parsedWait) * time.Second } - Logger.Warnw("Rate limited (429), retrying after", "pieceCID", pieceCid, "retryAfter", waitTime) - time.Sleep(waitTime) - continue // Retry after waiting } + // Set global rate limit timer + nextAllowedRequestMu.Lock() + nextAllowedRequestTime = time.Now().Add(retryDuration) + nextAllowedRequestMu.Unlock() + + Logger.Warnw("Rate limited (429), retrying after", "pieceCID", pieceCid, "retryAfter", retryDuration) + time.Sleep(retryDuration) + continue // Retry after waiting + } + // Handle other errors if resp.StatusCode != http.StatusOK { Logger.Errorw("Metadata API returned error", "pieceCID", pieceCid, "statusCode", resp.StatusCode, "attempt", attempt)