由于Mysql的创表语句与Doris创表有所差别,写了一个脚本便于直接转换,在分桶和副本上还需要根据个人的情况进行调整,以下是直接写为接口和页面的形式方便使用和调用,以下是为解决当前的创表问题,若有不好的地方可以指出,互相学习。

import re
from typing import List, Dict, Tuple, Optional
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
 
# 类型映射配置
TYPE_MAPPING = {
    'tinyint': {
        'default': 'tinyint',
        'rules': [(r'\(1\)', 'boolean')]
    },
    'smallint': 'SMALLINT',
    'mediumint': 'int',
    'int': 'int',
    'bigint': 'bigint',
    'float': 'float',
    'double': 'double',
    'decimal': 'decimal',
    'char': 'char',
    'varchar': 'varchar',
    'text': 'string',
    'longtext': 'string',
    'json': 'json',
    'date': 'DATE',
    'datetime': 'datetime',
    'timestamp': 'datetime',
    'enum': lambda params: f"varchar({len(params.split(',')) * 10})" if params else 'varchar(255)',
    'bit': lambda params: 'boolean' if params == '(1)' else 'int'
}
 
DEFAULT_VALUES_MAP = {
    'current_timestamp': 'CURRENT_TIMESTAMP',
    'null': 'NULL'
}
 
def parse_mysql_sql(mysql_sql: str) -> Tuple[str, str, List[Dict], Dict, str]:
    """解析MySQL建表语句,返回数据库名、表名、字段列表、约束和表注释"""
    # 预处理SQL
    sql = re.sub(r'/\*.*?\*/', '', mysql_sql, flags=re.DOTALL)
    sql = re.sub(r'--.*', '', sql)
    sql = re.sub(r'\s+', ' ', sql).lower().strip()
    
    # 提取数据库名和表名
    # 支持 [database](file://h:\Project\company_git_project\als_project_dev\DbTools.py#L0-L0).`table` 或 database.table 格式
    table_match = re.search(r'create table (?:if not exists )?`?(\w+)`?\.`?(\w+)`?', sql)
    if not table_match:
        # 如果没有匹配到数据库名,只匹配表名
        table_match = re.search(r'create table (?:if not exists )?`?(\w+)`?(?:\s|\(|$)', sql)
        if not table_match:
            raise ValueError("Invalid MySQL CREATE TABLE syntax: Missing table name")
        database_name = ""
        table_name = table_match.group(1)
    else:
        database_name = table_match.group(1)
        table_name = table_match.group(2)
    
    # 提取表结构主体
    body_match = re.search(r'\((.*?)\)\s*(engine|charset|comment|;)', sql, re.DOTALL | re.IGNORECASE)
    if not body_match:
        raise ValueError("Invalid table structure")
    body = body_match.group(1).strip()
 
    # 获取最后一行comment="xxx"中的comment内
    end_match = re.search(r'comment\s*=\s*[\'"](.*?)[\'"]', sql)
 
    # 分割表元素
    elements = []
    current = []
    paren_depth = 0
    for char in body:
        if char == '(':
            paren_depth += 1
        elif char == ')':
            paren_depth -= 1
        if char == ',' and paren_depth == 0:
            elements.append(''.join(current).strip())
            current = []
        else:
            current.append(char)
    if current:
        elements.append(''.join(current).strip())
    
    # 解析字段和约束
    fields = []
    primary_keys = []
    indexes = {}
    table_comment = ''
    
    for elem in elements:
        elem = elem.strip()
        if not elem:
            continue
 
        # 解析字段
        if elem.startswith('`'):
            field = parse_mysql_field(elem)
            if field:
                fields.append(field)
                continue
        
        # 解析主键
        if elem.startswith('primary key'):
            pk_match = re.search(r'primary key\s*\(([^)]+)\)', elem)
            
            if pk_match:
                primary_keys = [c.strip('` ') for c in pk_match.group(1).split(',')]
                print('PRIMARY KEY:', primary_keys)
 
        # 解析索引
        elif any(kw in elem for kw in ['unique key', 'key', 'index', 'UNIQUE_KEY']):
            # 处理UNIQUE KEY的情况
            if 'unique key' in elem:
                unique_match = re.search(r'^unique\s+key\s+`[^`]*`\s*\((.*)\)\s+using\s+btree$', elem, re.IGNORECASE)
                if unique_match:
                    primary_keys.extend([c.strip('` ') for c in unique_match.group(1).split(',')])
                else:
                    # 如果没有 USING BTREE,尝试匹配其他格式
                    unique_match = re.search(r'^unique\s+key\s+`[^`]*`\s*\((.*)\)', elem, re.IGNORECASE)
                    if unique_match:
                        primary_keys.extend([c.strip('` ') for c in unique_match.group(1).split(',')])
            else:
                index = parse_mysql_index(elem)
                if index:
                    indexes[index['name']] = index
            
    if end_match:
        comment_match = re.search(r"comment\s*=\s*['\"](.*?)['\"]", end_match.group(0))
        if comment_match:
            table_comment = comment_match.group(1)
 
    return database_name, table_name, fields, {'primary_keys': primary_keys, 'indexes': indexes}, table_comment
 
