Skip to content

Commit

Permalink
Merge pull request #26 from WGrape/testing
Browse files Browse the repository at this point in the history
style: 添加方法注释和binlog数据处理过程文档等
  • Loading branch information
WGrape authored Dec 30, 2021
2 parents ac62fde + e2061bb commit 2681a2c
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 50 deletions.
14 changes: 12 additions & 2 deletions HOWTOCODE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
-     [1、基于Canal](#11)
-     [2、ES文档更新](#12)
-     [3、完整架构](#13)
- [二、执行原理](#2)
- [二、底层原理](#2)
-     [1、生命周期](#21)
-     [2、命令执行](#22)
-     [3、binlog数据处理过程](#23)
- [三、应用配置](#3)
-     [1、消费配置](#31)
-     [2、数据库配置](#32)
Expand Down Expand Up @@ -43,7 +44,7 @@ ESUpdater提供了从消费Kafka中的数据库增量数据,到ES文档增量

<img src="https://user-images.githubusercontent.com/35942268/147027126-1df83ddf-8698-44dd-a988-5499f7eeb063.png" width="625">

## <span id="2">二、执行原理</span>
## <span id="2">二、底层原理</span>
ESUpdater的核心由```Consumer```进程和```Worker```进程组成,其中根目录下的```/esupdater.php```为入口文件

### <span id="21">1、生命周期</span>
Expand Down Expand Up @@ -79,6 +80,15 @@ ESUpdater的核心由```Consumer```进程和```Worker```进程组成,其中根
#### <span id="223">(3) work</span>
```Consumer```进程使用```php esupdater work```命令启动```Worker```进程时,```Worker```进程会记录下```/runtime/esupdater-worker-{pid}.pid```进程ID文件,只有当结束后才会删除此文件。

### <span id="23">3、binlog数据处理过程</span>
处理过程为```binlog => canalData => urlencode(canalData)```,可以参考文件 [/framework/Canal.php](./framework/Canal.php)

1. Canal将```binlog```数据解析为```json```格式并投递至kafka
2. Consumer进程消费kafka,使用```urlencode```方式编码获取到的消息数据
3. Consumer进程把编码后的消息数据,传递至Worker进程
4. Worker进程再依次拆解数据即可


## <span id="3">三、应用配置</span>

### <span id="31">1、消费配置</span>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ bash ./restart.sh
项目共有如下3个的文档,以便查看了解

- [README](./README.md) :项目本身的文档,快速了解项目
- [HOWTOCODE](./HOWTOCODE.md) :更深的了解项目,包括架构设计、执行原理、应用配置、单元测试等
- [HOWTOCODE](./HOWTOCODE.md) :更深的了解项目,包括架构设计、底层原理、应用配置、单元测试等
- [HELP](./HELP.md) :解决安装和部署过程中问题的帮助手册,包括镜像制作帮助、容器部署帮助等

### <span id="52">2、参与项目</span>
Expand Down
27 changes: 24 additions & 3 deletions app/alpha/user/UserHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,39 @@

class UserHandler
{
/**
* The event callback on database insert.
*
* @param array $parsedCanalData
*
* @return bool
*/
public function onInsert(array $parsedCanalData): bool
{
return (new UserService())->handleInsert($parsedCanalData);
return (new UserService())->doInsert($parsedCanalData);
}

/**
* The event callback on database update.
*
* @param array $parsedCanalData
*
* @return bool
*/
public function onUpdate(array $parsedCanalData): bool
{
return (new UserService())->handleUpdate($parsedCanalData);
return (new UserService())->doUpdate($parsedCanalData);
}

/**
* The event callback on database delete.
*
* @param array $parsedCanalData
*
* @return bool
*/
public function onDelete(array $parsedCanalData): bool
{
return (new UserService())->handleDelete($parsedCanalData);
return (new UserService())->doDelete($parsedCanalData);
}
}
40 changes: 34 additions & 6 deletions app/alpha/user/UserService.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,54 @@

class UserService
{
/**
* Get userid of user.
*
* @param int $userid
*
* @return int
*/
public function getUserId(int $userid): int
{
return ($userid * 100) + 5;
}

public function handleInsert(array $parsedCanalData): bool
/**
* Do insert business things.
*
* @param array $parsedCanalData
*
* @return bool
*/
public function doInsert(array $parsedCanalData): bool
{
Logger::logInfo("result(UserService -> handleInsert) : success");
Logger::logInfo("result(UserService -> doInsert) : success");
return true;
}

public function handleUpdate(array $parsedCanalData): bool
/**
* Do update business things.
*
* @param array $parsedCanalData
*
* @return bool
*/
public function doUpdate(array $parsedCanalData): bool
{
Logger::logInfo("result(UserService -> handleUpdate) : success");
Logger::logInfo("result(UserService -> doUpdate) : success");
return true;
}

public function handleDelete(array $parsedCanalData): bool
/**
* Do delete business things.
*
* @param array $parsedCanalData
*
* @return bool
*/
public function doDelete(array $parsedCanalData): bool
{
Logger::logInfo("result(UserService -> handleDelete) : success");
Logger::logInfo("result(UserService -> doDelete) : success");
return true;
}
}
53 changes: 52 additions & 1 deletion app/common/DBService.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,67 @@

class DBService
{

/**
* The host for accessing database.
*
* @var string
*/
private $host;

/**
* The port for accessing database.
*
* @var string
*/
private $port;

/**
* The account name for accessing database.
*
* @var string
*/
private $username;

/**
* The password for accessing database.
*
* @var string
*/
private $password;

/**
* The database name for accessing database.
*
* @var string
*/
private $database;

/**
* The charset for accessing database.
*
* @var string
*/
private $charset;

/**
* The mysqli instance.
*
* @var \mysqli
*/
private $mysqliObject;

public function __construct($host, $port, $username, $password, $database, $charset)
/**
* Constructs a database service.
*
* @param string $host
* @param string $port
* @param string $username
* @param string $password
* @param string $database
* @param string $charset
*/
public function __construct(string $host, string $port, string $username, string $password, string $database, string $charset)
{
$this->host = $host;
$this->port = $port;
Expand Down
78 changes: 78 additions & 0 deletions app/common/ESService.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,44 @@

class ESService
{
/**
* The host for accessing elasticsearch.
*
* @var string
*/
public $host;

/**
* The port for accessing elasticsearch.
*
* @var string
*/
public $port;

/**
* The user:password for accessing elasticsearch.
*
* @var string
*/
public $userPassword;

/**
* The field of _type for accessing elasticsearch.
*
* @var string
*/
public $documentType;

/**
* The index for accessing elasticsearch.
*
* @var string
*/
public $index;

/**
* The different curl method.
*/
const METHOD_GET = 'GET';
const METHOD_POST = 'POST';
const METHOD_PUT = 'PUT';
Expand All @@ -35,12 +66,32 @@ public function __construct($index)
$this->index = $index;
}

/**
* Is the changed filed need to update elasticsearch.
*
* @param array $changedFieldList
*
* @param array $needToUpdateFieldList
*
* @return bool
*/
public function isNeedToUpdate(array $changedFieldList, array $needToUpdateFieldList): bool
{
$result = array_intersect($changedFieldList, $needToUpdateFieldList);
return !empty($result);
}

/**
* Update a document.
*
* @param $documentId
*
* @param $updateList
*
* @param false $upsert
*
* @return bool
*/
public function updateDoc($documentId, $updateList, $upsert = false): bool
{
$url = "{$this->host}:{$this->port}/{$this->index}/{$this->documentType}/{$documentId}/_update";
Expand All @@ -61,6 +112,15 @@ public function updateDoc($documentId, $updateList, $upsert = false): bool
return $this->curlRequest($url, self::METHOD_POST, $data);
}

/**
* Put a document.
*
* @param $documentId
*
* @param $document
*
* @return bool
*/
public function putDoc($documentId, $document): bool
{
$url = "{$this->host}:{$this->port}/{$this->index}/{$this->documentType}/{$documentId}";
Expand All @@ -75,11 +135,29 @@ public function putDoc($documentId, $document): bool
return $this->curlRequest($url, self::METHOD_PUT, $data);
}

/**
* Is success or not.
*
* @param int $httpCode
*
* @return bool
*/
public function isSuccess(int $httpCode): bool
{
return intval($httpCode) >= 200 && intval($httpCode) < 300;
}

/**
* Request the elasticsearch by http.
*
* @param string $url
*
* @param string $method
*
* @param string $data
*
* @return bool
*/
public function curlRequest(string $url, string $method, string $data): bool
{
$ch = curl_init();
Expand Down
Loading

0 comments on commit 2681a2c

Please sign in to comment.