使用微博API(nearby timeline接口)搜集含GPS新浪微博数据
2014-10-31 20:45
1131 查看
上篇文章讲述了数据搜集思路中的如何使用“关键字+时间段+区域”搜集新浪微博数据,接下来将详细介绍如何搜集含GPS的微博数据。含GPS的微博数据比较重要,可用于研究社会行为、个体行为轨迹、城市迁徙以及功能分区等等。
其中主要的参数有access_token,lat,long,range,starttime,endtime,count,page。
第一步:选择多个中心点,以10km为半径做buffer覆盖整个城市;
第二步:圆形区域较多,可采用多线程进行。一个buffer对应一个圆形区域,对应一个线程;
第三步:用额外的线程将采集到的微博数据入库。
其中,在稍长一段时间内包含的GPS数据数量巨大。如果不细化时间段,实际返回的数据可能会缩水。为了克服这一点,可以用starttime和endtime来控制返回数据量,尽可能多地返回的数据。这里我们将starttime和endtime设置为一个小时。
收集某个特定区域的GPS微博数据,继承Thread类。对于更大区域,使用线程池效率更好(有时间再写)。
数据入库类
主程序入口
如果想编译成windows窗口可执行文件,参见github !!
1、切入点
新浪微博提供了专门搜索含GPS微博的API,即位置服务接口下的nearby_timeline接口(http://open.weibo.com/wiki/2/place/nearby_timeline)。接口参数如下:必选 | 类型及范围 | 说明 | |
---|---|---|---|
source | false | string | 采用OAuth授权方式不需要此参数,其他授权方式为必填参数,数值为应用的AppKey。 |
access_token | false | string | 采用OAuth授权方式为必填参数,其他授权方式不需要此参数,OAuth授权后获得。 |
lat | true | float | 纬度。有效范围:-90.0到+90.0,+表示北纬。 |
long | true | float | 经度。有效范围:-180.0到+180.0,+表示东经。 |
range | false | int | 搜索范围,单位米,默认2000米,最大11132米。 |
starttime | false | int | 开始时间,Unix时间戳。 |
endtime | false | int | 结束时间,Unix时间戳。 |
sort | false | int | 排序方式。默认为0,按时间排序;为1时按与中心点距离进行排序。 |
count | false | int | 单页返回的记录条数,最大为50,默认为20。 |
page | false | int | 返回结果的页码,默认为1。 |
base_app | false | int | 是否只获取当前应用的数据。0为否(所有数据),1为是(仅当前应用),默认为0。 |
offset | false | int | 传入的经纬度是否是纠偏过,0:没纠偏、1:纠偏过,默认为0。 |
2、采集思路
由于搜索区域最大为11公里的圆,一个大城市需要多个圆才能覆盖。所以:第一步:选择多个中心点,以10km为半径做buffer覆盖整个城市;
第二步:圆形区域较多,可采用多线程进行。一个buffer对应一个圆形区域,对应一个线程;
第三步:用额外的线程将采集到的微博数据入库。
其中,在稍长一段时间内包含的GPS数据数量巨大。如果不细化时间段,实际返回的数据可能会缩水。为了克服这一点,可以用starttime和endtime来控制返回数据量,尽可能多地返回的数据。这里我们将starttime和endtime设置为一个小时。
3、具体实现
收集某个特定区域某个时间段的GPS微博数据,不断改变时间段,可收集不同短时间段内的GPS数据''' CollectGeoInPeriod can collect geospatial weibo data of defined zone that is circular region in period In this class, just need change the period hour after hour to fetch weibo of the defined zone so that collect as much data as possible ''' class CollectGeoInPeriod: ''' constructor @paraments: accessToken: the access token for calling weibo api lat, longt: the center of defined circular zone, which is defined by latitude and longitude radius: the radius of defined circular zone queue: the synchronized container to hold weibo data ''' def __init__(self, accessToken, lat, longt, queue, radius=10000): self.logger = logging.getLogger('main.geoInPeriod') self.client = self.initWBAPI(accessToken) self.lat = lat self.longt = longt self.radius = radius self.queue = queue def logSep(self): self.logger.info('-----------------------------------------------------') def log(self, info): self.logger.info(info) self.logger.info('Latitude: ' + str(self.lat)) self.logger.info('Longitude: ' + str(self.longt)) self.logger.info('Radius: ' + str(self.radius)) ''' initialize the weibo api client @paraments: accessToken: the access token for calling weibo api. (such as '2.00BTaqXF06XASO33243564b69kVghB') @return: client: a client of weibo api ''' def initWBAPI(self, accessToken): client = weibo.APIClient() client.set_access_token(accessToken) return client ''' transfer the format time to unix time @paraments: date: a date string which has strict format. (Date Format Example: 2013-06-09 00:30:00) @return: unix timestamp, integer numbers, which must be required by the weibo api ''' def getUnixTime(self, date): return int(time.mktime(time.strptime(date, '%Y-%m-%d %H:%M:%S'))) ''' call the weibo api for weibo data @paraments: the request paraments are listed in url:http://open.weibo.com/wiki/2/place/nearby_timeline @return: the dict contains response weibo data or null, the format turns to example in page(http://open.weibo.com/wiki/2/place/nearby_timeline) ''' def fetchContent(self, page, count, starttime, endtime): return self.client.place.nearby_timeline.get(lat=self.lat, long=self.longt, starttime=self.getUnixTime(starttime), endtime=self.getUnixTime(endtime), count=count, range=self.radius, page=page) ''' Give a circular region, collect the data in short period and store them in queue. @paraments: starttime: the start time of the period endtime: the end time of the period maxTryNum: set max numbers to try when the internet is poor ''' def downloadInPeriod(self, starttime, endtime, maxTryNum = 4): page = 1 count = 50 actualSize = 0 expectedTotal = 0 isReapeated = '' while(True): for tryNum in range(maxTryNum): try: content = self.fetchContent(page, count, starttime, endtime) break except Exception, e: if tryNum < (maxTryNum-1): time.sleep(10) self.logger.info('Retry...') self.logSep() continue else: self.log('Exception: ' + str(e)) self.logger.info('TimeScope: ' + starttime + ' -- ' + endtime) self.logSep() return False ## check whether the response is null or not if type(content) == list: self.logger.info('Return Null!!!') self.logger.info('Expected Total Number: ' + str(expectedTotal)) self.logger.info('Actual Weibo Number: ' + str(actualSize)) self.logger.info('TimeScope: ' + starttime + ' -- ' + endtime + ' IS OVER!') self.logSep() return True expectedTotal = content['total_number'] statusList = content['statuses'] ## check whether the return is empty or not if (not statusList) or (not len(statusList)): self.logger.info('Return Zero!!!') self.logger.info('Expected Total Number: ' + str(expectedTotal)) self.logger.info('Actual Weibo Number: ' + str(actualSize)) self.logger.info('TimeScope: ' + starttime + ' -- ' + endtime + ' IS OVER!') self.logSep() return True ## check whether the returning contents are repeated or not if isReapeated == statusList[0]['mid']: self.log('Reapeat!!! #Page' + str(page)) self.logger.info('TimeScope: ' + starttime + ' -- ' + endtime) self.logger.info('Expected Total Number: ' + str(expectedTotal)) self.logger.info('Actual Weibo Number: ' + str(actualSize)) self.logSep() self.logger.info('sleeping 80 seconds...') time.sleep(80) page += 1 continue else: isReapeated = statusList[0]['mid'] ## store the status collected in queue for status in statusList: self.queue.put(status) ## check whether is over and recompute the next count curSize = len(statusList) actualSize += curSize if expectedTotal == actualSize: self.logger.info('Return Full...') self.logger.info('TimeScope: ' + starttime + ' -- ' + endtime + ' IS OVER!') self.logSep() return True elif expectedTotal - actualSize >= 50: count = 50 elif expectedTotal - actualSize >= 20: count = expectedTotal - actualSize else: count = 20 ## ready for next page page += 1
收集某个特定区域的GPS微博数据,继承Thread类。对于更大区域,使用线程池效率更好(有时间再写)。
''' GraspGeo is the class to grasp the statuses in special area, extending the Thread class Use a instance of GraspGeo to collect a specified area within specified period ''' class GraspGeo(threading.Thread): def __init__(self, queue, threadName, accessToken, lat, longt, starttime, endtime, hasEnd=1): threading.Thread.__init__(self, name=threadName) self.name = threadName self.collecGeo = CollectGeoInPeriod(accessToken, lat, longt, queue) self.starttime = starttime self.endtime = self.getEndtime(starttime) self.hasEnd = hasEnd self.END = endtime self.logger = logging.getLogger('main.geoNearPoint.' + self.name) self.start() def getEndtime(self, starttime, interval = 60*60): start_datetime = datetime.datetime.fromtimestamp(time.mktime(time.strptime(starttime, '%Y-%m-%d %H:%M:%S'))) end_datetime = start_datetime + datetime.timedelta(seconds = interval) endtime = end_datetime.strftime('%Y-%m-%d %H:%M:%S') return endtime def notEnd(self): if self.hasEnd: return (time.strptime(self.starttime,'%Y-%m-%d %H:%M:%S')) < (time.strptime(self.END,'%Y-%m-%d %H:%M:%S')) else: return True def run(self): while self.notEnd(): self.logger.info('TimeScope: ' + self.starttime + ' -- ' + self.endtime) if self.collecGeo.downloadInPeriod(self.starttime, self.endtime): self.starttime = self.endtime self.endtime = self.getEndtime(self.starttime) else: self.collecGeo.log('Intenet Error!') self.logger.error('TimeScope: ' + self.starttime + ' -- ' + self.endtime) else: self.logger.info('+++++++++++++++++++++++++++++++++++++++++++++++++++++') self.logger.info(self.name + ' Task Overs!') self.collecGeo.log('Task Infomation') self.logger.info('TimeScope: ' + self.starttime + ' -- ' + self.endtime) self.logger.info('+++++++++++++++++++++++++++++++++++++++++++++++++++++') self.collecGeo = None
数据入库类
''' Import the collected date into the mongodb ''' class ImportDB(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue self.conn = pymongo.MongoClient(dbURL) self.status = conn[database][collection] self.goon = True self.logger = logging.getLogger('main.importDB') self.start() def formatTime(self, starttime): return datetime.datetime.fromtimestamp(time.mktime(time.strptime(starttime, '%a %b %d %H:%M:%S +0800 %Y'))) def run(self): while self.goon: try: ## extract one from queue record = self.queue.get(block=True, timeout=120) ## import record into MongoDB ## exchange the position of latitude and longitude to maximum compatibility of the mongodb geospatial index if record and ('geo' in record) and record['geo'] and ('coordinates' in record['geo']): record['geo']['coordinates'] = record['geo']['coordinates'][::-1] record['created_at'] = self.formatTime(record['created_at']) try: self.status.insert(record) ## signals to queue job is done self.queue.task_done() except Exception, e: #self.logger.debug(str(e)) pass else: time.sleep(3) except Exception,e: self.goon = False self.conn.close()
主程序入口
## initialize logging logger = logging.getLogger('main') logger.setLevel(logging.DEBUG) filehandler = logging.FileHandler('collect.log') filehandler.setLevel(logging.DEBUG) streamhandler = logging.StreamHandler() streamhandler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s: %(message)s') filehandler.setFormatter(formatter) streamhandler.setFormatter(formatter) logger.addHandler(filehandler) logger.addHandler(streamhandler) ## initialize the paraments config = open('config.yaml') params = yaml.load(config) config.close() points = params['points'] for point in points: logger.info('name:%s' %point['name']) logger.info('latitude:%s longtude:%s' %(point['lat'], point['longt'])) starttime = params['starttime'] endtime = params['endtime'] dbURL = params['dbURL'] dbThreadNum = params['dbThreadNum'] database = params['database'] collection = params['collection'] logger.info('starttime:%s endtime:%s' %(starttime, endtime)) logger.info('dbThreadNum:%s' %dbThreadNum) logger.info('database:%s collection:%s' %(database, collection)) def main(): queue = Queue.Queue(0) p = [] q = [] for point in points: t = GraspGeo(queue, point['name'], point['accessToken'], point['lat'], point['longt'], starttime, endtime) p.append(t) #import data to mongodb for j in xrange(dbThreadNum): dt = ImportDB(queue) q.append(dt) #wait on the queue until everything has been processed for m in xrange(dbThreadNum): if q[m].isAlive():q[m].join() queue.join() #print 'ALL OVER!' #logger.info('ALL OVER!') if __name__ == '__main__': main()
如果想编译成windows窗口可执行文件,参见github !!
相关文章推荐
- 使用新浪微博官方API抓取微博数据(Python版)
- 求助:php通过新浪微博接口 api 如果获取某一地区下所有用户的微博列表?或使用包含某一关键词的微博列表
- Java使用新浪微博API开发微博应用的基本方法
- PHP下使用CURL方式POST数据至API接口的代码
- 使用Sina API获取新浪财经的证券股票数据接口(时价 K线等)
- ELK研究(一):elasticsearch java api接口操作ES集群 ---TransportClient的使用介绍 bulk批量提交数据
- PHP下使用CURL方式POST数据至API接口的方法
- Python微博地点签到大数据实战(一)微博API的使用
- 使用Yahoo API获取雅虎的证券股票数据接口(时价 K线等)
- Java使用新浪微博API通过账号密码方式登陆微博的实例
- python使用新浪微博api上传图片到微博示例
- 腾讯微博API时间线相关接口返回的微博信息中head值使用问题
- Java调用 新浪微博API 接口发微博,逐条讲解,绝对清晰
- Python使用新浪微博API发送微博的例子
- 使用网页爬虫(高级搜索功能)搜集含关键词新浪微博数据
- Python使用新浪微博API发送微博的例子
- 使用新浪微博API的OAuth认证发布微博实例
- ELK研究(一):elasticsearch java api接口操作ES集群 ---TransportClient的使用介绍 bulk批量提交数据
- 使用新浪微博API的OAuth认证发布微博
- Python微博地点签到大数据实战 微博API的使用