def parse_mysql_field(line: str) -> Optional[Dict]:
    """解析MySQL字段定义"""
    pattern = (
        r'`(?P<name>\w+)`\s+'
        r'(?P<type>\w+)(?:\((?P<params>[^)]*)\))?\s*'
        r'(?P<unsigned>unsigned\s+)?'
        r'(?P<nullable>not null|null)?\s*'
        r'(?:default\s+'
        r'(?P<default>(?:'
        r'(?:\'(?P<default_str>.*?)\')|'
        r'(?P<default_num>\d+\.?\d*)|'
        r'(?P<default_func>\w+))'
        r'))?\s*'
        r'(?P<auto_inc>auto_increment)?\s*'
        r'(?:comment\s+\'(?P<comment>.*?)\')?'
    )
    match = re.match(pattern, line, re.IGNORECASE)
    if not match:
        return None
    
    groups = match.groupdict()
    mysql_type = groups['type'].lower()
    
    # 类型转换
    type_handler = TYPE_MAPPING.get(mysql_type)
    if not type_handler:
        doris_type = 'string'
    elif callable(type_handler):
        doris_type = type_handler(groups['params'] or '')
    elif isinstance(type_handler, dict):
        doris_type = type_handler['default']
        for pattern, replacement in type_handler['rules']:
            if re.search(pattern, groups['params'] or ''):
                doris_type = replacement
                break
    else:
        doris_type = type_handler
        if groups['params'] and mysql_type in ['varchar', 'char']:
            params = int(int(groups["params"] ) * 2.5)
            doris_type += f'({params})'
        elif groups['params'] and mysql_type in ['decimal', 'float', 'double']:
            doris_type += f'({groups["params"]})'
 
    
    # 处理unsigned
    if groups['unsigned'] and 'int' in doris_type.lower():
        doris_type += ' UNSIGNED'
    
    # 处理默认值
    default = None
    if groups['default_func']:
        default = DEFAULT_VALUES_MAP.get(groups['default_func'].lower().rstrip('()'))
    elif groups['default_str'] is not None:
        default = f"'{groups['default_str']}'"
    elif groups['default_num'] is not None:
        default = groups['default_num']
    
    return {
        'name': groups['name'],
        'type': doris_type,
        'nullable': 'NOT NULL' if 'not null' in (groups['nullable'] or '').lower() else 'NULL',
        'default': default,
        'comment': groups['comment'] or ''
    }
 
def parse_mysql_index(line: str) -> Optional[Dict]:
    """解析MySQL索引"""
    index_pattern = (
        r'(?P<type>unique key|key|index)\s+'
        r'`?(?P<name>\w+)`?\s*'
        r'\((?P<columns>[^)]+)\)'
    )
    match = re.match(index_pattern, line, re.IGNORECASE)
    if not match:
        return None
    
    return {
        'type': match.group('type').upper().replace(' ', '_'),
        'name': match.group('name'),
        'columns': [c.strip('` ') for c in match.group('columns').split(',')]
    }
 
def reorder_fields_by_unique_key(fields: List[Dict], unique_key_columns: List[str]) -> List[Dict]:
    """
    根据UNIQUE KEY的字段顺序重新排列字段列表
    """
    if not unique_key_columns:
        return fields
    
    # 创建一个字典,便于按名称快速查找字段
    fields_dict = {field['name']: field for field in fields}
    
    reordered_fields = []
    processed_names = set()
    
    # 首先添加UNIQUE KEY中的字段(保持其指定顺序)
    for col_name in unique_key_columns:
        if col_name in fields_dict and col_name not in processed_names:
            reordered_fields.append(fields_dict[col_name])
            processed_names.add(col_name)
    
    # 然后添加剩余的字段
    for field in fields:
        if field['name'] not in processed_names:
            reordered_fields.append(field)
            processed_names.add(field['name'])
    
    return reordered_fields
 
def create_insert_sql(original_table_name, reordered_fields, target_table_name):
    '''
    创建一个sql语句,用于将源数据表数据插入到目标数据表
    :param original_table_name: 源数据表
    :param reordered_fields: 源数据表字段列表
    :param target_table_name: 目标数据表
    '''
 
    suggestions = [
        "-- 插入数据SQL:",
    ]
 
    insert_sql = f"INSERT INTO {target_table_name} ({', '.join(field['name'] for field in reordered_fields)}) " \
                 f"SELECT {', '.join(field['name'] for field in reordered_fields)} FROM {original_table_name};"
    
    suggestions.append(insert_sql)
 
    return '\n'.join(suggestions)
 
def generate_doris_sql(
    database_name: str,
    table_name: str,
    fields: List[Dict],
    constraints: Dict,
    table_comment: str,
    bucket_strategy: str = 'auto',
    bucket_key: Optional[str] = None,
    buckets: Optional[int] = None,
    replication_num: int = 3
) -> str:
    """生成Doris建表语句"""
    # 根据UNIQUE KEY重新排序字段
    unique_key_columns = constraints['primary_keys']
    reordered_fields = reorder_fields_by_unique_key(fields, unique_key_columns)
    
    # 生成字段定义
    field_defs = []
    for field in reordered_fields:
        parts = [f"`{field['name']}` {field['type']}"]
        if field['nullable'] == 'NOT NULL':
            parts.append("NOT NULL")
        if field['default']:
            if field['default'] == 'NULL':
                parts.append("NULL")
            else:
                parts.append(f"DEFAULT {field['default']}")
        if field['comment']:
            parts.append(f"COMMENT '{field['comment']}'")
        field_defs.append(' '.join(parts))
    
    # 构建基础语句
    # 如果有数据库名,则在表名前加上数据库名
    full_table_name = f"`{database_name}`.`{table_name}`" if database_name else f"`{table_name}`"
    doris_sql = [
        f"CREATE TABLE {full_table_name} (",
        ",\n  ".join(field_defs),
        ') ENGINE=OLAP'
    ]
 
    # 数据模型
    if constraints['primary_keys']:
        pk_columns = [f"`{col}`" for col in constraints['primary_keys']]
        doris_sql.append(f"UNIQUE KEY ({', '.join(pk_columns)})")
    else:
        doris_sql.append("UNIQUE KEY (`__dummy_key`)")
    
    # 分桶策略
    if bucket_strategy == 'hash':
        bucket_key = bucket_key or constraints['primary_keys'][0] if constraints['primary_keys'] else fields[0]['name']
        buckets = buckets or 10
        doris_sql.append(f"DISTRIBUTED BY HASH(`{bucket_key}`) BUCKETS {buckets}")
    elif bucket_strategy == 'random':
        buckets = buckets or 10
        doris_sql.append(f"DISTRIBUTED BY RANDOM BUCKETS {buckets}")
    else:  # auto
        doris_sql.append("DISTRIBUTED BY RANDOM BUCKETS AUTO")
    
    # 表属性
    properties = [
        # f'"replication_num" = "{replication_num}"',
        # '"storage_format" = "V2"',
 
        f'"replication_allocation" = "tag.location.default: 1"',
        '"in_memory" = "false"', 
        '"storage_format" = "V2"'
        # '"enable_unique_key_merge_on_write" = "true"'
    ]
    doris_sql.append("PROPERTIES (\n  " + ",\n  ".join(properties) + "\n);")
    
    # 表注释
    if table_comment:
        doris_sql.insert(4, f"COMMENT '{table_comment}'")
    
    # 生成索引建议
    index_notes = generate_index_suggestions(table_name, constraints['indexes'])
 
    # 生成插入语句
    target_table_name = '.'.join([database_name, table_name] if database_name != '' else [table_name])
    insert_sql = create_insert_sql(original_table_name=table_name, reordered_fields=reordered_fields, target_table_name=target_table_name)
    
    return '\n'.join(doris_sql) + '\n\n' + index_notes + '\n\n' + insert_sql
 
def generate_index_suggestions(table_name: str, indexes: Dict) -> str:
    """生成Doris索引建议"""
    suggestions = [
        "-- Doris索引创建:",
        # "-- 修改字段位置 ALTER TABLE table_name ADD ROLLUP rollup_name (column_list);",
        # "-- 创建字段索引 CREATE INDEX idx_columns_name ON table_name(column_list) USING BITMAP;"
    ]
    
    created = set()
    for idx in indexes.values():
        columns = idx['columns']
        # 生成全字段索引
        combo = tuple(columns)
        if combo not in created:
            if len(columns) == 1:  # 索引字段数等于1时才生成全字段索引
                suggestions.append(f"CREATE INDEX idx_{idx['name']} ON {table_name} ({', '.join(f'`{c}`' for c in columns)});")
                created.add(combo)
            
        # 生成前缀索引(最多3列)
        for i in range(1, min(3, len(columns))):
            prefix = tuple(columns[:i+1])
            if prefix not in created:
                suggestions.append(f"-- ALTER TABLE {table_name} ADD ROLLUP {idx['name']}_prefix{i+1} ({', '.join(f'`{c}`' for c in prefix)});")
                created.add(prefix)
    
    return '\n'.join(suggestions)
 
def start(mysql_create_table_sql):
 
    try:
        database_name, table_name, fields, constraints, table_comment = parse_mysql_sql(mysql_create_table_sql)
 
        print(generate_doris_sql(
            database_name=database_name,
            table_name=table_name,
            fields=fields,
            constraints=constraints,
            table_comment=table_comment,
            bucket_strategy='hash',
            bucket_key='',
            buckets=1,
            replication_num=3
        ))
    except ValueError as e:
        print(f"Error: {e}")
 
app = FastAPI(title="MySQL to Doris Converter API", 
              description="Convert MySQL CREATE TABLE statements to Doris CREATE TABLE statements", 
              version="1.0.0")
 
 
class ConversionRequest(BaseModel):
    mysql_sql: str
    bucket_strategy: str = 'hash'
    bucket_key: Optional[str] = ''
    buckets: Optional[int] = 1
    replication_num: int = 3
 
class ConversionResponse(BaseModel):
    success: bool
    doris_sql: Optional[str] = None
    error: Optional[str] = None
 
