C# 从MongoDB导入数据到mysql
2017-11-16 09:55
507 查看
因为项目需要,经常导入MongoDB数据到Mysql,故而做了一个小工具来进行数据迁移。
在这个过程中,因为效率问题,探索了很多方法,单条遍历拼接插入命令、使用事务进行处理(也是拼接插入命令)、使用MySqlBulkLoad批量导入数据到mysql。
下边分别描述一下三种处理方法:
a.从MongoDB中获取指定的collection
b.从collection中获取数据表的列信息
foreach (BsonDocument bs in documents)
{
var obj = bs.ToDictionary();
if (tableColloumns.Count < obj.Keys.Count)
{
int i = 0;
foreach (var key in obj.Keys)
{
string hearder = key.ToString();
if (CheckHeader(tableColloumns, hearder))
{
tableColloumns.Insert(i, RemoveSpecialLetter(hearder));
i++;
}
}
}
}
c.创建Mysql数据库
private void CreatMysqlDB()
{
string connectMysql = "server = " + MysqlIP.Text + "; database =" + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";
MySqlConnection conn = new MySqlConnection(connectMysql);
conn.Open();
string creatMysqlDB = "CREATE DATABASE " + MysqlDatabase.Text + ";";
MySqlCommand cmd = new MySqlCommand(creatMysqlDB, conn);
_message.messageA += "连接Mysql成功···\n";
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql数据库成功···\n";
}
catch (Exception e)
{
MessageBox.Show(e.Message);
}
}d.创建mysql数据库表
using (var Conn = new MySqlConnection(connectMysql))
{
Conn.Open();
_message.messageA += "连接Mysql成功···\n";
#region 创建数据库表
string createStatement = "CREATE TABLE " + MysqlTable.Text + " (";
int i = 0;
var IsRegex = new Regex("^Is[A-Z]");
foreach (string colloumn in tableColloumns)
{//增加数据库表列的属性设置
if (i == 0)
{
createStatement += colloumn + " VarChar(255) not null primary key,";
i++;
}
else if (IsRegex.Match(colloumn).Success)
{
createStatement += colloumn + " tinyint(4),";
i++;
}
else if (colloumn == "Html")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "Content")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "ContentHtml")
{
createStatement += colloumn + " longtext,";
i++;
}
else
{
createStatement += " " + colloumn + " text,";
i++;
}
}
createStatement = createStatement.Remove(createStatement.Length - 1);
createStatement += ") ENGINE=MyISAM DEFAULT CHARSET=utf8";//表设置
using (MySqlCommand cmd = new MySqlCommand(createStatement, Conn))
{
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql表成功···\n";
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
#endregion
}e.开始写入数据
{
public List<string> values = new List<string>();
}
#region 单条处理数据
string strsql = GetCommand();
int count = 0;
await DataCollection.Find(new BsonDocument()).ForEachAsync(bs =>
{
using (var Conn2 = new MySqlConnection(connectMysql))
{
Conn2.Open();
MySqlCommand cmd = new MySqlCommand();
cmd.Connection = Conn2;
cmd.CommandText = strsql;
cmd.Parameters.Clear();
ItemClass item = new ItemClass();
var obj = bs.ToDictionary();
var keys = ChangeToStrlist(obj.Keys);
var values = ChangeToStrlist(obj.Values);
int temp = 0;
foreach (var colloumn in tableColloumns)
{
if (colloumn == keys[temp])
{
item.values.Add(values[temp]);
temp++;
}
else
{
item.values.Add(string.Empty);
}
}
temp = 0;
foreach (string colloumn in tableColloumns)
{
cmd.Parameters.AddWithValue(colloumn, item.values[temp]);
temp++;
}
try
{
cmd.ExecuteNonQuery();
ncount++;
}
catch (Exception EX)
{
MessageBox.Show(EX.Message);
}
if (ncount% 10000 == 0)
{
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
}
}
}
);
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
_message.messageA += "向Mysql表写入数据完成\n";
#endregion
以上就是三种解决办法,是从最笨的方法慢慢提高效率,所以推荐使用第三种方法。
在这个过程中,因为效率问题,探索了很多方法,单条遍历拼接插入命令、使用事务进行处理(也是拼接插入命令)、使用MySqlBulkLoad批量导入数据到mysql。
下边分别描述一下三种处理方法:
a.从MongoDB中获取指定的collection
#region 从MongoDB中获取数据 string connetMongoDB = "mongodb://" + MongoDBIP.Text; var Client = new MongoClient(connetMongoDB); _message.messageA += "MongDB 连接成功\n"; var DataBase = Client.GetDatabase(MongoDBDataBaseName.Text); var DataCollection = DataBase.GetCollection<BsonDocument>(MongoDBTableName.Text); var documents = DataCollection.Find(new BsonDocument()).Limit(100).ToListAsync().Result; #endregion
b.从collection中获取数据表的列信息
foreach (BsonDocument bs in documents)
{
var obj = bs.ToDictionary();
if (tableColloumns.Count < obj.Keys.Count)
{
int i = 0;
foreach (var key in obj.Keys)
{
string hearder = key.ToString();
if (CheckHeader(tableColloumns, hearder))
{
tableColloumns.Insert(i, RemoveSpecialLetter(hearder));
i++;
}
}
}
}
c.创建Mysql数据库
private void CreatMysqlDB()
{
string connectMysql = "server = " + MysqlIP.Text + "; database =" + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";
MySqlConnection conn = new MySqlConnection(connectMysql);
conn.Open();
string creatMysqlDB = "CREATE DATABASE " + MysqlDatabase.Text + ";";
MySqlCommand cmd = new MySqlCommand(creatMysqlDB, conn);
_message.messageA += "连接Mysql成功···\n";
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql数据库成功···\n";
}
catch (Exception e)
{
MessageBox.Show(e.Message);
}
}d.创建mysql数据库表
using (var Conn = new MySqlConnection(connectMysql))
{
Conn.Open();
_message.messageA += "连接Mysql成功···\n";
#region 创建数据库表
string createStatement = "CREATE TABLE " + MysqlTable.Text + " (";
int i = 0;
var IsRegex = new Regex("^Is[A-Z]");
foreach (string colloumn in tableColloumns)
{//增加数据库表列的属性设置
if (i == 0)
{
createStatement += colloumn + " VarChar(255) not null primary key,";
i++;
}
else if (IsRegex.Match(colloumn).Success)
{
createStatement += colloumn + " tinyint(4),";
i++;
}
else if (colloumn == "Html")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "Content")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "ContentHtml")
{
createStatement += colloumn + " longtext,";
i++;
}
else
{
createStatement += " " + colloumn + " text,";
i++;
}
}
createStatement = createStatement.Remove(createStatement.Length - 1);
createStatement += ") ENGINE=MyISAM DEFAULT CHARSET=utf8";//表设置
using (MySqlCommand cmd = new MySqlCommand(createStatement, Conn))
{
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql表成功···\n";
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
#endregion
}e.开始写入数据
方法1.遍历collection,拼接插入命令(单条执行)
public class ItemClass{
public List<string> values = new List<string>();
}
public List<string> ChangeToStrlist(Dictionary<string, object>.KeyCollection list) { List<string> listStr = new List<string>(); foreach (var item in list) listStr.Add(RemoveSpecialLetter(item.ToString())); return listStr; } public static List<string> ChangeToStrlist(Dictionary<string, object>.ValueCollection list) { List<string> listStr = new List<string>(); foreach (var item in list) listStr.Add(item.ToString()); return listStr; }
#region 单条处理数据
string strsql = GetCommand();
int count = 0;
await DataCollection.Find(new BsonDocument()).ForEachAsync(bs =>
{
using (var Conn2 = new MySqlConnection(connectMysql))
{
Conn2.Open();
MySqlCommand cmd = new MySqlCommand();
cmd.Connection = Conn2;
cmd.CommandText = strsql;
cmd.Parameters.Clear();
ItemClass item = new ItemClass();
var obj = bs.ToDictionary();
var keys = ChangeToStrlist(obj.Keys);
var values = ChangeToStrlist(obj.Values);
int temp = 0;
foreach (var colloumn in tableColloumns)
{
if (colloumn == keys[temp])
{
item.values.Add(values[temp]);
temp++;
}
else
{
item.values.Add(string.Empty);
}
}
temp = 0;
foreach (string colloumn in tableColloumns)
{
cmd.Parameters.AddWithValue(colloumn, item.values[temp]);
temp++;
}
try
{
cmd.ExecuteNonQuery();
ncount++;
}
catch (Exception EX)
{
MessageBox.Show(EX.Message);
}
if (ncount% 10000 == 0)
{
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
}
}
}
);
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
_message.messageA += "向Mysql表写入数据完成\n";
#endregion
方法2:使用事务处理
#region 使用事务处理,一次处理500条数据(有一个问题,当mongdb数据量很大的时候,会内存崩溃,这个问题可以采用分段获取的方式解决,请自行解决) int count = 500; List<ItemClass> items = new List<ItemClass>(); foreach (BsonDocument bs in DataCollection.Find(new BsonDocument()).ToListAsync().Result) { if (count < 500) { var obj = bs.ToDictionary(); ItemClass item = new ItemClass(); var keys = ChangeToStrlist(obj.Keys); var values = ChangeToStrlist(obj.Values); int temp = 0; foreach (var colloumn in tableColloumns) { if (colloumn == keys[temp]) { item.values.Add(values[temp]); temp++; } else { item.values.Add(string.Empty); } } items.Add(item); count++; } else { ExecuteSqlTran(items); ncount += count; _message.messageA += "已写入" + ncount.ToString() + "条数据\n"; items.Clear(); count = 0; } } ExecuteSqlTran(items); ncount += count; items.Clear(); _message.messageA += "已写入" + ncount.ToString() + "条数据\n"; _message.messageA += "向Mysql表写入数据完成\n"; #endregion
public void ExecuteSqlTran(List<ItemClass> items) { string connectMysql2 = "server = " + MysqlIP.Text + "; database =" + MysqlDatabase.Text + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306"; using (MySqlConnection conn = new MySqlConnection(connectMysql2)) { conn.Open(); MySqlCommand cmd = new MySqlCommand(); cmd.Connection = conn; MySqlTransaction tx = conn.BeginTransaction(); cmd.Transaction = tx; string strsql = GetCommand(); try { for (int n = 0; n < items.Count; n++) { cmd.CommandText = strsql; cmd.Parameters.Clear(); var item = items ; int i = 0; foreach (string colloumn in tableColloumns) { cmd.Parameters.AddWithValue(colloumn, item.values[i]); i++; } cmd.ExecuteNonQuery(); if (n > 0 && (n % 500 == 0 || n == items.Count - 1)) { tx.Commit(); counts += items.Count; Console.WriteLine("写入" + counts + "条数据成功"); tx = conn.BeginTransaction(); } } } catch (System.Data.SqlClient.SqlException E) { Console.WriteLine("写入数据失败,回滚"); Console.ReadLine(); tx.Rollback(); MessageBox.Show(E.Message); //throw new Exception(E.Message); } } }
private string GetCommand() { // string cmdstr = "update WIRELESS_PERSON_T set PersonName=@PersonName, PersonSex='" + person.getPersonSex() + "', YID=@YID, caseinfoid='" + person.getCaseinfoid() + "', Kind='" + person.getKind()+ "', caseremark=@Caseremark, ArrivalKind='" + person.getArrivalKind() + "' where PersonId=" + person.getPersonId(); string command = @"insert into `" + MongoDBTableName.Text + "` ("; for (int k = 0; k < tableColloumns.Count; k++) { if (k == tableColloumns.Count - 1) command += "`" + tableColloumns[k] + "`)"; else command += "`" + tableColloumns[k] + "`,"; } command += "values( "; for (int j = 0; j < tableColloumns.Count; j++) { if (j == tableColloumns.Count - 1) command += "?" + tableColloumns[j] + ")"; else command += "?" + tableColloumns[j] + ","; } return command; }
3.使用MysqlBulkLoad进行数据迁移
#region 批处理处理数据 var Filter = Builders<BsonDocument>.Filter; var Cursor = DataCollection.Find(Filter.Empty).ToCursor(); var FileName = "temp.txt"; using (StreamWriter Writer = new StreamWriter(FileName, false, Encoding.UTF8)) { while (await Cursor.MoveNextAsync()) { foreach (var Document in Cursor.Current) { Writer.Write("<%RS%>"); foreach (var col in tableColloumns) { if (IsRegex.Match(col).Success) { Writer.Write($"{Document.GetValue(col, false).AsBoolean:1?0}<%COL%>"); } else if (col == "_id") { Writer.Write($"{Document.GetValue(col).AsObjectId.ToString()}<%COL%>"); } else { Writer.Write($"{Document.GetValue(col, string.Empty).AsString}<%COL%>"); } } Writer.Write("<%RE%>"); } } } var Bulk = new MySqlBulkLoader(Conn); Bulk.TableName = MysqlTable.Text; Bulk.ConflictOption = MySqlBulkLoaderConflictOption.Ignore; Bulk.Local = true; Bulk.Timeout =10* 60 * 1000; Bulk.CharacterSet = "utf8mb4"; Bulk.LinePrefix = "<%RS%>"; Bulk.LineTerminator = "<%RE%>"; Bulk.FieldTerminator = "<%COL%>"; Bulk.EscapeCharacter = '\b'; Bulk.FileName = FileName; ncount = Bulk.Load(); #endregion _message.messageA += "已写入" + ncount.ToString() + "条数据\n"; _message.messageA += "向Mysql表写入数据完成\n";
以上就是三种解决办法,是从最笨的方法慢慢提高效率,所以推荐使用第三种方法。
相关文章推荐
- 从mysql导入数据到mongodb的方法
- C#中mySQL 使用语句LOAD DATA LOCAL INFILE 'record.log' INTO TABLE PT_LOG;导入数据时乱码问题
- 将数据从MongoDB导入到MySQL
- C# 使用MySqlBulkLoader 批量导入数据到Mysql
- 将mysql数据导入mongodb
- <MongoDB | Mysql>亿级别---数据生成及高效率导入
- 将MySQL数据导入MongoDB
- 将mongodb 数据指定字段导出,然后指定字段导入mysql 实例 及相关问题解决
- 将mysql数据导入mongodb
- 从mysql导入数据到mongodb
- mongodb导出数据导入mysql
- 通过Python将MongoDB导出的json数据转换成Mysql的insert语句导入
- 将mysql数据导为csv再导入到mongodb中
- Hadoop+Spark+MongoDB+MySQL+C#大数据开发项目最佳实践
- 将数据从MongoDB导入到MySQL
- 从MongoDB抽取数据导入mysql
- mongodb mysql数据互相导入
- C#将Excel数据导入数据库(MySQL或Sql Server)
- 一个实现数据批量从mongodb导入Mysql的方案
- 使用pandas把mysql的数据导入MongoDB。