347 lines
14 KiB
Python
347 lines
14 KiB
Python
|
|
"""
|
|||
|
|
@project:serviceone
|
|||
|
|
@File:DynamicModelClass.py
|
|||
|
|
@IDE:PyCharm
|
|||
|
|
@Author:徐彬程
|
|||
|
|
@Date:2023/12/2 18:09
|
|||
|
|
"""
|
|||
|
|
import copy
|
|||
|
|
import os.path
|
|||
|
|
import re
|
|||
|
|
from abc import abstractmethod
|
|||
|
|
from django.db import models
|
|||
|
|
from rest_framework import serializers
|
|||
|
|
from django.utils import timezone
|
|||
|
|
import json
|
|||
|
|
from django.apps import apps
|
|||
|
|
import requests
|
|||
|
|
import configparser # 这里一定要全小写,不然会报找不到模块错误
|
|||
|
|
from django.conf import settings
|
|||
|
|
from dependency.classes.tree.treeClass import PythonCodeTreeNode, PythonCodeTreeController
|
|||
|
|
|
|||
|
|
cf = configparser.ConfigParser()
|
|||
|
|
path = os.path.join(settings.BASE_DIR, 'dependency', 'json_data_in_need', 'ZeroCode.conf')
|
|||
|
|
cf.read(path)
|
|||
|
|
"""
|
|||
|
|
data模版
|
|||
|
|
data = [
|
|||
|
|
{
|
|||
|
|
"name": "uuid",
|
|||
|
|
"field_type": "CharField",
|
|||
|
|
"options": {
|
|||
|
|
'max_length': 200
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "path_name",
|
|||
|
|
"field_type": "CharField",
|
|||
|
|
"options": {
|
|||
|
|
"Article",
|
|||
|
|
"null":True,
|
|||
|
|
"blank":"True",
|
|||
|
|
"on_delete":"models.SET_NULL",
|
|||
|
|
"related_name":"'fileRecord'",
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
]
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BuildDynamicModelClass:
|
|||
|
|
"""
|
|||
|
|
工厂模式
|
|||
|
|
"""
|
|||
|
|
ZeroCodeModelInfoData = []
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def model_atr_transform(cls, data):
|
|||
|
|
FIELD_TYPES = {
|
|||
|
|
"CharField": models.CharField,
|
|||
|
|
"IntegerField": models.IntegerField,
|
|||
|
|
"JSONField": models.JSONField,
|
|||
|
|
"ImageField": models.ImageField,
|
|||
|
|
"IPAddressField": models.IPAddressField,
|
|||
|
|
"TextField": models.TextField,
|
|||
|
|
"TimeField": models.TimeField,
|
|||
|
|
"ForeignKey": models.ForeignKey,
|
|||
|
|
"ManyToManyField": models.ManyToManyField,
|
|||
|
|
"DateTimeField": models.DateTimeField,
|
|||
|
|
"BigIntegerField": models.BigIntegerField,
|
|||
|
|
"FloatField": models.FloatField
|
|||
|
|
}
|
|||
|
|
OPTIONAL_FIELD_TYPES = {
|
|||
|
|
"models.SET_NULL": models.SET_NULL,
|
|||
|
|
"models.CASCADE": models.CASCADE,
|
|||
|
|
"timezone.now": timezone.now
|
|||
|
|
}
|
|||
|
|
IGNORE_OPTIONS_LIST = {
|
|||
|
|
"is_visible": "any",
|
|||
|
|
"is_editable": "any",
|
|||
|
|
"form_type": "any",
|
|||
|
|
"data_show_fields": [],
|
|||
|
|
"null": "False",
|
|||
|
|
"blank": "False",
|
|||
|
|
"primary_key": "False",
|
|||
|
|
"unique": "False"
|
|||
|
|
}
|
|||
|
|
model_fields = {}
|
|||
|
|
new_data = copy.deepcopy(data)
|
|||
|
|
for f in new_data:
|
|||
|
|
field_klass = FIELD_TYPES[f["field_type"]]
|
|||
|
|
if f["field_type"] == 'ManyToManyField':
|
|||
|
|
keys_to_keep = ["to", "through"]
|
|||
|
|
f["options"] = {key: value for key, value in f["options"].items() if key in keys_to_keep}
|
|||
|
|
continue
|
|||
|
|
for k, v in f["options"].items():
|
|||
|
|
if isinstance(v, str):
|
|||
|
|
if v in OPTIONAL_FIELD_TYPES:
|
|||
|
|
f["options"][k] = OPTIONAL_FIELD_TYPES[v]
|
|||
|
|
for ignore_k, ignore_v in IGNORE_OPTIONS_LIST.items():
|
|||
|
|
if ignore_k in f["options"].keys() and ignore_v == f["options"][ignore_k]:
|
|||
|
|
del f["options"][ignore_k]
|
|||
|
|
elif ignore_v == 'any' and ignore_k in f["options"].keys():
|
|||
|
|
del f["options"][ignore_k]
|
|||
|
|
model_fields[f["name"]] = field_klass(**f["options"])
|
|||
|
|
return model_fields
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_model(cls, table_name):
|
|||
|
|
fields = cls.get_model_info({
|
|||
|
|
"return_type": "False",
|
|||
|
|
"target_model_info": {
|
|||
|
|
"model_name": table_name,
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
# 结构化数据
|
|||
|
|
progress_data = cls.model_atr_transform(fields["model_info_json"])
|
|||
|
|
# 构建model类
|
|||
|
|
try:
|
|||
|
|
now_model = apps.get_model('dependency', table_name)
|
|||
|
|
except LookupError:
|
|||
|
|
now_model = cls.build_model(table_name, progress_data)
|
|||
|
|
return now_model
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_model_by_data(cls, table_name, data):
|
|||
|
|
# 结构化数据
|
|||
|
|
progress_data = cls.model_atr_transform(data)
|
|||
|
|
# 构建model类
|
|||
|
|
try:
|
|||
|
|
now_model = apps.get_model('dependency', table_name)
|
|||
|
|
except LookupError:
|
|||
|
|
now_model = cls.build_model(table_name, progress_data)
|
|||
|
|
return now_model
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_model_sign(cls, table_name):
|
|||
|
|
fields = BuildDynamicModelClass.get_model_info({
|
|||
|
|
"return_type": "False",
|
|||
|
|
"target_model_info": {
|
|||
|
|
"model_name": table_name,
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
return fields["sign"]
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_model_field(cls, table_name):
|
|||
|
|
fields = BuildDynamicModelClass.get_model_info({
|
|||
|
|
"return_type": "False",
|
|||
|
|
"target_model_info": {
|
|||
|
|
"model_name": table_name,
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
return fields["model_info_json"]
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_model_name_by_sign(cls, sign):
|
|||
|
|
fields = BuildDynamicModelClass.get_model_info({
|
|||
|
|
"return_type": "False",
|
|||
|
|
"target_model_info": {
|
|||
|
|
"sign": sign,
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
return fields["model_name"]
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_serializers(cls, table_name):
|
|||
|
|
fields = BuildDynamicModelClass.get_model_info({
|
|||
|
|
"return_type": "False",
|
|||
|
|
"target_model_info": {
|
|||
|
|
"model_name": table_name,
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
# 结构化数据
|
|||
|
|
progress_data = BuildDynamicModelClass.model_atr_transform(fields["model_info_json"])
|
|||
|
|
# 构建model类
|
|||
|
|
now_model = cls.get_model_by_data(table_name, fields["model_info_json"])
|
|||
|
|
# 构建序列化类
|
|||
|
|
serializers_build = BuildSerializers.build_serializers(table_name + "_serializers",
|
|||
|
|
now_model,
|
|||
|
|
{'extend': progress_data})
|
|||
|
|
return serializers_build
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def build_model(cls, table_name, fields, module=''):
|
|||
|
|
# 获取表格数据
|
|||
|
|
attrs = fields
|
|||
|
|
|
|||
|
|
# 放入表名称
|
|||
|
|
class Meta:
|
|||
|
|
app_label = 'dependency'
|
|||
|
|
db_table = table_name
|
|||
|
|
|
|||
|
|
# 设置表名以及相关model属性
|
|||
|
|
attrs['Meta'] = Meta
|
|||
|
|
attrs['__module__'] = module
|
|||
|
|
|
|||
|
|
# 进行类创建
|
|||
|
|
model_class = type(table_name, (models.Model,), attrs)
|
|||
|
|
return model_class
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_zero_code_model_fields(cls):
|
|||
|
|
cls.ZeroCodeModelInfoData = []
|
|||
|
|
if not len(cls.ZeroCodeModelInfoData) == 0:
|
|||
|
|
return cls.ZeroCodeModelInfoData
|
|||
|
|
else:
|
|||
|
|
with open('dependency/json_data_in_need/data.json',encoding='utf-8') as data:
|
|||
|
|
cls.ZeroCodeModelInfoData = json.load(data)
|
|||
|
|
return cls.ZeroCodeModelInfoData
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_model_info(cls, data):
|
|||
|
|
res = cls.get_zero_code_model_fields()
|
|||
|
|
if len(res) == 0:
|
|||
|
|
res = cls.get_zero_code_model_fields()
|
|||
|
|
progress_data = cls.model_atr_transform(dict(res)["table_fields"])
|
|||
|
|
try:
|
|||
|
|
now_model = apps.get_model('dependency', dict(res)["table_name"])
|
|||
|
|
except LookupError:
|
|||
|
|
now_model = cls.build_model(dict(res)["table_name"], progress_data)
|
|||
|
|
data['target_model_info'].update({'model_status': 1})
|
|||
|
|
if not bool(data['return_type'] == 'True'):
|
|||
|
|
try:
|
|||
|
|
|
|||
|
|
target = now_model.objects.get(**json.loads(json.dumps(data['target_model_info'])))
|
|||
|
|
except now_model.DoesNotExist as e:
|
|||
|
|
return str('查找的表不存在')
|
|||
|
|
else:
|
|||
|
|
target = now_model.objects.filter(**json.loads(json.dumps(data['target_model_info'])))
|
|||
|
|
# 构建序列化类
|
|||
|
|
serializers_build = BuildSerializers.build_serializers('zero_code_model', now_model, {})
|
|||
|
|
# 序列化类使用
|
|||
|
|
serializers = serializers_build(target, many=bool(data['return_type'] == 'True'))
|
|||
|
|
return serializers.data
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_table_relation(cls, table_sign, now_table):
|
|||
|
|
return_data = {
|
|||
|
|
"columns": [],
|
|||
|
|
"serializers_list": {},
|
|||
|
|
}
|
|||
|
|
fields = cls.get_model_info({
|
|||
|
|
"return_type": "False",
|
|||
|
|
"target_model_info": {
|
|||
|
|
"model_name": "middle_index_table",
|
|||
|
|
}})
|
|||
|
|
# 结构化数据
|
|||
|
|
progress_data = BuildDynamicModelClass.model_atr_transform(fields["model_info_json"])
|
|||
|
|
# 构建model类
|
|||
|
|
try:
|
|||
|
|
now_model = apps.get_model('dependency', "middle_index_table")
|
|||
|
|
except LookupError:
|
|||
|
|
now_model = BuildDynamicModelClass.build_model("middle_index_table", progress_data)
|
|||
|
|
target = now_model.objects.all()
|
|||
|
|
# 构建序列化类
|
|||
|
|
serializers_build = BuildSerializers.build_serializers("middle_index_table" + "_serializers",
|
|||
|
|
now_model,
|
|||
|
|
{"extend": progress_data})
|
|||
|
|
serializers = serializers_build(target, many=True)
|
|||
|
|
middle_serializer = None
|
|||
|
|
middle_model = None
|
|||
|
|
column_data = None
|
|||
|
|
for item in serializers.data:
|
|||
|
|
# 获取关联表信息
|
|||
|
|
if fields["sign"] == item["table_list"][0] and not item["middle_table_json_info"] == {}:
|
|||
|
|
middle_data = item["middle_table_json_info"]["middle_json_info"]
|
|||
|
|
middle_model = cls.get_model_by_data(item["middle_table_json_info"]["middle_name"], middle_data)
|
|||
|
|
progress_data = BuildDynamicModelClass.model_atr_transform(middle_data)
|
|||
|
|
middle_serializer = BuildSerializers.build_serializers(item["middle_table_json_info"]["middle_name"],
|
|||
|
|
middle_model, {'extend': progress_data})
|
|||
|
|
# 是否关联表判定
|
|||
|
|
if table_sign in item["table_list"] and not item["middle_table_json_info"] == {} and 'empty' not in item[
|
|||
|
|
"table_list"]:
|
|||
|
|
# 写入当前表的预设置字段信息
|
|||
|
|
if middle_model is not None and middle_serializer is not None:
|
|||
|
|
try:
|
|||
|
|
target = middle_model.objects.get(middle_index_table_sign=item["sign"])
|
|||
|
|
column_data = middle_serializer(target).data
|
|||
|
|
except middle_model.DoesNotExist:
|
|||
|
|
print(123)
|
|||
|
|
item = dict(item)
|
|||
|
|
# 结构化数据
|
|||
|
|
middle_progress_data = BuildDynamicModelClass.model_atr_transform(
|
|||
|
|
item["middle_table_json_info"]["middle_json_info"])
|
|||
|
|
# 构建model类
|
|||
|
|
try:
|
|||
|
|
middle_now_model = apps.get_model('dependency', item["middle_table_json_info"]["middle_name"])
|
|||
|
|
except LookupError:
|
|||
|
|
middle_now_model = BuildDynamicModelClass.build_model(item["middle_table_json_info"]["middle_name"],
|
|||
|
|
middle_progress_data)
|
|||
|
|
# 构建序列化类
|
|||
|
|
filter_data = {
|
|||
|
|
now_table + "_sign": item["sign"]
|
|||
|
|
}
|
|||
|
|
middle_target = middle_now_model.objects.filter(**filter_data)
|
|||
|
|
middle_serializers_build = BuildSerializers.build_serializers(
|
|||
|
|
item["middle_table_json_info"]["middle_name"] + "_serializers",
|
|||
|
|
middle_now_model,
|
|||
|
|
{"extend": middle_progress_data})
|
|||
|
|
return_data["serializers_list"].update({"middle_table_serializers": middle_serializers_build})
|
|||
|
|
return_data["serializers_list"].update({"middle_now_model": middle_now_model})
|
|||
|
|
return_data["serializers_list"].update(
|
|||
|
|
{"fields": [{x['name']: ""} for x in item["middle_table_json_info"]["middle_json_info"] if
|
|||
|
|
x['name'] != 'sign']})
|
|||
|
|
if column_data is not None:
|
|||
|
|
relation_columns_model = cls.get_model("relation_columns").objects.get(
|
|||
|
|
sign=column_data["relation_columns_sign"])
|
|||
|
|
columns_infos = cls.get_serializers("relation_columns")(relation_columns_model).data
|
|||
|
|
frontend_show_model = cls.get_model("frontend_show").objects.get(
|
|||
|
|
sign=columns_infos["frontend_show"])
|
|||
|
|
frontend_show_infos = cls.get_serializers("frontend_show")(frontend_show_model).data
|
|||
|
|
for columns in columns_infos["columns_infos"]:
|
|||
|
|
for additional in frontend_show_infos["show_items"]:
|
|||
|
|
if columns["name"] == additional["field_name"]:
|
|||
|
|
columns["additional"] = additional["additional"]
|
|||
|
|
return_data.update({'columns': columns_infos["columns_infos"]})
|
|||
|
|
return return_data
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BuildSerializers:
|
|||
|
|
@classmethod
|
|||
|
|
def build_serializers(cls, table_name, now_model, attrs):
|
|||
|
|
extra_kwargs = {}
|
|||
|
|
if 'extend' in attrs.keys():
|
|||
|
|
for item in attrs['extend'].keys():
|
|||
|
|
extra_kwargs.update({item: {'validators': []}})
|
|||
|
|
del attrs["extend"]
|
|||
|
|
attr = attrs
|
|||
|
|
else:
|
|||
|
|
attr = attrs
|
|||
|
|
|
|||
|
|
# 创建 Meta 类
|
|||
|
|
meta_class = type('Meta', (), {
|
|||
|
|
'model': now_model,
|
|||
|
|
'fields': '__all__',
|
|||
|
|
'depth': 1,
|
|||
|
|
"extra_kwargs": extra_kwargs
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
# 将 Meta 类作为属性传递给 type() 创建的类
|
|||
|
|
attr['Meta'] = meta_class
|
|||
|
|
|
|||
|
|
# 数据验证通过元对象所存放的json中进行读取数据,根传入的参数进行比对
|
|||
|
|
|
|||
|
|
model_class = type(table_name, (serializers.ModelSerializer,), attr)
|
|||
|
|
return model_class
|