基于docker的canal-server部署作业_chenzuancong9642的博客-程序员秘密

技术标签: 运维  大数据  

基于docker的canal-server部署作业

1. 目标

启动一个canal-server实例监听目标mysql的操作

2. 准备工作

  • Ubuntu下安装Docker,并配置国内镜像
  • 部署一个mysql实例
    $ docker run -p 3306:3306 --name mysql-master -e MYSQL_ROOT_PASSWORD=123456 -d mysql
    
  • 配置mysql (参见canal Quickstart)
    • 启用binlog
    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW #选择row模式
    server_id=9527 #配置mysql replaction需要定义,不能和canal的slaveId重复
    
    • 添加slave权限
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
    canal Quickstart文档中,并没有要求添加SHOW VIEW权限,但是本作业在执行中会产生如下异常:
    Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, message=SHOW VIEW command denied to user 'canal'@'172.17.0.1' for table 'host_summary', sqlState=42000, sqlStateMarker=#]
    with command: show create table `sys`.`host_summary`; ...
    
    因为canal-server在启动时,会去dump表结构并缓存。
    /*com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta*/
    
    /**
     * 初始化的时候dump一下表结构
     */
    private boolean dumpTableMeta(MysqlConnection connection, final CanalEventFilter filter) {
        try {
            ResultSetPacket packet = connection.query("show databases");
            List<String> schemas = new ArrayList<String>();
            for (String schema : packet.getFieldValues()) {
                schemas.add(schema);
            }
    
            for (String schema : schemas) {
                packet = connection.query("show tables from `" + schema + "`");
                List<String> tables = new ArrayList<String>();
                for (String table : packet.getFieldValues()) {
                    String fullName = schema + "." + table;
                    if (blackFilter == null || !blackFilter.filter(fullName)) {
                        if (filter == null || filter.filter(fullName)) {
                            tables.add(table);
                        }
                    }
                }
    
                if (tables.isEmpty()) {
                    continue;
                }
    
                StringBuilder sql = new StringBuilder();
                for (String table : tables) {
                    sql.append("show create table `" + schema + "`.`" + table + "`;");
                }
    
                List<ResultSetPacket> packets = connection.queryMulti(sql.toString());
                for (ResultSetPacket onePacket : packets) {
                    if (onePacket.getFieldValues().size() > 1) {
                        String oneTableCreateSql = onePacket.getFieldValues().get(1);
                        memoryTableMeta.apply(INIT_POSITION, schema, oneTableCreateSql, null);
                    }
                }
            }
    
            return true;
        } catch (IOException e) {
            throw new CanalParseException(e);
        }
    }
    
    所以首次启动成功后,再将 SHOW VIEW 权限回收并不会影响正常作业(表结果不变的情况下)。这也为我在作业中重现异常带来了一点小麻烦。为了快速跑通QuickStart,遇到问题后我第一时间给canal账号赋予了所有权限:
    GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
    然后重启canal-server,作业正常。然后回收权限,但异常并不能重现,但canal-server工作正常。
    REVOKE ALL PRIVILEGES ON *.* FROM 'canal'@'%' ;
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    

3. 部署canal-server

官网文档 Canal Docker QuickStart。 文档中使用canal提供的run.sh启动,脚本的工作就是为执行docker run准备参数,在本作业的目标是启动一个canal-server实例,大多参数都使用canal默认配置就可以,所以为了简单,直接使用docker命令进行启动。配置相应的master地址和用户名密码,映射必要的监听端口就足够了。详细配置可参考Canal AdminGuide

docker run --name canal-server \
-e canal.instance.master.address=192.168.83.128:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-p 11111:11111 \
-d canal/canal-server:v1.1.0

canal-server:v1.1.0有一个小bug,如果canal.properties中没有配置canal.instance.parser.parallelThreadSize(默认情况,该配置项是被注释掉的):

