diff --git a/service/downloadserver/downloadserver.go b/service/downloadserver/downloadserver.go index 2056987a..1806c247 100644 --- a/service/downloadserver/downloadserver.go +++ b/service/downloadserver/downloadserver.go @@ -1,36 +1,48 @@ package downloadserver import ( + "bufio" "context" - "fmt" + "io" + "math/rand" "net/http" + "strconv" "strings" "sync" "time" "github.com/cockroachdb/errors" - "github.com/data-preservation-programs/singularity/handler/storage" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/service" - "github.com/data-preservation-programs/singularity/service/contentprovider" - "github.com/data-preservation-programs/singularity/storagesystem" "github.com/data-preservation-programs/singularity/store" + "github.com/data-preservation-programs/singularity/service/contentprovider" "github.com/fxamacker/cbor/v2" "github.com/ipfs/go-cid" "github.com/ipfs/go-log/v2" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" - "github.com/rjNemo/underscore" ) const shutdownTimeout = 5 * time.Second +// Maximum retry attempts +const maxRetries = 60 + +// Initial wait duration before retrying (doubles each retry) +const initialBackoff = 120 * time.Second // Relax for Fail2Ban rules + +var archiveReqLock sync.Mutex // Ensures only 1 API request is active at a time +var nextAllowedRequestTime time.Time +var nextAllowedRequestMu sync.Mutex + type DownloadServer struct { bind string api string config map[string]string clientConfig model.ClientConfig usageCache *UsageCache[contentprovider.PieceMetadata] + + metadataCache sync.Map // Cache for ongoing metadata requests } type cacheItem[C any] struct { @@ -109,49 +121,80 @@ func (c *UsageCache[C]) Done(key string) { item.usageCount-- } +var streamLimit = make(chan struct{}, 1) // Only 1 concurrent stream + func (d *DownloadServer) handleGetPiece(c echo.Context) error { + streamLimit <- struct{}{} // Acquire semaphore (waits if limit reached) + defer func() { <-streamLimit }() // Release when done + id := c.Param("id") pieceCid, err := cid.Parse(id) if err != nil { + Logger.Errorw("Invalid piece CID", "id", id, "error", err) return c.String(http.StatusBadRequest, "failed to parse piece CID: "+err.Error()) } + if pieceCid.Type() != cid.FilCommitmentUnsealed { + Logger.Warnw("Received invalid CID type", "id", id, "type", pieceCid.Type()) return c.String(http.StatusBadRequest, "CID is not a commp") } + + Logger.Infow("Processing request", "pieceCID", pieceCid.String()) + + // **Check usage cache first** var pieceMetadata *contentprovider.PieceMetadata var ok bool pieceMetadata, ok = d.usageCache.Get(pieceCid.String()) + if !ok { + Logger.Infow("Metadata not found in cache, checking for ongoing fetch", "pieceCID", pieceCid.String()) + + // **Fetch metadata** var statusCode int pieceMetadata, statusCode, err = GetMetadata(c.Request().Context(), d.api, d.config, d.clientConfig, pieceCid.String()) - if err != nil && statusCode >= 400 { - return c.String(statusCode, "failed to query metadata API: "+err.Error()) - } + + // **Handle errors** if err != nil { - return c.String(http.StatusInternalServerError, "failed to query metadata API: "+err.Error()) + Logger.Errorw("Failed to query metadata API", "pieceCID", pieceCid.String(), "statusCode", statusCode, "error", err) + return c.String(statusCode, "failed to query metadata API: "+err.Error()) } + + Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid.String()) d.usageCache.Set(pieceCid.String(), *pieceMetadata) } - defer func() { - d.usageCache.Done(pieceCid.String()) - }() - pieceReader, err := store.NewPieceReader(c.Request().Context(), pieceMetadata.Car, pieceMetadata.Storage, pieceMetadata.CarBlocks, pieceMetadata.Files) + + defer d.usageCache.Done(pieceCid.String()) + + // **Initialize Piece Reader - Corrected** + pieceReader, err := store.NewPieceReader( + c.Request().Context(), + pieceMetadata.Car, + pieceMetadata.Storage, + pieceMetadata.CarBlocks, + pieceMetadata.Files, + ) if err != nil { + Logger.Errorw("Failed to create piece reader", "pieceCID", pieceCid.String(), "error", err) return c.String(http.StatusInternalServerError, "failed to create piece reader: "+err.Error()) } defer pieceReader.Close() - contentprovider.SetCommonHeaders(c, pieceCid.String()) - http.ServeContent( - c.Response(), - c.Request(), - pieceCid.String()+".car", - pieceMetadata.Car.CreatedAt, - pieceReader, - ) + + Logger.Infow("Serving content", "pieceCID", pieceCid.String(), "filename", pieceCid.String()+".car") + + // **Use buffered writer (16MB buffer) to improve streaming performance** + bufferedWriter := bufio.NewWriterSize(c.Response().Writer, 16*1024*1024) // 16MB buffer + defer bufferedWriter.Flush() + + _, err = io.Copy(bufferedWriter, pieceReader) + if err != nil { + Logger.Errorw("Error streaming content", "pieceCID", pieceCid.String(), "error", err) + return c.String(http.StatusInternalServerError, "Error streaming content: "+err.Error()) + } return nil } + func GetMetadata( ctx context.Context, api string, @@ -159,63 +202,101 @@ func GetMetadata( clientConfig model.ClientConfig, pieceCid string) (*contentprovider.PieceMetadata, int, error) { api = strings.TrimSuffix(api, "/") - req, err := http.NewRequestWithContext(ctx, http.MethodGet, api+"/piece/metadata/"+pieceCid, nil) - if err != nil { - return nil, 0, errors.WithStack(err) - } - req.Header.Add("Accept", "application/cbor") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, 0, errors.WithStack(err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return nil, resp.StatusCode, errors.Errorf("failed to get metadata: %s", resp.Status) - } + var lastErr error + var lastStatusCode int - var pieceMetadata contentprovider.PieceMetadata - err = cbor.NewDecoder(resp.Body).Decode(&pieceMetadata) - if err != nil { - return nil, 0, errors.Wrap(err, "failed to decode metadata") - } + // Lock to ensure only one request is in-flight + archiveReqLock.Lock() + defer archiveReqLock.Unlock() - cfg := make(map[string]string) - backend, ok := storagesystem.BackendMap[pieceMetadata.Storage.Type] - if !ok { - return nil, 0, errors.Newf("storage type %s is not supported", pieceMetadata.Storage.Type) - } + // Wait if we are rate-limited + nextAllowedRequestMu.Lock() + waitTime := time.Until(nextAllowedRequestTime) + nextAllowedRequestMu.Unlock() - prefix := pieceMetadata.Storage.Type + "-" - provider := pieceMetadata.Storage.Config["provider"] - providerOptions, err := underscore.Find(backend.ProviderOptions, func(providerOption storagesystem.ProviderOptions) bool { - return providerOption.Provider == provider - }) - if err != nil { - return nil, 0, errors.Newf("provider '%s' is not supported", provider) + if waitTime > 0 { + Logger.Warnw("Rate limit active, waiting", "pieceCID", pieceCid, "waitTime", waitTime) + time.Sleep(waitTime) } - for _, option := range providerOptions.Options { - if option.Default != nil { - cfg[option.Name] = fmt.Sprintf("%v", option.Default) + for attempt := 1; attempt <= maxRetries; attempt++ { + Logger.Infow("Fetching metadata from API", "pieceCID", pieceCid, "attempt", attempt) + + // Set request timeout (increase timeout per attempt) + reqCtx, cancel := context.WithTimeout(ctx, 120*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, api+"/piece/metadata/"+pieceCid, nil) + if err != nil { + Logger.Errorw("Failed to create request", "pieceCID", pieceCid, "error", err) + return nil, 0, errors.WithStack(err) + } + + req.Header.Add("Accept", "application/cbor") + resp, err := http.DefaultClient.Do(req) + if err != nil { + Logger.Warnw("Failed to reach metadata API", "pieceCID", pieceCid, "attempt", attempt, "error", err) + lastErr = err + time.Sleep(exponentialBackoff(attempt)) + continue // Retry + } + defer resp.Body.Close() + + // Handle 429 Too Many Requests + if resp.StatusCode == http.StatusTooManyRequests { + retryAfter := resp.Header.Get("Retry-After") + retryDuration := exponentialBackoff(attempt) // Default backoff + if retryAfter != "" { + parsedWait, err := strconv.Atoi(retryAfter) + if err == nil { + retryDuration = time.Duration(parsedWait) * time.Second + } } - } - for key, value := range pieceMetadata.Storage.Config { - cfg[key] = value + // Set global rate limit timer + nextAllowedRequestMu.Lock() + nextAllowedRequestTime = time.Now().Add(retryDuration) + nextAllowedRequestMu.Unlock() + + Logger.Warnw("Rate limited (429), retrying after", "pieceCID", pieceCid, "retryAfter", retryDuration) + time.Sleep(retryDuration) + continue // Retry after waiting } - for key, value := range config { - if strings.HasPrefix(key, prefix) { - trimmed := strings.TrimPrefix(key, prefix) - snake := strings.ReplaceAll(trimmed, "-", "_") - cfg[snake] = value + // Handle other errors + if resp.StatusCode != http.StatusOK { + Logger.Errorw("Metadata API returned error", "pieceCID", pieceCid, "statusCode", resp.StatusCode, "attempt", attempt) + lastStatusCode = resp.StatusCode + lastErr = errors.Errorf("failed to get metadata: %s", resp.Status) + time.Sleep(exponentialBackoff(attempt)) + continue // Retry + } + + // Decode metadata response + var pieceMetadata contentprovider.PieceMetadata + err = cbor.NewDecoder(resp.Body).Decode(&pieceMetadata) + if err != nil { + Logger.Errorw("Failed to decode metadata", "pieceCID", pieceCid, "attempt", attempt, "error", err) + lastErr = errors.Wrap(err, "failed to decode metadata") + time.Sleep(exponentialBackoff(attempt)) + continue // Retry } + + Logger.Infow("Successfully fetched metadata", "pieceCID", pieceCid) + return &pieceMetadata, 0, nil } - pieceMetadata.Storage.Config = cfg - storage.OverrideStorageWithClientConfig(&pieceMetadata.Storage, clientConfig) - return &pieceMetadata, 0, nil + // If all retries fail, return the last error + Logger.Errorw("All metadata fetch attempts failed", "pieceCID", pieceCid, "error", lastErr) + return nil, lastStatusCode, lastErr +} + +// Exponential backoff function (2^attempt with jitter) - Ensure only one definition exists +func exponentialBackoff(attempt int) time.Duration { + baseDelay := time.Duration(1<