功能:将mysql中保存的各个产品每天的dau数据,计算聚合到mongodb中,因为应用数据直接保存到mongodb中。离线调用python脚本,将每天的数据聚合写到mongodb的周表中。mysql数据都是模拟的,通过程序随机数据写入。
1.mysql数据库:
plat_dau_detail
表结构说明:天dau
属性 | 类型 | 说明 |
prod_id | varchar(20) | 产品标识 |
d_date | int | 时间戳 |
dau | int | dau |
2.mongodb周数据库设计:
plat_dau_week
表结构说明:周dau
属性 | 类型 | 说明 |
prod_id | string | 产品标识 |
d_date | int | 时间戳 |
d_quarter | int | 季度(1,2,3,4) |
d_month | int | 月 |
d_week | int | 一个月的第n周 |
d_last | int | 1代表一个月的最后一周 |
dau | int | dau |
主程序dauMgt.py:
#!/usr/bin/env python# -*- coding:utf-8 -*-'management dau data from mysql'import MySQLdbimport pymongo import stringfrom datetime import datetime,date,timeimport dateManageDB_HOST = '127.0.0.1'DB_USER = 'root'DB_PASS = ''DB_NAME = 'platdau'MONGO_HOST = '127.0.0.1'MONGO_PORT = 27017myConn = MySQLdb.connect(host=DB_HOST,user=DB_USER,passwd=DB_PASS,db=DB_NAME,charset="utf8")cursor = myConn.cursor()#获取dau详细数据# sql = 'select * from plat_dau_detail'sql = 'SELECT sum( dau ) dau , prod_id, from_unixtime( d_date ) d_date \FROM `plat_dau_detail`\GROUP BY weekofyear( from_unixtime( d_date ) ) , prod_id'resultCnt = cursor.execute(sql)platDauData = cursor.fetchall()dataList = []for item in platDauData: prodId = item[1] d_date = item[2] d_date = str(d_date.date()) dau = item[0] dict1 = {'prodId':prodId,'d_date':str(d_date),'dau':dau} dataList.append(dict1) # print dataListmongoCon = pymongo.Connection(MONGO_HOST,MONGO_PORT)db = mongoCon.daufor item in dataList: d_date = item['d_date'] prodId = item['prodId'] dau = item['dau'] dateInfo = dateManage.getDateInfo(d_date) db.plat_dau_week.insert({'prod_id':prodId,'d_date':dateInfo['timestamp'],'d_quarter':dateInfo['quarter'],'d_month':dateInfo['month'],'d_week':dateInfo['week'],'d_last':dateInfo['last'],'dau':int(dau)})
时间处理模块dateManage.py:
#!/usr/bin/env python# -*- coding:utf-8 -*-"""date management"""from datetime import datetime,date,timedeltaimport mathimport time#取得一个时间的详细信息def getDateInfo(param): param1 = param + ' 00:00:00' d = datetime.strptime(param1,"%Y-%m-%d %H:%M:%S").date() weekday = d.isoweekday() dayDif = weekday - 1 monday = d - timedelta(days=dayDif) year = monday.year month = monday.month quarter = int(math.ceil(float(month/3.0))) timestamp = int(time.mktime(time.strptime(str(monday), "%Y-%m-%d"))) month1 = date(year,month,1) monthMonday1 = month1.isoweekday() tempMonth = month1.month monthMonday = [] while tempMonth <= month : monthMonday1 = month1.isoweekday() if monthMonday1 != 1: dayDiff = 7-monthMonday1 + 1 month1 = month1 + timedelta(days=dayDiff) curDate = str(month1) monthMonday.append(curDate) month1 = month1 + timedelta(days=7) tempMonth = month1.month weekNum = monthMonday.index(str(monday)) + 1 lastNum = 0 if len(monthMonday) == weekNum : lastNum = 1 result = {'timestamp':timestamp,'quarter':quarter,'month':month,'week':weekNum,'last':lastNum} return result# a = '2014-10-01';# a = getDateInfo(a)# print a
运行结果:
PS:由于第一次练手,不足之处难免,记录一下。以后回头看看