Java实现Mysql增量同步


最近公司有个基于Mysql做增量数据同步的需求需要我要完成。源端是两个不同业务系统数据库的两张表,需要把这两张表的数据字段做一些过滤和处理,然后增量同步到本地服务的数据库中。由于数据量不大,源端两个表都是几十万的数据,因此当时首先想到的就是通过java定时读取表数据的方式做数据同步,但这种方式太过呆板,而且延迟也比较大。所以目前的思路想通过mysql binlog日志,使mysql中的数据变化实时同步到本地数据库,根据业务需求我只需要监听数据的insert,update,delete这几种event。监听mysql binlog工具有很多,目前大多数使用

canal使用集群方式部署,系统比较臃肿,复杂性也相对较高,可定制性也比较差,所以我没有考虑这种方案,采用了mysql-binlog-connector-java

工作原理

将mysql的日志格式设置为row模式,使用mysql-binlog-connector对日志进行读取,将读取到的mysql操作事件进行解析封装,最终封装为事件对象,进而对mysql的日志进行处理。但区别在于canal是模拟mysql slave端,主动从master端拉取日志数据。而mysql-binlog-connector只是一个解析库,它有两种模式:BinaryLogFileReader日志读取模式,和BinaryLogClient客户端访问模式。但似乎BinaryLogFileReader日志读取模式更适合于增量同步,它是可中断可指定position读取的的模式。下面是BinaryLogFileReader日志读取模式的示例代码

import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

import java.io.File;
import java.io.IOException;

public class Test {
    public static void main(String[] args) throws IOException {
        String filePath="D:\\DATA\\mysql-bin.000987";
        File binlogFile = new File(filePath);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setChecksumType(ChecksumType.CRC32);
        BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);

        for (Event event; (event = reader.readEvent()) != null; ) {
            System.out.println(event.toString());
        }
        reader.close();
    }
}

上述代码,实现了针对某个sql日志文件进行读取,解析mysql事件,并封装为Event的功能。这样只能读取本地的Mysql日志文件,也就是说用这种方式,那么同步服务只能和源端数据库在同一台机器上,这显然不是我想要的方式,下面是主要说BinaryLogFileReader日志读取模式,它可监听远程的Mysql日志文件变化。

使用步骤

1.Maven配置

        <dependency>
            <groupId>com.github.shyiko</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.18.1</version>
        </dependency>

2.Mysql源端数据库配置

      server-id=1
      log-bin=mysql-bin
      binlog-format=ROW
      expire_logs_days = 10
      max_binlog_size  = 100M

在Mysql的my.cnf配置文件中需增加以上几个属性(windows环境为my.ini文件),然后重启数据库。

3.Java代码

注意如果要监听多个源端,需要开启多个线程。因为这个开源组件中,监听源端日志文件的代码是阻塞的。所以如果在单线程中监听多个源端,那么只会有第一个数据源能监听到。由于我需要监听两个源端,因此开启了两个线程去获取源端的bing-log。下面是测试类

public class Test {

    public static void main(String[] args){
        Thread thread1=new Thread(new ThreadTesta());
        Thread thread2=new Thread(new ThreadTestb());
        thread1.start();
        thread2.start();
    }
}

源端A对应的线程代码

public class ThreadTesta implements Runnable{
    @Override
    public void run() {
        testRemote();
    }

