Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
bain2018 committed Apr 28, 2022
0 parents commit a59fede
Show file tree
Hide file tree
Showing 47 changed files with 5,442 additions and 0 deletions.
Binary file added .DS_Store
Binary file not shown.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
composer.lock
.idea/
test/
vendor/
.php_cs.cache
113 changes: 113 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# canal-php

## 一.canal-php 简介

canal-php 是阿里巴巴开源项目 [Canal](https://github.com/alibaba/canal)是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 php 客户端。为 php 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。

基于日志增量订阅&消费支持的业务:

1. 数据库镜像
2. 数据库实时备份
3. 多级索引 (卖家和买家各自分库索引)
4. search build
5. 业务cache刷新
6. 价格变化等重要业务消息

关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki

## 二.应用场景

canal-php 作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:

1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis

4.数据库异地备份、数据同步

5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

## 三.工作原理

canal-php 是 Canal 的 php 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。

## 四.工作流程

1.Canal连接到mysql数据库,模拟slave

2.canal-php 与 Canal 建立连接

3.数据库发生变更写入到binlog

4.Canal向数据库发送dump请求,获取binlog并解析

5.canal-php 向 Canal 请求数据库变更

6.Canal 发送解析后的数据给canal-php

7.canal-php收到数据,消费成功,发送回执。(可选)

8.Canal记录消费位置。

![架构图](assets/architecture.png)

## 五.快速启动

### 安装Canal

Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart


### 构建canal php客户端

````shell
$ composer require xingwenge/canal_php

or

$ git clone https://github.com/xingwenge/canal-php.git
$ cd canal-php
$ composer update
````

### 建立与Canal的连接
````php
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

$client->connect("127.0.0.1", 11111);
$client->checkValid();
$client->subscribe("1001", "example", ".*\\..*");
# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤

while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
Fmt::println($entry);
}
}
sleep(1);
}

$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
````

![运行效果图](assets/effect.gif)

更多详情请查看 [Sample](https://github.com/xingwenge/canal-php/blob/master/src/sample/client.php)

### 性能
本地开发环境

消费速度:sql insert 10000 事件,32秒消耗完成。消费速度 312.5 条/s。

内存使用:4 M。
Binary file added assets/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/chat.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/effect.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "bain2018/canal",
"description": "connect canal server via php client. fixed bugs",
"type": "library",
"license": "MIT",
"authors": [
{
"name": "xingwenge",
"email": "[email protected]"
}
],
"minimum-stability": "dev",
"require": {
"google/protobuf": "^3.20",
"php": ">=8.0",
"clue/socket-raw": "^1.6"
},
"autoload": {
"psr-4": {
"Com\\Alibaba\\Otter\\Canal\\Protocol\\": "src/protocol/Com/Alibaba/Otter/Canal/Protocol/",
"GPBMetadata\\": "src/protocol/GPBMetadata/",
"xingwenge\\canal_php\\": "src/"
}
}
}
9 changes: 9 additions & 0 deletions src/CanalClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php
namespace xingwenge\canal_php;

class CanalClient
{
const TYPE_SOCKET = 1;
const TYPE_SWOOLE = 2;
const TYPE_SOCKET_CLUE = 3;
}
31 changes: 31 additions & 0 deletions src/CanalConnectorFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php
namespace xingwenge\canal_php;

use xingwenge\canal_php\adapter\CanalConnectorBase;

class CanalConnectorFactory
{
private function __construct()
{

}

/**
* @param $clientType
* @return CanalConnectorBase
* @throws \Exception
*/
public static function createClient($clientType)
{
switch($clientType){
case CanalClient::TYPE_SOCKET:
return new \xingwenge\canal_php\adapter\socket\CanalConnector();
case CanalClient::TYPE_SWOOLE:
return new \xingwenge\canal_php\adapter\swoole\CanalConnector();
case CanalClient::TYPE_SOCKET_CLUE:
return new \xingwenge\canal_php\adapter\clue\CanalConnector();
default:
throw new \Exception("Unknown client type");
}
}
}
59 changes: 59 additions & 0 deletions src/Fmt.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php
namespace xingwenge\canal_php;

use Com\Alibaba\Otter\Canal\Protocol\Column;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;

class Fmt
{
/**
* @param Entry $entry
* @throws \Exception
*/
public static function println($entry)
{
switch ($entry->getEntryType()) {
case EntryType::TRANSACTIONBEGIN:
case EntryType::TRANSACTIONEND:
return;
break;
}

$rowChange = new RowChange();
$rowChange->mergeFromString($entry->getStoreValue());
$evenType = $rowChange->getEventType();
$header = $entry->getHeader();

echo sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", $header->getLogfileName(), $header->getLogfileOffset(), $header->getSchemaName(), $header->getTableName(), $header->getEventType()), PHP_EOL;
echo $rowChange->getSql(), PHP_EOL;

/** @var RowData $rowData */
foreach ($rowChange->getRowDatas() as $rowData) {
switch ($evenType) {
case EventType::DELETE:
self::ptColumn($rowData->getBeforeColumns());
break;
case EventType::INSERT:
self::ptColumn($rowData->getAfterColumns());
break;
default:
echo '-------> before', PHP_EOL;
self::ptColumn($rowData->getBeforeColumns());
echo '-------> after', PHP_EOL;
self::ptColumn($rowData->getAfterColumns());
break;
}
}
}

private static function ptColumn($columns) {
/** @var Column $column */
foreach ($columns as $column) {
echo sprintf("%s : %s update= %s", $column->getName(), $column->getValue(), var_export($column->getUpdated(), true)), PHP_EOL;
}
}
}
15 changes: 15 additions & 0 deletions src/ICanalConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php
namespace xingwenge\canal_php;

interface ICanalConnector
{
public function connect();
public function disConnect();
public function checkValid();
public function subscribe();
public function unSubscribe();
public function get();
public function getWithoutAck();
public function ack();
public function rollback();
}
44 changes: 44 additions & 0 deletions src/Message.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php
namespace xingwenge\canal_php;

use Com\Alibaba\Otter\Canal\Protocol\Entry;

class Message
{
/** @var int */
private $id;
/** @var array */
private $entries;

/**
* @return int
*/
public function getId()
{
return $this->id;
}

/**
* @param int $id
*/
public function setId( $id )
{
$this->id = $id;
}

/**
* @return array
*/
public function getEntries()
{
return $this->entries;
}

/**
* @param Entry $entry
*/
public function addEntries( $entry )
{
$this->entries[] = $entry;
}
}
Loading

0 comments on commit a59fede

Please sign in to comment.