Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/php/peer/http/Authorizations.class.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<?php namespace peer\http;

use lang\Object;
use lang\XPClass;
use lang\reflect\TargetInvocationException;
use util\Secret;
Expand Down
83 changes: 83 additions & 0 deletions src/main/php/peer/http/Channel.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php namespace peer\http;

use peer\SocketInputStream;

class Channel implements \io\streams\InputStream {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs some tests!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Script used for integration testing timeouts and reconnection behaviors:

<?php namespace keep_alive;

use peer\http\HttpConnection;
use util\cmd\Console;
use io\streams\Streams;
use lang\Throwable;

$c= new HttpConnection($argv[1]);

for ($i= 0; $i < 10; $i++) {
  try {
    $r= $c->get([], ['Connection' => 'keep-alive']);
    Console::writeLine(date('r'), ': ', $r);
    Streams::readAll($r->in());
  } catch (Throwable $t) {
    Console::writeLine(date('r'), ': ', $t);
  }

  $delay= rand(100, 6000);
  Console::writeLinef('ZZ %.3f seconds', $delay / 1000);
  usleep($delay * 1000);
}

private $socket;
private $reuseable= null;

/** @param peer.Socket */
public function __construct($socket) {
$this->socket= $socket;
}

/** @return peer.Socket */
public function socket() { return $this->socket; }

/**
* Rebinds to a new socket, closing the existing one if necessary
*
* @param peer.Socket
* @return void
*/
public function bind($socket) {
if ($this->socket->isConnected()) {
$this->socket->close();
}
$this->socket= $socket;
}

/**
* Connect (if necessary)
*
* @param float $connectTimeout
* @param float $readTimeout
* @return void
*/
public function connect($connectTimeout, $readTimeout) {
if (false === $this->reuseable) {
$this->socket->close();
} else if ($this->socket->isConnected()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check enough? If the server closes the connection (e.g. because of a timeout on a reused connection), we would probably receive an EOF here, but isConnected() would still be true,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this actually is a problem; and it seems eof() could help here. In need of some more real-life testing!

return;
}

$this->socket->setTimeout($readTimeout);
$this->socket->connect($connectTimeout);
}

/**
* Disconnect (if necessary)
*
* @return void
*/
public function disconnect() {
$this->socket->isConnected() && $this->socket->close();
}

/**
* Sends a request and returns the response
*
* @param peer.http.HttpRequest $request
* @return peer.http.HttpResponse
*/
public function send($request) {
$this->socket->write($request->getRequestString());
$this->reuseable= false;
return new HttpResponse($this, true, function() { $this->reuseable= true; });
}

/** @return int */
public function available() {
return $this->socket->eof() ? 0 : 1;
}

/** @return string */
public function read($limit= 8192) {
return $this->socket->readBinary($limit);
}

/** @return void */
public function close() {
// NOOP
}
}
8 changes: 7 additions & 1 deletion src/main/php/peer/http/HttpConnection.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use peer\URL;
use util\log\Traceable;
use lang\Closeable;

/**
* HTTP connection
Expand All @@ -26,7 +27,7 @@
* @see rfc://2616
* @test xp://net.xp_framework.unittest.peer.HttpTest
*/
class HttpConnection implements Traceable {
class HttpConnection implements Traceable, Closeable {
protected
$url = null,
$transport = null,
Expand Down Expand Up @@ -260,4 +261,9 @@ public function trace($arg= null, $headers= []) {
public function setTrace($cat) {
$this->transport->setTrace($cat);
}

/** @return void */
public function close() {
$this->transport->close();
}
}
110 changes: 55 additions & 55 deletions src/main/php/peer/http/HttpInputStream.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,74 +8,74 @@
* @test xp://peer.http.unittest.HttpInputStreamTest
*/
class HttpInputStream implements InputStream {
protected
$response = null,
$buffer = '',
$available = 0;

private $stream;
private $buffer= '';

/**
* Constructor
*
* @param peer.http.HttpResponse $response
* @param io.streams.InputStream
* @param callable $consumed Optional callback when stream is completely consumed
*/
public function __construct(HttpResponse $response) {
$this->response= $response;
public function __construct(InputStream $stream, $consumed= null) {
$this->stream= $stream;
$this->consumed= $consumed;
}

/**
* Buffer a chunk if necessary
*
* @return int available
*/
protected function buffer() {
if (($l= strlen($this->buffer)) > 0) return $l;
if (false === ($chunk= $this->response->readData(8192, true))) {
$this->available= -1;
return 0;

/** @param callable $consumed */
public function callback($consumed) {
$this->consumed= $consumed;
}

/** @return void */
public function consumed() {
if ($f= $this->consumed) $f();
}

/** @return bool */
public function available() {
if ('' === $this->buffer) {
return $this->stream->available();
} else {
$this->buffer.= $chunk;
$this->available= strlen($this->buffer);
return $this->available;
return strlen($this->buffer);
}
}

/**
* Read a string
*
* @param int $limit default 8192
* @return string
*/
/** @return string */
public function read($limit= 8192) {
if (-1 === $this->available) return null; // At end
$this->buffer();
$b= substr($this->buffer, 0, $limit);
$this->buffer= substr($this->buffer, $limit);
return $b;
if (null === $this->buffer) {
return null; // EOF
} else if ('' === $this->buffer) {
$chunk= $this->stream->read($limit);
return '' == $chunk ? null : $chunk;
} else {
$return= substr($this->buffer, 0, $limit);
$this->buffer= (string)substr($this->buffer, $limit);
return $return;
}
}

/**
* Returns the number of bytes that can be read from this stream
* without blocking.
*
* @return int
*/
public function available() {
return (-1 === $this->available) ? 0 : $this->buffer();
}
/** @return string */
public function readLine() {
if (null === $this->buffer) return null; // EOF

/**
* Close this buffer
*/
public function close() {
$this->response->closeStream();
while (false === ($p= strpos($this->buffer, "\r\n"))) {
$chunk= $this->stream->read();
if ('' == $chunk) {
$return= $this->buffer;
$this->buffer= null;
return $return;
}
$this->buffer.= $chunk;
}

$return= substr($this->buffer, 0, $p);
$this->buffer= substr($this->buffer, $p + 2);
return $return;
}

/**
* Creates a string representation of this Http
*
* @return string
*/
public function toString() {
return nameof($this).'<'.$this->response->toString().'>';
/** @return voud */
public function close() {
$this->stream->close();
}
}
}
Loading