-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlocaldb.py
149 lines (120 loc) · 4.22 KB
/
localdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#-*- coding=utf-8 -*-
from sqlalchemy import Column, String, create_engine,Integer,Boolean
from sqlalchemy.sql.expression import func
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import os
basedir=os.path.abspath('.')
db_path=os.path.join(basedir,'local.db')
# 创建对象的基类:
Base = declarative_base()
# 定义Post对象:
class Post(Base): #文章数据
__tablename__ = 'posts'
id = Column(String(64), primary_key=True)
name = Column(String(50))
poster=Column(String(200))
category=Column(String(50))
tags=Column(String(50))
status=Column(Boolean)
def __init__(self,**kwargs):
for key,value in kwargs.items():
setattr(self,key,value)
class Picture(Base): #图片数据
__tablename__ = 'pictures'
pid = Column(String(64), primary_key=True)
subid = Column(Integer, primary_key=True)
url=Column(String(200))
def __init__(self,**kwargs):
for key,value in kwargs.items():
setattr(self,key,value)
class ID(Base):
__tablename__ = 'id_table'
id = Column(String(64), primary_key=True)
postnum = Column(Integer)
def __init__(self,**kwargs):
for key,value in kwargs.items():
setattr(self,key,value)
# 初始化数据库连接:
engine = create_engine('sqlite:///'+db_path)
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
if not os.path.exists(db_path):
Base.metadata.create_all(engine)
class DB():
def __init__(self,table):
self.type=table
self.session=DBSession()
def insertnew(self,**kwargs): #接收ID 和 最大图片数量
exists=True
if self.type=='Post':
id=kwargs['id']
table=Post #定义的一个Post类
print(table.id)
if self.session.query(table).filter(table.id==id).first() is None:
exists=False
elif self.type=='Picture':
pid=kwargs['pid'] #图片的id
subid=kwargs['subid']
table=Picture
if self.session.query(table).filter(table.pid==pid,table.subid==subid).first() is None:
exists=False
else:
id=kwargs['id']
table=ID
if self.session.query(table).filter(table.id==id).first() is None:
exists=False
if not exists:
#就算是false也不见得是错的哦
new_item=table(**kwargs)
self.session.add(new_item)
self.session.commit()
return exists
def update(self,**kwargs): #上传本地
if self.type=='Post':
print('upload to local database')
id=kwargs['id']
status=kwargs['status']
item=self.session.query(Post).filter(Post.id==id).first()
item.status=status
self.session.add(item)
self.session.commit()
elif self.type=='ID':
id=kwargs['id']
postnum=kwargs['postnum']
item=self.session.query(ID).filter(ID.id==id).first()
item.postnum=postnum
self.session.add(item)
self.session.commit()
def get_a_item(self):
#post=self.session.query(Post).order_by(func.rand()).first() #mysql
post=self.session.query(Post).filter(Post.status==False).order_by(func.random()).first() #pgsql/sqlite
if post is None:
return None,None
pictures=self.session.query(Picture).filter(Picture.pid==post.id).all()
return post,pictures
def select(self,table,**kwargs):
sql='self.session.query({}).filter('.format(table)
its=kwargs.items()
argnum=len(its)
for idx,kv in enumerate(its):
k,v=kv
sql+='{}.{}=="{}"'.format(table,k,v)
if idx==argnum-1:
sql+=')'
else:
sql+=','
sql+='.all()'
results=eval(sql)
if len(results)==0:
results=[None]
return results
# session = DBSession()
# # 创建新User对象:
# new_user = User(id='5', name='Bob')
# # 添加到session:
# session.add(new_user)
# # 提交即保存到数据库:
# session.commit()
# # 关闭session:
# session.close()