您的位置:首页 > 数据库 > Mongodb

C# 从MongoDB导入数据到mysql

2017-11-16 09:55 507 查看
因为项目需要,经常导入MongoDB数据到Mysql,故而做了一个小工具来进行数据迁移。

在这个过程中,因为效率问题,探索了很多方法,单条遍历拼接插入命令、使用事务进行处理(也是拼接插入命令)、使用MySqlBulkLoad批量导入数据到mysql。

下边分别描述一下三种处理方法:

a.从MongoDB中获取指定的collection

#region 从MongoDB中获取数据
string connetMongoDB = "mongodb://" + MongoDBIP.Text;
var Client = new MongoClient(connetMongoDB);
_message.messageA += "MongDB 连接成功\n";
var DataBase = Client.GetDatabase(MongoDBDataBaseName.Text);
var DataCollection = DataBase.GetCollection<BsonDocument>(MongoDBTableName.Text);
var documents = DataCollection.Find(new BsonDocument()).Limit(100).ToListAsync().Result;
#endregion

b.从collection中获取数据表的列信息

foreach (BsonDocument bs in documents)
{
var obj = bs.ToDictionary();
if (tableColloumns.Count < obj.Keys.Count)
{
int i = 0;
foreach (var key in obj.Keys)
{
string hearder = key.ToString();
if (CheckHeader(tableColloumns, hearder))
{
tableColloumns.Insert(i, RemoveSpecialLetter(hearder));
i++;
}
}
}
}
c.创建Mysql数据库
private void CreatMysqlDB()
{
string connectMysql = "server = " + MysqlIP.Text + "; database =" + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";
MySqlConnection conn = new MySqlConnection(connectMysql);
conn.Open();
string creatMysqlDB = "CREATE DATABASE " + MysqlDatabase.Text + ";";
MySqlCommand cmd = new MySqlCommand(creatMysqlDB, conn);
_message.messageA += "连接Mysql成功···\n";
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql数据库成功···\n";

}
catch (Exception e)
{
MessageBox.Show(e.Message);
}

}d.创建mysql数据库表
using (var Conn = new MySqlConnection(connectMysql))
{

Conn.Open();
_message.messageA += "连接Mysql成功···\n";
#region 创建数据库表
string createStatement = "CREATE TABLE " + MysqlTable.Text + " (";
int i = 0;
var IsRegex = new Regex("^Is[A-Z]");
foreach (string colloumn in tableColloumns)
{//增加数据库表列的属性设置
if (i == 0)
{
createStatement += colloumn + " VarChar(255) not null primary key,";
i++;
}
else if (IsRegex.Match(colloumn).Success)
{
createStatement += colloumn + " tinyint(4),";
i++;
}
else if (colloumn == "Html")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "Content")
{
createStatement += colloumn + " longtext,";
i++;
}
else if (colloumn == "ContentHtml")
{
createStatement += colloumn + " longtext,";
i++;
}
else
{
createStatement += " " + colloumn + " text,";
i++;
}
}
createStatement = createStatement.Remove(createStatement.Length - 1);
createStatement += ") ENGINE=MyISAM DEFAULT CHARSET=utf8";//表设置
using (MySqlCommand cmd = new MySqlCommand(createStatement, Conn))
{
try
{
cmd.ExecuteNonQuery();
_message.messageA += "创建Mysql表成功···\n";
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
#endregion
}e.开始写入数据

方法1.遍历collection,拼接插入命令(单条执行)

public class ItemClass
{
public List<string> values = new List<string>();
}
public List<string> ChangeToStrlist(Dictionary<string, object>.KeyCollection list)
{
List<string> listStr = new List<string>();
foreach (var item in list)
listStr.Add(RemoveSpecialLetter(item.ToString()));
return listStr;

}

public static List<string> ChangeToStrlist(Dictionary<string, object>.ValueCollection list)
{
List<string> listStr = new List<string>();
foreach (var item in list)
listStr.Add(item.ToString());
return listStr;
}

#region 单条处理数据
string strsql = GetCommand();
int count = 0;
await DataCollection.Find(new BsonDocument()).ForEachAsync(bs =>
{
using (var Conn2 = new MySqlConnection(connectMysql))
{
Conn2.Open();
MySqlCommand cmd = new MySqlCommand();
cmd.Connection = Conn2;
cmd.CommandText = strsql;
cmd.Parameters.Clear();
ItemClass item = new ItemClass();
var obj = bs.ToDictionary();
var keys = ChangeToStrlist(obj.Keys);
var values = ChangeToStrlist(obj.Values);
int temp = 0;
foreach (var colloumn in tableColloumns)
{
if (colloumn == keys[temp])
{
item.values.Add(values[temp]);
temp++;
}
else
{
item.values.Add(string.Empty);
}
}
temp = 0;
foreach (string colloumn in tableColloumns)
{
cmd.Parameters.AddWithValue(colloumn, item.values[temp]);
temp++;
}
try
{
cmd.ExecuteNonQuery();
ncount++;
}
catch (Exception EX)
{
MessageBox.Show(EX.Message);
}
if (ncount% 10000 == 0)
{
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
}
}
}
);
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
_message.messageA += "向Mysql表写入数据完成\n";
#endregion

方法2:使用事务处理

#region 使用事务处理,一次处理500条数据(有一个问题,当mongdb数据量很大的时候,会内存崩溃,这个问题可以采用分段获取的方式解决,请自行解决)
int count = 500;
List<ItemClass> items = new List<ItemClass>();
foreach (BsonDocument bs in DataCollection.Find(new BsonDocument()).ToListAsync().Result)
{
if (count < 500)
{
var obj = bs.ToDictionary();
ItemClass item = new ItemClass();
var keys = ChangeToStrlist(obj.Keys);
var values = ChangeToStrlist(obj.Values);
int temp = 0;
foreach (var colloumn in tableColloumns)
{
if (colloumn == keys[temp])
{
item.values.Add(values[temp]);
temp++;
}
else
{
item.values.Add(string.Empty);
}
}
items.Add(item);
count++;
}
else
{

ExecuteSqlTran(items);
ncount += count;
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
items.Clear();
count = 0;
}
}
ExecuteSqlTran(items);
ncount += count;
items.Clear();
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
_message.messageA += "向Mysql表写入数据完成\n";
#endregion

public void ExecuteSqlTran(List<ItemClass> items)
{
string connectMysql2 = "server = " + MysqlIP.Text + "; database =" + MysqlDatabase.Text + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";
using (MySqlConnection conn = new MySqlConnection(connectMysql2))
{
conn.Open();
MySqlCommand cmd = new MySqlCommand();
cmd.Connection = conn;
MySqlTransaction tx = conn.BeginTransaction();
cmd.Transaction = tx;
string strsql = GetCommand();
try
{
for (int n = 0; n < items.Count; n++)
{
cmd.CommandText = strsql;
cmd.Parameters.Clear();

var item = items
;
int i = 0;
foreach (string colloumn in tableColloumns)
{
cmd.Parameters.AddWithValue(colloumn, item.values[i]);
i++;
}
cmd.ExecuteNonQuery();
if (n > 0 && (n % 500 == 0 || n == items.Count - 1))
{
tx.Commit();
counts += items.Count;
Console.WriteLine("写入" + counts + "条数据成功");
tx = conn.BeginTransaction();
}
}
}
catch (System.Data.SqlClient.SqlException E)
{
Console.WriteLine("写入数据失败,回滚");
Console.ReadLine();
tx.Rollback();
MessageBox.Show(E.Message);
//throw new Exception(E.Message);
}
}
}
private string GetCommand()
{

// string cmdstr = "update WIRELESS_PERSON_T set PersonName=@PersonName, PersonSex='" + person.getPersonSex() + "', YID=@YID, caseinfoid='" + person.getCaseinfoid() + "', Kind='" + person.getKind()+ "', caseremark=@Caseremark, ArrivalKind='" + person.getArrivalKind() + "' where PersonId=" + person.getPersonId();

string command = @"insert into `" + MongoDBTableName.Text + "` (";
for (int k = 0; k < tableColloumns.Count; k++)
{
if (k == tableColloumns.Count - 1)
command += "`" + tableColloumns[k] + "`)";
else
command += "`" + tableColloumns[k] + "`,";
}
command += "values( ";
for (int j = 0; j < tableColloumns.Count; j++)
{
if (j == tableColloumns.Count - 1)
command += "?" + tableColloumns[j] + ")";
else
command += "?" + tableColloumns[j] + ",";
}
return command;
}

3.使用MysqlBulkLoad进行数据迁移

#region 批处理处理数据
var Filter = Builders<BsonDocument>.Filter;
var Cursor = DataCollection.Find(Filter.Empty).ToCursor();
var FileName = "temp.txt";
using (StreamWriter Writer = new StreamWriter(FileName, false, Encoding.UTF8))
{
while (await Cursor.MoveNextAsync())
{
foreach (var Document in Cursor.Current)
{
Writer.Write("<%RS%>");
foreach (var col in tableColloumns)
{
if (IsRegex.Match(col).Success)
{
Writer.Write($"{Document.GetValue(col, false).AsBoolean:1?0}<%COL%>");
}
else if (col == "_id")
{
Writer.Write($"{Document.GetValue(col).AsObjectId.ToString()}<%COL%>");
}
else
{
Writer.Write($"{Document.GetValue(col, string.Empty).AsString}<%COL%>");
}
}
Writer.Write("<%RE%>");
}
}
}

var Bulk = new MySqlBulkLoader(Conn);
Bulk.TableName = MysqlTable.Text;
Bulk.ConflictOption = MySqlBulkLoaderConflictOption.Ignore;
Bulk.Local = true;
Bulk.Timeout =10* 60 * 1000;
Bulk.CharacterSet = "utf8mb4";
Bulk.LinePrefix = "<%RS%>";
Bulk.LineTerminator = "<%RE%>";
Bulk.FieldTerminator = "<%COL%>";
Bulk.EscapeCharacter = '\b';
Bulk.FileName = FileName;
ncount = Bulk.Load();
#endregion
_message.messageA += "已写入" + ncount.ToString() + "条数据\n";
_message.messageA += "向Mysql表写入数据完成\n";


以上就是三种解决办法,是从最笨的方法慢慢提高效率,所以推荐使用第三种方法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: