Skip to content

Commit 8e40da1

Browse files
committed
fix function cal function
1 parent c5a0497 commit 8e40da1

File tree

5 files changed

+190
-55
lines changed

5 files changed

+190
-55
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Composer ignores
22
/vendor
33
composer.phar
4-
composer.lock
4+
composer.lock
5+
examples/test.txt

README.md

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ require 'vendor/autoload.php';
4040

4141
use ReactphpX\Bandwidth\Bandwidth;
4242

43-
define("BURST_RATE", 1024 * 1024 * 150); // 150KB/sec burst rate
43+
define("BURST_RATE", 1024 * 1024 * 5); // 5MB/sec burst rate
4444

45-
define("FILL_RATE", 1024 * 1024 * 50); // 50KB/sec sustained rate
45+
define("FILL_RATE", 1024 * 1024 * 1); // 1M/sec sustained rate
4646

4747
$bandwidth = new Bandwidth(BURST_RATE, FILL_RATE);
4848

@@ -70,6 +70,46 @@ $sourceStream->on('end', function () use ($bandwidthStream) {
7070

7171
```
7272

73+
### memory usage
74+
75+
```php
76+
<?php
77+
require 'vendor/autoload.php';
78+
79+
use ReactphpX\Bandwidth\Bandwidth;
80+
81+
function getMemoryUsage(): array
82+
{
83+
return [
84+
'date' => date('Y-m-d H:i:s'),
85+
'current_usage_mb' => round(memory_get_usage() / 1024 / 1024, 3),
86+
'current_usage_real_mb' => round(memory_get_usage(true) / 1024 / 1024, 3),
87+
'peak_usage_mb' => round(memory_get_peak_usage() / 1024 / 1024, 3),
88+
'peak_usage_real_mb' => round(memory_get_peak_usage(true) / 1024 / 1024, 3),
89+
];
90+
}
91+
92+
define("BURST_RATE", 1024 * 1024 * 150); // 150KB/sec burst rate
93+
define("FILL_RATE", 1024 * 1024 * 50); // 50KB/sec sustained rate
94+
95+
$bandwidth = new Bandwidth(BURST_RATE, FILL_RATE);
96+
97+
// listen starting memory
98+
$startMemory = getMemoryUsage();
99+
echo 'Start memory: ' . json_encode($startMemory, JSON_UNESCAPED_SLASHES) . PHP_EOL;
100+
101+
$stream = $bandwidth->file('test.txt');
102+
103+
$stream->on('data', function ($data) {
104+
echo $data;
105+
});
106+
107+
$stream->on('end', function () {
108+
echo "end\n";
109+
echo 'End memory: ' . json_encode(getMemoryUsage(), JSON_UNESCAPED_SLASHES) . PHP_EOL;
110+
});
111+
```
112+
73113
## License
74114

75115
MIT

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"reactphp-x/concurrent": "^1.0",
1717
"reactphp-x/limiter": "^1.0",
1818
"react/stream": "^1.4",
19-
"wpjscc/filesystem": "^0.2.0"
19+
"wpjscc/filesystem": "^0.2.0",
20+
"react/async": "^4.0"
2021
}
2122
}

examples/memory_usage.php

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php
2+
require __DIR__ . '/../vendor/autoload.php';
3+
4+
use ReactphpX\Bandwidth\Bandwidth;
5+
use React\Promise\Deferred;
6+
use function React\Async\async;
7+
use function React\Async\await;
8+
9+
function getMemoryUsage(): array
10+
{
11+
return [
12+
'date' => date('Y-m-d H:i:s'),
13+
'current_usage_mb' => round(memory_get_usage() / 1024 / 1024, 3),
14+
'current_usage_real_mb' => round(memory_get_usage(true) / 1024 / 1024, 3),
15+
'peak_usage_mb' => round(memory_get_peak_usage() / 1024 / 1024, 3),
16+
'peak_usage_real_mb' => round(memory_get_peak_usage(true) / 1024 / 1024, 3),
17+
];
18+
}
19+
20+
define("BURST_RATE", 1024 * 1024 * 5); // 150KB/sec burst rate
21+
define("FILL_RATE", 1024 * 1024 * 1); // 50KB/sec sustained rate
22+
23+
24+
async(function () {
25+
while (true) {
26+
echo "start\n";
27+
$deferred = new Deferred();
28+
$bandwidth = new Bandwidth(BURST_RATE, FILL_RATE);
29+
30+
// Listen starting memory
31+
echo 'Start memory: ' . json_encode(getMemoryUsage(), JSON_UNESCAPED_SLASHES) . PHP_EOL;
32+
33+
$filePath = __DIR__ . '/test.txt'; // Put a sample file here to try
34+
$stream = $bandwidth->file($filePath);
35+
36+
$stream->on('data', function ($data) {
37+
echo (strlen($data)/1024/1024 ). "MB\n";
38+
gc_collect_cycles();
39+
});
40+
41+
$stream->on('end', function () use ($deferred) {
42+
echo PHP_EOL . "end" . PHP_EOL;
43+
echo 'End memory: ' . json_encode(getMemoryUsage(), JSON_UNESCAPED_SLASHES) . PHP_EOL;
44+
$deferred->resolve(true);
45+
});
46+
47+
$stream->on('error', function ($e) use ($deferred) {
48+
fwrite(STDERR, 'Error: ' . ($e instanceof \Throwable ? $e->getMessage() : (string) $e) . PHP_EOL);
49+
echo 'Memory at error: ' . json_encode(getMemoryUsage(), JSON_UNESCAPED_SLASHES) . PHP_EOL;
50+
$deferred->reject(new \Exception($e->getMessage()));
51+
});
52+
53+
await($deferred->promise());
54+
break;
55+
56+
}
57+
})();
58+
59+
60+
async(function () {
61+
// $filesystem = \React\Filesystem\Factory::create();
62+
$filesystem = new \React\Filesystem\Fallback\Adapter;
63+
$path = __DIR__ . '/test.txt';
64+
$size = filesize($path);
65+
66+
$chunkSize = 1024 * 1024 * 1;
67+
$start = 0;
68+
while (true) {
69+
echo "start\n";
70+
await($filesystem->file($path)->getContents($start, $chunkSize));
71+
\React\Async\delay(1);
72+
$start += $chunkSize;
73+
if ($start >= $size) {
74+
break;
75+
}
76+
}
77+
});
78+
79+
$startMemory = getMemoryUsage();
80+
81+
\React\EventLoop\Loop::addPeriodicTimer(1, function () use ($startMemory) {
82+
gc_collect_cycles();
83+
echo json_encode([
84+
'start_memory' => $startMemory,
85+
'current_memory' => getMemoryUsage(),
86+
], JSON_PRETTY_PRINT) . PHP_EOL;
87+
});

src/Bandwidth.php

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use ReactphpX\Concurrent\Concurrent;
66
use ReactphpX\Limiter\TokenBucket;
77
use React\Filesystem\Factory;
8+
use function React\Async\async;
9+
use function React\Async\await;
810

911
final class Bandwidth
1012
{
@@ -35,33 +37,37 @@ public function file(string $path, $p = 0, $length = -1, $readKB = 0)
3537

3638
$readKB = $readKB > 0 ? min($readKB, $this->KB) : $this->KB;
3739

38-
$this->filesystem->detect($path)->then(function ($node) use ($path) {
39-
if ($node instanceof \React\Filesystem\Node\FileInterface) {
40-
return $node->stat();
41-
} else {
42-
throw new \RuntimeException($path. ' is not a file');
43-
}
44-
})->then(function ($stat) use ($stream, $p, $length, $readKB) {
45-
if ($this->queue) {
46-
return $this->concurrent->concurrent(function () use ($stream, $stat, $p, $length, $readKB) {
40+
async(function ($path, $stream, $p, $length, $readKB) {
41+
try {
42+
$node = await($this->filesystem->detect($path));
43+
if (!($node instanceof \React\Filesystem\Node\FileInterface)) {
44+
throw new \RuntimeException($path . ' is not a file');
45+
}
46+
$stat = await($node->stat());
47+
48+
if ($this->queue) {
49+
await($this->concurrent->concurrent(
50+
async(function () use ($stream, $stat, $p, $length, $readKB) {
51+
$file = $this->filesystem->file($stat->path());
52+
$size = $stat->size();
53+
if ($length > 0) {
54+
$size = min($size, $p + $length);
55+
}
56+
return await($this->fileStream($file, $stream, $p, $size, $readKB));
57+
})
58+
));
59+
} else {
4760
$file = $this->filesystem->file($stat->path());
4861
$size = $stat->size();
4962
if ($length > 0) {
5063
$size = min($size, $p + $length);
5164
}
52-
return $this->fileStream($file, $stream, $p, $size, $readKB);
53-
});
54-
} else {
55-
$file = $this->filesystem->file($stat->path());
56-
$size = $stat->size();
57-
if ($length > 0) {
58-
$size = min($size, $p + $length);
65+
await($this->fileStream($file, $stream, $p, $size, $readKB));
5966
}
60-
return $this->fileStream($file, $stream, $p, $size, $readKB);
67+
} catch (\Throwable $e) {
68+
$stream->emit('error', [$e]);
6169
}
62-
}, function ($e) use ($stream) {
63-
$stream->emit('error', [$e]);
64-
});
70+
})($path, $stream, $p, $length, $readKB);
6571
return $stream;
6672
}
6773

@@ -72,11 +78,12 @@ public function stream($stream)
7278
$concurrent = $this->queue ? $this->concurrent : new Concurrent(1);
7379

7480
$stream->on('data', function ($data) use ($_stream, $concurrent) {
75-
$concurrent->concurrent(function() use ($_stream, $data){
76-
return $this->bucket->removeTokens(1024 * strlen($data))->then(function () use ($_stream, $data) {
81+
$concurrent->concurrent(
82+
async(function () use ($_stream, $data) {
83+
await($this->bucket->removeTokens(1024 * strlen($data)));
7784
$_stream->write($data);
78-
});
79-
});
85+
})
86+
);
8087
});
8188

8289
return $_stream;
@@ -85,32 +92,31 @@ public function stream($stream)
8592
protected function fileStream($file, $stream, $p, $size, $readKB)
8693
{
8794

88-
if (!$stream->isWritable()) {
89-
return \React\Promise\resolve(null);
90-
}
91-
92-
$currentSize = $size - $p;
93-
94-
if ($currentSize/1024 < $readKB) {
95-
return $this->bucket->removeTokens(1024 * 1024 * ceil($currentSize/1024))->then(function () use ($file, $stream, $p, $currentSize) {
96-
return $file->getContents($p, $currentSize)->then(function ($contents) use ($stream) {
97-
$stream->end($contents);
98-
return null;
99-
});
100-
});
101-
} else {
102-
return $this->bucket->removeTokens(1024 * 1024 * $readKB)->then(function () use ($file, $stream, $p, $size, $readKB) {
103-
return $file->getContents($p, 1024 * 1024 * $readKB)->then(function ($contents) use ($stream, $file, $p, $size, $readKB) {
104-
$p += strlen($contents);
105-
if ($p >= $size) {
106-
$stream->end($contents);
107-
return null;
108-
} else {
109-
$stream->write($contents);
110-
return $this->fileStream($file, $stream, $p, $size, $readKB);
111-
}
112-
});
113-
});
114-
}
95+
return async(function () use ($file, $stream, $p, $size, $readKB) {
96+
while (true) {
97+
if (!$stream->isWritable()) {
98+
return true;
99+
}
100+
$remaining = $size - $p;
101+
if ($remaining <= 0) {
102+
return true;
103+
}
104+
if ($remaining/1024 < $readKB) {
105+
await($this->bucket->removeTokens(1024 * 1024 * ceil($remaining/1024)));
106+
$contents = await($file->getContents($p, $remaining));
107+
$stream->end($contents);
108+
return true;
109+
} else {
110+
await($this->bucket->removeTokens(1024 * 1024 * $readKB));
111+
$contents = await($file->getContents($p, 1024 * 1024 * $readKB));
112+
$p += strlen($contents);
113+
if ($p >= $size) {
114+
$stream->end($contents);
115+
return true;
116+
}
117+
$stream->write($contents);
118+
}
119+
}
120+
})();
115121
}
116122
}

0 commit comments

Comments
 (0)