Files
ZERO_CODE/ZeroCodeProject/apis/ZeroCodeMain/BaseApi/models.py

194 lines
7.3 KiB
Python
Raw Normal View History

2026-01-11 00:11:34 +08:00
import copy
import uuid
from django.apps import apps
from django.core.validators import ValidationError
from django.db import models
from django.db.models import ManyToManyRel
from dependency.base.utils import get_models_field_detail, get_app_info, judge_the_same, \
judge_diff_be_two_models_and_change, table_exists, build_serializers, install_model
from django.core.exceptions import FieldDoesNotExist
def get_model(model_name, queryset_dict=None):
if queryset_dict is None:
model_infos = Model.objects.get(name=model_name)
else:
model_infos = Model.objects.filter(**queryset_dict)
get_serializers = build_serializers(Model.__name__, Model, {}, False)
model_data = get_serializers(model_infos).data
app_config = apps.get_app_config(model_data["app"]["name"])
return app_config.get_model(model_name)
def get_model_status(model_name, queryset_dict=None):
if queryset_dict is None:
model_infos = Model.objects.get(name=model_name)
else:
model_infos = Model.objects.filter(**queryset_dict)
return model_infos
def get_model_fields(model_name):
model_id = Model.objects.get(name=model_name).id
get_serializers = build_serializers(ModelField.__name__, ModelField, {}, False)
return get_serializers(ModelField.objects.filter(model_id=model_id), many=True).data
def reverse_add_sub_data(add_data, sub_model_instance=None):
return_datas = []
for items in add_data:
need_del_key = []
need_add_sub_list = {}
for key, value in items.items():
if isinstance(value, list) and len(value) > 0:
sub_model = sub_model_instance._meta.get_field(key).remote_field.model
res = reverse_add_sub_data(value, sub_model_instance=sub_model)
need_add_sub_list.update({key: res})
need_del_key.append(key)
elif isinstance(value, list) and len(value) == 0:
need_del_key.append(key)
# 数据库添加数据
for keys in need_del_key:
del items[keys]
items["uuid"] = str(uuid.uuid4())
if 'id' in items:
sub_item_ins = sub_model_instance.objects.get(id=items["id"])
else:
sub_item_ins = sub_model_instance.objects.create(**items)
return_datas.append(sub_item_ins)
for add_sub_item_k, add_sub_item_v in need_add_sub_list.items():
for add_sub_item in add_sub_item_v:
many_to_many_manager = getattr(sub_item_ins, add_sub_item_k)
many_to_many_manager.add(add_sub_item)
return return_datas
def def_new_model(name, fields=None, app_label='', module='', options=None):
class Meta:
# Using type('Meta', ...) gives a dictproxy error during model creation
pass
if app_label:
# app_label必须用Meta内部类来设定
setattr(Meta, 'app_label', app_label)
# 若提供了options参数就要用更新Meta类
if options is not None and not isinstance(options, bool):
for key, value in options.iteritems():
setattr(Meta, key, value)
# 创建一个字典来模拟类的声明module和当前所在的module对应
attrs = {'__module__': module, 'Meta': Meta}
# 创建这个类这会触发ModelBase来处理
model = type(name, (models.Model,), attrs)
# 加入所有提供的字段
if fields:
for item, value in fields.items():
if isinstance(value, models.CharField):
value.max_length = int(value.max_length)
value.contribute_to_class(model, item)
return model
def create_model(name, fields=None, app_label='', module='', options=None):
"""
创建指定model
"""
# 查询当前app内的模型是否注册
all_models = get_app_info(app_label)
app_config = apps.get_app_config(app_label)
for model in all_models:
if model.__name__ == name:
exist_list = get_models_field_detail(model)
new_list = get_models_field_detail({}, fields=fields)
add_item = []
for item in model._meta.get_fields():
if isinstance(item, ManyToManyRel):
add_item = get_models_field_detail({}, fields={item.name: item})
new_list.extend(add_item)
for field in model._meta.get_fields():
if isinstance(field, models.ManyToManyField):
# 获取关联模型的序列化器
related_model = field.remote_field.through
if related_model.__name__ in app_config.models:
del app_config.models[related_model.__name__]
if judge_the_same(exist_list, new_list):
## 清理模型缓存
apps.clear_cache()
# 从 sys.modules 中移除旧模型
model_key = f"{name.lower()}"
if model_key in app_config.models.keys():
del app_config.models[model_key]
# 重新定义模型
new_model = def_new_model(name, fields, app_label, module, options)
judge_diff_be_two_models_and_change(model, new_model)
return new_model
else:
return model
model = def_new_model(name, fields, app_label, module, options)
if not table_exists((app_label + "_" + name)):
print("模型初始化:", app_label + "_" + name)
install_model(model)
return model
class App(models.Model):
name = models.CharField(max_length=255)
name_cn = models.CharField(max_length=255)
module = models.CharField(max_length=255)
def __str__(self):
return self.name
class Model(models.Model):
app = models.ForeignKey(App, related_name='models', on_delete=models.CASCADE)
name = models.CharField(max_length=255)
name_cn = models.CharField(max_length=255)
def __str__(self):
return self.name
def get_django_model(self):
"Returns a functional Django model based on current data"
# Get all associated fields into a list ready for dict()
fields = [(f.name, f.get_django_field()) for f in self.model_fields.all()]
# Use the create_model function defined above
return create_model(self.name, dict(fields), self.app.name, self.app.module)
class Meta:
unique_together = (('app', 'name'),)
class ModelField(models.Model):
model = models.ForeignKey(Model, related_name='model_fields', on_delete=models.CASCADE)
name = models.CharField(max_length=255)
name_cn = models.CharField(max_length=255)
type = models.CharField(max_length=255)
is_show = models.BooleanField(default=True)
def get_django_field(self):
"Returns the correct field type, instantiated with applicable settings"
# Get all associated settings into a list ready for dict()
settings = [(s.name, s.value) for s in self.model_settings.all()]
# Instantiate the field with the settings as **kwargs
return getattr(models, self.type)(**dict(settings))
class Meta:
unique_together = (('model', 'name'),)
class ModelSetting(models.Model):
field = models.ForeignKey(ModelField, related_name='model_settings', on_delete=models.CASCADE)
name = models.CharField(max_length=255)
name_cn = models.CharField(max_length=255)
value = models.CharField(max_length=255)
class Meta:
unique_together = (('field', 'name'),)