254 lines
10 KiB
Python
254 lines
10 KiB
Python
import copy
|
||
|
||
import uuid
|
||
from django.db import connection, OperationalError
|
||
from django.apps import apps
|
||
import hashlib
|
||
import json
|
||
from django.db import connection, models
|
||
from rest_framework import serializers
|
||
|
||
"""
|
||
------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||
PART I
|
||
动态MODEL部分
|
||
"""
|
||
|
||
|
||
def table_exists(table_name):
|
||
"""
|
||
检查表是否存在于数据库中
|
||
:param table_name: 表名
|
||
:return: True 如果表存在,否则 False
|
||
"""
|
||
with connection.cursor() as cursor:
|
||
return table_name in connection.introspection.table_names()
|
||
|
||
|
||
def judge_diff_be_two_models_and_change(old_model, new_model):
|
||
# 获取表名
|
||
old_table_name = old_model._meta.db_table
|
||
new_table_name = new_model._meta.db_table
|
||
if old_table_name != new_table_name:
|
||
alter_model(old_model, old_table_name, new_table_name)
|
||
# 修改表字段
|
||
fields1 = {f.name: f for f in copy.deepcopy(old_model._meta.get_fields()) if f.concrete}
|
||
fields2 = {f.name: f for f in copy.deepcopy(new_model._meta.get_fields()) if f.concrete}
|
||
del fields1['id']
|
||
del fields2['id']
|
||
|
||
# 检测字段重命名
|
||
renamed_fields = {}
|
||
for field_name1, field_x1 in fields1.items():
|
||
for field_name2, field_x2 in fields2.items():
|
||
list_field_x1 = list(field_x1.deconstruct())
|
||
list_field_x2 = list(field_x2.deconstruct())
|
||
list_field_x2[0] = field_name2
|
||
if list_field_x2 == list_field_x1 and field_name1 != field_name2:
|
||
renamed_fields[field_name1] = field_name2
|
||
break
|
||
|
||
with connection.schema_editor() as schema_editor:
|
||
# 删除旧模型中有但新模型中没有的字段(排除重命名字段)
|
||
for field_name in set(fields1.keys()) - set(fields2.keys()) - set(renamed_fields.keys()):
|
||
schema_editor.remove_field(old_model, fields1[field_name])
|
||
|
||
# 添加新模型中有但旧模型中没有的字段
|
||
for field_name in set(fields2.keys()) - set(fields1.keys()) - set(renamed_fields.values()):
|
||
schema_editor.add_field(old_model, fields2[field_name])
|
||
|
||
# 处理字段重命名
|
||
for old_name, new_name in renamed_fields.items():
|
||
if not isinstance(fields1[old_name], models.ManyToManyField):
|
||
rename_column(old_model._meta.db_table, old_name, new_name, get_column_definition(fields2[new_name]))
|
||
else:
|
||
re_old_name = old_model._meta.app_label + "_" + fields1[old_name].remote_field.through.__name__
|
||
re_new_name = old_model._meta.app_label + "_" + fields2[new_name].remote_field.through.__name__
|
||
alter_model(fields1[old_name].remote_field.through, re_old_name, re_new_name)
|
||
|
||
# 修改字段属性
|
||
for field_name in set(fields1.keys()) & set(fields2.keys()):
|
||
if field_name not in renamed_fields:
|
||
field1 = fields1[field_name]
|
||
field2 = fields2[field_name]
|
||
if field1.deconstruct() != field2.deconstruct() and not isinstance(field2, models.ManyToManyField):
|
||
try:
|
||
schema_editor.alter_field(old_model, field1, field2)
|
||
except OperationalError:
|
||
schema_editor.remove_field(old_model, field1)
|
||
schema_editor.add_field(old_model, field2)
|
||
elif field1.deconstruct() != field2.deconstruct() and isinstance(field2, models.ManyToManyField):
|
||
delete_model(field1.remote_field.through)
|
||
if field2.remote_field.through:
|
||
install_model(field2.remote_field.through)
|
||
|
||
|
||
def get_column_definition(field):
|
||
FIELD_TYPES = {
|
||
"IntegerField": "INT",
|
||
"CharField": "VARCHAR",
|
||
"JSONField": "JSON",
|
||
"ManyToManyField": "ManyToManyField",
|
||
"DateTimeField": "DATETIME",
|
||
"FloatField": "FLOAT"
|
||
}
|
||
if field.get_internal_type() == 'CharField':
|
||
max_length = getattr(field, 'max_length', None)
|
||
is_null = getattr(field, 'null', None)
|
||
if is_null == 'True':
|
||
return f'VARCHAR({max_length}) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci'
|
||
return f'VARCHAR({max_length}) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL'
|
||
else:
|
||
is_null = getattr(field, 'null', None)
|
||
if is_null == 'True':
|
||
return FIELD_TYPES[field.get_internal_type()] + "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci"
|
||
return FIELD_TYPES[field.get_internal_type()] + "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci" + "NOT NULL"
|
||
|
||
|
||
def rename_column(table_name, old_column, new_column, column_definition):
|
||
sql = f'ALTER TABLE `zero_code`.`{table_name}` CHANGE COLUMN `{old_column}` `{new_column}` {column_definition};'
|
||
print(sql)
|
||
with connection.schema_editor() as schema_editor:
|
||
schema_editor.execute(sql)
|
||
|
||
|
||
def install_model(model):
|
||
"""动态创建模型的数据库表"""
|
||
with connection.schema_editor() as schema_editor:
|
||
schema_editor.create_model(model)
|
||
|
||
|
||
def delete_model(model):
|
||
"""删除模型对应的数据库表"""
|
||
with connection.schema_editor() as schema_editor:
|
||
schema_editor.delete_model(model)
|
||
|
||
|
||
def alter_model(model, old_table_name, new_table_name):
|
||
with connection.schema_editor() as schema_editor:
|
||
schema_editor.alter_db_table(model, old_table_name, new_table_name)
|
||
|
||
|
||
def get_app_info(app):
|
||
# 获取应用的配置
|
||
app_config = apps.get_app_config(app) # 'myapp' 是应用的名称
|
||
|
||
# 获取应用中注册的所有模型
|
||
app_models = app_config.get_models()
|
||
return app_models
|
||
|
||
|
||
def find_target_field(name, fields):
|
||
for k in fields.keys():
|
||
if name == k:
|
||
return fields[k]
|
||
|
||
|
||
def get_models_field_detail(target_model, fields=None):
|
||
model_filed_detail_list = []
|
||
if fields is None:
|
||
for field in target_model._meta.get_fields():
|
||
if field.name != 'id':
|
||
settings = {
|
||
'name': field.name,
|
||
'type': field.get_internal_type(),
|
||
'settings': {
|
||
'max_length': getattr(field, 'max_length', None),
|
||
'null': getattr(field, 'null', False),
|
||
'blank': getattr(field, 'blank', False),
|
||
'default': getattr(field, 'default', None),
|
||
'to': getattr(field, 'to', None),
|
||
'through': getattr(field, 'through', None),
|
||
}
|
||
}
|
||
model_filed_detail_list.append(settings)
|
||
return model_filed_detail_list
|
||
else:
|
||
for k, v in fields.items():
|
||
settings = {
|
||
'name': k,
|
||
'type': v.get_internal_type(),
|
||
'settings': {
|
||
'max_length': getattr(v, 'max_length', None),
|
||
'null': getattr(v, 'null', False),
|
||
'blank': getattr(v, 'blank', False),
|
||
'default': getattr(v, 'default', None),
|
||
'to': getattr(v, 'to', None),
|
||
'through': getattr(v, 'through', None),
|
||
}
|
||
}
|
||
model_filed_detail_list.append(settings)
|
||
return model_filed_detail_list
|
||
|
||
|
||
def convert_to_serializable(obj):
|
||
"""将不可序列化的对象转换为可序列化的形式"""
|
||
if isinstance(obj, type): # 处理类对象
|
||
return str(obj)
|
||
elif isinstance(obj, dict): # 处理字典
|
||
return {k: convert_to_serializable(v) for k, v in obj.items()}
|
||
elif isinstance(obj, (list, tuple)): # 处理列表或元组
|
||
return [convert_to_serializable(item) for item in obj]
|
||
else:
|
||
return obj
|
||
|
||
|
||
def judge_the_same(list1, list2):
|
||
if len(list1) != len(list2):
|
||
return True
|
||
for item1 in list1:
|
||
for item2 in list2:
|
||
if item1["name"] == item2["name"]:
|
||
if item1 != item2:
|
||
return True
|
||
return False
|
||
|
||
|
||
"""
|
||
------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||
PART II
|
||
序列化器部分
|
||
"""
|
||
|
||
|
||
def build_serializers(table_name, now_model, attrs, active_related, depth=1):
|
||
if depth > 10:
|
||
raise ValueError("序列化嵌套超过深度限制")
|
||
# 创建 Meta 类
|
||
meta_class = type('Meta', (), {
|
||
'model': now_model,
|
||
'fields': "__all__",
|
||
})
|
||
|
||
# 将 Meta 类作为属性传递给 type() 创建的类
|
||
attrs['Meta'] = meta_class
|
||
|
||
# TODO 加入验证
|
||
|
||
# 数据验证通过元对象所存放的json中进行读取数据,根传入的参数进行比对
|
||
# 检测多对多字段并创建嵌套序列化器
|
||
for field in now_model._meta.get_fields():
|
||
if isinstance(field, models.ManyToManyField) and not active_related:
|
||
# 获取关联模型的序列化器
|
||
related_model = field.remote_field.model
|
||
related_serializer = build_serializers(related_model.__name__, related_model, {}, False, depth + 1)
|
||
attrs[field.name] = related_serializer(many=True, read_only=True)
|
||
elif isinstance(field, models.ManyToManyRel) and active_related:
|
||
related_model = field.remote_field.model
|
||
related_serializer = build_serializers(related_model.__name__, related_model, {}, False, depth + 1)
|
||
# 将嵌套序列化器添加到 attrs 中
|
||
attrs[field.name] = related_serializer(many=True, read_only=True)
|
||
elif isinstance(field, models.ForeignKey):
|
||
# 获取关联模型的序列化器
|
||
related_model = field.remote_field.model
|
||
related_serializer = type(related_model.__name__, (serializers.ModelSerializer,), {
|
||
'Meta': type('Meta', (), {
|
||
'model': related_model,
|
||
'fields': '__all__',
|
||
'depth': 10
|
||
}),
|
||
})
|
||
# 将嵌套序列化器添加到 attrs 中
|
||
attrs[field.name] = related_serializer(read_only=True)
|
||
model_class = type(table_name + "_Serializer", (serializers.ModelSerializer,), attrs)
|
||
return model_class |