您的位置:首页 > 数据库

sqlalchemy连接postgresql

2017-10-27 15:52 459 查看
如果出现权限问题请把url里面的地址改为ip地址,不要用localhost

App和User里面的tablename对应数据库里面的表名

执行之前手动连一下postgresql数据库查看一下数据

from sqlalchemy import Column,String,create_engine

from sqlalchemy.types import CHAR,Integer,String,Text

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker

from flask import jsonify

from sqlalchemy import func

Base = declarative_base()

class App(Base):

    __tablename__ = 'app'    

    id = Column(Integer,primary_key=True)

    name = Column(String(32))

    cn_name = Column(String(32))

    desc = Column(Text)

    status = Column(Integer)

class User(Base):

    __tablename__ = 't_user'

    userid = Column('userid',Integer,primary_key=True,nullable=True)

    username = Column('username',String(64),nullable=True)

    password = Column('password',String(64),nullable=True)

engine = create_engine('postgresql://postgres:postgres@172.17.0.37:5432/skynet',echo=True)

DBsession = sessionmaker(bind=engine)

session = DBsession()

query = session.query(App)

print session.query(func.count(App.id)).one()[0]统计多少

for i in query.all():                        #列出所有

    print i.id,i.name,i.cn_name,i.desc,i.status

几种常见sqlalchemy查询:

#简单查询    

print(session.query(User).all())

print(session.query(User.name, User.fullname).all())    

print(session.query(User, User.name).all())        

#带条件查询    

print(session.query(User).filter_by(name='user1').all())    

print(session.query(User).filter(User.name == "user").all())    

print(session.query(User).filter(User.name.like("user%")).all())      

  

#多条件查询    

print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())    

print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())        

#sql过滤    

print(session.query(User).filter("id>:id").params(id=1).all())        

#关联查询     

print(session.query(User, Address).filter(User.id == Address.user_id).all())    

print(session.query(User).join(User.addresses).all())    

print(session.query(User).outerjoin(User.addresses).all())        

#聚合查询    

print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())    

print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())        

#子查询    

stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()    

print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())     

   

#exists    

print(session.query(User).filter(exists().where(Address.user_id == User.id)))    

print(session.query(User).filter(User.addresses.any()))

限制返回字段查询

person = session.query(Person.name, Person.created_at,Person.updated_at).filter_by(name="zhongwei").order_by(Person.created_at).first()

记录总数查询:

from sqlalchemy import func

# count User records, without

# using a subquery.

session.query(func.count(User.id))

# return count of user "id" grouped

# by "name"

session.query(func.count(User.id)).\

        group_by(User.name)

from sqlalchemy import distinct

# count distinct "name" values

session.query(func.count(distinct(User.name)))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: