python访问mysql将返回的表转化为json
2016-07-18 13:59
369 查看
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import print_function import os import sys from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType if __name__ == "__main__": sc = SparkContext(appName="PythonSQL") sqlContext = SQLContext(sc) # RDD is created from a list of rows some_rdd = sc.parallelize([Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]) # Infer schema from the first row, create a DataFrame and print the schema some_df = sqlContext.createDataFrame(some_rdd) some_df.printSchema() # Another RDD is created from a list of tuples another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)]) # Schema with two fields - person_name and person_age schema = StructType([StructField("person_name", StringType(), False), StructField("person_age", IntegerType(), False)]) # Create a DataFrame by applying the schema to the RDD and print the schema another_df = sqlContext.createDataFrame(another_rdd, schema) another_df.printSchema() # root # |-- age: integer (nullable = true) # |-- name: string (nullable = true) # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files. # if len(sys.argv) < 2: # path = "file://" + \ # os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json") # else: # path = sys.argv[1] path="D:\spark-1.6.0-bin-hadoop2.6\data\mllib\people.json"; # Create a DataFrame from the file(s) pointed to by path people = sqlContext.jsonFile(path) # root # |-- person_name: string (nullable = false) # |-- person_age: integer (nullable = false) # The inferred schema can be visualized using the printSchema() method. people.printSchema() # root # |-- age: IntegerType # |-- name: StringType # Register this DataFrame as a table. people.registerAsTable("people") # SQL statements can be run by using the sql methods provided by sqlContext teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") for each in teenagers.collect(): print(each[0]) # teenagers.append("namesAndAges.parquet", "parquet"); import json #teenagers.save("D:\spark-1.6.0-bin-hadoop2.6\data\mllib\peoplenew.json","json","append") file_object = open("D:\spark-1.6.0-bin-hadoop2.6\data\mllib\peoplenew.json", 'w') file_object.write("{'name':'222'}") file_object.close() #teenagers.rdd.repartition(1).saveAsTextFile("D:\spark-1.6.0-bin-hadoop2.6\data\mllib\peoplenew1.json") #sc.parallelize(teenagers.collect()).saveAsTextFile("D:\spark-1.6.0-bin-hadoop2.6\data\mllib\peoplenew1.json") #sc.parallelize(teenagers.collect()).saveAsTextFile("D:\spark-1.6.0-bin-hadoop2.6\data\mllib\peoplenew1.json") #error teenagers.rdd.repartition(1).saveAsTextFile("D:\spark-1.6.0-bin-hadoop2.6\data\mllib\peoplenew1.json"); import pymysql conn = pymysql.connect(host='aliyun.ovalcn.com', port=3306, user='root', passwd='oval163', db='pos_wanli_combine', charset='UTF8') cur = conn.cursor() cur.execute("SELECT * FROM biz_dms_order limit 2") results = cur.fetchall() orders = [] data = {} # for i in range(len(cur.description)): # print("Column {}:".format(i + 1)) # desc = cur.description[i] # print(" column_name = {}".format(desc[0])) for row in results: orderDict = {} for i in range(len(cur.description)): # print("Column {}:".format(i + 1)) desc = cur.description[i] #print(" column_name = {}".format(desc[0])) colName = desc[0] orderDict.setdefault(colName, str(row[i])) #print(row[i]) # order[desc[0]] = row[i] #setattr(orderDict,colName, row[i]) #print(row[i]) #orderDict['id'] =11 #print(row) #orderDict.setdefault('id', 11) #i=0 orders.append(orderDict) #print(json.dumps(orders)) data['code'] = 0 data['orders'] = orders jsonStr = json.dumps(data) print(jsonStr) #print(data) # for r in cur: # print("row_number:" + str(cur.rownumber)) #print("id:" + str(r[0]) + "key:" + str(r[1]) + " mean:" + str(r[2])) # cur.close() conn.close() sc.stop()
相关文章推荐
- Python中的cls与self的区别
- Python模块之os
- Python 结巴分词(1)分词
- ubuntu下安装 ipython notebook
- 梯度下降原理及Python实现
- Python数据可视化之Matplotlib学习笔记
- Python2.7.x基础教程笔记--input与raw_input的区别
- python post方式 上传文件到php服务器
- python的排序函数
- python lxml库etree解析html
- ubuntu下如何使用mysql/python中如何调用sql新建一个数据库
- python类库[进程subprocess与管道pipe]
- python第三库安装方法记录
- python批量重命名文件
- Python——4Dict和Set类型
- Python中排序常用到的sort 、sorted和argsort函数
- Python之decorator,闭包,异常
- Python——3条件判断和循环
- python 使用zip合并相邻的列表项
- Python——2list和tuple类型