Skip to content

Commit 704f6cc

Browse files
authored
Merge pull request #360 from qiniu/features/resumeable-upload
resumable upload v2
2 parents 73619fa + 6921472 commit 704f6cc

File tree

5 files changed

+319
-21
lines changed

5 files changed

+319
-21
lines changed

src/Qiniu/Storage/ResumeUploader.php

+226-14
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,14 @@ final class ResumeUploader
2222
private $params;
2323
private $mime;
2424
private $contexts;
25+
private $finishedEtags;
2526
private $host;
27+
private $bucket;
2628
private $currentUrl;
2729
private $config;
30+
private $resumeRecordFile;
31+
private $version;
32+
private $partSize;
2833

2934
/**
3035
* 上传二进制流到七牛
@@ -36,6 +41,9 @@ final class ResumeUploader
3641
* @param string $params 自定义变量
3742
* @param string $mime 上传数据的mimeType
3843
* @param string $config
44+
* @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件
45+
* @param string $version 分片上传版本 目前支持v1/v2版本 默认v1
46+
* @param string $partSize 分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
3947
*
4048
* @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
4149
*/
@@ -46,7 +54,10 @@ public function __construct(
4654
$size,
4755
$params,
4856
$mime,
49-
$config
57+
$config,
58+
$resumeRecordFile = null,
59+
$version = 'v1',
60+
$partSize = config::BLOCK_SIZE
5061
) {
5162

5263
$this->upToken = $upToken;
@@ -56,9 +67,14 @@ public function __construct(
5667
$this->params = $params;
5768
$this->mime = $mime;
5869
$this->contexts = array();
70+
$this->finishedEtags = array("etags"=>array(), "uploadId"=>"", "expiredAt"=>0, "uploaded"=>0);
5971
$this->config = $config;
72+
$this->resumeRecordFile = $resumeRecordFile ? $resumeRecordFile : null;
73+
$this->version = $version ? $version : 'v1';
74+
$this->partSize = $partSize ? $partSize : config::BLOCK_SIZE;
6075

6176
list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($upToken);
77+
$this->bucket = $bucket;
6278
if ($err != null) {
6379
return array(null, $err);
6480
}
@@ -76,14 +92,88 @@ public function __construct(
7692
public function upload($fname)
7793
{
7894
$uploaded = 0;
95+
if ($this->version == 'v2') {
96+
$partNumber = 1;
97+
$encodedObjectName = $this->key? \Qiniu\base64_urlSafeEncode($this->key) : '~';
98+
};
99+
// get upload record from resumeRecordFile
100+
if ($this->resumeRecordFile != null) {
101+
$blkputRets = null;
102+
if (file_exists($this->resumeRecordFile)) {
103+
$stream = fopen($this->resumeRecordFile, 'r');
104+
if ($stream) {
105+
$streamLen = filesize($this->resumeRecordFile);
106+
if ($streamLen > 0) {
107+
$contents = fread($stream, $streamLen);
108+
fclose($stream);
109+
if ($contents) {
110+
$blkputRets = json_decode($contents, true);
111+
if ($blkputRets === null) {
112+
error_log("resumeFile contents decode error");
113+
}
114+
} else {
115+
error_log("read resumeFile failed");
116+
}
117+
} else {
118+
error_log("resumeFile is empty");
119+
}
120+
} else {
121+
error_log("resumeFile open failed");
122+
}
123+
} else {
124+
error_log("resumeFile not exists");
125+
}
126+
127+
if ($blkputRets) {
128+
if ($this->version == 'v1') {
129+
if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) &&
130+
is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded'])) {
131+
$this->contexts = $blkputRets['contexts'];
132+
$uploaded = $blkputRets['uploaded'];
133+
}
134+
} elseif ($this->version == 'v2') {
135+
if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) &&
136+
isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time()
137+
&& $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) &&
138+
is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"])) {
139+
$this->finishedEtags['etags'] = $blkputRets["etags"];
140+
$this->finishedEtags["uploadId"] = $blkputRets["uploadId"];
141+
$this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"];
142+
$this->finishedEtags["uploaded"] = $blkputRets["uploaded"];
143+
$uploaded = $blkputRets["uploaded"];
144+
$partNumber = count($this->finishedEtags["etags"]) + 1;
145+
} else {
146+
$this->makeInitReq($encodedObjectName);
147+
}
148+
} else {
149+
throw new \Exception("only support v1/v2 now!");
150+
}
151+
} else {
152+
if ($this->version == 'v2') {
153+
$this->makeInitReq($encodedObjectName);
154+
}
155+
}
156+
} else {
157+
// init a Multipart Upload task if choose v2
158+
if ($this->version == 'v2') {
159+
$this->makeInitReq($encodedObjectName);
160+
}
161+
}
162+
79163
while ($uploaded < $this->size) {
80164
$blockSize = $this->blockSize($uploaded);
81165
$data = fread($this->inputStream, $blockSize);
82166
if ($data === false) {
83167
throw new \Exception("file read failed", 1);
84168
}
85-
$crc = \Qiniu\crc32_data($data);
86-
$response = $this->makeBlock($data, $blockSize);
169+
if ($this->version == 'v1') {
170+
$crc = \Qiniu\crc32_data($data);
171+
$response = $this->makeBlock($data, $blockSize);
172+
} else {
173+
$md5 = md5($data);
174+
$response = $this->uploadPart($data, $partNumber, $this->finishedEtags["uploadId"], $encodedObjectName);
175+
}
176+
87177
$ret = null;
88178
if ($response->ok() && $response->json() != null) {
89179
$ret = $response->json();
@@ -93,22 +183,69 @@ public function upload($fname)
93183
if ($err != null) {
94184
return array(null, $err);
95185
}
96-
97186
$upHostBackup = $this->config->getUpBackupHost($accessKey, $bucket);
98187
$this->host = $upHostBackup;
99188
}
100-
if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
101-
$response = $this->makeBlock($data, $blockSize);
102-
$ret = $response->json();
103-
}
104189

105-
if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
106-
return array(null, new Error($this->currentUrl, $response));
190+
if ($this->version == 'v1') {
191+
if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
192+
$response = $this->makeBlock($data, $blockSize);
193+
$ret = $response->json();
194+
}
195+
196+
if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
197+
return array(null, new Error($this->currentUrl, $response));
198+
}
199+
array_push($this->contexts, $ret['ctx']);
200+
} else {
201+
if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) {
202+
$response = $this->uploadPart(
203+
$data,
204+
$partNumber,
205+
$this->finishedEtags["uploadId"],
206+
$encodedObjectName
207+
);
208+
$ret = $response->json();
209+
}
210+
211+
if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) {
212+
return array(null, new Error($this->currentUrl, $response));
213+
}
214+
$blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber);
215+
array_push($this->finishedEtags['etags'], $blockStatus);
216+
$partNumber += 1;
107217
}
108-
array_push($this->contexts, $ret['ctx']);
218+
109219
$uploaded += $blockSize;
220+
if ($this->version == 'v2') {
221+
$this->finishedEtags['uploaded'] = $uploaded;
222+
}
223+
224+
if ($this->resumeRecordFile !== null) {
225+
if ($this->version == 'v1') {
226+
$recordData = array(
227+
'contexts' => $this->contexts,
228+
'uploaded' => $uploaded
229+
);
230+
$recordData = json_encode($recordData);
231+
} else {
232+
$recordData = json_encode($this->finishedEtags);
233+
}
234+
if ($recordData) {
235+
$isWritten = file_put_contents($this->resumeRecordFile, $recordData);
236+
if ($isWritten === false) {
237+
error_log("write resumeRecordFile failed");
238+
}
239+
} else {
240+
error_log('resumeRecordData encode failed');
241+
}
242+
}
243+
}
244+
if ($this->version == 'v1') {
245+
return $this->makeFile($fname);
246+
} else {
247+
return $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName);
110248
}
111-
return $this->makeFile($fname);
112249
}
113250

