Files
vermouth789 fe39320977 123
123
2026-01-11 00:11:34 +08:00

347 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
@project:serviceone
@File:DynamicModelClass.py
@IDE:PyCharm
@Author:徐彬程
@Date:2023/12/2 18:09
"""
import copy
import os.path
import re
from abc import abstractmethod
from django.db import models
from rest_framework import serializers
from django.utils import timezone
import json
from django.apps import apps
import requests
import configparser # 这里一定要全小写,不然会报找不到模块错误
from django.conf import settings
from dependency.classes.tree.treeClass import PythonCodeTreeNode, PythonCodeTreeController
cf = configparser.ConfigParser()
path = os.path.join(settings.BASE_DIR, 'dependency', 'json_data_in_need', 'ZeroCode.conf')
cf.read(path)
"""
data模版
data = [
{
"name": "uuid",
"field_type": "CharField",
"options": {
'max_length': 200
}
},
{
"name": "path_name",
"field_type": "CharField",
"options": {
"Article",
"null":True,
"blank":"True",
"on_delete":"models.SET_NULL",
"related_name":"'fileRecord'",
}
}
]
"""
class BuildDynamicModelClass:
"""
工厂模式
"""
ZeroCodeModelInfoData = []
@classmethod
def model_atr_transform(cls, data):
FIELD_TYPES = {
"CharField": models.CharField,
"IntegerField": models.IntegerField,
"JSONField": models.JSONField,
"ImageField": models.ImageField,
"IPAddressField": models.IPAddressField,
"TextField": models.TextField,
"TimeField": models.TimeField,
"ForeignKey": models.ForeignKey,
"ManyToManyField": models.ManyToManyField,
"DateTimeField": models.DateTimeField,
"BigIntegerField": models.BigIntegerField,
"FloatField": models.FloatField
}
OPTIONAL_FIELD_TYPES = {
"models.SET_NULL": models.SET_NULL,
"models.CASCADE": models.CASCADE,
"timezone.now": timezone.now
}
IGNORE_OPTIONS_LIST = {
"is_visible": "any",
"is_editable": "any",
"form_type": "any",
"data_show_fields": [],
"null": "False",
"blank": "False",
"primary_key": "False",
"unique": "False"
}
model_fields = {}
new_data = copy.deepcopy(data)
for f in new_data:
field_klass = FIELD_TYPES[f["field_type"]]
if f["field_type"] == 'ManyToManyField':
keys_to_keep = ["to", "through"]
f["options"] = {key: value for key, value in f["options"].items() if key in keys_to_keep}
continue
for k, v in f["options"].items():
if isinstance(v, str):
if v in OPTIONAL_FIELD_TYPES:
f["options"][k] = OPTIONAL_FIELD_TYPES[v]
for ignore_k, ignore_v in IGNORE_OPTIONS_LIST.items():
if ignore_k in f["options"].keys() and ignore_v == f["options"][ignore_k]:
del f["options"][ignore_k]
elif ignore_v == 'any' and ignore_k in f["options"].keys():
del f["options"][ignore_k]
model_fields[f["name"]] = field_klass(**f["options"])
return model_fields
@classmethod
def get_model(cls, table_name):
fields = cls.get_model_info({
"return_type": "False",
"target_model_info": {
"model_name": table_name,
}
})
# 结构化数据
progress_data = cls.model_atr_transform(fields["model_info_json"])
# 构建model类
try:
now_model = apps.get_model('dependency', table_name)
except LookupError:
now_model = cls.build_model(table_name, progress_data)
return now_model
@classmethod
def get_model_by_data(cls, table_name, data):
# 结构化数据
progress_data = cls.model_atr_transform(data)
# 构建model类
try:
now_model = apps.get_model('dependency', table_name)
except LookupError:
now_model = cls.build_model(table_name, progress_data)
return now_model
@classmethod
def get_model_sign(cls, table_name):
fields = BuildDynamicModelClass.get_model_info({
"return_type": "False",
"target_model_info": {
"model_name": table_name,
}
})
return fields["sign"]
@classmethod
def get_model_field(cls, table_name):
fields = BuildDynamicModelClass.get_model_info({
"return_type": "False",
"target_model_info": {
"model_name": table_name,
}
})
return fields["model_info_json"]
@classmethod
def get_model_name_by_sign(cls, sign):
fields = BuildDynamicModelClass.get_model_info({
"return_type": "False",
"target_model_info": {
"sign": sign,
}
})
return fields["model_name"]
@classmethod
def get_serializers(cls, table_name):
fields = BuildDynamicModelClass.get_model_info({
"return_type": "False",
"target_model_info": {
"model_name": table_name,
}
})
# 结构化数据
progress_data = BuildDynamicModelClass.model_atr_transform(fields["model_info_json"])
# 构建model类
now_model = cls.get_model_by_data(table_name, fields["model_info_json"])
# 构建序列化类
serializers_build = BuildSerializers.build_serializers(table_name + "_serializers",
now_model,
{'extend': progress_data})
return serializers_build
@classmethod
def build_model(cls, table_name, fields, module=''):
# 获取表格数据
attrs = fields
# 放入表名称
class Meta:
app_label = 'dependency'
db_table = table_name
# 设置表名以及相关model属性
attrs['Meta'] = Meta
attrs['__module__'] = module
# 进行类创建
model_class = type(table_name, (models.Model,), attrs)
return model_class
@classmethod
def get_zero_code_model_fields(cls):
cls.ZeroCodeModelInfoData = []
if not len(cls.ZeroCodeModelInfoData) == 0:
return cls.ZeroCodeModelInfoData
else:
with open('dependency/json_data_in_need/data.json',encoding='utf-8') as data:
cls.ZeroCodeModelInfoData = json.load(data)
return cls.ZeroCodeModelInfoData
@classmethod
def get_model_info(cls, data):
res = cls.get_zero_code_model_fields()
if len(res) == 0:
res = cls.get_zero_code_model_fields()
progress_data = cls.model_atr_transform(dict(res)["table_fields"])
try:
now_model = apps.get_model('dependency', dict(res)["table_name"])
except LookupError:
now_model = cls.build_model(dict(res)["table_name"], progress_data)
data['target_model_info'].update({'model_status': 1})
if not bool(data['return_type'] == 'True'):
try:
target = now_model.objects.get(**json.loads(json.dumps(data['target_model_info'])))
except now_model.DoesNotExist as e:
return str('查找的表不存在')
else:
target = now_model.objects.filter(**json.loads(json.dumps(data['target_model_info'])))
# 构建序列化类
serializers_build = BuildSerializers.build_serializers('zero_code_model', now_model, {})
# 序列化类使用
serializers = serializers_build(target, many=bool(data['return_type'] == 'True'))
return serializers.data
@classmethod
def get_table_relation(cls, table_sign, now_table):
return_data = {
"columns": [],
"serializers_list": {},
}
fields = cls.get_model_info({
"return_type": "False",
"target_model_info": {
"model_name": "middle_index_table",
}})
# 结构化数据
progress_data = BuildDynamicModelClass.model_atr_transform(fields["model_info_json"])
# 构建model类
try:
now_model = apps.get_model('dependency', "middle_index_table")
except LookupError:
now_model = BuildDynamicModelClass.build_model("middle_index_table", progress_data)
target = now_model.objects.all()
# 构建序列化类
serializers_build = BuildSerializers.build_serializers("middle_index_table" + "_serializers",
now_model,
{"extend": progress_data})
serializers = serializers_build(target, many=True)
middle_serializer = None
middle_model = None
column_data = None
for item in serializers.data:
# 获取关联表信息
if fields["sign"] == item["table_list"][0] and not item["middle_table_json_info"] == {}:
middle_data = item["middle_table_json_info"]["middle_json_info"]
middle_model = cls.get_model_by_data(item["middle_table_json_info"]["middle_name"], middle_data)
progress_data = BuildDynamicModelClass.model_atr_transform(middle_data)
middle_serializer = BuildSerializers.build_serializers(item["middle_table_json_info"]["middle_name"],
middle_model, {'extend': progress_data})
# 是否关联表判定
if table_sign in item["table_list"] and not item["middle_table_json_info"] == {} and 'empty' not in item[
"table_list"]:
# 写入当前表的预设置字段信息
if middle_model is not None and middle_serializer is not None:
try:
target = middle_model.objects.get(middle_index_table_sign=item["sign"])
column_data = middle_serializer(target).data
except middle_model.DoesNotExist:
print(123)
item = dict(item)
# 结构化数据
middle_progress_data = BuildDynamicModelClass.model_atr_transform(
item["middle_table_json_info"]["middle_json_info"])
# 构建model类
try:
middle_now_model = apps.get_model('dependency', item["middle_table_json_info"]["middle_name"])
except LookupError:
middle_now_model = BuildDynamicModelClass.build_model(item["middle_table_json_info"]["middle_name"],
middle_progress_data)
# 构建序列化类
filter_data = {
now_table + "_sign": item["sign"]
}
middle_target = middle_now_model.objects.filter(**filter_data)
middle_serializers_build = BuildSerializers.build_serializers(
item["middle_table_json_info"]["middle_name"] + "_serializers",
middle_now_model,
{"extend": middle_progress_data})
return_data["serializers_list"].update({"middle_table_serializers": middle_serializers_build})
return_data["serializers_list"].update({"middle_now_model": middle_now_model})
return_data["serializers_list"].update(
{"fields": [{x['name']: ""} for x in item["middle_table_json_info"]["middle_json_info"] if
x['name'] != 'sign']})
if column_data is not None:
relation_columns_model = cls.get_model("relation_columns").objects.get(
sign=column_data["relation_columns_sign"])
columns_infos = cls.get_serializers("relation_columns")(relation_columns_model).data
frontend_show_model = cls.get_model("frontend_show").objects.get(
sign=columns_infos["frontend_show"])
frontend_show_infos = cls.get_serializers("frontend_show")(frontend_show_model).data
for columns in columns_infos["columns_infos"]:
for additional in frontend_show_infos["show_items"]:
if columns["name"] == additional["field_name"]:
columns["additional"] = additional["additional"]
return_data.update({'columns': columns_infos["columns_infos"]})
return return_data
class BuildSerializers:
@classmethod
def build_serializers(cls, table_name, now_model, attrs):
extra_kwargs = {}
if 'extend' in attrs.keys():
for item in attrs['extend'].keys():
extra_kwargs.update({item: {'validators': []}})
del attrs["extend"]
attr = attrs
else:
attr = attrs
# 创建 Meta 类
meta_class = type('Meta', (), {
'model': now_model,
'fields': '__all__',
'depth': 1,
"extra_kwargs": extra_kwargs
})
# 将 Meta 类作为属性传递给 type() 创建的类
attr['Meta'] = meta_class
# 数据验证通过元对象所存放的json中进行读取数据根传入的参数进行比对
model_class = type(table_name, (serializers.ModelSerializer,), attr)
return model_class