您的位置:首页 > 数据库 > Mongodb

续【将数据从MongoDB迁移到mysql】

2017-11-23 10:38 567 查看
上一篇博客讲了讲数据从MongoDB迁移到mysql中的方法,最近帮同事导入的过程中,出现一些问题,最主要的问题就是对于列不稳定的mongo collection很可能迁移的时候数据丢失,为了解决这个问题,对算法进行如下改进:

private async Task OnTranseDataAsync()
{
#region 从MongoDB中获取数据
string connetMongoDB = "mongodb://" + MongoDBIP.Text;
var Client = new MongoClient(connetMongoDB);
_message.messageA += "MongDB "+MongoDBDataBaseName.Text+"连接成功···\n";
var DataBase = Client.GetDatabase(MongoDBDataBaseName.Text);
var DataCollection = DataBase.GetCollection<BsonDocument>(MongoDBTableName.Text);

#endregion
CreatMysqlDB();//创建mysql数据库
#region 向mysql中写入数据

string connectMysql = "server = " + MysqlIP.Text + "; database =" + MysqlDatabase.Text + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";
var IsRegex = new Regex("^Is[A-Z]");
using (var Conn = new MySqlConnection(connectMysql))
{

Conn.Open();
_message.messageA += "连接Mysql数据库-"+MysqlDatabase.Text+"-成功···\n";

#region 向数据库表写数据

#region 批处理处理数据
var SetColumns = new HashSet<string>();
var Filter = Builders<BsonDocument>.Filter;
var Cursor = DataCollection.Find(Filter.Empty).ToCursor();

#region 从MongoDB中读数据
_message.messageA += "开始从MongoDB-" + MongoDBDataBaseName.Text + "-表读取数据···\n";
var FileName = "temp.txt";
using (StreamWriter Writer = new StreamWriter(FileName, false, Encoding.UTF8))
{
while (await Cursor.MoveNextAsync())
{
foreach (var Document in Cursor.Current)
{
Writer.Write("<%RS%>");
foreach (var ele in Document.Elements)
{
var ColumnName = ele.Name;
if (!SetColumns.Contains(ele.Name))
{
SetColumns.Add(ele.Name);
}
}

foreach (var col in SetColumns)
{
if (IsRegex.Match(col).Success)
{
Writer.Write($"{Document.GetValue(col, false).AsBoolean:1?0}<%COL%>");
}
else if (col == "_id")
{
Writer.Write($"{Document.GetValue(col).AsObjectId.ToString()}<%COL%>");
}
else
{
Writer.Write($"{Document.GetValue(col, string.Empty).AsString}<%COL%>");
}
}

Writer.Write("<%RE%>");
ncount++;
if (ncount % 10000 == 0)
{
_message.messageA += "已读取" + ncount + "条数据···\n";
}
}
}
}
_message.messageA += "共读取" + ncount + "条数据···\n";
#endregion

#region 创建mysql数据库表
string createStatement = "CREATE TABLE IF NOT EXISTS " + MysqlTable.Text + " (";
int i = 0;
foreach (string colloumn in SetColumns)
{
if (i == 0)
{
createStatement += colloumn + " VarChar(255) not null primary key,";
i++;
}
//else if (colloumn == "Url")
//{
// createStatement += colloumn + " VarChar(255),";
// i++;
//}
else if (IsRegex.Match(colloumn).Success)
{
createStatement += colloumn + " tinyint(4),";
i++;
}
else if (colloumn == "Html")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "Content")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "ContentHtml")
{
createStatement += colloumn + " longtext,";
i++;
}
else
{
createStatement += " " + colloumn + " text,";
i++;
}
}
createStatement = createStatement.Remove(createStatement.Length - 1);
createStatement += ") ENGINE=MyISAM DEFAULT CHARSET=utf8";
using (MySqlCommand cmd = new MySqlCommand(createStatement, Conn))
{
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql-" + MysqlTable.Text + "表成功···\n";
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
_message.messageA+= "创建Mysql-" + MysqlTable.Text + "表失败···\n";
}
}
#endregion

#region 向mysql数据库写入数据
_message.messageA += "开始向Mysql-" + MysqlTable.Text + "表写入数据···\n";
var Bulk = new MySqlBulkLoader(Conn);
Bulk.TableName = MysqlTable.Text;
Bulk.ConflictOption = MySqlBulkLoaderConflictOption.Ignore;
Bulk.Local = true;
Bulk.Timeout = 10 * 60 * 1000;
Bulk.CharacterSet = "utf8mb4";
Bulk.LinePrefix = "<%RS%>";
Bulk.LineTerminator = "<%RE%>";
Bulk.FieldTerminator = "<%COL%>";
Bulk.EscapeCharacter = '\b';
Bulk.FileName = FileName;
ncount = Bulk.Load();
#endregion
#endregion

_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
_message.messageA += "向Mysql-"+MysqlTable.Text+"表写入数据完成\n";
//MessageBox.Show("共写入" + ncount + "条数据");
button.IsEnabled = true;
button.Content = "开始迁移数据";
#endregion
}

#endregion
}

private void CreatMysqlDB()
{
string connectMysql = "server = " + MysqlIP.Text + "; database =" + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";
MySqlConnection conn = new MySqlConnection();
try
{
conn = new MySqlConnection(connectMysql);
conn.Open();
}
catch (Exception e)
{
MessageBox.Show("mysql连接失败");
return;
}
string creatMysqlDB = "CREATE DATABASE IF NOT EXISTS " + MysqlDatabase.Text + ";";
MySqlCommand cmd = new MySqlCommand(creatMysqlDB, conn);
_message.messageA += "连接Mysql数据库"+MysqlDatabase.Text+"成功···\n";
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql数据库"+MysqlDatabase.Text+"成功···\n";
conn.Close();
}
catch (Exception e)
{
MessageBox.Show(e.Message);
_message.messageA += "创建数据库失败";

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: