347 lines
14 KiB
Python
347 lines
14 KiB
Python
"""
|
||
@project:serviceone
|
||
@File:DynamicModelClass.py
|
||
@IDE:PyCharm
|
||
@Author:徐彬程
|
||
@Date:2023/12/2 18:09
|
||
"""
|
||
import copy
|
||
import os.path
|
||
import re
|
||
from abc import abstractmethod
|
||
from django.db import models
|
||
from rest_framework import serializers
|
||
from django.utils import timezone
|
||
import json
|
||
from django.apps import apps
|
||
import requests
|
||
import configparser # 这里一定要全小写,不然会报找不到模块错误
|
||
from django.conf import settings
|
||
from dependency.classes.tree.treeClass import PythonCodeTreeNode, PythonCodeTreeController
|
||
|
||
cf = configparser.ConfigParser()
|
||
path = os.path.join(settings.BASE_DIR, 'dependency', 'json_data_in_need', 'ZeroCode.conf')
|
||
cf.read(path)
|
||
"""
|
||
data模版
|
||
data = [
|
||
{
|
||
"name": "uuid",
|
||
"field_type": "CharField",
|
||
"options": {
|
||
'max_length': 200
|
||
}
|
||
},
|
||
{
|
||
"name": "path_name",
|
||
"field_type": "CharField",
|
||
"options": {
|
||
"Article",
|
||
"null":True,
|
||
"blank":"True",
|
||
"on_delete":"models.SET_NULL",
|
||
"related_name":"'fileRecord'",
|
||
}
|
||
}
|
||
]
|
||
"""
|
||
|
||
|
||
class BuildDynamicModelClass:
|
||
"""
|
||
工厂模式
|
||
"""
|
||
ZeroCodeModelInfoData = []
|
||
|
||
@classmethod
|
||
def model_atr_transform(cls, data):
|
||
FIELD_TYPES = {
|
||
"CharField": models.CharField,
|
||
"IntegerField": models.IntegerField,
|
||
"JSONField": models.JSONField,
|
||
"ImageField": models.ImageField,
|
||
"IPAddressField": models.IPAddressField,
|
||
"TextField": models.TextField,
|
||
"TimeField": models.TimeField,
|
||
"ForeignKey": models.ForeignKey,
|
||
"ManyToManyField": models.ManyToManyField,
|
||
"DateTimeField": models.DateTimeField,
|
||
"BigIntegerField": models.BigIntegerField,
|
||
"FloatField": models.FloatField
|
||
}
|
||
OPTIONAL_FIELD_TYPES = {
|
||
"models.SET_NULL": models.SET_NULL,
|
||
"models.CASCADE": models.CASCADE,
|
||
"timezone.now": timezone.now
|
||
}
|
||
IGNORE_OPTIONS_LIST = {
|
||
"is_visible": "any",
|
||
"is_editable": "any",
|
||
"form_type": "any",
|
||
"data_show_fields": [],
|
||
"null": "False",
|
||
"blank": "False",
|
||
"primary_key": "False",
|
||
"unique": "False"
|
||
}
|
||
model_fields = {}
|
||
new_data = copy.deepcopy(data)
|
||
for f in new_data:
|
||
field_klass = FIELD_TYPES[f["field_type"]]
|
||
if f["field_type"] == 'ManyToManyField':
|
||
keys_to_keep = ["to", "through"]
|
||
f["options"] = {key: value for key, value in f["options"].items() if key in keys_to_keep}
|
||
continue
|
||
for k, v in f["options"].items():
|
||
if isinstance(v, str):
|
||
if v in OPTIONAL_FIELD_TYPES:
|
||
f["options"][k] = OPTIONAL_FIELD_TYPES[v]
|
||
for ignore_k, ignore_v in IGNORE_OPTIONS_LIST.items():
|
||
if ignore_k in f["options"].keys() and ignore_v == f["options"][ignore_k]:
|
||
del f["options"][ignore_k]
|
||
elif ignore_v == 'any' and ignore_k in f["options"].keys():
|
||
del f["options"][ignore_k]
|
||
model_fields[f["name"]] = field_klass(**f["options"])
|
||
return model_fields
|
||
|
||
@classmethod
|
||
def get_model(cls, table_name):
|
||
fields = cls.get_model_info({
|
||
"return_type": "False",
|
||
"target_model_info": {
|
||
"model_name": table_name,
|
||
}
|
||
})
|
||
# 结构化数据
|
||
progress_data = cls.model_atr_transform(fields["model_info_json"])
|
||
# 构建model类
|
||
try:
|
||
now_model = apps.get_model('dependency', table_name)
|
||
except LookupError:
|
||
now_model = cls.build_model(table_name, progress_data)
|
||
return now_model
|
||
|
||
@classmethod
|
||
def get_model_by_data(cls, table_name, data):
|
||
# 结构化数据
|
||
progress_data = cls.model_atr_transform(data)
|
||
# 构建model类
|
||
try:
|
||
now_model = apps.get_model('dependency', table_name)
|
||
except LookupError:
|
||
now_model = cls.build_model(table_name, progress_data)
|
||
return now_model
|
||
|
||
@classmethod
|
||
def get_model_sign(cls, table_name):
|
||
fields = BuildDynamicModelClass.get_model_info({
|
||
"return_type": "False",
|
||
"target_model_info": {
|
||
"model_name": table_name,
|
||
}
|
||
})
|
||
return fields["sign"]
|
||
|
||
@classmethod
|
||
def get_model_field(cls, table_name):
|
||
fields = BuildDynamicModelClass.get_model_info({
|
||
"return_type": "False",
|
||
"target_model_info": {
|
||
"model_name": table_name,
|
||
}
|
||
})
|
||
return fields["model_info_json"]
|
||
|
||
@classmethod
|
||
def get_model_name_by_sign(cls, sign):
|
||
fields = BuildDynamicModelClass.get_model_info({
|
||
"return_type": "False",
|
||
"target_model_info": {
|
||
"sign": sign,
|
||
}
|
||
})
|
||
return fields["model_name"]
|
||
|
||
@classmethod
|
||
def get_serializers(cls, table_name):
|
||
fields = BuildDynamicModelClass.get_model_info({
|
||
"return_type": "False",
|
||
"target_model_info": {
|
||
"model_name": table_name,
|
||
}
|
||
})
|
||
# 结构化数据
|
||
progress_data = BuildDynamicModelClass.model_atr_transform(fields["model_info_json"])
|
||
# 构建model类
|
||
now_model = cls.get_model_by_data(table_name, fields["model_info_json"])
|
||
# 构建序列化类
|
||
serializers_build = BuildSerializers.build_serializers(table_name + "_serializers",
|
||
now_model,
|
||
{'extend': progress_data})
|
||
return serializers_build
|
||
|
||
@classmethod
|
||
def build_model(cls, table_name, fields, module=''):
|
||
# 获取表格数据
|
||
attrs = fields
|
||
|
||
# 放入表名称
|
||
class Meta:
|
||
app_label = 'dependency'
|
||
db_table = table_name
|
||
|
||
# 设置表名以及相关model属性
|
||
attrs['Meta'] = Meta
|
||
attrs['__module__'] = module
|
||
|
||
# 进行类创建
|
||
model_class = type(table_name, (models.Model,), attrs)
|
||
return model_class
|
||
|
||
@classmethod
|
||
def get_zero_code_model_fields(cls):
|
||
cls.ZeroCodeModelInfoData = []
|
||
if not len(cls.ZeroCodeModelInfoData) == 0:
|
||
return cls.ZeroCodeModelInfoData
|
||
else:
|
||
with open('dependency/json_data_in_need/data.json',encoding='utf-8') as data:
|
||
cls.ZeroCodeModelInfoData = json.load(data)
|
||
return cls.ZeroCodeModelInfoData
|
||
|
||
@classmethod
|
||
def get_model_info(cls, data):
|
||
res = cls.get_zero_code_model_fields()
|
||
if len(res) == 0:
|
||
res = cls.get_zero_code_model_fields()
|
||
progress_data = cls.model_atr_transform(dict(res)["table_fields"])
|
||
try:
|
||
now_model = apps.get_model('dependency', dict(res)["table_name"])
|
||
except LookupError:
|
||
now_model = cls.build_model(dict(res)["table_name"], progress_data)
|
||
data['target_model_info'].update({'model_status': 1})
|
||
if not bool(data['return_type'] == 'True'):
|
||
try:
|
||
|
||
target = now_model.objects.get(**json.loads(json.dumps(data['target_model_info'])))
|
||
except now_model.DoesNotExist as e:
|
||
return str('查找的表不存在')
|
||
else:
|
||
target = now_model.objects.filter(**json.loads(json.dumps(data['target_model_info'])))
|
||
# 构建序列化类
|
||
serializers_build = BuildSerializers.build_serializers('zero_code_model', now_model, {})
|
||
# 序列化类使用
|
||
serializers = serializers_build(target, many=bool(data['return_type'] == 'True'))
|
||
return serializers.data
|
||
|
||
@classmethod
|
||
def get_table_relation(cls, table_sign, now_table):
|
||
return_data = {
|
||
"columns": [],
|
||
"serializers_list": {},
|
||
}
|
||
fields = cls.get_model_info({
|
||
"return_type": "False",
|
||
"target_model_info": {
|
||
"model_name": "middle_index_table",
|
||
}})
|
||
# 结构化数据
|
||
progress_data = BuildDynamicModelClass.model_atr_transform(fields["model_info_json"])
|
||
# 构建model类
|
||
try:
|
||
now_model = apps.get_model('dependency', "middle_index_table")
|
||
except LookupError:
|
||
now_model = BuildDynamicModelClass.build_model("middle_index_table", progress_data)
|
||
target = now_model.objects.all()
|
||
# 构建序列化类
|
||
serializers_build = BuildSerializers.build_serializers("middle_index_table" + "_serializers",
|
||
now_model,
|
||
{"extend": progress_data})
|
||
serializers = serializers_build(target, many=True)
|
||
middle_serializer = None
|
||
middle_model = None
|
||
column_data = None
|
||
for item in serializers.data:
|
||
# 获取关联表信息
|
||
if fields["sign"] == item["table_list"][0] and not item["middle_table_json_info"] == {}:
|
||
middle_data = item["middle_table_json_info"]["middle_json_info"]
|
||
middle_model = cls.get_model_by_data(item["middle_table_json_info"]["middle_name"], middle_data)
|
||
progress_data = BuildDynamicModelClass.model_atr_transform(middle_data)
|
||
middle_serializer = BuildSerializers.build_serializers(item["middle_table_json_info"]["middle_name"],
|
||
middle_model, {'extend': progress_data})
|
||
# 是否关联表判定
|
||
if table_sign in item["table_list"] and not item["middle_table_json_info"] == {} and 'empty' not in item[
|
||
"table_list"]:
|
||
# 写入当前表的预设置字段信息
|
||
if middle_model is not None and middle_serializer is not None:
|
||
try:
|
||
target = middle_model.objects.get(middle_index_table_sign=item["sign"])
|
||
column_data = middle_serializer(target).data
|
||
except middle_model.DoesNotExist:
|
||
print(123)
|
||
item = dict(item)
|
||
# 结构化数据
|
||
middle_progress_data = BuildDynamicModelClass.model_atr_transform(
|
||
item["middle_table_json_info"]["middle_json_info"])
|
||
# 构建model类
|
||
try:
|
||
middle_now_model = apps.get_model('dependency', item["middle_table_json_info"]["middle_name"])
|
||
except LookupError:
|
||
middle_now_model = BuildDynamicModelClass.build_model(item["middle_table_json_info"]["middle_name"],
|
||
middle_progress_data)
|
||
# 构建序列化类
|
||
filter_data = {
|
||
now_table + "_sign": item["sign"]
|
||
}
|
||
middle_target = middle_now_model.objects.filter(**filter_data)
|
||
middle_serializers_build = BuildSerializers.build_serializers(
|
||
item["middle_table_json_info"]["middle_name"] + "_serializers",
|
||
middle_now_model,
|
||
{"extend": middle_progress_data})
|
||
return_data["serializers_list"].update({"middle_table_serializers": middle_serializers_build})
|
||
return_data["serializers_list"].update({"middle_now_model": middle_now_model})
|
||
return_data["serializers_list"].update(
|
||
{"fields": [{x['name']: ""} for x in item["middle_table_json_info"]["middle_json_info"] if
|
||
x['name'] != 'sign']})
|
||
if column_data is not None:
|
||
relation_columns_model = cls.get_model("relation_columns").objects.get(
|
||
sign=column_data["relation_columns_sign"])
|
||
columns_infos = cls.get_serializers("relation_columns")(relation_columns_model).data
|
||
frontend_show_model = cls.get_model("frontend_show").objects.get(
|
||
sign=columns_infos["frontend_show"])
|
||
frontend_show_infos = cls.get_serializers("frontend_show")(frontend_show_model).data
|
||
for columns in columns_infos["columns_infos"]:
|
||
for additional in frontend_show_infos["show_items"]:
|
||
if columns["name"] == additional["field_name"]:
|
||
columns["additional"] = additional["additional"]
|
||
return_data.update({'columns': columns_infos["columns_infos"]})
|
||
return return_data
|
||
|
||
|
||
class BuildSerializers:
|
||
@classmethod
|
||
def build_serializers(cls, table_name, now_model, attrs):
|
||
extra_kwargs = {}
|
||
if 'extend' in attrs.keys():
|
||
for item in attrs['extend'].keys():
|
||
extra_kwargs.update({item: {'validators': []}})
|
||
del attrs["extend"]
|
||
attr = attrs
|
||
else:
|
||
attr = attrs
|
||
|
||
# 创建 Meta 类
|
||
meta_class = type('Meta', (), {
|
||
'model': now_model,
|
||
'fields': '__all__',
|
||
'depth': 1,
|
||
"extra_kwargs": extra_kwargs
|
||
})
|
||
|
||
# 将 Meta 类作为属性传递给 type() 创建的类
|
||
attr['Meta'] = meta_class
|
||
|
||
# 数据验证通过元对象所存放的json中进行读取数据,根传入的参数进行比对
|
||
|
||
model_class = type(table_name, (serializers.ModelSerializer,), attr)
|
||
return model_class
|