Skip to content

Commit

Permalink
Fix file upload race condition that might occur during reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
danog committed Sep 23, 2023
1 parent 223f8a7 commit 0a8e41a
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 50 deletions.
8 changes: 7 additions & 1 deletion src/FileCallback.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

/**
* File callback interface.
*
* @template TT
*
* @implements FileCallbackInterface<TT>
*/
final class FileCallback implements FileCallbackInterface
{
Expand All @@ -34,7 +38,7 @@ final class FileCallback implements FileCallbackInterface
/**
* Construct file callback.
*
* @param mixed $file File to download/upload
* @param TT $file File to download/upload
* @param callable(float, float, float) $callback Callback
*/
public function __construct(public readonly mixed $file, callable $callback)
Expand All @@ -43,6 +47,8 @@ public function __construct(public readonly mixed $file, callable $callback)
}
/**
* Get file.
*
* @return TT
*/
public function getFile(): mixed
{
Expand Down
4 changes: 4 additions & 0 deletions src/FileCallbackInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@

/**
* File callback interface.
*
* @template T
*/
interface FileCallbackInterface
{
/**
* Get file.
*
* @return T
*/
public function getFile(): mixed;
/**
Expand Down
14 changes: 7 additions & 7 deletions src/InternalDoc.php
Original file line number Diff line number Diff line change
Expand Up @@ -1956,13 +1956,13 @@ public function uploadEncrypted(\danog\MadelineProto\FileCallbackInterface|array
* The callable must accept two parameters: int $offset, int $size
* The callable must return a string with the contest of the file at the specified offset and size.
*
* @param mixed $callable Callable
* @param integer $size File size
* @param string $mime Mime type
* @param string $fileName File name
* @param callable $cb Callback
* @param boolean $seekable Whether chunks can be fetched out of order
* @param boolean $encrypted Whether to encrypt file for secret chats
* @param callable(int, int): string $callable Callable (offset, length) => data
* @param integer $size File size
* @param string $mime Mime type
* @param string $fileName File name
* @param callable(float, float, float): void $cb Status callback
* @param boolean $seekable Whether chunks can be fetched out of order
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return array InputFile constructor
*/
Expand Down
5 changes: 2 additions & 3 deletions src/Ipc/Wrapper/FileCallback.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

/**
* @internal
*
* @implements FileCallbackInterface<mixed>
*/
final class FileCallback extends Obj implements FileCallbackInterface
{
/**
* Get file.
*/
public function getFile(): mixed
{
return $this->__call('getFile');
Expand Down
59 changes: 25 additions & 34 deletions src/MTProtoTools/Files.php
Original file line number Diff line number Diff line change
Expand Up @@ -187,25 +187,18 @@ public function uploadFromUrl(string|FileCallbackInterface $url, int $size = 0,
* The callable must accept two parameters: int $offset, int $size
* The callable must return a string with the contest of the file at the specified offset and size.
*
* @param mixed $callable Callable
* @param integer $size File size
* @param string $mime Mime type
* @param string $fileName File name
* @param callable $cb Callback
* @param boolean $seekable Whether chunks can be fetched out of order
* @param boolean $encrypted Whether to encrypt file for secret chats
* @param callable(int, int): string $callable Callable (offset, length) => data
* @param integer $size File size
* @param string $mime Mime type
* @param string $fileName File name
* @param callable(float, float, float): void $cb Status callback
* @param boolean $seekable Whether chunks can be fetched out of order
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return array InputFile constructor
*/
public function uploadFromCallable(callable $callable, int $size = 0, string $mime = 'application/octet-stream', string $fileName = '', ?callable $cb = null, bool $seekable = true, bool $encrypted = false): array
{
if (\is_object($callable) && $callable instanceof FileCallbackInterface) {
$cb = $callable;
$callable = $callable->getFile();
}
if (!\is_callable($callable)) {
throw new Exception('Invalid callable provided');
}
if ($cb === null) {
$cb = function (float $percent, float $speed, float $time): void {
$this->logger->logger('Upload status: '.$percent.'%', Logger::NOTICE);
Expand Down Expand Up @@ -256,13 +249,17 @@ public function uploadFromCallable(callable $callable, int $size = 0, string $mi
};
}
$totalSize = 0;
if (!$seekable) {
$nextOffset = 0;
$callable = static function (int $offset, int $size) use ($callable, &$nextOffset): string {
Assert::eq($offset, $nextOffset);
$nextOffset += $size;
return $callable($offset, $size);
};
}
$callable = static function (int $part_num) use (&$totalSize, $size, $file_id, &$part_total_num, $part_size, $callable, $ige): array {
static $offset = 0;
$oldOffset = $offset;
$offset += $part_size;

$bytes = $callable(
$oldOffset,
$part_num * $part_size,
$part_size
);
$totalSize += $bytesLen = \strlen($bytes);
Expand All @@ -275,9 +272,6 @@ public function uploadFromCallable(callable $callable, int $size = 0, string $mi
}
}

if ($bytes instanceof Future) {
$bytes = $bytes->await();
}
if ($ige) {
$bytes = $ige->encrypt(\str_pad($bytes, $part_size, \chr(0)));
}
Expand All @@ -287,18 +281,15 @@ public function uploadFromCallable(callable $callable, int $size = 0, string $mi
$resPromises = [];
$start = \microtime(true);
while ($part_num < $part_total_num || !$size) {
$writePromise = async(
$this->methodCallAsyncWrite(...),
$method,
fn () => $callable($part_num),
['heavy' => true, 'datacenter' => &$datacenter]
);
if (!$seekable) {
try {
$writePromise->await();
} catch (StreamEof) {
break;
}
try {
$writePromise = async(
$this->methodCallAsyncWrite(...),
$method,
$seekable ? fn () => $callable($part_num) : $callable($part_num),
['heavy' => true, 'datacenter' => &$datacenter]
);
} catch (StreamEof) {
break;
}
EventLoop::queue(function () use ($writePromise, $cb, $part_num, $size, &$resPromises): void {
$readFuture = $writePromise->await();
Expand Down
21 changes: 16 additions & 5 deletions src/MTProtoTools/FilesLogic.php
Original file line number Diff line number Diff line change
Expand Up @@ -399,16 +399,22 @@ public function uploadFromStream(mixed $stream, int $size = 0, string $mime = 'a
}
if ($stream instanceof File) {
$lock = new LocalMutex;
$callable = static function (int $offset, int $size) use ($stream, $seekable, $lock) {
$nextOffset = 0;
$callable = static function (int $offset, int $size) use ($stream, $seekable, $lock, &$nextOffset): string {
/** @var Lock */
$l = $lock->acquire();
try {
if ($seekable) {
while ($stream->tell() !== $offset) {
$stream->seek($offset);
}
} else {
Assert::eq($offset, $nextOffset);
$nextOffset += $size;
}
return $stream->read(null, $size);
$result = $stream->read(null, $size);
\assert($result !== null);
return $result;
} finally {
EventLoop::queue($l->release(...));
}
Expand All @@ -419,17 +425,22 @@ public function uploadFromStream(mixed $stream, int $size = 0, string $mime = 'a
$stream = ($ctx->getStream());
$created = true;
}
$callable = static function (int $offset, int $size) use ($stream) {
$nextOffset = 0;
$callable = static function (int $offset, int $size) use ($stream, &$nextOffset): string {
if (!$stream instanceof BufferedRawStream) {
throw new \InvalidArgumentException('Invalid stream type');
}
Assert::eq($offset, $nextOffset);
$nextOffset += $size;
$reader = $stream->getReadBuffer($l);
try {
return $reader->bufferRead($size);
$result = $reader->bufferRead($size);
} catch (NothingInTheSocketException $e) {
$reader = $stream->getReadBuffer($size);
return $reader->bufferRead($size);
$result = $reader->bufferRead($size);
}
\assert($result !== null);
return $result;
};
$seekable = false;
}
Expand Down

0 comments on commit 0a8e41a

Please sign in to comment.