您的位置:首页 > 其它

Spark 使用马尔可夫模型的智能邮件营销

2017-11-17 19:07 309 查看
目的:用户的购买行为看起来是没有规律可循的,但其实从时间有序的角度看,也许是有规律可循的,例如,用户可能每一个月发工资时购买得多,每年某个时间(双十一、生日)等购买得比较多
马尔科夫模型能够挖掘出时间上的规律,假设我们能够根据用户上一次购买记录推测其下一次购买时间,就可以在推测时间向其发送邮件进行营销
至于营销的商品内容,可以根据其他推荐算法的结果。输入:<customerID>,<transactionID>,<purchaseDate>,<amount>...ZSY40NYPS6,1381872876,2013-01-01,110...ZSY40NYPS6,1381872920,2013-01-11,32...ZSY40NYPS6,1381873821,2013-03-04,111...ZSY40NYPS6,1381874034,2013-04-09,65...第一步:生成<customerID>,<purchaseDate1>,<amount1>,<purchaseDate2>,<amount2>,<purchaseDate3>,<amount3>...其中,purchaseDate1<=purchaseDate2<=purchaseDate3....需要对每个用户的交易日期排序,关键技术:二次排序、组合键...ZSY40NYPS6,2013-01-01,110,2013-01-11,32,2013-03-04,111,2013-04-09,65...
第二步:将交易序列转换为状态序列
对于每个用户的交易序列,每次取两个交易:2013-01-01,110,2013-01-11,32   2013-03-04,111,2013-04-09,65
根据两个交易的时间差和金额差来标定不同的状态

ZSY40NYPS6,ME,SL
第三步:生成马尔科夫状态转移矩阵。根据上步统计状态转移
([ME,SL],1)累加
第四步:根据马尔科夫状态转移矩阵得到转移概率表
0.05033,0.008262,0.7487,0.1432,0.0003689,0.01423,0.03306,8.889e-05,0.001791
0.4791,0.01265,0.4071,0.07468,0.0002040,0.009386,0.01612,0.0002040,0.0006121
0.6671,0.008839,0.1261,0.1463,0.0009289,0.01387,0.03505,0.0002426,0.001555
0.04773,0.0004718,0.7681,0.01487,0.0001862,0.1385,0.02863,1.242e-05,0.001490
0.6215,0.002151,0.2925,0.01075,0.006452,0.05161,0.008602,0.002151,0.004301
0.1072,0.002772,0.7044,0.1364,0.0003616,0.01374,0.03247,8.036e-05,0.002612
0.06196,0.0004748,0.7678,0.02008,0.0001424,0.1262,0.003988,4.748e-05,0.01937
0.5036,0.007299,0.3431,0.04380,0.007299,0.05839,0.007299,0.007299,0.02190
0.1834,0.001920,0.6313,0.02544,0.0004801,0.1167,0.03889,0.0009602,0.0009602
第五步:根据马尔科夫模型预测下一个智能邮件营销日期
import java.text.SimpleDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Markov {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Markov").setMaster("local")
val sc = new SparkContext(sparkConf)
val input = "file:///media/chenjie/0009418200012FF3/ubuntu/java/Test1/input/smart_email_training.txt";
val output = "file:///media/chenjie/0009418200012FF3/ubuntu/markov"
val transactions = sc.textFile(input)
val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
val customers = transactions.map(line => {
val tokens = line.split(",")
(tokens(0), (dateFormat.parse(tokens(2)).getTime.toLong, tokens(3).toDouble))
//(用户ID,(交易日期,交易额))
})
val coustomerGrouped = customers.groupByKey()//将用户的所有交易聚合到一起
val sortedByDate = coustomerGrouped.mapValues(_.toList.sortBy(_._1))//将用户的所有交易按日期排序
val stateSequence = sortedByDate.mapValues(list => {
val sequence = for {
i <- 0 until list.size - 1
(currentDate, currentPurchase) = list(i)
(nextDate, nextPurchse) = list(i + 1)
} yield {
val elapsedTime = (nextDate - currentDate) / 86400000 match {
//1000ms/s * 60s/min * 60min/h * 24h/day = 86400000ms/day
case diff if (diff < 30) => "S" //small
case diff if (diff < 60) => "M" // medium
case _                   => "L" // large
}
val amountRange = (currentPurchase / nextPurchse) match {
case ratio if (ratio < 0.9) => "L" // significantly less than
case ratio if (ratio < 1.1) => "E" // more or less same
case _                      => "G" // significantly greater than
}
elapsedTime + amountRange
}
sequence
})

val model = stateSequence.filter(_._2.size >= 2).flatMap(f => {
val states = f._2

val statePair = for {
i <- 0 until states.size - 1
fromState = states(i)
toState = states(i + 1)
} yield {
((fromState, toState), 1)
}
statePair
})

val markovModel = model.reduceByKey(_ + _)

val markovModelFormatted = markovModel.map(f => f._1._1 + "," + f._1._2 + "\t" + f._2)
markovModelFormatted.foreach(println)

markovModelFormatted.saveAsTextFile(output)
sc.stop()
}
}

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息