Skip to content

refactor: upload file to server directly #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: multipart-upload
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions source/ftrack_api/accessor/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import hashlib
import base64
from typing import BinaryIO

import requests

Expand All @@ -23,15 +24,16 @@ def __init__(self, resource_identifier, session, mode="rb"):
self.resource_identifier = resource_identifier
self._session = session
self._has_read = False
self._has_uploaded = False

super(ServerFile, self).__init__()

def flush(self):
"""Flush all changes."""
super(ServerFile, self).flush()

if self.mode == "wb":
self._write()
if not self._has_uploaded and self.mode == "wb":
self._flush_to_server()

def read(self, limit=None):
"""Read file."""
Expand All @@ -41,6 +43,12 @@ def read(self, limit=None):

return super(ServerFile, self).read(limit)

def write(self, content):
"""Write *content* to file."""
assert self._has_uploaded is False, "Cannot write to file after upload."

return super().write(content)

def _read(self):
"""Read all remote content from key into wrapped_file."""
position = self.tell()
Expand All @@ -66,13 +74,22 @@ def _read(self):
for block in response.iter_content(ftrack_api.symbol.CHUNK_SIZE):
self.wrapped_file.write(block)

self.flush()
self.seek(position)
self._has_uploaded = False

def _write(self):
def _flush_to_server(self):
"""Write current data to remote key."""
position = self.tell()

self.upload_to_server(self.wrapped_file)

self.seek(position)

def upload_to_server(self, source_file: "BinaryIO"):
"""
Direct upload source to server.
Use with caution, it will forbid any further write operation until you read the file.
"""
# Retrieve component from cache to construct a filename.
component = self._session.get("FileComponent", self.resource_identifier)
if not component:
Expand All @@ -92,28 +109,29 @@ def _write(self):
self._session,
component_id=self.resource_identifier,
file_name=name,
file_size=self._get_size(),
file=self.wrapped_file,
checksum=self._compute_checksum(),
file_size=self._get_size(source_file),
file=source_file,
checksum=self._compute_checksum(source_file),
)
uploader.start()
except Exception as error:
raise ftrack_api.exception.AccessorOperationFailedError(
"Failed to put file to server: {0}.".format(error)
)

self.seek(position)
self._has_uploaded = True

def _get_size(self):
@staticmethod
def _get_size(file: "BinaryIO"):
"""Return size of file in bytes."""
position = self.tell()
length = self.seek(0, os.SEEK_END)
self.seek(position)
position = file.tell()
length = file.seek(0, os.SEEK_END)
file.seek(position)
return length

def _compute_checksum(self):
@staticmethod
def _compute_checksum(fp: "BinaryIO"):
"""Return checksum for file."""
fp = self.wrapped_file
buf_size = ftrack_api.symbol.CHUNK_SIZE
hash_obj = hashlib.md5()
spos = fp.tell()
Expand Down
22 changes: 15 additions & 7 deletions source/ftrack_api/entity/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import collections.abc
import functools

import ftrack_api.data
import ftrack_api.accessor.server
import ftrack_api.entity.base
import ftrack_api.exception
import ftrack_api.event.base
Expand Down Expand Up @@ -346,13 +348,19 @@ def _add_data(self, component, resource_identifier, source):
)
target_data = self.accessor.open(resource_identifier, "wb")

# Read/write data in chunks to avoid reading all into memory at the
# same time.
chunked_read = functools.partial(
source_data.read, ftrack_api.symbol.CHUNK_SIZE
)
for chunk in iter(chunked_read, b""):
target_data.write(chunk)
if isinstance(source_data, ftrack_api.data.File) and isinstance(
target_data, ftrack_api.accessor.server.ServerFile
):
# If source is a file and target is a server, use the server's upload method
target_data.upload_to_server(source_data.wrapped_file)
else:
# Read/write data in chunks to avoid reading all into memory at the
# same time.
chunked_read = functools.partial(
source_data.read, ftrack_api.symbol.CHUNK_SIZE
)
for chunk in iter(chunked_read, b""):
target_data.write(chunk)

target_data.close()
source_data.close()
Expand Down
4 changes: 2 additions & 2 deletions source/ftrack_api/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import math
import os
from typing import IO, Awaitable, Callable, TYPE_CHECKING, List, Optional
from typing import BinaryIO, Awaitable, Callable, TYPE_CHECKING, List, Optional
import typing
import anyio

Expand Down Expand Up @@ -66,7 +66,7 @@ def __init__(
component_id: str,
file_name: str,
file_size: int,
file: "IO",
file: "BinaryIO",
checksum: Optional[str],
):
self.session = session
Expand Down
Loading