有人知道一个快速简单的方法来迁移SQLite3数据库到MySQL吗?


当前回答

最简单的方法可能是使用sqlite .dump命令,在本例中创建示例数据库的转储。

sqlite3 sample.db .dump > dump.sql

然后(理论上)您可以使用root用户将其导入到mysql数据库中,在本例中是数据库服务器127.0.0.1上的测试数据库。

mysql -p -u root -h 127.0.0.1 test < dump.sql

我说在理论上,因为语法之间有一些差异。

在sqlite事务开始

BEGIN TRANSACTION;
...
COMMIT;

MySQL只使用

BEGIN;
...
COMMIT;

还有其他类似的问题(varchars和双引号会回到脑海中),但没有查找和替换不能解决的问题。

也许你应该问问为什么要迁移,如果性能/数据库大小是问题,也许应该考虑重新规划模式,如果系统正在迁移到更强大的产品,这可能是计划数据未来的理想时机。

其他回答

下面是一个python脚本,基于Shalmanese的回答和Alex martelli在翻译Perl到python中的一些帮助

我让它成为社区维基,所以请随时编辑和重构,只要它不破坏功能(谢天谢地,我们可以回滚)-它很丑,但工作

像这样使用(假设脚本名为dump_for_mysql.py:

sqlite3 sample.db .dump | python dump_for_mysql.py > dump.sql

然后你可以导入到mysql

注意-你需要手动添加外键约束,因为sqlite实际上不支持它们

以下是剧本:

#!/usr/bin/env python

import re
import fileinput

def this_line_is_useless(line):
    useless_es = [
        'BEGIN TRANSACTION',
        'COMMIT',
        'sqlite_sequence',
        'CREATE UNIQUE INDEX',
        'PRAGMA foreign_keys=OFF',
    ]
    for useless in useless_es:
        if re.search(useless, line):
            return True

def has_primary_key(line):
    return bool(re.search(r'PRIMARY KEY', line))

searching_for_end = False
for line in fileinput.input():
    if this_line_is_useless(line):
        continue

    # this line was necessary because '');
    # would be converted to \'); which isn't appropriate
    if re.match(r".*, ''\);", line):
        line = re.sub(r"''\);", r'``);', line)

    if re.match(r'^CREATE TABLE.*', line):
        searching_for_end = True

    m = re.search('CREATE TABLE "?(\w*)"?(.*)', line)
    if m:
        name, sub = m.groups()
        line = "DROP TABLE IF EXISTS %(name)s;\nCREATE TABLE IF NOT EXISTS `%(name)s`%(sub)s\n"
        line = line % dict(name=name, sub=sub)
    else:
        m = re.search('INSERT INTO "(\w*)"(.*)', line)
        if m:
            line = 'INSERT INTO %s%s\n' % m.groups()
            line = line.replace('"', r'\"')
            line = line.replace('"', "'")
    line = re.sub(r"([^'])'t'(.)", "\1THIS_IS_TRUE\2", line)
    line = line.replace('THIS_IS_TRUE', '1')
    line = re.sub(r"([^'])'f'(.)", "\1THIS_IS_FALSE\2", line)
    line = line.replace('THIS_IS_FALSE', '0')

    # Add auto_increment if it is not there since sqlite auto_increments ALL
    # primary keys
    if searching_for_end:
        if re.search(r"integer(?:\s+\w+)*\s*PRIMARY KEY(?:\s+\w+)*\s*,", line):
            line = line.replace("PRIMARY KEY", "PRIMARY KEY AUTO_INCREMENT")
        # replace " and ' with ` because mysql doesn't like quotes in CREATE commands 
        if line.find('DEFAULT') == -1:
            line = line.replace(r'"', r'`').replace(r"'", r'`')
        else:
            parts = line.split('DEFAULT')
            parts[0] = parts[0].replace(r'"', r'`').replace(r"'", r'`')
            line = 'DEFAULT'.join(parts)

    # And now we convert it back (see above)
    if re.match(r".*, ``\);", line):
        line = re.sub(r'``\);', r"'');", line)

    if searching_for_end and re.match(r'.*\);', line):
        searching_for_end = False

    if re.match(r"CREATE INDEX", line):
        line = re.sub('"', '`', line)

    if re.match(r"AUTOINCREMENT", line):
        line = re.sub("AUTOINCREMENT", "AUTO_INCREMENT", line)

    print line,

哈……我希望我先发现这个!我对这篇文章的回应是……脚本转换mysql转储SQL文件的格式,可以导入到sqlite3 db

这两者的结合正是我所需要的:


当sqlite3数据库将与ruby一起使用时,您可能需要更改:

tinyint([0-9]*) 

to:

sed 's/ tinyint(1*) / boolean/g ' |
sed 's/ tinyint([0|2-9]*) / integer /g' |

唉,这只是一半的工作,因为即使你插入1和0到一个标记为布尔的字段,sqlite3将它们存储为1和0,所以你必须通过并做一些类似的事情:

Table.find(:all, :conditions => {:column => 1 }).each { |t| t.column = true }.each(&:save)
Table.find(:all, :conditions => {:column => 0 }).each { |t| t.column = false}.each(&:save)

但是通过查看SQL文件来查找所有布尔值是很有帮助的。

基于Jims的解决方案: 快速简单的方法迁移SQLite3到MySQL?

sqlite3 your_sql3_database.db .dump | python ./dump.py > your_dump_name.sql
cat your_dump_name.sql | sed '1d' | mysql --user=your_mysql_user --default-character-set=utf8 your_mysql_db -p  

这对我很有用。我只使用sed来丢弃第一行,这与mysql不一样,但是您也可以修改dump.py脚本来丢弃这一行。

如果你使用的是Python/Django,这很简单:

在settings.py中创建两个数据库(如https://docs.djangoproject.com/en/1.11/topics/db/multi-db/)

然后就这样做:

objlist = ModelObject.objects.using('sqlite').all()

for obj in objlist:
    obj.save(using='mysql')

我用Python3编写了这个简单的脚本。它可以作为包含的类或通过终端shell调用的独立脚本使用。默认情况下,它将所有整数导入为int(11),将字符串导入为varchar(300),但所有这些都可以分别在构造函数或脚本参数中进行调整。

注意:它需要MySQL连接器/Python 2.0.4或更高版本

如果你觉得下面的代码很难阅读,这是GitHub上的源代码链接:https://github.com/techouse/sqlite3-to-mysql

#!/usr/bin/env python3

__author__ = "Klemen Tušar"
__email__ = "techouse@gmail.com"
__copyright__ = "GPL"
__version__ = "1.0.1"
__date__ = "2015-09-12"
__status__ = "Production"

import os.path, sqlite3, mysql.connector
from mysql.connector import errorcode


class SQLite3toMySQL:
    """
    Use this class to transfer an SQLite 3 database to MySQL.

    NOTE: Requires MySQL Connector/Python 2.0.4 or higher (https://dev.mysql.com/downloads/connector/python/)
    """
    def __init__(self, **kwargs):
        self._properties = kwargs
        self._sqlite_file = self._properties.get('sqlite_file', None)
        if not os.path.isfile(self._sqlite_file):
            print('SQLite file does not exist!')
            exit(1)
        self._mysql_user = self._properties.get('mysql_user', None)
        if self._mysql_user is None:
            print('Please provide a MySQL user!')
            exit(1)
        self._mysql_password = self._properties.get('mysql_password', None)
        if self._mysql_password is None:
            print('Please provide a MySQL password')
            exit(1)
        self._mysql_database = self._properties.get('mysql_database', 'transfer')
        self._mysql_host = self._properties.get('mysql_host', 'localhost')

        self._mysql_integer_type = self._properties.get('mysql_integer_type', 'int(11)')
        self._mysql_string_type = self._properties.get('mysql_string_type', 'varchar(300)')

        self._sqlite = sqlite3.connect(self._sqlite_file)
        self._sqlite.row_factory = sqlite3.Row
        self._sqlite_cur = self._sqlite.cursor()

        self._mysql = mysql.connector.connect(
            user=self._mysql_user,
            password=self._mysql_password,
            host=self._mysql_host
        )
        self._mysql_cur = self._mysql.cursor(prepared=True)
        try:
            self._mysql.database = self._mysql_database
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_BAD_DB_ERROR:
                self._create_database()
            else:
                print(err)
                exit(1)

    def _create_database(self):
        try:
            self._mysql_cur.execute("CREATE DATABASE IF NOT EXISTS `{}` DEFAULT CHARACTER SET 'utf8'".format(self._mysql_database))
            self._mysql_cur.close()
            self._mysql.commit()
            self._mysql.database = self._mysql_database
            self._mysql_cur = self._mysql.cursor(prepared=True)
        except mysql.connector.Error as err:
            print('_create_database failed creating databse {}: {}'.format(self._mysql_database, err))
            exit(1)

    def _create_table(self, table_name):
        primary_key = ''
        sql = 'CREATE TABLE IF NOT EXISTS `{}` ( '.format(table_name)
        self._sqlite_cur.execute('PRAGMA table_info("{}")'.format(table_name))
        for row in self._sqlite_cur.fetchall():
            column = dict(row)
            sql += ' `{name}` {type} {notnull} {auto_increment}, '.format(
                name=column['name'],
                type=self._mysql_string_type if column['type'].upper() == 'TEXT' else self._mysql_integer_type,
                notnull='NOT NULL' if column['notnull'] else 'NULL',
                auto_increment='AUTO_INCREMENT' if column['pk'] else ''
            )
            if column['pk']:
                primary_key = column['name']
        sql += ' PRIMARY KEY (`{}`) ) ENGINE = InnoDB CHARACTER SET utf8'.format(primary_key)
        try:
            self._mysql_cur.execute(sql)
            self._mysql.commit()
        except mysql.connector.Error as err:
            print('_create_table failed creating table {}: {}'.format(table_name, err))
            exit(1)

    def transfer(self):
        self._sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
        for row in self._sqlite_cur.fetchall():
            table = dict(row)
            # create the table
            self._create_table(table['name'])
            # populate it
            print('Transferring table {}'.format(table['name']))
            self._sqlite_cur.execute('SELECT * FROM "{}"'.format(table['name']))
            columns = [column[0] for column in self._sqlite_cur.description]
            try:
                self._mysql_cur.executemany("INSERT IGNORE INTO `{table}` ({fields}) VALUES ({placeholders})".format(
                    table=table['name'],
                    fields=('`{}`, ' * len(columns)).rstrip(' ,').format(*columns),
                    placeholders=('%s, ' * len(columns)).rstrip(' ,')
                ), (tuple(data) for data in self._sqlite_cur.fetchall()))
                self._mysql.commit()
            except mysql.connector.Error as err:
                print('_insert_table_data failed inserting data into table {}: {}'.format(table['name'], err))
                exit(1)
        print('Done!')


def main():
    """ For use in standalone terminal form """
    import sys, argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--sqlite-file', dest='sqlite_file', default=None, help='SQLite3 db file')
    parser.add_argument('--mysql-user', dest='mysql_user', default=None, help='MySQL user')
    parser.add_argument('--mysql-password', dest='mysql_password', default=None, help='MySQL password')
    parser.add_argument('--mysql-database', dest='mysql_database', default=None, help='MySQL host')
    parser.add_argument('--mysql-host', dest='mysql_host', default='localhost', help='MySQL host')
    parser.add_argument('--mysql-integer-type', dest='mysql_integer_type', default='int(11)', help='MySQL default integer field type')
    parser.add_argument('--mysql-string-type', dest='mysql_string_type', default='varchar(300)', help='MySQL default string field type')
    args = parser.parse_args()

    if len(sys.argv) == 1:
        parser.print_help()
        exit(1)

    converter = SQLite3toMySQL(
        sqlite_file=args.sqlite_file,
        mysql_user=args.mysql_user,
        mysql_password=args.mysql_password,
        mysql_database=args.mysql_database,
        mysql_host=args.mysql_host,
        mysql_integer_type=args.mysql_integer_type,
        mysql_string_type=args.mysql_string_type
    )
    converter.transfer()

if __name__ == '__main__':
    main()