Skip to content

Improve download-server behavior with internetarchive backend #461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: feat/fix-internetarchive-downloads
Choose a base branch
from
211 changes: 146 additions & 65 deletions service/downloadserver/downloadserver.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
package downloadserver

import (
"bufio"
"context"
"fmt"
"io"
"math/rand"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/data-preservation-programs/singularity/handler/storage"
"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/service"
"github.com/data-preservation-programs/singularity/service/contentprovider"
"github.com/data-preservation-programs/singularity/storagesystem"
"github.com/data-preservation-programs/singularity/store"
"github.com/data-preservation-programs/singularity/service/contentprovider"
"github.com/fxamacker/cbor/v2"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-log/v2"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/rjNemo/underscore"
)

const shutdownTimeout = 5 * time.Second

// Maximum retry attempts
const maxRetries = 60

// Initial wait duration before retrying (doubles each retry)
const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules

var archiveReqLock sync.Mutex // Ensures only 1 API request is active at a time
var nextAllowedRequestTime time.Time
var nextAllowedRequestMu sync.Mutex

type DownloadServer struct {
bind string
api string
config map[string]string
clientConfig model.ClientConfig
usageCache *UsageCache[contentprovider.PieceMetadata]

metadataCache sync.Map // Cache for ongoing metadata requests
}

type cacheItem[C any] struct {
Expand Down Expand Up @@ -109,113 +121,182 @@ func (c *UsageCache[C]) Done(key string) {
item.usageCount--
}

var streamLimit = make(chan struct{}, 1) // Only 1 concurrent stream

func (d *DownloadServer) handleGetPiece(c echo.Context) error {
streamLimit <- struct{}{} // Acquire semaphore (waits if limit reached)
defer func() { <-streamLimit }() // Release when done

id := c.Param("id")
pieceCid, err := cid.Parse(id)
if err != nil {
Logger.Errorw("Invalid piece CID", "id", id, "error", err)
return c.String(http.StatusBadRequest, "failed to parse piece CID: "+err.Error())
}

if pieceCid.Type() != cid.FilCommitmentUnsealed {
Logger.Warnw("Received invalid CID type", "id", id, "type", pieceCid.Type())
return c.String(http.StatusBadRequest, "CID is not a commp")
}

Logger.Infow("Processing request", "pieceCID", pieceCid.String())

// **Check usage cache first**
var pieceMetadata *contentprovider.PieceMetadata
var ok bool
pieceMetadata, ok = d.usageCache.Get(pieceCid.String())

if !ok {
Logger.Infow("Metadata not found in cache, checking for ongoing fetch", "pieceCID", pieceCid.String())

// **Fetch metadata**
var statusCode int
pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String())
if err != nil && statusCode >= 400 {
return c.String(statusCode, "failed to query metadata API: "+err.Error())
}

// **Handle errors**
if err != nil {
return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error())
Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err)
return c.String(statusCode, "failed to query metadata API: "+err.Error())
}

Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid.String())
d.usageCache.Set(pieceCid.String(), *pieceMetadata)
}
defer func() {
d.usageCache.Done(pieceCid.String())
}()
pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files)

defer d.usageCache.Done(pieceCid.String())

// **Initialize Piece Reader - Corrected**
pieceReader, err := store.NewPieceReader(
c.Request().Context(),
pieceMetadata.Car,
pieceMetadata.Storage,
pieceMetadata.CarBlocks,
pieceMetadata.Files,
)
if err != nil {
Logger.Errorw("Failed to create piece reader", "pieceCID", pieceCid.String(), "error", err)
return c.String(http.StatusInternalServerError, "failed to create piece reader: "+err.Error())
}
defer pieceReader.Close()
contentprovider.SetCommonHeaders(c, pieceCid.String())
http.ServeContent(
c.Response(),
c.Request(),
pieceCid.String()+".car",
pieceMetadata.Car.CreatedAt,
pieceReader,
)

Logger.Infow("Serving content", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car")

// **Use buffered writer (16MB buffer) to improve streaming performance**
bufferedWriter := bufio.NewWriterSize(c.Response().Writer, 16*1024*1024) // 16MB buffer
defer bufferedWriter.Flush()

_, err = io.Copy(bufferedWriter, pieceReader)
if err != nil {
Logger.Errorw("Error streaming content", "pieceCID", pieceCid.String(), "error", err)
return c.String(http.StatusInternalServerError, "Error streaming content: "+err.Error())
}

return nil
}