    private static void testRemote(){
        final BinaryLogClient client = new BinaryLogClient("118.25.36.79", 3306, "root", "root");
        // 指定从何处开始增量
        client.setBinlogFilename("mysql-bin.000003");
        client.setBinlogPosition(154);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(event -> {
            EventData data = event.getData();
            event.getHeader();
            if (data instanceof UpdateRowsEventData) {
                System.out.println("-----------Update--------------");
                List<Map.Entry<Serializable[], Serializable[]>> list = ((UpdateRowsEventData) data).getRows();
                updateFormat(list);
                // System.out.println(data.toString());
            } else if (data instanceof WriteRowsEventData) {
                System.out.println("----------- Insert---------------");
                List<Serializable[]> list = ((WriteRowsEventData) data).getRows();
                deleteOrInsertFormat(list);
                // System.out.println(data.toString());
            } else if (data instanceof DeleteRowsEventData) {
                System.out.println("-------------Delete--------------");
                List<Serializable[]> list = ((DeleteRowsEventData) data).getRows();
                deleteOrInsertFormat(list);
                //System.out.println(data.toString());
            }if(data instanceof TableMapEventData){
                TableMapEventData tableMapEventData = (TableMapEventData) data;
                String dataBase = tableMapEventData.getDatabase();
                String tableName = tableMapEventData.getTable();
                System.out.println("------dataBase:"+dataBase+"-----tableName:"+tableName+"-------");
                //tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getTable());
            }
        });
        try {
            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void deleteOrInsertFormat(List<Serializable[]> list){
        Iterator var2 = list.iterator();
        while (var2.hasNext()){
            System.out.println("-----row-data-----");
            Object[] objects = (Object[])var2.next();
            List<String> beforeList = new ArrayList<>();
            for(Object obj : objects){
                if(obj instanceof Integer){
                    beforeList.add(obj+"");
                }else if(obj instanceof String){
                    beforeList.add(obj+"");
                }else if(obj instanceof byte[]){
                    beforeList.add(new String((byte[]) obj));
                }else {
                    beforeList.add(obj+"");
                }
            }
            System.out.println("insert-or-delete:"+JSON.toJSONString(beforeList));
        }
    }

    private static void updateFormat(List<Map.Entry<Serializable[], Serializable[]>> list){
        Iterator var2 = list.iterator();
        while(var2.hasNext()) {
            System.out.println("-----rows-----");
            Map.Entry row = (Map.Entry)var2.next();
            Object[] beforeData = (Object[])row.getKey();
            List<String> beforeList = new ArrayList<>();
            for(Object obj : beforeData){
                if(obj instanceof Integer){
                    beforeList.add(obj+"");
                }else if(obj instanceof Double){
                    beforeList.add(obj+"");
                }else if(obj instanceof Long){
                    beforeList.add(obj+"");
                }else if(obj instanceof byte[]){
                    beforeList.add(new String((byte[]) obj));
                }else {
                    beforeList.add(obj+"");
                }
            }
            System.out.println(JSON.toJSONString("before-data:"+beforeList));
            Object[] afterData =(Object[])row.getValue();
            List<String> afterList = new ArrayList<>();
            for(Object obj : afterData){
                if(obj instanceof Integer){
                    afterList.add(obj+"");
                }else if(obj instanceof Double){
                    beforeList.add(obj+"");
                }else if(obj instanceof Long){
                    beforeList.add(obj+"");
                }else if(obj instanceof byte[]){
                    afterList.add(new String((byte[]) obj));
                }else {
                    afterList.add(obj+"");
                }
            }
            System.out.println(JSON.toJSONString("after-data:"+afterList));
        }
    }
}

源端B对应的线程代码

public class ThreadTestb implements Runnable {
    @Override
    public void run() {
        testLocal();
    }

    private static void testLocal(){
        final BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "root", "root");
        // 指定从何处开始增量
        client.setBinlogFilename("mysql-bin.000001");
        client.setBinlogPosition(3754);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(event -> {
            EventData data = event.getData();

            if (data instanceof UpdateRowsEventData) {
                System.out.println("-----------Update--------------");
                List<Map.Entry<Serializable[], Serializable[]>> list = ((UpdateRowsEventData) data).getRows();
                updateFormat(list);
                // System.out.println(data.toString());
            } else if (data instanceof WriteRowsEventData) {
                System.out.println("----------- Insert---------------");
                List<Serializable[]> list = ((WriteRowsEventData) data).getRows();
                deleteOrInsertFormat(list);
                // System.out.println(data.toString());
            } else if (data instanceof DeleteRowsEventData) {
                System.out.println("-------------Delete--------------");
                List<Serializable[]> list = ((DeleteRowsEventData) data).getRows();
                deleteOrInsertFormat(list);
                //System.out.println(data.toString());
            }if(data instanceof TableMapEventData){
                TableMapEventData tableMapEventData = (TableMapEventData) data;
                String dataBase = tableMapEventData.getDatabase();
                String tableName = tableMapEventData.getTable();
                System.out.println("------dataBase:"+dataBase+"-----tableName:"+tableName+"-------");
                //tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getTable());
            }
        });
        try {
            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void deleteOrInsertFormat(List<Serializable[]> list){
        Iterator var2 = list.iterator();
        while (var2.hasNext()){
            System.out.println("-----row-data-----");
            Object[] objects = (Object[])var2.next();
            List<String> beforeList = new ArrayList<>();
            for(Object obj : objects){
                if(obj instanceof Integer){
                    beforeList.add(obj+"");
                }else if(obj instanceof String){
                    beforeList.add(obj+"");
                }else if(obj instanceof byte[]){
                    beforeList.add(new String((byte[]) obj));
                }else {
                    beforeList.add(obj+"");
                }
            }
            System.out.println("insert-or-delete:"+JSON.toJSONString(beforeList));
        }
    }

    private static void updateFormat(List<Map.Entry<Serializable[], Serializable[]>> list){
        Iterator var2 = list.iterator();
        while(var2.hasNext()) {
            System.out.println("-----rows-----");
            Map.Entry row = (Map.Entry)var2.next();
            Object[] beforeData = (Object[])row.getKey();
            List<String> beforeList = new ArrayList<>();
            for(Object obj : beforeData){
                if(obj instanceof Integer){
                    beforeList.add(obj+"");
                }else if(obj instanceof Double){
                    beforeList.add(obj+"");
                }else if(obj instanceof Long){
                    beforeList.add(obj+"");
                }else if(obj instanceof byte[]){
                    beforeList.add(new String((byte[]) obj));
                }else {
                    beforeList.add(obj+"");
                }
            }
            System.out.println(JSON.toJSONString("before-data:"+beforeList));
            Object[] afterData =(Object[])row.getValue();
            List<String> afterList = new ArrayList<>();
            for(Object obj : afterData){
                if(obj instanceof Integer){
                    afterList.add(obj+"");
                }else if(obj instanceof Double){
                    beforeList.add(obj+"");
                }else if(obj instanceof Long){
                    beforeList.add(obj+"");
                }else if(obj instanceof byte[]){
                    afterList.add(new String((byte[]) obj));
                }else {
                    afterList.add(obj+"");
                }
            }
            System.out.println(JSON.toJSONString("after-data:"+afterList));
        }
    }
}

这里需要注意的是,源端数据库的账户。如果使用非root用户需要给相应的权限。此外在获取日志文件中的数据中,可以指定从哪个文件哪个位置开始监听。以防止数据丢失,可通过以下sql来查询当前数据库正处于的位置

show master status;

-- 结果
-- +------------------+----------+--------------+------------------+-------------------+
-- | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
-- +------------------+----------+--------------+------------------+-------------------+
-- | mysql-bin.000005 | 154      |              |                  |                   |
-- +------------------+----------+--------------+------------------+-------------------+

如果在代码运行中,不确定源端是否开启了bin-log,可通过下面这个sql来查询

show variables like '%log_bin%';

-- 结果
-- +---------------------------------+-----------------------------------------------------------+
-- | Variable_name                   | Value                                                     |
-- +---------------------------------+-----------------------------------------------------------+
-- | log_bin                         | ON                                                        |
-- +---------------------------------+-----------------------------------------------------------+
-- | log_bin_basename                | C:\ProgramData\MySQL\MySQL Server 5.7\Data\mysql-bin      |
-- +---------------------------------+-----------------------------------------------------------+
-- | log_bin_index                   | C:\ProgramData\MySQL\MySQL Server 5.7\Data\mysql-bin.index|
-- +---------------------------------+-----------------------------------------------------------+
-- | log_bin_trust_function_creators | OFF                                                       |
-- +---------------------------------+-----------------------------------------------------------+
-- | log_bin_use_v1_row_events         | OFF                                                       |
-- +---------------------------------+-----------------------------------------------------------+
-- | sql_log_bin                     | ON                                                        |
-- +---------------------------------+-----------------------------------------------------------+

结果中log-bin的值为ON表示,Mysql数据库已经开启了bin-log。关于Myql的日志模式的理解。以上就是全部代码了,唯一有点不方便的地方就是该组件可以获取到表变更的数据但不能获取到表的字段名,字段名只能通过另外的方式获取,可登陆源端数据库,通过下面这个sql获取。

select COLUMN_NAME from information_schema.COLUMNS where table_name = 'dev_device_camera'; 

Mysql的三种日志

MySQL binlog日志有三种格式,分别为Statement,MiXED和ROW。下面总结了各自的优缺点

1.Statement:每一条会修改数据的sql都会记录在binlog中。

优点 缺点
出现最早,兼容较好,binlog文件较小 存在安全隐患,可能导致主从不一致
日志是包含用户执行的原始SQL,方便统计和审计 对一些系统函数不能准确复制或是不能复制

2.ROW:不记录sql语句上下文相关信息,仅保存哪条记录被修改。

优点 缺点
相比statement更加安全的复制格式 binlog比较大(myql5.6支持binlog_row_image)
在某些情况下复制速度更快(SQL复杂,表有主键) 单语句更新(删除)表的行数过多,会形成大量binlog
系统的特殊函数也可以复制 无法从binlog看见用户执行SQL

3.Mixed: 是以上两种level的混合使用,一般的语句修改使用statment格式保存binlog,如一些函数,statement无法完成主从复制的操作,则采用row格式保存binlog,MySQL会根据执行的每一条具体的sql语句来区分对待记录的日志形式,也就是在Statement和Row之间选择一种.新版本的MySQL中队row level模式也被做了优化,并不是所有的修改都会以row level来记录,像遇到表结构变更的时候就会以statement模式来记录。至于update或者delete等修改数据的语句,还是会记录所有行的变更。


Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Previous
深入Java线程池实现源码 深入Java线程池实现源码
Java线程池是使用频率很高的开源框架。也是在面试中常被问到的组件。它的实现源码在J.U.C包下,本人也经常使用线程池,简单方便。大多是浮于表面的一些API的调用,对于框架实现中具体做了哪些事情,却是知之甚少。本文将从源码角度,深入了聊一聊
2019-06-11
Next 
面试总结 面试总结
又到一年金三银四的黄金跳槽季,相信很多人都在这期间蠢蠢欲动,不是正在离职,就是在纠结要不要跳槽。各大公司呢,也开始招兵买马为今年公司的发展储备人才。2019年的金三银四可以说比较特殊了,人们都说,互联网寒冬来了。各大互联网公司裁员,在互联网
2019-05-15
  TOC