由于Mysql的创表语句与Doris创表有所差别,写了一个脚本便于直接转换,在分桶和副本上还需要根据个人的情况进行调整,以下是直接写为接口和页面的形式方便使用和调用,以下是为解决当前的创表问题,若有不好的地方可以指出,互相学习。
import re
from typing import List, Dict, Tuple, Optional
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# 类型映射配置
TYPE_MAPPING = {
'tinyint': {
'default': 'tinyint',
'rules': [(r'\(1\)', 'boolean')]
},
'smallint': 'SMALLINT',
'mediumint': 'int',
'int': 'int',
'bigint': 'bigint',
'float': 'float',
'double': 'double',
'decimal': 'decimal',
'char': 'char',
'varchar': 'varchar',
'text': 'string',
'longtext': 'string',
'json': 'json',
'date': 'DATE',
'datetime': 'datetime',
'timestamp': 'datetime',
'enum': lambda params: f"varchar({len(params.split(',')) * 10})" if params else 'varchar(255)',
'bit': lambda params: 'boolean' if params == '(1)' else 'int'
}
DEFAULT_VALUES_MAP = {
'current_timestamp': 'CURRENT_TIMESTAMP',
'null': 'NULL'
}
def parse_mysql_sql(mysql_sql: str) -> Tuple[str, str, List[Dict], Dict, str]:
"""解析MySQL建表语句,返回数据库名、表名、字段列表、约束和表注释"""
# 预处理SQL
sql = re.sub(r'/\*.*?\*/', '', mysql_sql, flags=re.DOTALL)
sql = re.sub(r'--.*', '', sql)
sql = re.sub(r'\s+', ' ', sql).lower().strip()
# 提取数据库名和表名
# 支持 [database](file://h:\Project\company_git_project\als_project_dev\DbTools.py#L0-L0).`table` 或 database.table 格式
table_match = re.search(r'create table (?:if not exists )?`?(\w+)`?\.`?(\w+)`?', sql)
if not table_match:
# 如果没有匹配到数据库名,只匹配表名
table_match = re.search(r'create table (?:if not exists )?`?(\w+)`?(?:\s|\(|$)', sql)
if not table_match:
raise ValueError("Invalid MySQL CREATE TABLE syntax: Missing table name")
database_name = ""
table_name = table_match.group(1)
else:
database_name = table_match.group(1)
table_name = table_match.group(2)
# 提取表结构主体
body_match = re.search(r'\((.*?)\)\s*(engine|charset|comment|;)', sql, re.DOTALL | re.IGNORECASE)
if not body_match:
raise ValueError("Invalid table structure")
body = body_match.group(1).strip()
# 获取最后一行comment="xxx"中的comment内
end_match = re.search(r'comment\s*=\s*[\'"](.*?)[\'"]', sql)
# 分割表元素
elements = []
current = []
paren_depth = 0
for char in body:
if char == '(':
paren_depth += 1
elif char == ')':
paren_depth -= 1
if char == ',' and paren_depth == 0:
elements.append(''.join(current).strip())
current = []
else:
current.append(char)
if current:
elements.append(''.join(current).strip())
# 解析字段和约束
fields = []
primary_keys = []
indexes = {}
table_comment = ''
for elem in elements:
elem = elem.strip()
if not elem:
continue
# 解析字段
if elem.startswith('`'):
field = parse_mysql_field(elem)
if field:
fields.append(field)
continue
# 解析主键
if elem.startswith('primary key'):
pk_match = re.search(r'primary key\s*\(([^)]+)\)', elem)
if pk_match:
primary_keys = [c.strip('` ') for c in pk_match.group(1).split(',')]
print('PRIMARY KEY:', primary_keys)
# 解析索引
elif any(kw in elem for kw in ['unique key', 'key', 'index', 'UNIQUE_KEY']):
# 处理UNIQUE KEY的情况
if 'unique key' in elem:
unique_match = re.search(r'^unique\s+key\s+`[^`]*`\s*\((.*)\)\s+using\s+btree$', elem, re.IGNORECASE)
if unique_match:
primary_keys.extend([c.strip('` ') for c in unique_match.group(1).split(',')])
else:
# 如果没有 USING BTREE,尝试匹配其他格式
unique_match = re.search(r'^unique\s+key\s+`[^`]*`\s*\((.*)\)', elem, re.IGNORECASE)
if unique_match:
primary_keys.extend([c.strip('` ') for c in unique_match.group(1).split(',')])
else:
index = parse_mysql_index(elem)
if index:
indexes[index['name']] = index
if end_match:
comment_match = re.search(r"comment\s*=\s*['\"](.*?)['\"]", end_match.group(0))
if comment_match:
table_comment = comment_match.group(1)
return database_name, table_name, fields, {'primary_keys': primary_keys, 'indexes': indexes}, table_comment
def parse_mysql_field(line: str) -> Optional[Dict]:
"""解析MySQL字段定义"""
pattern = (
r'`(?P<name>\w+)`\s+'
r'(?P<type>\w+)(?:\((?P<params>[^)]*)\))?\s*'
r'(?P<unsigned>unsigned\s+)?'
r'(?P<nullable>not null|null)?\s*'
r'(?:default\s+'
r'(?P<default>(?:'
r'(?:\'(?P<default_str>.*?)\')|'
r'(?P<default_num>\d+\.?\d*)|'
r'(?P<default_func>\w+))'
r'))?\s*'
r'(?P<auto_inc>auto_increment)?\s*'
r'(?:comment\s+\'(?P<comment>.*?)\')?'
)
match = re.match(pattern, line, re.IGNORECASE)
if not match:
return None
groups = match.groupdict()
mysql_type = groups['type'].lower()
# 类型转换
type_handler = TYPE_MAPPING.get(mysql_type)
if not type_handler:
doris_type = 'string'
elif callable(type_handler):
doris_type = type_handler(groups['params'] or '')
elif isinstance(type_handler, dict):
doris_type = type_handler['default']
for pattern, replacement in type_handler['rules']:
if re.search(pattern, groups['params'] or ''):
doris_type = replacement
break
else:
doris_type = type_handler
if groups['params'] and mysql_type in ['varchar', 'char']:
params = int(int(groups["params"] ) * 2.5)
doris_type += f'({params})'
elif groups['params'] and mysql_type in ['decimal', 'float', 'double']:
doris_type += f'({groups["params"]})'
# 处理unsigned
if groups['unsigned'] and 'int' in doris_type.lower():
doris_type += ' UNSIGNED'
# 处理默认值
default = None
if groups['default_func']:
default = DEFAULT_VALUES_MAP.get(groups['default_func'].lower().rstrip('()'))
elif groups['default_str'] is not None:
default = f"'{groups['default_str']}'"
elif groups['default_num'] is not None:
default = groups['default_num']
return {
'name': groups['name'],
'type': doris_type,
'nullable': 'NOT NULL' if 'not null' in (groups['nullable'] or '').lower() else 'NULL',
'default': default,
'comment': groups['comment'] or ''
}
def parse_mysql_index(line: str) -> Optional[Dict]:
"""解析MySQL索引"""
index_pattern = (
r'(?P<type>unique key|key|index)\s+'
r'`?(?P<name>\w+)`?\s*'
r'\((?P<columns>[^)]+)\)'
)
match = re.match(index_pattern, line, re.IGNORECASE)
if not match:
return None
return {
'type': match.group('type').upper().replace(' ', '_'),
'name': match.group('name'),
'columns': [c.strip('` ') for c in match.group('columns').split(',')]
}
def reorder_fields_by_unique_key(fields: List[Dict], unique_key_columns: List[str]) -> List[Dict]:
"""
根据UNIQUE KEY的字段顺序重新排列字段列表
"""
if not unique_key_columns:
return fields
# 创建一个字典,便于按名称快速查找字段
fields_dict = {field['name']: field for field in fields}
reordered_fields = []
processed_names = set()
# 首先添加UNIQUE KEY中的字段(保持其指定顺序)
for col_name in unique_key_columns:
if col_name in fields_dict and col_name not in processed_names:
reordered_fields.append(fields_dict[col_name])
processed_names.add(col_name)
# 然后添加剩余的字段
for field in fields:
if field['name'] not in processed_names:
reordered_fields.append(field)
processed_names.add(field['name'])
return reordered_fields
def create_insert_sql(original_table_name, reordered_fields, target_table_name):
'''
创建一个sql语句,用于将源数据表数据插入到目标数据表
:param original_table_name: 源数据表
:param reordered_fields: 源数据表字段列表
:param target_table_name: 目标数据表
'''
suggestions = [
"-- 插入数据SQL:",
]
insert_sql = f"INSERT INTO {target_table_name} ({', '.join(field['name'] for field in reordered_fields)}) " \
f"SELECT {', '.join(field['name'] for field in reordered_fields)} FROM {original_table_name};"
suggestions.append(insert_sql)
return '\n'.join(suggestions)
def generate_doris_sql(
database_name: str,
table_name: str,
fields: List[Dict],
constraints: Dict,
table_comment: str,
bucket_strategy: str = 'auto',
bucket_key: Optional[str] = None,
buckets: Optional[int] = None,
replication_num: int = 3
) -> str:
"""生成Doris建表语句"""
# 根据UNIQUE KEY重新排序字段
unique_key_columns = constraints['primary_keys']
reordered_fields = reorder_fields_by_unique_key(fields, unique_key_columns)
# 生成字段定义
field_defs = []
for field in reordered_fields:
parts = [f"`{field['name']}` {field['type']}"]
if field['nullable'] == 'NOT NULL':
parts.append("NOT NULL")
if field['default']:
if field['default'] == 'NULL':
parts.append("NULL")
else:
parts.append(f"DEFAULT {field['default']}")
if field['comment']:
parts.append(f"COMMENT '{field['comment']}'")
field_defs.append(' '.join(parts))
# 构建基础语句
# 如果有数据库名,则在表名前加上数据库名
full_table_name = f"`{database_name}`.`{table_name}`" if database_name else f"`{table_name}`"
doris_sql = [
f"CREATE TABLE {full_table_name} (",
",\n ".join(field_defs),
') ENGINE=OLAP'
]
# 数据模型
if constraints['primary_keys']:
pk_columns = [f"`{col}`" for col in constraints['primary_keys']]
doris_sql.append(f"UNIQUE KEY ({', '.join(pk_columns)})")
else:
doris_sql.append("UNIQUE KEY (`__dummy_key`)")
# 分桶策略
if bucket_strategy == 'hash':
bucket_key = bucket_key or constraints['primary_keys'][0] if constraints['primary_keys'] else fields[0]['name']
buckets = buckets or 10
doris_sql.append(f"DISTRIBUTED BY HASH(`{bucket_key}`) BUCKETS {buckets}")
elif bucket_strategy == 'random':
buckets = buckets or 10
doris_sql.append(f"DISTRIBUTED BY RANDOM BUCKETS {buckets}")
else: # auto
doris_sql.append("DISTRIBUTED BY RANDOM BUCKETS AUTO")
# 表属性
properties = [
# f'"replication_num" = "{replication_num}"',
# '"storage_format" = "V2"',
f'"replication_allocation" = "tag.location.default: 1"',
'"in_memory" = "false"',
'"storage_format" = "V2"'
# '"enable_unique_key_merge_on_write" = "true"'
]
doris_sql.append("PROPERTIES (\n " + ",\n ".join(properties) + "\n);")
# 表注释
if table_comment:
doris_sql.insert(4, f"COMMENT '{table_comment}'")
# 生成索引建议
index_notes = generate_index_suggestions(table_name, constraints['indexes'])
# 生成插入语句
target_table_name = '.'.join([database_name, table_name] if database_name != '' else [table_name])
insert_sql = create_insert_sql(original_table_name=table_name, reordered_fields=reordered_fields, target_table_name=target_table_name)
return '\n'.join(doris_sql) + '\n\n' + index_notes + '\n\n' + insert_sql
def generate_index_suggestions(table_name: str, indexes: Dict) -> str:
"""生成Doris索引建议"""
suggestions = [
"-- Doris索引创建:",
# "-- 修改字段位置 ALTER TABLE table_name ADD ROLLUP rollup_name (column_list);",
# "-- 创建字段索引 CREATE INDEX idx_columns_name ON table_name(column_list) USING BITMAP;"
]
created = set()
for idx in indexes.values():
columns = idx['columns']
# 生成全字段索引
combo = tuple(columns)
if combo not in created:
if len(columns) == 1: # 索引字段数等于1时才生成全字段索引
suggestions.append(f"CREATE INDEX idx_{idx['name']} ON {table_name} ({', '.join(f'`{c}`' for c in columns)});")
created.add(combo)
# 生成前缀索引(最多3列)
for i in range(1, min(3, len(columns))):
prefix = tuple(columns[:i+1])
if prefix not in created:
suggestions.append(f"-- ALTER TABLE {table_name} ADD ROLLUP {idx['name']}_prefix{i+1} ({', '.join(f'`{c}`' for c in prefix)});")
created.add(prefix)
return '\n'.join(suggestions)
def start(mysql_create_table_sql):
try:
database_name, table_name, fields, constraints, table_comment = parse_mysql_sql(mysql_create_table_sql)
print(generate_doris_sql(
database_name=database_name,
table_name=table_name,
fields=fields,
constraints=constraints,
table_comment=table_comment,
bucket_strategy='hash',
bucket_key='',
buckets=1,
replication_num=3
))
except ValueError as e:
print(f"Error: {e}")
app = FastAPI(title="MySQL to Doris Converter API",
description="Convert MySQL CREATE TABLE statements to Doris CREATE TABLE statements",
version="1.0.0")
class ConversionRequest(BaseModel):
mysql_sql: str
bucket_strategy: str = 'hash'
bucket_key: Optional[str] = ''
buckets: Optional[int] = 1
replication_num: int = 3
class ConversionResponse(BaseModel):
success: bool
doris_sql: Optional[str] = None
error: Optional[str] = None
@app.post("/convert", response_model=ConversionResponse, summary="Convert MySQL to Doris SQL")
async def convert_mysql_to_doris(request: ConversionRequest):
"""
将MySQL建表语句转换为Doris建表语句
参数:
- mysql_sql: MySQL建表语句
- bucket_strategy: 分桶策略 (hash/random/auto),默认为hash
- bucket_key: 分桶键,当bucket_strategy为hash时生效
- buckets: 分桶数,默认为1
- replication_num: 副本数,默认为3
"""
try:
database_name, table_name, fields, constraints, table_comment = parse_mysql_sql(request.mysql_sql)
doris_sql = generate_doris_sql(
database_name=database_name,
table_name=table_name,
fields=fields,
constraints=constraints,
table_comment=table_comment,
bucket_strategy=request.bucket_strategy,
bucket_key=request.bucket_key,
buckets=request.buckets,
replication_num=request.replication_num
)
return ConversionResponse(success=True, doris_sql=doris_sql)
except Exception as e:
return ConversionResponse(success=False, error=str(e))
@app.get("/", response_class=HTMLResponse, summary="Web Interface")
async def web_interface():
"""
返回网页界面
"""
html_content = '''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MySQL to Doris 转换器</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.container {
display: flex;
height: 80vh;
gap: 20px;
}
.panel {
flex: 1;
display: flex;
flex-direction: column;
}
.input-panel {
background-color: white;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.output-panel {
background-color: white;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h2 {
margin: 0;
padding: 15px;
background-color: #4CAF50;
color: white;
font-size: 18px;
}
textarea {
width: 100%;
height: calc(100% - 60px);
padding: 15px;
border: none;
resize: none;
font-family: monospace;
font-size: 14px;
box-sizing: border-box;
}
.config-panel {
margin-top: 20px;
background-color: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.config-row {
display: flex;
align-items: center;
margin-bottom: 10px;
}
label {
width: 120px;
font-weight: bold;
}
input, select {
flex: 1;
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
}
button {
background-color: #4CAF50;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
margin-top: 10px;
}
button:hover {
background-color: #45a049;
}
.loading {
text-align: center;
padding: 20px;
color: #666;
}
.error {
color: red;
padding: 10px;
background-color: #ffe6e6;
border-radius: 4px;
margin-top: 10px;
}
</style>
</head>
<body>
<h1 style="text-align: center; color: #333;">MySQL to Doris 转换器</h1>
<div class="container">
<div class="panel input-panel">
<h2>MySQL 建表语句</h2>
<textarea id="mysqlInput" placeholder="请粘贴您的 MySQL CREATE TABLE 语句...">CREATE TABLE `mydb`.`test_user_info` (
[id](file://h:\Project\company_git_project\als_project_dev\m3u8\m3u8\model.py#L0-L0) int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
[username](file://h:\Project\company_git_project\als_project_dev\私密代理demo.py#L14-L14) varchar(50) NOT NULL COMMENT '用户名',
`email` varchar(100) DEFAULT NULL COMMENT '邮箱',
`age` tinyint(3) unsigned DEFAULT NULL COMMENT '年龄',
`balance` decimal(10,2) DEFAULT 0.00 COMMENT '余额',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`status` tinyint(1) DEFAULT 1 COMMENT '状态:1-启用,0-禁用',
PRIMARY KEY ([id](file://h:\Project\company_git_project\als_project_dev\m3u8\m3u8\model.py#L0-L0)),
UNIQUE KEY `uk_username` ([username](file://h:\Project\company_git_project\als_project_dev\私密代理demo.py#L14-L14)) USING BTREE,
KEY `idx_email` (`email`) USING BTREE,
KEY `idx_status` (`status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户信息表';</textarea>
</div>
<div class="panel output-panel">
<h2>Doris 建表语句</h2>
<textarea id="dorisOutput" readonly placeholder="转换结果将显示在这里..."></textarea>
</div>
</div>
<div class="config-panel">
<h2 style="margin-top: 0;">转换配置</h2>
<div class="config-row">
<label for="bucketStrategy">分桶策略:</label>
<select id="bucketStrategy">
<option value="hash">Hash</option>
<option value="random">Random</option>
<option value="auto">Auto</option>
</select>
</div>
<div class="config-row">
<label for="bucketKey">分桶键:</label>
<input type="text" id="bucketKey" placeholder="如: id">
</div>
<div class="config-row">
<label for="buckets">分桶数:</label>
<input type="number" id="buckets" value="1">
</div>
<div class="config-row">
<label for="replicationNum">副本数:</label>
<input type="number" id="replicationNum" value="3">
</div>
<button id="convertBtn">立即转换</button>
<button id="clearBtn">清空</button>
</div>
<script>
document.addEventListener('DOMContentLoaded', function() {
const mysqlInput = document.getElementById('mysqlInput');
const dorisOutput = document.getElementById('dorisOutput');
const convertBtn = document.getElementById('convertBtn');
const clearBtn = document.getElementById('clearBtn');
const bucketStrategy = document.getElementById('bucketStrategy');
const bucketKey = document.getElementById('bucketKey');
const buckets = document.getElementById('buckets');
const replicationNum = document.getElementById('replicationNum');
// 实时转换功能
let timeoutId;
mysqlInput.addEventListener('input', function() {
clearTimeout(timeoutId);
timeoutId = setTimeout(() => {
performConversion();
}, 500); // 延迟500毫秒执行转换
});
// 手动转换按钮点击事件
convertBtn.addEventListener('click', performConversion);
// 清空按钮点击事件
clearBtn.addEventListener('click', function() {
mysqlInput.value = '';
dorisOutput.value = '';
});
// 执行转换函数
function performConversion() {
const sql = mysqlInput.value.trim();
if (!sql) {
dorisOutput.value = '';
return;
}
// 显示加载状态
dorisOutput.value = '转换中...';
fetch('/convert', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
mysql_sql: sql,
bucket_strategy: bucketStrategy.value,
bucket_key: bucketKey.value,
buckets: parseInt(buckets.value) || 1,
replication_num: parseInt(replicationNum.value) || 3
})
})
.then(response => response.json())
.then(data => {
if (data.success) {
dorisOutput.value = data.doris_sql;
} else {
dorisOutput.value = `错误: ${data.error}`;
}
})
.catch(error => {
dorisOutput.value = `请求错误: ${error.message}`;
});
}
// 初始化转换
performConversion();
});
</script>
</body>
</html>'''
return HTMLResponse(content=html_content)
@app.get("/health", summary="API Health Check")
async def health_check():
"""
检查API是否正常运行
"""
return {"status": "healthy", "message": "MySQL to Doris converter API is running"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

发表评论 取消回复