Files
vermouth789 fe39320977 123
123
2026-01-11 00:11:34 +08:00

254 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import copy
import uuid
from django.db import connection, OperationalError
from django.apps import apps
import hashlib
import json
from django.db import connection, models
from rest_framework import serializers
"""
------------------------------------------------------------------------------------------------------------------------------------------------------------
PART I
动态MODEL部分
"""
def table_exists(table_name):
"""
检查表是否存在于数据库中
:param table_name: 表名
:return: True 如果表存在,否则 False
"""
with connection.cursor() as cursor:
return table_name in connection.introspection.table_names()
def judge_diff_be_two_models_and_change(old_model, new_model):
# 获取表名
old_table_name = old_model._meta.db_table
new_table_name = new_model._meta.db_table
if old_table_name != new_table_name:
alter_model(old_model, old_table_name, new_table_name)
# 修改表字段
fields1 = {f.name: f for f in copy.deepcopy(old_model._meta.get_fields()) if f.concrete}
fields2 = {f.name: f for f in copy.deepcopy(new_model._meta.get_fields()) if f.concrete}
del fields1['id']
del fields2['id']
# 检测字段重命名
renamed_fields = {}
for field_name1, field_x1 in fields1.items():
for field_name2, field_x2 in fields2.items():
list_field_x1 = list(field_x1.deconstruct())
list_field_x2 = list(field_x2.deconstruct())
list_field_x2[0] = field_name2
if list_field_x2 == list_field_x1 and field_name1 != field_name2:
renamed_fields[field_name1] = field_name2
break
with connection.schema_editor() as schema_editor:
# 删除旧模型中有但新模型中没有的字段(排除重命名字段)
for field_name in set(fields1.keys()) - set(fields2.keys()) - set(renamed_fields.keys()):
schema_editor.remove_field(old_model, fields1[field_name])
# 添加新模型中有但旧模型中没有的字段
for field_name in set(fields2.keys()) - set(fields1.keys()) - set(renamed_fields.values()):
schema_editor.add_field(old_model, fields2[field_name])
# 处理字段重命名
for old_name, new_name in renamed_fields.items():
if not isinstance(fields1[old_name], models.ManyToManyField):
rename_column(old_model._meta.db_table, old_name, new_name, get_column_definition(fields2[new_name]))
else:
re_old_name = old_model._meta.app_label + "_" + fields1[old_name].remote_field.through.__name__
re_new_name = old_model._meta.app_label + "_" + fields2[new_name].remote_field.through.__name__
alter_model(fields1[old_name].remote_field.through, re_old_name, re_new_name)
# 修改字段属性
for field_name in set(fields1.keys()) & set(fields2.keys()):
if field_name not in renamed_fields:
field1 = fields1[field_name]
field2 = fields2[field_name]
if field1.deconstruct() != field2.deconstruct() and not isinstance(field2, models.ManyToManyField):
try:
schema_editor.alter_field(old_model, field1, field2)
except OperationalError:
schema_editor.remove_field(old_model, field1)
schema_editor.add_field(old_model, field2)
elif field1.deconstruct() != field2.deconstruct() and isinstance(field2, models.ManyToManyField):
delete_model(field1.remote_field.through)
if field2.remote_field.through:
install_model(field2.remote_field.through)
def get_column_definition(field):
FIELD_TYPES = {
"IntegerField": "INT",
"CharField": "VARCHAR",
"JSONField": "JSON",
"ManyToManyField": "ManyToManyField",
"DateTimeField": "DATETIME",
"FloatField": "FLOAT"
}
if field.get_internal_type() == 'CharField':
max_length = getattr(field, 'max_length', None)
is_null = getattr(field, 'null', None)
if is_null == 'True':
return f'VARCHAR({max_length}) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci'
return f'VARCHAR({max_length}) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL'
else:
is_null = getattr(field, 'null', None)
if is_null == 'True':
return FIELD_TYPES[field.get_internal_type()] + "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci"
return FIELD_TYPES[field.get_internal_type()] + "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci" + "NOT NULL"
def rename_column(table_name, old_column, new_column, column_definition):
sql = f'ALTER TABLE `zero_code`.`{table_name}` CHANGE COLUMN `{old_column}` `{new_column}` {column_definition};'
print(sql)
with connection.schema_editor() as schema_editor:
schema_editor.execute(sql)
def install_model(model):
"""动态创建模型的数据库表"""
with connection.schema_editor() as schema_editor:
schema_editor.create_model(model)
def delete_model(model):
"""删除模型对应的数据库表"""
with connection.schema_editor() as schema_editor:
schema_editor.delete_model(model)
def alter_model(model, old_table_name, new_table_name):
with connection.schema_editor() as schema_editor:
schema_editor.alter_db_table(model, old_table_name, new_table_name)
def get_app_info(app):
# 获取应用的配置
app_config = apps.get_app_config(app) # 'myapp' 是应用的名称
# 获取应用中注册的所有模型
app_models = app_config.get_models()
return app_models
def find_target_field(name, fields):
for k in fields.keys():
if name == k:
return fields[k]
def get_models_field_detail(target_model, fields=None):
model_filed_detail_list = []
if fields is None:
for field in target_model._meta.get_fields():
if field.name != 'id':
settings = {
'name': field.name,
'type': field.get_internal_type(),
'settings': {
'max_length': getattr(field, 'max_length', None),
'null': getattr(field, 'null', False),
'blank': getattr(field, 'blank', False),
'default': getattr(field, 'default', None),
'to': getattr(field, 'to', None),
'through': getattr(field, 'through', None),
}
}
model_filed_detail_list.append(settings)
return model_filed_detail_list
else:
for k, v in fields.items():
settings = {
'name': k,
'type': v.get_internal_type(),
'settings': {
'max_length': getattr(v, 'max_length', None),
'null': getattr(v, 'null', False),
'blank': getattr(v, 'blank', False),
'default': getattr(v, 'default', None),
'to': getattr(v, 'to', None),
'through': getattr(v, 'through', None),
}
}
model_filed_detail_list.append(settings)
return model_filed_detail_list
def convert_to_serializable(obj):
"""将不可序列化的对象转换为可序列化的形式"""
if isinstance(obj, type): # 处理类对象
return str(obj)
elif isinstance(obj, dict): # 处理字典
return {k: convert_to_serializable(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)): # 处理列表或元组
return [convert_to_serializable(item) for item in obj]
else:
return obj
def judge_the_same(list1, list2):
if len(list1) != len(list2):
return True
for item1 in list1:
for item2 in list2:
if item1["name"] == item2["name"]:
if item1 != item2:
return True
return False
"""
------------------------------------------------------------------------------------------------------------------------------------------------------------
PART II
序列化器部分
"""
def build_serializers(table_name, now_model, attrs, active_related, depth=1):
if depth > 10:
raise ValueError("序列化嵌套超过深度限制")
# 创建 Meta 类
meta_class = type('Meta', (), {
'model': now_model,
'fields': "__all__",
})
# 将 Meta 类作为属性传递给 type() 创建的类
attrs['Meta'] = meta_class
# TODO 加入验证
# 数据验证通过元对象所存放的json中进行读取数据根传入的参数进行比对
# 检测多对多字段并创建嵌套序列化器
for field in now_model._meta.get_fields():
if isinstance(field, models.ManyToManyField) and not active_related:
# 获取关联模型的序列化器
related_model = field.remote_field.model
related_serializer = build_serializers(related_model.__name__, related_model, {}, False, depth + 1)
attrs[field.name] = related_serializer(many=True, read_only=True)
elif isinstance(field, models.ManyToManyRel) and active_related:
related_model = field.remote_field.model
related_serializer = build_serializers(related_model.__name__, related_model, {}, False, depth + 1)
# 将嵌套序列化器添加到 attrs 中
attrs[field.name] = related_serializer(many=True, read_only=True)
elif isinstance(field, models.ForeignKey):
# 获取关联模型的序列化器
related_model = field.remote_field.model
related_serializer = type(related_model.__name__, (serializers.ModelSerializer,), {
'Meta': type('Meta', (), {
'model': related_model,
'fields': '__all__',
'depth': 10
}),
})
# 将嵌套序列化器添加到 attrs 中
attrs[field.name] = related_serializer(read_only=True)
model_class = type(table_name + "_Serializer", (serializers.ModelSerializer,), attrs)
return model_class