diff --git a/phirehose/OauthPhirehose.php b/phirehose/OauthPhirehose.php new file mode 100644 index 0000000..c45a8bc --- /dev/null +++ b/phirehose/OauthPhirehose.php @@ -0,0 +1,157 @@ +consumerKey?$this->consumerKey:TWITTER_CONSUMER_KEY; + $oauth['oauth_nonce'] = md5(uniqid(rand(), true)); + $oauth['oauth_signature_method'] = 'HMAC-SHA1'; + $oauth['oauth_timestamp'] = time(); + $oauth['oauth_version'] = '1.0'; + $oauth['oauth_token'] = $this->username; + if (isset($params['oauth_verifier'])) + { + $oauth['oauth_verifier'] = $params['oauth_verifier']; + unset($params['oauth_verifier']); + } + // encode all oauth values + foreach ($oauth as $k => $v) + $oauth[$k] = $this->encode_rfc3986($v); + + // encode all non '@' params + // keep sigParams for signature generation (exclude '@' params) + // rename '@key' to 'key' + $sigParams = array(); + $hasFile = false; + if (is_array($params)) + { + foreach ($params as $k => $v) + { + if (strncmp('@', $k, 1) !== 0) + { + $sigParams[$k] = $this->encode_rfc3986($v); + $params[$k] = $this->encode_rfc3986($v); + } + else + { + $params[substr($k, 1)] = $v; + unset($params[$k]); + $hasFile = true; + } + } + + if ($hasFile === true) + $sigParams = array(); + } + + $sigParams = array_merge($oauth, (array) $sigParams); + + // sorting + ksort($sigParams); + + // signing + $oauth['oauth_signature'] = $this->encode_rfc3986($this->generateSignature($method, $url, $sigParams)); + return array('request' => $params, 'oauth' => $oauth); + } + + protected function encode_rfc3986($string) + { + return str_replace('+', ' ', str_replace('%7E', '~', rawurlencode(($string)))); + } + + protected function generateSignature($method = null, $url = null, + $params = null) + { + if (empty($method) || empty($url)) + return false; + + // concatenating and encode + $concat = ''; + foreach ((array) $params as $key => $value) + $concat .= "{$key}={$value}&"; + $concat = substr($concat, 0, -1); + $concatenatedParams = $this->encode_rfc3986($concat); + + // normalize url + $urlParts = parse_url($url); + $scheme = strtolower($urlParts['scheme']); + $host = strtolower($urlParts['host']); + $port = isset($urlParts['port']) ? intval($urlParts['port']) : 0; + $retval = strtolower($scheme) . '://' . strtolower($host); + if (!empty($port) && (($scheme === 'http' && $port != 80) || ($scheme === 'https' && $port != 443))) + $retval .= ":{$port}"; + + $retval .= $urlParts['path']; + if (!empty($urlParts['query'])) + $retval .= "?{$urlParts['query']}"; + + $normalizedUrl = $this->encode_rfc3986($retval); + $method = $this->encode_rfc3986($method); // don't need this but why not? + + $signatureBaseString = "{$method}&{$normalizedUrl}&{$concatenatedParams}"; + + # sign the signature string + $key = $this->encode_rfc3986($this->consumerSecret?$this->consumerSecret:TWITTER_CONSUMER_SECRET) . '&' . $this->encode_rfc3986($this->password); + return base64_encode(hash_hmac('sha1', $signatureBaseString, $key, true)); + } + + protected function getOAuthHeader($method, $url, $params = array()) + { + $params = $this->prepareParameters($method, $url, $params); + $oauthHeaders = $params['oauth']; + + $urlParts = parse_url($url); + $oauth = 'OAuth realm="",'; + foreach ($oauthHeaders as $name => $value) + { + $oauth .= "{$name}=\"{$value}\","; + } + $oauth = substr($oauth, 0, -1); + + return $oauth; + } + + protected function getAuthorizationHeader() + { + $url = self::URL_BASE . $this->method . '.' . $this->format; + $urlParts = parse_url($url); + + // Setup params appropriately + $requestParams = array('delimited' => 'length'); + + // Filter takes additional parameters + if (count($this->trackWords) > 0) + { + $requestParams['track'] = implode(',', $this->trackWords); + } + if (count($this->followIds) > 0) + { + $requestParams['follow'] = implode(',', $this->followIds); + } + + return $this->getOAuthHeader('POST', $url, $requestParams); + } +} diff --git a/phirehose/Phirehose.php b/phirehose/Phirehose.php index c2e481a..5f57b4b 100644 --- a/phirehose/Phirehose.php +++ b/phirehose/Phirehose.php @@ -1,13 +1,12 @@ - * @version 0.2.4 ($Id$) + * @version 0.2.gitmaster */ abstract class Phirehose { @@ -15,20 +14,13 @@ abstract class Phirehose /** * Class constants */ - const URL_BASE = 'http://stream.twitter.com/1/statuses/'; + const URL_BASE = 'https://stream.twitter.com/1/statuses/'; const FORMAT_JSON = 'json'; const FORMAT_XML = 'xml'; const METHOD_FILTER = 'filter'; const METHOD_SAMPLE = 'sample'; const METHOD_RETWEET = 'retweet'; const METHOD_FIREHOSE = 'firehose'; - const USER_AGENT = 'Phirehose/0.2.4 +http://code.google.com/p/phirehose/'; - const FILTER_CHECK_MIN = 5; - const FILTER_UPD_MIN = 120; - const TCP_BACKOFF = 1; - const TCP_BACKOFF_MAX = 16; - const HTTP_BACKOFF = 10; - const HTTP_BACKOFF_MAX = 240; const EARTH_RADIUS_KM = 6371; @@ -39,7 +31,7 @@ abstract class Phirehose protected $password; protected $method; protected $format; - protected $count; + protected $count; //Can be -150,000 to 150,000. @see http://dev.twitter.com/pages/streaming_api_methods#count protected $followIds; protected $trackWords; protected $locationBoxes; @@ -49,23 +41,119 @@ abstract class Phirehose // State vars protected $filterChanged; protected $reconnect; + + /** + * The number of tweets received per second in previous minute; calculated fresh + * just before each call to statusUpdate() + * I.e. if fewer than 30 tweets in last minute then this will be zero; if 30 to 90 then it + * will be 1, if 90 to 150 then 2, etc. + * + * @var integer + */ protected $statusRate; + protected $lastErrorNo; protected $lastErrorMsg; + + /** + * Number of tweets received. + * + * Note: by default this is the sum for last 60 seconds, and is therefore + * reset every 60 seconds. + * To change this behaviour write a custom statusUpdate() function. + * + * @var integer + */ + protected $statusCount=0; + + /** + * The number of calls to $this->checkFilterPredicates(). + * + * By default it is called every 5 seconds, so if doing statusUpdates every + * 60 seconds and then resetting it, this will usually be 12. + * + * @var integer + */ + protected $filterCheckCount=0; + + /** + * Total number of seconds (fractional) spent in the enqueueStatus() calls (i.e. the customized + * function that handles each received tweet). + * + * @var float + */ + protected $enqueueSpent=0; + + /** + * Total number of seconds (fractional) spent in the checkFilterPredicates() calls + * + * @var float + */ + protected $filterCheckSpent=0; + + /** + * Number of seconds since the last tweet arrived (or the keep-alive newline) + * + * @var integer + */ + protected $idlePeriod=0; + + /** + * The maximum value $this->idlePeriod has reached. + * + * @var integer + */ + protected $maxIdlePeriod=0; + + /** + * Time spent on each call to enqueueStatus() (i.e. average time spent, in milliseconds, + * spent processing received tweet). + * + * Simply: enqueueSpent divided by statusCount + * Note: by default, calculated fresh for past 60 seconds, every 60 seconds. + * + * @var float + */ + protected $enqueueTimeMS=0; + + /** + * Like $enqueueTimeMS but for the checkFilterPredicates() function. + * @var float + */ + protected $filterCheckTimeMS=0; + + /** + * Seconds since the last call to statusUpdate() + * + * Reset to zero after each call to statusUpdate() + * Highest value it should ever reach is $this->avgPeriod + * + * @var integer + */ + protected $avgElapsed=0; + // Config type vars - override in subclass if desired - protected $connectFailuresMax = 20; + protected $connectFailuresMax = 20; protected $connectTimeout = 5; protected $readTimeout = 5; protected $idleReconnectTimeout = 90; protected $avgPeriod = 60; - + protected $status_length_base = 10; + protected $userAgent = 'Phirehose/0.2.gitmaster +https://github.com/fennb/phirehose'; + protected $filterCheckMin = 5; + protected $filterUpdMin = 120; + protected $tcpBackoff = 1; + protected $tcpBackoffMax = 16; + protected $httpBackoff = 10; + protected $httpBackoffMax = 240; + /** - * Create a new Phirehose object attached to the appropriate twitter stream method. + * Create a new Phirehose object attached to the appropriate twitter stream method. * Methods are: METHOD_FIREHOSE, METHOD_RETWEET, METHOD_SAMPLE, METHOD_FILTER * Formats are: FORMAT_JSON, FORMAT_XML * @see Phirehose::METHOD_SAMPLE * @see Phirehose::FORMAT_JSON - * + * * @param string $username Any twitter username * @param string $password Any twitter password * @param string $method @@ -80,17 +168,17 @@ public function __construct($username, $password, $method = Phirehose::METHOD_SA } /** - * Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies + * Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies * ("@user Hello!" created without pressing the reply button) are not matched. It is up to you to find the integer * IDs of each twitter user. * Applies to: METHOD_FILTER - * + * * @param array $userIds Array of Twitter integer userIDs */ public function setFollow($userIds) { $userIds = ($userIds === NULL) ? array() : $userIds; - sort($userIds); // Non-optimal but necessary + sort($userIds); // Non-optimal but necessary if ($this->followIds != $userIds) { $this->filterChanged = TRUE; } @@ -108,10 +196,10 @@ public function getFollow() } /** - * Specifies keywords to track. Track keywords are case-insensitive logical ORs. Terms are exact-matched, ignoring + * Specifies keywords to track. Track keywords are case-insensitive logical ORs. Terms are exact-matched, ignoring * punctuation. Phrases, keywords with spaces, are not supported. Queries are subject to Track Limitations. * Applies to: METHOD_FILTER - * + * * See: http://apiwiki.twitter.com/Streaming-API-Documentation#TrackLimiting * * @param array $trackWords @@ -127,7 +215,7 @@ public function setTrack($trackWords) } /** - * Returns an array of keywords being tracked + * Returns an array of keywords being tracked * * @return array */ @@ -137,24 +225,24 @@ public function getTrack() } /** - * Specifies a set of bounding boxes to track as an array of 4 element lon/lat pairs denoting , + * Specifies a set of bounding boxes to track as an array of 4 element lon/lat pairs denoting , * . Only tweets that are both created using the Geotagging API and are placed from within a tracked * bounding box will be included in the stream. The user's location field is not used to filter tweets. Bounding boxes - * are logical ORs and must be less than or equal to 1 degree per side. A locations parameter may be combined with + * are logical ORs and must be less than or equal to 1 degree per side. A locations parameter may be combined with * track parameters, but note that all terms are logically ORd. - * + * * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). - * + * * Applies to: METHOD_FILTER - * + * * See: http://apiwiki.twitter.com/Streaming-API-Documentation#locations * - * Eg: + * Eg: * setLocations(array( * array(-122.75, 36.8, -121.75, 37.8), // San Francisco - * array(-74, 40, -73, 41), // New York + * array(-74, 40, -73, 41), // New York * )); - * + * * @param array $boundingBoxes */ public function setLocations($boundingBoxes) @@ -167,7 +255,7 @@ public function setLocations($boundingBoxes) // Sanity check if (count($boundingBox) != 4) { // Invalid - Not much we can do here but log error - $this->log('Invalid location bounding box: [' . implode(', ', $boundingBox) . ']'); + $this->log('Invalid location bounding box: [' . implode(', ', $boundingBox) . ']','error'); return FALSE; } // Append this lat/lon pairs to flattened array @@ -182,7 +270,7 @@ public function setLocations($boundingBoxes) } /** - * Returns an array of 4 element arrays that denote the monitored location bounding boxes for tweets using the + * Returns an array of 4 element arrays that denote the monitored location bounding boxes for tweets using the * Geotagging API. * * @see setLocations() @@ -201,19 +289,19 @@ public function getLocations() { } /** - * Convenience method that sets location bounding boxes by an array of lon/lat/radius sets, rather than manually + * Convenience method that sets location bounding boxes by an array of lon/lat/radius sets, rather than manually * specified bounding boxes. Each array element should contain 3 element subarray containing a latitude, longitude and * radius. Radius is specified in kilometers and is approximate (as boxes are square). * * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). - * - * Eg: + * + * Eg: * setLocationsByCircle(array( * array(144.9631, -37.8142, 30), // Melbourne, 3km radius - * array(-0.1262, 51.5001, 25), // London 10km radius + * array(-0.1262, 51.5001, 25), // London 10km radius * )); - * - * + * + * * @see setLocations() * @param array */ @@ -223,7 +311,7 @@ public function setLocationsByCircle($locations) { // Sanity check if (count($locTriplet) != 3) { // Invalid - Not much we can do here but log error - $this->log('Invalid location triplet for ' . __METHOD__ . ': [' . implode(', ', $locTriplet) . ']'); + $this->log('Invalid location triplet for ' . __METHOD__ . ': [' . implode(', ', $locTriplet) . ']','error'); return FALSE; } list($lon, $lat, $radius) = $locTriplet; @@ -237,8 +325,8 @@ public function setLocationsByCircle($locations) { // Add to bounding box array $boundingBoxes[] = array($minLon, $minLat, $maxLon, $maxLat); // Debugging is handy - $this->log('Resolved location circle [' . $lon . ', ' . $lat . ', r: ' . $radius . '] -> bbox: [' . $minLon . - ', ' . $minLat . ', ' . $maxLon . ', ' . $maxLat . ']'); + $this->log('Resolved location circle [' . $lon . ', ' . $lat . ', r: ' . $radius . '] -> bbox: [' . $minLon . + ', ' . $minLat . ', ' . $maxLon . ', ' . $maxLat . ']'); } // Set by bounding boxes $this->setLocations($boundingBoxes); @@ -248,20 +336,20 @@ public function setLocationsByCircle($locations) { * Sets the number of previous statuses to stream before transitioning to the live stream. Applies only to firehose * and filter + track methods. This is generally used internally and should not be needed by client applications. * Applies to: METHOD_FILTER, METHOD_FIREHOSE - * + * * @param integer $count */ public function setCount($count) { - $this->count = $count; + $this->count = $count; } /** * Connects to the stream API and consumes the stream. Each status update in the stream will cause a call to the * handleStatus() method. - * + * * @see handleStatus() - * @param boolean $reconnect Reconnects as per recommended + * @param boolean $reconnect Reconnects as per recommended * @throws ErrorException */ public function consume($reconnect = TRUE) @@ -276,7 +364,6 @@ public function consume($reconnect = TRUE) $this->reconnect(); // Init state - $statusCount = $filterCheckCount = $enqueueSpent = $filterCheckSpent = $idlePeriod = $maxIdlePeriod = 0; $lastAverage = $lastFilterCheck = $lastFilterUpd = $lastStreamActivity = time(); $fdw = $fde = NULL; // Placeholder write/error file descriptors for stream_select @@ -288,7 +375,7 @@ public function consume($reconnect = TRUE) */ if ((time() - $lastStreamActivity) > $this->idleReconnectTimeout) { $this->log('Idle timeout: No stream activity for > ' . $this->idleReconnectTimeout . ' seconds. ' . - ' Reconnecting.'); + ' Reconnecting.','info'); $this->reconnect(); $lastStreamActivity = time(); continue; @@ -300,14 +387,14 @@ public function consume($reconnect = TRUE) continue; // We need a newline } // Track maximum idle period - $idlePeriod = (time() - $lastStreamActivity); - $maxIdlePeriod = ($idlePeriod > $maxIdlePeriod) ? $idlePeriod : $maxIdlePeriod; + $this->idlePeriod = (time() - $lastStreamActivity); + $this->maxIdlePeriod = ($this->idlePeriod > $this->maxIdlePeriod) ? $this->idlePeriod : $this->maxIdlePeriod; // We got a newline, this is stream activity $lastStreamActivity = time(); // Read status length delimiter $delimiter = substr($this->buff, 0, $eol); $this->buff = substr($this->buff, $eol + 2); // consume off buffer, + 2 = "\r\n" - $statusLength = intval($delimiter); + $statusLength = intval($delimiter, $this->status_length_base); if ($statusLength > 0) { // Read status bytes and enqueue $bytesLeft = $statusLength - strlen($this->buff); @@ -317,42 +404,38 @@ public function consume($reconnect = TRUE) $bytesLeft = ($statusLength - strlen($this->buff)); } // Accrue/enqueue and track time spent enqueing - $statusCount ++; $enqueueStart = microtime(TRUE); $this->enqueueStatus($this->buff); - $enqueueSpent += (microtime(TRUE) - $enqueueStart); + $this->enqueueSpent += (microtime(TRUE) - $enqueueStart); + $this->statusCount++; } else { // Timeout/no data after readTimeout seconds } - // Calc counter averages - $avgElapsed = time() - $lastAverage; - if ($avgElapsed >= $this->avgPeriod) { - // Calc tweets-per-second - $this->statusRate = round($statusCount / $avgElapsed, 0); + // Calc counter averages + $this->avgElapsed = time() - $lastAverage; + if ($this->avgElapsed >= $this->avgPeriod) { + $this->statusRate = round($this->statusCount / $this->avgElapsed, 0); // Calc tweets-per-second // Calc time spent per enqueue in ms - $enqueueTimeMS = ($statusCount > 0) ? round($enqueueSpent / $statusCount * 1000, 2) : 0; + $this->enqueueTimeMS = ($this->statusCount > 0) ? round($this->enqueueSpent / $this->statusCount * 1000, 2) : 0; // Calc time spent total in filter predicate checking - $filterCheckTimeMS = ($filterCheckCount > 0) ? round($filterCheckSpent / $filterCheckCount * 1000, 2) : 0; - $this->log('Consume rate: ' . $this->statusRate . ' status/sec (' . $statusCount . ' total), avg ' . - 'enqueueStatus(): ' . $enqueueTimeMS . 'ms, avg checkFilterPredicates(): ' . $filterCheckTimeMS . 'ms (' . - $filterCheckCount . ' total) over ' . $this->avgPeriod . ' seconds, max stream idle period: ' . - $maxIdlePeriod . ' seconds.'); - // Reset - $statusCount = $filterCheckCount = $enqueueSpent = $filterCheckSpent = $idlePeriod = $maxIdlePeriod = 0; + $this->filterCheckTimeMS = ($this->filterCheckCount > 0) ? round($this->filterCheckSpent / $this->filterCheckCount * 1000, 2) : 0; + + $this->heartbeat(); + $this->statusUpdate(); $lastAverage = time(); } // Check if we're ready to check filter predicates - if ($this->method == self::METHOD_FILTER && (time() - $lastFilterCheck) >= self::FILTER_CHECK_MIN) { - $filterCheckCount ++; + if ($this->method == self::METHOD_FILTER && (time() - $lastFilterCheck) >= $this->filterCheckMin) { + $this->filterCheckCount++; $lastFilterCheck = time(); $filterCheckStart = microtime(TRUE); $this->checkFilterPredicates(); // This should be implemented in subclass if required - $filterCheckSpent += (microtime(TRUE) - $filterCheckStart); + $this->filterCheckSpent += (microtime(TRUE) - $filterCheckStart); } // Check if filter is ready + allowed to be updated (reconnect) - if ($this->filterChanged == TRUE && (time() - $lastFilterUpd) >= self::FILTER_UPD_MIN) { - $this->log('Reconnecting due to changed filter predicates.'); + if ($this->filterChanged == TRUE && (time() - $lastFilterUpd) >= $this->filterUpdMin) { + $this->log('Reconnecting due to changed filter predicates.','info'); $this->reconnect(); $lastFilterUpd = time(); } @@ -362,15 +445,31 @@ public function consume($reconnect = TRUE) // Some sort of socket error has occured $this->lastErrorNo = is_resource($this->conn) ? @socket_last_error($this->conn) : NULL; $this->lastErrorMsg = ($this->lastErrorNo > 0) ? @socket_strerror($this->lastErrorNo) : 'Socket disconnected'; - $this->log('Phirehose connection error occured: ' . $this->lastErrorMsg); + $this->log('Phirehose connection error occured: ' . $this->lastErrorMsg,'error'); - // Reconnect + // Reconnect } while ($this->reconnect); // Exit $this->log('Exiting.'); } + + + /** + * Called every $this->avgPeriod (default=60) seconds, and this default implementation + * calculates some rates, logs them, and resets the counters. + */ + protected function statusUpdate() + { + $this->log('Consume rate: ' . $this->statusRate . ' status/sec (' . $this->statusCount . ' total), avg ' . + 'enqueueStatus(): ' . $this->enqueueTimeMS . 'ms, avg checkFilterPredicates(): ' . $this->filterCheckTimeMS . 'ms (' . + $this->filterCheckCount . ' total) over ' . $this->avgElapsed . ' seconds, max stream idle period: ' . + $this->maxIdlePeriod . ' seconds.'); + // Reset + $this->statusCount = $this->filterCheckCount = $this->enqueueSpent = 0; + $this->filterCheckSpent = $this->idlePeriod = $this->maxIdlePeriod = 0; + } /** * Returns the last error message (TCP or HTTP) that occured with the streaming API or client. State is cleared upon @@ -383,11 +482,11 @@ public function getLastErrorMsg() } /** - * Returns the last error number that occured with the streaming API or client. Numbers correspond to either the + * Returns the last error number that occured with the streaming API or client. Numbers correspond to either the * fsockopen() error states (in the case of TCP errors) or HTTP error codes from Twitter (in the case of HTTP errors). - * + * * State is cleared upon successful reconnect. - * + * * @return string */ public function getLastErrorNo() @@ -400,13 +499,13 @@ public function getLastErrorNo() * Connects to the stream URL using the configured method. * @throws ErrorException */ - protected function connect() + protected function connect() { // Init state $connectFailures = 0; - $tcpRetry = self::TCP_BACKOFF / 2; - $httpRetry = self::HTTP_BACKOFF / 2; + $tcpRetry = $this->tcpBackoff / 2; + $httpRetry = $this->httpBackoff / 2; // Keep trying until connected (or max connect failures exceeded) do { @@ -419,7 +518,6 @@ protected function connect() // Construct URL/HTTP bits $url = self::URL_BASE . $this->method . '.' . $this->format; $urlParts = parse_url($url); - $authCredentials = base64_encode($this->username . ':' . $this->password); // Setup params appropriately $requestParams = array('delimited' => 'length'); @@ -429,33 +527,34 @@ protected function connect() $requestParams['track'] = implode(',', $this->trackWords); } if ($this->method == self::METHOD_FILTER && count($this->followIds) > 0) { - $requestParams['follow'] = implode(',', $this->followIds); + $requestParams['follow'] = implode(',', $this->followIds); } if ($this->method == self::METHOD_FILTER && count($this->locationBoxes) > 0) { - $requestParams['locations'] = implode(',', $this->locationBoxes); + $requestParams['locations'] = implode(',', $this->locationBoxes); } - if ($this->count > 0) { + if ($this->count <> 0) { $requestParams['count'] = $this->count; } // Debugging is useful - $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', + $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', var_export($requestParams, TRUE))); /** - * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native + * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). */ $errNo = $errStr = NULL; $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; + $port = ($urlParts['scheme'] == 'https') ? 443 : 80; /** - * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and + * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and * PHP appears to cache it the result if in a long running process (as per Phirehose). */ $streamIPs = gethostbynamel($urlParts['host']); - if (count($streamIPs) == 0) { - throw new ErrorException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); + if(empty($streamIPs)) { + throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); } // Choose one randomly (if more than one) @@ -463,22 +562,22 @@ protected function connect() $streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))]; $this->log('Connecting to ' . $streamIP); - @$this->conn = fsockopen($scheme . $streamIP, 80, $errNo, $errStr, $this->connectTimeout); + @$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout); // No go - handle errors/backoff if (!$this->conn || !is_resource($this->conn)) { $this->lastErrorMsg = $errStr; $this->lastErrorNo = $errNo; - $connectFailures ++; + $connectFailures++; if ($connectFailures > $this->connectFailuresMax) { $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg); - throw new ErrorException($msg, $errNo); // Throw an exception for other code to handle + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle } // Increase retry/backoff up to max - $tcpRetry = ($tcpRetry < self::TCP_BACKOFF_MAX) ? $tcpRetry * 2 : self::TCP_BACKOFF_MAX; + $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; $this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.'); + $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.','info'); sleep($tcpRetry); continue; } @@ -492,20 +591,35 @@ protected function connect() stream_set_blocking($this->conn, 1); // Encode request data - $postData = http_build_query($requestParams); + $postData = http_build_query($requestParams, NULL, '&'); + $postData = str_replace('+','%20',$postData); //Change it from RFC1738 to RFC3986 (see + //enc_type parameter in http://php.net/http_build_query and note that enc_type is + //not available as of php 5.3) + $authCredentials = $this->getAuthorizationHeader(); // Do it fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n"); - fwrite($this->conn, "Host: " . $urlParts['host'] . "\r\n"); + fwrite($this->conn, "Host: " . $urlParts['host'] . ':' . $port . "\r\n"); fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n"); fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n"); fwrite($this->conn, "Accept: */*\r\n"); - fwrite($this->conn, 'Authorization: Basic ' . $authCredentials . "\r\n"); - fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n"); + fwrite($this->conn, 'Authorization: ' . $authCredentials . "\r\n"); + fwrite($this->conn, 'User-Agent: ' . $this->userAgent . "\r\n"); fwrite($this->conn, "\r\n"); fwrite($this->conn, $postData . "\r\n"); fwrite($this->conn, "\r\n"); + $this->log("POST " . $urlParts['path'] . " HTTP/1.0\r\n"); + $this->log("Host: " . $urlParts['host'] . ':' . $port . "\r\n"); + $this->log("Content-type: application/x-www-form-urlencoded\r\n"); + $this->log("Content-length: " . strlen($postData) . "\r\n"); + $this->log("Accept: */*\r\n"); + $this->log('Authorization: ' . $authCredentials . "\r\n"); + $this->log('User-Agent: ' . $this->userAgent . "\r\n"); + $this->log("\r\n"); + $this->log($postData . "\r\n"); + $this->log("\r\n"); + // First line is response list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); @@ -519,7 +633,7 @@ protected function connect() // If we got a non-200 response, we need to backoff and retry if ($httpCode != 200) { - $connectFailures ++; + $connectFailures++; // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) while ($bLine = trim(fgets($this->conn, 4096))) { @@ -527,7 +641,7 @@ protected function connect() } // Construct error - $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; + $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; // Set last error state $this->lastErrorMsg = $errStr; @@ -536,13 +650,13 @@ protected function connect() // Have we exceeded maximum failures? if ($connectFailures > $this->connectFailuresMax) { $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg); - throw new ErrorException($msg); // We eventually throw an exception for other code to handle + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle } // Increase retry/backoff up to max - $httpRetry = ($httpRetry < self::HTTP_BACKOFF_MAX) ? $httpRetry * 2 : self::HTTP_BACKOFF_MAX; + $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; $this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . '. Sleeping for ' . $httpRetry . ' seconds.'); + $errStr . '. Sleeping for ' . $httpRetry . ' seconds.','info'); sleep($httpRetry); continue; @@ -556,7 +670,7 @@ protected function connect() $this->lastErrorMsg = NULL; $this->lastErrorNo = NULL; - // Switch to non-blocking to consume the stream (important) + // Switch to non-blocking to consume the stream (important) stream_set_blocking($this->conn, 0); // Connect always causes the filterChanged status to be cleared @@ -567,18 +681,24 @@ protected function connect() $this->buff = ''; } + + protected function getAuthorizationHeader() + { + $authCredentials = base64_encode($this->username . ':' . $this->password); + return "Basic: ".$authCredentials; + } /** * Method called as frequently as practical (every 5+ seconds) that is responsible for checking if filter predicates * (ie: track words or follow IDs) have changed. If they have, they should be set using the setTrack() and setFollow() - * methods respectively within the overridden implementation. - * + * methods respectively within the overridden implementation. + * * Note that even if predicates are changed every 5 seconds, an actual reconnect will not happen more frequently than * every 2 minutes (as per Twitter Streaming API documentation). - * - * Note also that this method is called upon every connect attempt, so if your predicates are causing connection + * + * Note also that this method is called upon every connect attempt, so if your predicates are causing connection * errors, they should be checked here and corrected. - * + * * This should be implemented/overridden in any subclass implementing the FILTER method. * * @see setTrack() @@ -596,8 +716,13 @@ protected function checkFilterPredicates() * * @see error_log() * @param string $messages + * @param String $level 'error', 'info', 'notice'. Defaults to 'notice', so you should set this + * parameter on the more important error messages. + * 'info' is used for problems that the class should be able to recover from automatically. + * 'error' is for exceptional conditions that may need human intervention. (For instance, emailing + * them to a system administrator may make sense.) */ - protected function log($message) + protected function log($message,$level='notice') { @error_log('Phirehose: ' . $message, 0); } @@ -624,15 +749,26 @@ private function reconnect() $reconnect = $this->reconnect; $this->disconnect(); // Implicitly sets reconnect to FALSE $this->reconnect = $reconnect; // Restore state to prev - $this->connect(); + $this->connect(); } /** * This is the one and only method that must be implemented additionally. As per the streaming API documentation, - * statuses should NOT be processed within the same process that is performing collection + * statuses should NOT be processed within the same process that is performing collection * * @param string $status */ abstract public function enqueueStatus($status); - -} // End of class \ No newline at end of file + + /** + * Reports a periodic heartbeat. Keep execution time minimal. + * + * @return NULL + */ + public function heartbeat() {} + +} // End of class + +class PhirehoseException extends Exception {} +class PhirehoseNetworkException extends PhirehoseException {} +class PhirehoseConnectLimitExceeded extends PhirehoseException {} diff --git a/phirehose/UserstreamPhirehose.php b/phirehose/UserstreamPhirehose.php new file mode 100644 index 0000000..81d9c93 --- /dev/null +++ b/phirehose/UserstreamPhirehose.php @@ -0,0 +1,502 @@ +auth_method = $auth_method; + } + + + protected function connect() { + if($this->auth_method === UserstreamPhirehose::CONNECT_OAUTH) { + $this->connect_oauth(); + } else { + $this->connect_basic(); + } + } + + + /** + * Connects to the stream URL using the configured method. + */ + protected function connect_basic() { + + // Init state + $connectFailures = 0; + $tcpRetry = $this->tcpBackoff / 2; + $httpRetry = $this->httpBackoff / 2; + + // Keep trying until connected (or max connect failures exceeded) + do { + + // Check filter predicates for every connect (for filter method) + if ($this->method == self::METHOD_FILTER) { + $this->checkFilterPredicates(); + } + + // Construct URL/HTTP bits + $url = self::URL_BASE . $this->method . '.' . $this->format; + $urlParts = parse_url($url); + $authCredentials = base64_encode($this->username . ':' . $this->password); + + // Setup params appropriately + $requestParams = array('delimited' => 'length'); + + // Filter takes additional parameters + if ($this->method == self::METHOD_USER && count($this->trackWords) > 0) { + $requestParams['track'] = implode(',', $this->trackWords); + } + if ($this->method == self::METHOD_USER && count($this->followIds) > 0) { + $requestParams['follow'] = implode(',', $this->followIds); + } + + + // Debugging is useful + $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', + var_export($requestParams, TRUE))); + + /** + * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native + * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). + */ + $errNo = $errStr = NULL; + $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; + $port = ($urlParts['scheme'] == 'https') ? 443 : 80; + + /** + * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and + * PHP appears to cache it the result if in a long running process (as per Phirehose). + */ + $streamIPs = gethostbynamel($urlParts['host']); + if (empty($streamIPs)) { + throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); + } + + // Choose one randomly (if more than one) + $this->log('Resolved host ' . $urlParts['host'] . ' to ' . implode(', ', $streamIPs)); + $streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))]; + $this->log('Connecting to ' . $streamIP); + + @$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout); + + // No go - handle errors/backoff + if (!$this->conn || !is_resource($this->conn)) { + $this->lastErrorMsg = $errStr; + $this->lastErrorNo = $errNo; + $connectFailures ++; + if ($connectFailures > $this->connectFailuresMax) { + $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle + } + // Increase retry/backoff up to max + $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; + $this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . + $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.','info'); + sleep($tcpRetry); + continue; + } + + // TCP connect OK, clear last error (if present) + $this->log('Connection established to ' . $streamIP); + $this->lastErrorMsg = NULL; + $this->lastErrorNo = NULL; + + // If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment + stream_set_blocking($this->conn, 1); + + // Encode request data + $postData = http_build_query($requestParams); + + // Do it + fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n"); + fwrite($this->conn, "Host: " . $urlParts['host'] . "\r\n"); + fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n"); + fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n"); + fwrite($this->conn, "Accept: */*\r\n"); + fwrite($this->conn, 'Authorization: Basic ' . $authCredentials . "\r\n"); + fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n"); + fwrite($this->conn, "\r\n"); + fwrite($this->conn, $postData . "\r\n"); + fwrite($this->conn, "\r\n"); + + // First line is response + list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); + + // Response buffers + $respHeaders = $respBody = ''; + + // Consume each header response line until we get to body + while ($hLine = trim(fgets($this->conn, 4096))) { + $respHeaders .= $hLine; + } + + // If we got a non-200 response, we need to backoff and retry + if ($httpCode != 200) { + $connectFailures ++; + + // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) + while ($bLine = trim(fgets($this->conn, 4096))) { + $respBody .= $bLine; + } + + // Construct error + $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; + + // Set last error state + $this->lastErrorMsg = $errStr; + $this->lastErrorNo = $httpCode; + + // Have we exceeded maximum failures? + if ($connectFailures > $this->connectFailuresMax) { + $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle + } + // Increase retry/backoff up to max + $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; + $this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . + $errStr . '. Sleeping for ' . $httpRetry . ' seconds.','info'); + sleep($httpRetry); + continue; + + } // End if not http 200 + + // Loop until connected OK + } while (!is_resource($this->conn) || $httpCode != 200); + + // Connected OK, reset connect failures + $connectFailures = 0; + $this->lastErrorMsg = NULL; + $this->lastErrorNo = NULL; + + // Switch to non-blocking to consume the stream (important) + stream_set_blocking($this->conn, 0); + + // Connect always causes the filterChanged status to be cleared + $this->filterChanged = FALSE; + + // Flush stream buffer & (re)assign fdrPool (for reconnect) + $this->fdrPool = array($this->conn); + $this->buff = ''; + + } + + + protected function connect_oauth() { + + // Init state + $connectFailures = 0; + $tcpRetry = $this->tcpBackoff / 2; + $httpRetry = $this->httpBackoff / 2; + + // Keep trying until connected (or max connect failures exceeded) + do { + + // Check filter predicates for every connect (for filter method) + if ($this->method == self::METHOD_FILTER) { + $this->checkFilterPredicates(); + } + + // Construct URL/HTTP bits + $url = self::URL_BASE . $this->method . '.' . $this->format; + $urlParts = parse_url($url); + $authCredentials = base64_encode($this->username . ':' . $this->password); + + // Setup params appropriately + $requestParams = array('delimited' => 'length'); + + // Filter takes additional parameters + if ($this->method == self::METHOD_USER && count($this->trackWords) > 0) { + $requestParams['track'] = implode(',', $this->trackWords); + } + if ($this->method == self::METHOD_USER && count($this->followIds) > 0) { + $requestParams['follow'] = implode(',', $this->followIds); + } + + + // Debugging is useful + $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', + var_export($requestParams, TRUE))); + + /** + * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native + * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). + */ + $errNo = $errStr = NULL; + $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; + $port = ($urlParts['scheme'] == 'https') ? 443 : 80; + + /** + * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and + * PHP appears to cache it the result if in a long running process (as per Phirehose). + */ + $streamIPs = gethostbynamel($urlParts['host']); + if (empty($streamIPs)) { + throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); + } + + // Choose one randomly (if more than one) + $this->log('Resolved host ' . $urlParts['host'] . ' to ' . implode(', ', $streamIPs)); + $streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))]; + $this->log('Connecting to ' . $streamIP); + + @$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout); + + // No go - handle errors/backoff + if (!$this->conn || !is_resource($this->conn)) { + $this->lastErrorMsg = $errStr; + $this->lastErrorNo = $errNo; + $connectFailures ++; + if ($connectFailures > $this->connectFailuresMax) { + $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle + } + // Increase retry/backoff up to max + $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; + $this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . + $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.','info'); + sleep($tcpRetry); + continue; + } + + // TCP connect OK, clear last error (if present) + $this->log('Connection established to ' . $streamIP); + $this->lastErrorMsg = NULL; + $this->lastErrorNo = NULL; + + // If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment + stream_set_blocking($this->conn, 1); + + // Encode request data + $postData = http_build_query($requestParams); + + // Oauth tokens + $oauthHeader = $this->getOAuthHeader('POST', $url); + + // Do it + fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.1\r\n"); + fwrite($this->conn, "Host: " . $urlParts['host'].':'.$port . "\r\n"); + #fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n"); + #fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n"); + #fwrite($this->conn, "Accept: */*\r\n"); + #fwrite($this->conn, 'Authorization: Basic ' . $authCredentials . "\r\n"); + fwrite($this->conn, $oauthHeader."\r\n"); + #fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n"); + fwrite($this->conn, "\r\n"); + fwrite($this->conn, $postData . "\r\n"); + fwrite($this->conn, "\r\n"); + + $this->log("POST " . $urlParts['path'] . " HTTP/1.1"); + $this->log("Host: " . $urlParts['host'].':'.$port); + #$this->log("Content-type: application/x-www-form-urlencoded"); + #$this->log("Content-length: " . strlen($postData)); + #$this->log("Accept: */*"); + #$this->log('Authorization: Basic ' . $authCredentials); + $this->log($oauthHeader); + #$this->log('User-Agent: ' . self::USER_AGENT); + $this->log(''); + $this->log($postData); + $this->log(''); + + // First line is response + list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); + + // Response buffers + $respHeaders = $respBody = ''; + + // Consume each header response line until we get to body + while ($hLine = trim(fgets($this->conn, 4096))) { + $respHeaders .= $hLine; + } + + // If we got a non-200 response, we need to backoff and retry + if ($httpCode != 200) { + $connectFailures ++; + + // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) + while ($bLine = trim(fgets($this->conn, 4096))) { + $respBody .= $bLine; + } + + // Construct error + $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; + + // Set last error state + $this->lastErrorMsg = $errStr; + $this->lastErrorNo = $httpCode; + + // Have we exceeded maximum failures? + if ($connectFailures > $this->connectFailuresMax) { + $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle + } + // Increase retry/backoff up to max + $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; + $this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . + $errStr . '. Sleeping for ' . $httpRetry . ' seconds.','info'); + sleep($httpRetry); + continue; + + } // End if not http 200 + + // Loop until connected OK + } while (!is_resource($this->conn) || $httpCode != 200); + + // Connected OK, reset connect failures + $connectFailures = 0; + $this->lastErrorMsg = NULL; + $this->lastErrorNo = NULL; + + // Switch to non-blocking to consume the stream (important) + stream_set_blocking($this->conn, 0); + + // Connect always causes the filterChanged status to be cleared + $this->filterChanged = FALSE; + + // Flush stream buffer & (re)assign fdrPool (for reconnect) + $this->fdrPool = array($this->conn); + $this->buff = ''; + + } + + + protected function prepareParameters($method = null, $url = null, $params = null) { + if(empty($method) || empty($url)) + return false; + + $oauth['oauth_consumer_key'] = TWITTER_CONSUMER_KEY; + $oauth['oauth_token'] = $this->username; + $oauth['oauth_nonce'] = md5(uniqid(rand(), true)); + $oauth['oauth_timestamp'] = time(); + $oauth['oauth_signature_method'] = 'HMAC-SHA1'; + if(isset($params['oauth_verifier'])) + { + $oauth['oauth_verifier'] = $params['oauth_verifier']; + unset($params['oauth_verifier']); + } + $oauth['oauth_version'] = '1.0'; + // encode all oauth values + foreach($oauth as $k => $v) + $oauth[$k] = $this->encode_rfc3986($v); + + // encode all non '@' params + // keep sigParams for signature generation (exclude '@' params) + // rename '@key' to 'key' + $sigParams = array(); + $hasFile = false; + if(is_array($params)) + { + foreach($params as $k => $v) + { + if(strncmp('@',$k,1) !== 0) + { + $sigParams[$k] = $this->encode_rfc3986($v); + $params[$k] = $this->encode_rfc3986($v); + } + else + { + $params[substr($k, 1)] = $v; + unset($params[$k]); + $hasFile = true; + } + } + + if($hasFile === true) + $sigParams = array(); + } + + $sigParams = array_merge($oauth, (array)$sigParams); + + // sorting + ksort($sigParams); + + print_r($sigParams); + + // signing + $oauth['oauth_signature'] = $this->encode_rfc3986($this->generateSignature($method, $url, $sigParams)); + return array('request' => $params, 'oauth' => $oauth); + } + + + protected function encode_rfc3986($string) { + return str_replace('+', ' ', str_replace('%7E', '~', rawurlencode(($string)))); + } + + + protected function generateSignature($method = null, $url = null, $params = null) { + if(empty($method) || empty($url)) + return false; + + // concatenating and encode + $concat = ''; + foreach((array)$params as $key => $value) + $concat .= "{$key}={$value}&"; + $concat = substr($concat, 0, -1); + $concatenatedParams = $this->encode_rfc3986($concat); + + // normalize url + $urlParts = parse_url($url); + $scheme = strtolower($urlParts['scheme']); + $host = strtolower($urlParts['host']); + $port = isset($urlParts['port']) ? intval($urlParts['port']) : 0; + $retval = strtolower($scheme) . '://' . strtolower($host); + if(!empty($port) && (($scheme === 'http' && $port != 80) || ($scheme === 'https' && $port != 443))) + $retval .= ":{$port}"; + + $retval .= $urlParts['path']; + if(!empty($urlParts['query'])) + $retval .= "?{$urlParts['query']}"; + + $normalizedUrl = $this->encode_rfc3986($retval); + $method = $this->encode_rfc3986($method); // don't need this but why not? + + $signatureBaseString = "{$method}&{$normalizedUrl}&{$concatenatedParams}"; + var_dump($signatureBaseString); + + # sign the signature string + $key = $this->encode_rfc3986(TWITTER_CONSUMER_SECRET) . '&' . $this->encode_rfc3986($this->password); + return base64_encode(hash_hmac('sha1', $signatureBaseString, $key, true)); + } + + + protected function getOAuthHeader($method, $url) { + $params = $this->prepareParameters($method, $url); + $oauthHeaders = $params['oauth']; + $urlParts = parse_url($url); + $oauth = 'Authorization: OAuth realm="' . $urlParts['scheme'] . '://' . $urlParts['host'] . $urlParts['path'] . '", '; + foreach($oauthHeaders as $name => $value) + { + $oauth .= "{$name}=\"{$value}\", "; + } + $oauth = substr($oauth, 0, -2); + return $oauth; + } + +}