114251
/**
@@ -163,9 +300,84 @@ private function post($url, $data)
163300

164301
private function blockSize($uploaded)
165302
{
166-
if ($this->size < $uploaded + Config::BLOCK_SIZE) {
303+
if ($this->size < $uploaded + $this->partSize) {
167304
return $this->size - $uploaded;
168305
}
169-
return Config::BLOCK_SIZE;
306+
return $this->partSize;
307+
}
308+
309+
private function makeInitReq($encodedObjectName)
310+
{
311+
$res = $this->initReq($encodedObjectName);
312+
$this->finishedEtags["uploadId"] = $res['uploadId'];
313+
$this->finishedEtags["expiredAt"] = $res['expireAt'];
314+
}
315+
316+
/**
317+
* 初始化上传任务
318+
*/
319+
private function initReq($encodedObjectName)
320+
{
321+
$url = $this->host.'/buckets/'.$this->bucket.'/objects/'.$encodedObjectName.'/uploads';
322+
$headers = array(
323+
'Authorization' => 'UpToken ' . $this->upToken,
324+
'Content-Type' => 'application/json'
325+
);
326+
$response = $this->postWithHeaders($url, null, $headers);
327+
return $response->json();
328+
}
329+
330+
/**
331+
* 分块上传v2
332+
*/
333+
private function uploadPart($block, $partNumber, $uploadId, $encodedObjectName)
334+
{
335+
$headers = array(
336+
'Authorization' => 'UpToken ' . $this->upToken,
337+
'Content-Type' => 'application/octet-stream',
338+
'Content-MD5' => $block
339+
);
340+
$url = $this->host.'/buckets/'.$this->bucket.'/objects/'.$encodedObjectName.
341+
'/uploads/'.$uploadId.'/'.$partNumber;
342+
$response = $this->put($url, $block, $headers);
343+
return $response;
344+
}
345+
346+
private function completeParts($fname, $uploadId, $encodedObjectName)
347+
{
348+
$headers = array(
349+
'Authorization' => 'UpToken '.$this->upToken,
350+
'Content-Type' => 'application/json'
351+
);
352+
$etags = $this->finishedEtags['etags'];
353+
$sortedEtags = \Qiniu\arraySort($etags, 'partNumber');
354+
$body = array(
355+
'fname' => $fname,
356+
'$mimeType' => $this->mime,
357+
'customVars' => $this->params,
358+
'parts' => $sortedEtags
359+
);
360+
$jsonBody = json_encode($body);
361+
$url = $this->host.'/buckets/'.$this->bucket.'/objects/'.$encodedObjectName.'/uploads/'.$uploadId;
362+
$response = $this->postWithHeaders($url, $jsonBody, $headers);
363+
if ($response->needRetry()) {
364+
$response = $this->postWithHeaders($url, $jsonBody, $headers);
365+
}
366+
if (!$response->ok()) {
367+
return array(null, new Error($this->currentUrl, $response));
368+
}
369+
return array($response->json(), null);
370+
}
371+
372+
private function put($url, $data, $headers)
373+
{
374+
$this->currentUrl = $url;
375+
return Client::put($url, $data, $headers);
376+
}
377+
378+
private function postWithHeaders($url, $data, $headers)
379+
{
380+
$this->currentUrl = $url;
381+
return Client::post($url, $data, $headers);
170382
}
171383
}

src/Qiniu/Storage/UploadManager.php

+11-3
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ public function put(
7272
* http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
7373
* @param $mime 上传数据的mimeType
7474
* @param $checkCrc 是否校验crc32
75-
*
75+
* @param $version 分片上传版本 目前支持v1/v2版本 默认v1
76+
* @param $partSize 分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
77+
* @param $resumeRecordFile 断点续传文件路径 默认为null
7678
* @return array 包含已上传文件的信息,类似:
7779
* [
7880
* "hash" => "<Hash string>",
@@ -85,7 +87,10 @@ public function putFile(
8587
$filePath,
8688
$params = null,
8789
$mime = 'application/octet-stream',
88-
$checkCrc = false
90+
$checkCrc = false,
91+
$resumeRecordFile = null,
92+
$version = 'v1',
93+
$partSize = config::BLOCK_SIZE
8994
) {
9095

9196
$file = fopen($filePath, 'rb');
@@ -119,7 +124,10 @@ public function putFile(
119124
$size,
120125
$params,
121126
$mime,
122-
$this->config
127+
$this->config,
128+
$resumeRecordFile,
129+
$version,
130+
$partSize
123131
);
124132
$ret = $up->upload(basename($filePath));
125133
fclose($file);

src/Qiniu/functions.php

+17
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,23 @@ function base64_urlSafeDecode($str)
6464
return base64_decode(str_replace($find, $replace, $str));
6565
}
6666

67+
/**
68+
* 二维数组根据某个字段排序
69+
* @param array $array 要排序的数组
70+
* @param string $key 要排序的键
71+
* @param string $sort 排序类型 SORT_ASC SORT_DESC
72+
* return array 排序后的数组
73+
*/
74+
function arraySort($array, $key, $sort = SORT_ASC)
75+
{
76+
$keysValue = array();
77+
foreach ($array as $k => $v) {
78+
$keysValue[$k] = $v[$key];
79+
}
80+
array_multisort($keysValue, $sort, $array);
81+
return $array;
82+
}
83+
6784
/**
6885
* Wrapper for JSON decode that implements error detection with helpful
6986
* error messages.

0 commit comments

Comments
 (0)