#canal.instance.parser.parallelThreadSize = 16

那么,MysqlMultiStageCoprocessor的parserThreadCount会被设为0,从而在启动线程池的时候会抛IllegalArgumentException异常。

/*com.alibaba.otter.canal.parse.inbound.AbstractEventParser*/
protected Integer parallelThreadSize = Runtime.getRuntime().availableProcessors() * 60 / 100;     //60%的能力跑解析,剩余部分处理网络

public void setParallelThreadSize(Integer parallelThreadSize) {
    if (parallelThreadSize != null) {
        this.parallelThreadSize = parallelThreadSize;
    }
}

protected MultiStageCoprocessor buildMultiStageCoprocessor() {
    MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
        parallelThreadSize,
        (LogEventConvert) binlogParser,
        transactionBuffer,
        destination);
    mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime);
    return mysqlMultiStageCoprocessor;
}

在canal的master分支上,该bug已经修复,保证线程池至少有一个线程:

Ensure at least 1 thread for thread pool.

/*com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor*/
int tc = parserThreadCount > 0 ? parserThreadCount : 1;
this.parserExecutor = Executors.newFixedThreadPool(tc,
    new NamedThreadFactory("MultiStageCoprocessor-Parser-" + destination));

本作业是在虚拟机上进行,就分配了一个cup,因此虽然canal日志中显示已经启动:

# cat /home/admin/canal-server/logs/canal/canal.log
2018-08-31 19:36:18.970 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

但实际上example实例并不能工作。

# cat /home/admin/canal-server/logs/example/example.log
2018-08-31 19:36:37.345 [destination = example , address = /192.168.83.128:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /192.168.83.128:3306 has an error, retrying. caused by
java.lang.IllegalArgumentException: null
        at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1314) ~[na:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1237) ~[na:1.8.0_181]
        at java.util.concurrent.Executors.newFixedThreadPool(Executors.java:151) ~[na:1.8.0_181]
        at com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor.start(MysqlMultiStageCoprocessor.java:84) ~[canal.parse-1.1.0.jar:na]
        at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:238) ~[canal.parse-1.1.0.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]

解决办法是更改虚拟机配置,设置成2个或以上cpu就能解决问题。通过对问题的分析,对于单cpu的物理机,无法通过修改cpu数量的情况,可以通过配置文件解决。

canal.instance.parser.parallelThreadSize = 1

4. 查看运行效果

对mysql进行update操作,就能看到日下日志:

# cat /home/admin/canal-server/logs/example/meta.log
2018-08-31 20:02:05.289 - clientId:1001 cursor:[mysql-bin.000007,3802,1535716924000,1002,] address[192.168.83.128/192.168.83.128:3306]

也可以运行ClientExample查看效果。

至此,作业完毕。

转载于:https://my.oschina.net/amhuman/blog/1941540

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/chenzuancong9642/article/details/101040754

智能推荐

[转载] 程序员喝酒喝出的计算机文化_weixin_30872867的博客-程序员秘密

大家喝的是啤酒。这时你入座了。 你给自己倒了杯可乐,这叫低配置。 你给自已倒了杯啤酒,这叫标准配置。 你给自己倒了杯茶水,这茶的颜色还跟啤酒一样,这叫木马。 你给自己倒了杯可乐,还滴了几滴醋,不仅颜色跟啤酒一样,而且不冒热气还有泡泡,这叫超级木马。 你的同事给你倒了杯白酒,这叫推荐配置。 人到齐了,酒席开始了。 你先一个人喝了一小口,这叫单元测试。 你跟旁边的人说哥们咱们随意,这叫交叉测试。 但是...

找到你的工作意义_banbi5215的博客-程序员秘密

题目: 找到你的工作意义经文: 弗6:5–9节一 目标立足于基督二 心志立足于服侍三 眼光立足于将来讨论:人们为什么要工作? A为了实现自我内心的价值 B为了改变人们的行为和思想 C为了养家糊口 D为了回报这个社会 E 为了让我的人生变得有趣 F ...