func GetMetadata(
ctx context.Context,
api string,
config map[string]string,
clientConfig model.ClientConfig,
pieceCid string) (*contentprovider.PieceMetadata, int, error) {
api = strings.TrimSuffix(api, "/")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, api+"/piece/metadata/"+pieceCid, nil)
if err != nil {
return nil, 0, errors.WithStack(err)
}

req.Header.Add("Accept", "application/cbor")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, 0, errors.WithStack(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, resp.StatusCode, errors.Errorf("failed to get metadata: %s", resp.Status)
}
var lastErr error
var lastStatusCode int

var pieceMetadata contentprovider.PieceMetadata
err = cbor.NewDecoder(resp.Body).Decode(&pieceMetadata)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to decode metadata")
}
// Lock to ensure only one request is in-flight
archiveReqLock.Lock()
defer archiveReqLock.Unlock()

cfg := make(map[string]string)
backend, ok := storagesystem.BackendMap[pieceMetadata.Storage.Type]
if !ok {
return nil, 0, errors.Newf("storage type %s is not supported", pieceMetadata.Storage.Type)
}
// Wait if we are rate-limited
nextAllowedRequestMu.Lock()
waitTime := time.Until(nextAllowedRequestTime)
nextAllowedRequestMu.Unlock()

prefix := pieceMetadata.Storage.Type + "-"
provider := pieceMetadata.Storage.Config["provider"]
providerOptions, err := underscore.Find(backend.ProviderOptions, func(providerOption storagesystem.ProviderOptions) bool {
return providerOption.Provider == provider
})
if err != nil {
return nil, 0, errors.Newf("provider '%s' is not supported", provider)
if waitTime > 0 {
Logger.Warnw("Rate limit active, waiting", "pieceCID", pieceCid, "waitTime", waitTime)
time.Sleep(waitTime)
}

for _, option := range providerOptions.Options {
if option.Default != nil {
cfg[option.Name] = fmt.Sprintf("%v", option.Default)
for attempt := 1; attempt <= maxRetries; attempt++ {
Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid, "attempt", attempt)

// Set request timeout (increase timeout per attempt)
reqCtx, cancel := context.WithTimeout(ctx, 120*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, api+"/piece/metadata/"+pieceCid, nil)
if err != nil {
Logger.Errorw("Failed to create request", "pieceCID", pieceCid, "error", err)
return nil, 0, errors.WithStack(err)
}

req.Header.Add("Accept", "application/cbor")
resp, err := http.DefaultClient.Do(req)
if err != nil {
Logger.Warnw("Failed to reach metadata API", "pieceCID", pieceCid, "attempt", attempt, "error", err)
lastErr = err
time.Sleep(exponentialBackoff(attempt))
continue // Retry
}
defer resp.Body.Close()

// Handle 429 Too Many Requests
if resp.StatusCode == http.StatusTooManyRequests {
retryAfter := resp.Header.Get("Retry-After")
retryDuration := exponentialBackoff(attempt) // Default backoff
if retryAfter != "" {
parsedWait, err := strconv.Atoi(retryAfter)
if err == nil {
retryDuration = time.Duration(parsedWait) * time.Second
}
}
}

for key, value := range pieceMetadata.Storage.Config {
cfg[key] = value
// Set global rate limit timer
nextAllowedRequestMu.Lock()
nextAllowedRequestTime = time.Now().Add(retryDuration)
nextAllowedRequestMu.Unlock()

Logger.Warnw("Rate limited (429), retrying after", "pieceCID", pieceCid, "retryAfter", retryDuration)
time.Sleep(retryDuration)
continue // Retry after waiting
}

for key, value := range config {
if strings.HasPrefix(key, prefix) {
trimmed := strings.TrimPrefix(key, prefix)
snake := strings.ReplaceAll(trimmed, "-", "_")
cfg[snake] = value
// Handle other errors
if resp.StatusCode != http.StatusOK {
Logger.Errorw("Metadata API returned error", "pieceCID", pieceCid, "statusCode", resp.StatusCode, "attempt", attempt)
lastStatusCode = resp.StatusCode
lastErr = errors.Errorf("failed to get metadata: %s", resp.Status)
time.Sleep(exponentialBackoff(attempt))
continue // Retry
}

// Decode metadata response
var pieceMetadata contentprovider.PieceMetadata
err = cbor.NewDecoder(resp.Body).Decode(&pieceMetadata)
if err != nil {
Logger.Errorw("Failed to decode metadata", "pieceCID", pieceCid, "attempt", attempt, "error", err)
lastErr = errors.Wrap(err, "failed to decode metadata")
time.Sleep(exponentialBackoff(attempt))
continue // Retry
}

Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid)
return &pieceMetadata, 0, nil
}

pieceMetadata.Storage.Config = cfg
storage.OverrideStorageWithClientConfig(&pieceMetadata.Storage, clientConfig)
return &pieceMetadata, 0, nil
// If all retries fail, return the last error
Logger.Errorw("All metadata fetch attempts failed", "pieceCID", pieceCid, "error", lastErr)
return nil, lastStatusCode, lastErr
}

// Exponential backoff function (2^attempt with jitter) - Ensure only one definition exists
func exponentialBackoff(attempt int) time.Duration {
baseDelay := time.Duration(1<<attempt) * initialBackoff
jitter := time.Duration(rand.Intn(500)) * time.Millisecond
return baseDelay + jitter
}

func (d *DownloadServer) Start(ctx context.Context, exitErr chan<- error) error {
Expand Down
33 changes: 27 additions & 6 deletions storagesystem/rclone.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storagesystem

import (
"bufio"
"bytes"
"context"
"io"
Expand Down Expand Up @@ -199,42 +200,59 @@ func (r *readerWithRetry) Read(p []byte) (int, error) {
r.reader, err2 = r.object.Open(r.ctx, &fs.SeekOption{Offset: r.offset})
if err2 != nil {
return n, errors.Join(err, err2)
logger.Warnf("Read error: %s, retrying after 5s", err)
time.Sleep(5 * time.Second) // 🔥 Add delay before retrying 🔥
}
return n, nil
}

func (h RCloneHandler) Read(ctx context.Context, path string, offset int64, length int64) (io.ReadCloser, fs.Object, error) {
logger.Debugw("Read: reading path", "type", h.fs.Name(), "root", h.fs.Root(), "path", path, "offset", offset, "length", length)

if length == 0 {
object, err := h.fs.NewObject(ctx, path)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to open object %s", path)
}
return io.NopCloser(bytes.NewReader(nil)), object, nil
}

// Fetch object from rclone storage
object, err := h.fsNoHead.NewObject(ctx, path)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to open object %s", path)
}

option := &fs.SeekOption{Offset: offset}

// Open the object for reading
reader, err := object.Open(ctx, option)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to open object stream %s", path)
}

// Apply a 256MB buffer & wrap it in io.NopCloser to add a Close method
bufferedReader := io.NopCloser(bufio.NewReaderSize(reader, 256*1024*1024)) // ✅ Wrapped with io.NopCloser

readerWithRetry := &readerWithRetry{
ctx: ctx,
object: object,
reader: reader,
reader: bufferedReader, // ✅ Now implements io.ReadCloser
offset: offset,
retryDelay: h.retryDelay,
retryBackoff: h.retryBackoff,
retryCountMax: h.retryMaxCount,
retryBackoffExponential: h.retryBackoffExponential,
}

if length < 0 {
return readerWithRetry, object, errors.WithStack(err)
return readerWithRetry, object, nil
}

return readCloser{
Reader: io.LimitReader(readerWithRetry, length),
Closer: readerWithRetry,
}, object, errors.WithStack(err)
}, object, nil
}

func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, error) {
Expand All @@ -247,6 +265,7 @@ func NewRCloneHandler(ctx context.Context, s model.Storage) (*RCloneHandler, err
ctx, _ = fs.AddConfig(ctx)
config := fs.GetConfig(ctx)
overrideConfig(config, s)
config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming

noHeadObjectConfig := make(map[string]string)
headObjectConfig := make(map[string]string)
Expand Down Expand Up @@ -345,7 +364,9 @@ func overrideConfig(config *fs.ConfigInfo, s model.Storage) {
if s.ClientConfig.UseServerModTime != nil {
config.UseServerModTime = *s.ClientConfig.UseServerModTime
}
if s.ClientConfig.LowLevelRetries != nil {
config.LowLevelRetries = *s.ClientConfig.LowLevelRetries
}
config.Transfers = 1 // Only 1 file download at a time
config.Checkers = 1 // Only 1 checker (avoids excessive HTTP requests)
config.LowLevelRetries = 10
config.MaxBacklog = 1
config.BufferSize = 256 * 1024 * 1024 // 256MB buffer for smoother streaming
}
Loading