@app.post("/convert", response_model=ConversionResponse, summary="Convert MySQL to Doris SQL")
async def convert_mysql_to_doris(request: ConversionRequest):
    """
    将MySQL建表语句转换为Doris建表语句
    
    参数:
    - mysql_sql: MySQL建表语句
    - bucket_strategy: 分桶策略 (hash/random/auto),默认为hash
    - bucket_key: 分桶键,当bucket_strategy为hash时生效
    - buckets: 分桶数,默认为1
    - replication_num: 副本数,默认为3
    """
    try:
        database_name, table_name, fields, constraints, table_comment = parse_mysql_sql(request.mysql_sql)
 
        doris_sql = generate_doris_sql(
            database_name=database_name,
            table_name=table_name,
            fields=fields,
            constraints=constraints,
            table_comment=table_comment,
            bucket_strategy=request.bucket_strategy,
            bucket_key=request.bucket_key,
            buckets=request.buckets,
            replication_num=request.replication_num
        )
 
        return ConversionResponse(success=True, doris_sql=doris_sql)
    except Exception as e:
        return ConversionResponse(success=False, error=str(e))
 
 
@app.get("/", response_class=HTMLResponse, summary="Web Interface")
async def web_interface():
    """
    返回网页界面
    """
    html_content = '''<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MySQL to Doris 转换器</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background-color: #f5f5f5;
        }
        .container {
            display: flex;
            height: 80vh;
            gap: 20px;
        }
        .panel {
            flex: 1;
            display: flex;
            flex-direction: column;
        }
        .input-panel {
            background-color: white;
            border-radius: 8px;
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
        }
        .output-panel {
            background-color: white;
            border-radius: 8px;
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
        }
        h2 {
            margin: 0;
            padding: 15px;
            background-color: #4CAF50;
            color: white;
            font-size: 18px;
        }
        textarea {
            width: 100%;
            height: calc(100% - 60px);
            padding: 15px;
            border: none;
            resize: none;
            font-family: monospace;
            font-size: 14px;
            box-sizing: border-box;
        }
        .config-panel {
            margin-top: 20px;
            background-color: white;
            padding: 15px;
            border-radius: 8px;
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
        }
        .config-row {
            display: flex;
            align-items: center;
            margin-bottom: 10px;
        }
        label {
            width: 120px;
            font-weight: bold;
        }
        input, select {
            flex: 1;
            padding: 8px;
            border: 1px solid #ddd;
            border-radius: 4px;
        }
        button {
            background-color: #4CAF50;
            color: white;
            padding: 10px 20px;
            border: none;
            border-radius: 4px;
            cursor: pointer;
            font-size: 16px;
            margin-top: 10px;
        }
        button:hover {
            background-color: #45a049;
        }
        .loading {
            text-align: center;
            padding: 20px;
            color: #666;
        }
        .error {
            color: red;
            padding: 10px;
            background-color: #ffe6e6;
            border-radius: 4px;
            margin-top: 10px;
        }
    </style>
</head>
<body>
    <h1 style="text-align: center; color: #333;">MySQL to Doris 转换器</h1>
    
    <div class="container">
        <div class="panel input-panel">
            <h2>MySQL 建表语句</h2>
            <textarea id="mysqlInput" placeholder="请粘贴您的 MySQL CREATE TABLE 语句...">CREATE TABLE `mydb`.`test_user_info` (
  [id](file://h:\Project\company_git_project\als_project_dev\m3u8\m3u8\model.py#L0-L0) int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  [username](file://h:\Project\company_git_project\als_project_dev\私密代理demo.py#L14-L14) varchar(50) NOT NULL COMMENT '用户名',
  `email` varchar(100) DEFAULT NULL COMMENT '邮箱',
  `age` tinyint(3) unsigned DEFAULT NULL COMMENT '年龄',
  `balance` decimal(10,2) DEFAULT 0.00 COMMENT '余额',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `status` tinyint(1) DEFAULT 1 COMMENT '状态:1-启用,0-禁用',
  PRIMARY KEY ([id](file://h:\Project\company_git_project\als_project_dev\m3u8\m3u8\model.py#L0-L0)),
  UNIQUE KEY `uk_username` ([username](file://h:\Project\company_git_project\als_project_dev\私密代理demo.py#L14-L14)) USING BTREE,
  KEY `idx_email` (`email`) USING BTREE,
  KEY `idx_status` (`status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户信息表';</textarea>
        </div>
        
        <div class="panel output-panel">
            <h2>Doris 建表语句</h2>
            <textarea id="dorisOutput" readonly placeholder="转换结果将显示在这里..."></textarea>
        </div>
    </div>
    
    <div class="config-panel">
        <h2 style="margin-top: 0;">转换配置</h2>
        <div class="config-row">
            <label for="bucketStrategy">分桶策略:</label>
            <select id="bucketStrategy">
                <option value="hash">Hash</option>
                <option value="random">Random</option>
                <option value="auto">Auto</option>
            </select>
        </div>
        <div class="config-row">
            <label for="bucketKey">分桶键:</label>
            <input type="text" id="bucketKey" placeholder="如: id">
        </div>
        <div class="config-row">
            <label for="buckets">分桶数:</label>
            <input type="number" id="buckets" value="1">
        </div>
        <div class="config-row">
            <label for="replicationNum">副本数:</label>
            <input type="number" id="replicationNum" value="3">
        </div>
        <button id="convertBtn">立即转换</button>
        <button id="clearBtn">清空</button>
    </div>
    
    <script>
        document.addEventListener('DOMContentLoaded', function() {
            const mysqlInput = document.getElementById('mysqlInput');
            const dorisOutput = document.getElementById('dorisOutput');
            const convertBtn = document.getElementById('convertBtn');
            const clearBtn = document.getElementById('clearBtn');
            const bucketStrategy = document.getElementById('bucketStrategy');
            const bucketKey = document.getElementById('bucketKey');
            const buckets = document.getElementById('buckets');
            const replicationNum = document.getElementById('replicationNum');
            
            // 实时转换功能
            let timeoutId;
            mysqlInput.addEventListener('input', function() {
                clearTimeout(timeoutId);
                timeoutId = setTimeout(() => {
                    performConversion();
                }, 500); // 延迟500毫秒执行转换
            });
            
            // 手动转换按钮点击事件
            convertBtn.addEventListener('click', performConversion);
            
            // 清空按钮点击事件
            clearBtn.addEventListener('click', function() {
                mysqlInput.value = '';
                dorisOutput.value = '';
            });
            
            // 执行转换函数
            function performConversion() {
                const sql = mysqlInput.value.trim();
                if (!sql) {
                    dorisOutput.value = '';
                    return;
                }
                
                // 显示加载状态
                dorisOutput.value = '转换中...';
                
                fetch('/convert', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json'
                    },
                    body: JSON.stringify({
                        mysql_sql: sql,
                        bucket_strategy: bucketStrategy.value,
                        bucket_key: bucketKey.value,
                        buckets: parseInt(buckets.value) || 1,
                        replication_num: parseInt(replicationNum.value) || 3
                    })
                })
                .then(response => response.json())
                .then(data => {
                    if (data.success) {
                        dorisOutput.value = data.doris_sql;
                    } else {
                        dorisOutput.value = `错误: ${data.error}`;
                    }
                })
                .catch(error => {
                    dorisOutput.value = `请求错误: ${error.message}`;
                });
            }
            
            // 初始化转换
            performConversion();
        });
    </script>
</body>
</html>'''
    return HTMLResponse(content=html_content)
 
@app.get("/health", summary="API Health Check")
async def health_check():
    """
    检查API是否正常运行
    """
    return {"status": "healthy", "message": "MySQL to Doris converter API is running"}
 
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

点赞(0)

评论列表 共有 0 条评论

暂无评论