python图形用户界面学习-按钮事件改变文本内容_python按钮点一下变文本_程序猿沙弥的博客-程序员秘密

目录开发工具代码运行效果总结开发工具python版本: python-3.8.1-amd64python开发工具: JetBrains PyCharm 2018.3.6 x64python图形用户界面开发库: wxPython安装wxPython代码(这里指定了阿里云镜像安装源下载安装会更快)pip install wxPython -i http://mirrors.aliyun.com/pypi/simple/代码# coding = utf-8import wx# 自

对matlab中gradient函数的理解与C++中的应用_方圆以外0的博客-程序员秘密

在将matlab代码改写为C++时碰到了gradient函数,大概看了下matlab中gradient函数的代码实现,可分为两个部分:1、[f,ndim,loc,rflag] = parse_inputs(f,v):分析输入数据的情况,比如确定输入数据的维数等。2、varargout = gradient(f,varargin):这是核心部分。以二维矩阵作为输入对象,精简后,思路如下:

Oracle实验_笑脸呀的博客-程序员秘密

操作表数据//插入记录的语法格式:insert into [方案名].表名[列名] values (值)学号姓名专业名性别出生时间总学分备注061102王平计算机女1986-09-0238NULL061103王燕计算机女1985-10-0640NULL061104韦严平计算机男1986-08-2640NULL...

随便推点

条件编译#ifdef MACRO_A和#if defined(MACRO_A)的区别_macro ifdef_David_xtd的博客-程序员秘密

在查阅linux内核源码的过程中,发现存在两种不同类型的条件编译语句:#ifdef MACRO_A和#if defined(MACRO_A)或#if !defined(MACRO_A)#ifdef和#ifdefined()之间的区别实际上,两者并无本质的差别,但后者的应用范围更广,能支持多个预编译变量的检查。#if defined(MACRO_A) && !defined(

MySQL绘制POI的实体图_poi 画图工具类_夜看满天繁星的博客-程序员秘密

package com.kehua.framework.utils;import java.io.FileOutputStream;import org.apache.poi.ss.usermodel.Cell;import org.apache.poi.ss.usermodel.Row;import org.apache.poi.ss.util.CellRangeAddress;import o...

unity3d远程加载资源模型到本地并加载(一)打包资源_阿帅_的博客-程序员秘密

由于模型放在远程服务器,fbx格式是不能加载的所以可以做成预设或是AssetBundle格式进行远程加载。首先打包AssetBundle资源!在unity资源文件夹下新建一下一个Editor文件夹 下方一个脚本代码如下!(代码可复制!!!) using UnityEngine; using System.Collections; using UnityEditor;public class E

php经常用到的数据过滤的方法_weixin_30916125的博客-程序员秘密

&lt;?php/** * global.func.php 公共函数库 *//** * 返回经addslashes处理过的字符串或数组 * @param $string 需要处理的字符串或数组 * @return mixed */function new_addslashes($string){ if(!is_array($string)) ...

1号店6个CSS样式文件(全6页)_七号男技师的博客-程序员秘密

公用样式文件/*公共样式模块*//*引入字体图标*/@font-face { font-family: 'icomoon'; src: url('../fonts/icomoon.eot?qicgwf'); src: url('../fonts/icomoon.eot?qicgwf#iefix') format('embedded-opentype'), url('../fonts/icomoon.ttf?qicgwf') format('truetype'), url('../

shell脚本编写-基础练习_编写一个脚本eq.sh,输入两个非零整数x和y,判断两个数是否相等,相等输出yes,不相等_2.wa的博客-程序员秘密

#! /bin/shecho "please input x y";read x y; #连续输入一组参数 以空格隔开z=`expr $x+$y`;echo "The sum is $z"

推荐文章

热门文章

相关标签