修改变量里的目标地址IP及源DB的DB名称或URL连接名称,修改copy的collection名称
const BATCH_SIZE = 2000; let srcConnection = "10.192.168.105_1"; let srcDb = "csms"; let dstConnection = "positioning[开发]"; let dstDb = "csms"; use(dstDb); //idPolicy: overwrite_with_same_id|always_insert_with_new_id|insert_with_new_id_if_id_exists|skip_documents_with_existing_id|abort_if_id_already_exists|drop_collection_first|log_errors let toCopyCollections = [ { srcCollection: "zltest.address_type_param_random", query: {}, projection: {}, dstCollection: "zltest.address_type_param", idPolicy: "log_errors"}, { srcCollection: "zltest.alarm_measure_random", query: {}, projection: {}, dstCollection: "zltest.alarm_measure", idPolicy: "log_errors"}, { srcCollection: "zltest.analog_five_his_random", query: {}, projection: {}, dstCollection: "zltest.analog_five_his", idPolicy: "log_errors"}, { srcCollection: "zltest.analog_minute_his_random", query: {}, projection: {}, dstCollection: "zltest.analog_minute_his", idPolicy: "log_errors"}, { srcCollection: "zltest.base_param_random", query: {}, projection: {}, dstCollection: "zltest.base_param", idPolicy: "log_errors"}, { srcCollection: "zltest.control_chargingRecord_random", query: {}, projection: {}, dstCollection: "zltest.control_chargingRecord", idPolicy: "log_errors"}, { srcCollection: "zltest.control_logic_random", query: {}, projection: {}, dstCollection: "zltest.control_logic", idPolicy: "log_errors"}, { srcCollection: "zltest.dev_alarm_his_random", query: {}, projection: {}, dstCollection: "zltest.dev_alarm_his", idPolicy: "log_errors"}, { srcCollection: "zltest.dev_calibration_his_random", query: {}, projection: {}, dstCollection: "zltest.dev_calibration_his", idPolicy: "log_errors"}, { srcCollection: "zltest.dev_his_value_random", query: {}, projection: {}, dstCollection: "zltest.dev_his_value", idPolicy: "log_errors"}, { srcCollection: "zltest.dev_realtime_alarm_random", query: {}, projection: {}, dstCollection: "zltest.dev_realtime_alarm", idPolicy: "log_errors"}, { srcCollection: "zltest.dev_realtime_value_random", query: {}, projection: {}, dstCollection: "zltest.dev_realtime_value", idPolicy: "log_errors"}, { srcCollection: "zltest.device_random", query: {}, projection: {}, dstCollection: "zltest.device", idPolicy: "log_errors"}, { srcCollection: "zltest.device_type_random", query: {}, projection: {}, dstCollection: "zltest.device_type", idPolicy: "log_errors"}, { srcCollection: "zltest.device_version_random", query: {}, projection: {}, dstCollection: "zltest.device_version", idPolicy: "log_errors"}, { srcCollection: "zltest.intensive_monitoring_random", query: {}, projection: {}, dstCollection: "zltest.intensive_monitoring", idPolicy: "log_errors"}, { srcCollection: "zltest.list_edit_random", query: {}, projection: {}, dstCollection: "zltest.list_edit", idPolicy: "log_errors"}, { srcCollection: "zltest.log_info_random", query: {}, projection: {}, dstCollection: "zltest.log_info", idPolicy: "log_errors"}, { srcCollection: "zltest.power_diagnosis_info_random", query: {}, projection: {}, dstCollection: "zltest.power_diagnosis_info", idPolicy: "log_errors"}, { srcCollection: "zltest.power_out_channel_data_random", query: {}, projection: {}, dstCollection: "zltest.power_out_channel_data", idPolicy: "log_errors"}, { srcCollection: "zltest.print_edit_random", query: {}, projection: {}, dstCollection: "zltest.print_edit", idPolicy: "log_errors"}, { srcCollection: "zltest.selfDiagnosis_handoverRecord_random", query: {}, projection: {}, dstCollection: "zltest.selfDiagnosis_handoverRecord", idPolicy: "log_errors"}, { srcCollection: "zltest.switching_change_his_random", query: {}, projection: {}, dstCollection: "zltest.switching_change_his", idPolicy: "log_errors"}, { srcCollection: "zltest.team_random", query: {}, projection: {}, dstCollection: "zltest.team", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.alarm_his_random", query: {}, projection: {}, dstCollection: "zhangluTest.alarm_his", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.alarm_measure_random", query: {}, projection: {}, dstCollection: "zhangluTest.alarm_measure", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.allowance_random", query: {}, projection: {}, dstCollection: "zhangluTest.allowance", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.area_random", query: {}, projection: {}, dstCollection: "zhangluTest.area", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.call_answered_history_random", query: {}, projection: {}, dstCollection: "zhangluTest.call_answered_history", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.car_random", query: {}, projection: {}, dstCollection: "zhangluTest.car", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.classes_random", query: {}, projection: {}, dstCollection: "zhangluTest.classes", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.communication_log_random", query: {}, projection: {}, dstCollection: "zhangluTest.communication_log", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.config_info_random", query: {}, projection: {}, dstCollection: "zhangluTest.config_info", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.dept_random", query: {}, projection: {}, dstCollection: "zhangluTest.dept", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.device_random", query: {}, projection: {}, dstCollection: "zhangluTest.device", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.device_config_random", query: {}, projection: {}, dstCollection: "zhangluTest.device_config", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.gatewayRecord_random", query: {}, projection: {}, dstCollection: "zhangluTest.gatewayRecord", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.job_random", query: {}, projection: {}, dstCollection: "zhangluTest.job", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.locator_card_random", query: {}, projection: {}, dstCollection: "zhangluTest.locator_card", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.log_info_random", query: {}, projection: {}, dstCollection: "zhangluTest.log_info", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.pass_area_history_random", query: {}, projection: {}, dstCollection: "zhangluTest.pass_area_history", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.pass_station_history_random", query: {}, projection: {}, dstCollection: "zhangluTest.pass_station_history", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.patrol_line_random", query: {}, projection: {}, dstCollection: "zhangluTest.patrol_line", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.patrol_plan_random", query: {}, projection: {}, dstCollection: "zhangluTest.patrol_plan", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.personnel_random", query: {}, projection: {}, dstCollection: "zhangluTest.personnel", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.position_history_random", query: {}, projection: {}, dstCollection: "zhangluTest.position_history", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.post_random", query: {}, projection: {}, dstCollection: "zhangluTest.post", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.profession_random", query: {}, projection: {}, dstCollection: "zhangluTest.profession", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.query_task_record_random", query: {}, projection: {}, dstCollection: "zhangluTest.query_task_record", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.realtime_alarm_random", query: {}, projection: {}, dstCollection: "zhangluTest.realtime_alarm", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.realtime_log_random", query: {}, projection: {}, dstCollection: "zhangluTest.realtime_log", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.realtime_value_random", query: {}, projection: {}, dstCollection: "zhangluTest.realtime_value", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.roadway_random", query: {}, projection: {}, dstCollection: "zhangluTest.roadway", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.station_status_random", query: {}, projection: {}, dstCollection: "zhangluTest.station_status", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.take_classes_random", query: {}, projection: {}, dstCollection: "zhangluTest.take_classes", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.task_config_random", query: {}, projection: {}, dstCollection: "zhangluTest.task_config", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.title_random", query: {}, projection: {}, dstCollection: "zhangluTest.title", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.uniqueness_check_random", query: {}, projection: {}, dstCollection: "zhangluTest.uniqueness_check", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.uniqueness_check_history_random", query: {}, projection: {}, dstCollection: "zhangluTest.uniqueness_check_history", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.vehicleArchives_random", query: {}, projection: {}, dstCollection: "zhangluTest.vehicleArchives", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.workforce_random", query: {}, projection: {}, dstCollection: "zhangluTest.workforce", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.workingTime_random", query: {}, projection: {}, dstCollection: "zhangluTest.workingTime", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.app_ip_random", query: {}, projection: {}, dstCollection: "zhangluTest.app_ip", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.dictionary_random", query: {}, projection: {}, dstCollection: "zhangluTest.dictionary", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.log_info_random", query: {}, projection: {}, dstCollection: "zhangluTest.log_info", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.resource_random", query: {}, projection: {}, dstCollection: "zhangluTest.resource", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.selfDiagnosis_handoverRecord_random", query: {}, projection: {}, dstCollection: "zhangluTest.selfDiagnosis_handoverRecord", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.user_info_random", query: {}, projection: {}, dstCollection: "zhangluTest.user_info", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.user_role_random", query: {}, projection: {}, dstCollection: "zhangluTest.user_role", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.config_random", query: {}, projection: {}, dstCollection: "zhangluTest.config", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.sensor_value_random", query: {}, projection: {}, dstCollection: "zhangluTest.sensor_value", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.station_rule_random", query: {}, projection: {}, dstCollection: "zhangluTest.station_rule", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.tag_random", query: {}, projection: {}, dstCollection: "zhangluTest.tag", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.device_random", query: {}, projection: {}, dstCollection: "zhangluTest.device", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.his_realtime_value_random", query: {}, projection: {}, dstCollection: "zhangluTest.his_realtime_value", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.realtime_value_random", query: {}, projection: {}, dstCollection: "zhangluTest.realtime_value", idPolicy: "log_errors"} ]; let toRecreateViews = [ ] // if have more indexes than default index (_id), create index after copy data. let shouldCreateIndex = true; let totalCopyResult={ result:{}, fails:[], } function copyCollection(params) { let { srcCollection, dstCollection, query, projection, idPolicy } = params; console.log(`Copy docs from ${srcConnection}:${srcDb}:${srcCollection} to ${dstConnection}:${dstDb}:${dstCollection} start...`); let isSameCollection = srcConnection === dstConnection && srcDb === dstDb && srcCollection === dstCollection; if (isSameCollection) { if (toCopyCollections.length === 1) shouldCreateIndex = false; else params.shouldCreateIndex = false; } if (idPolicy === "drop_collection_first") { // srcCollection is same to dstCollection, can not drop dstCollection (equal to drop srcCollection) // drop dst collection and copy from same collection, means nothing to do. if (isSameCollection) return; mb.dropCollection({ connection: dstConnection, db: dstDb, collection: dstCollection }); console.log(`drop collection ${dstDb}.${dstCollection}`); } totalCopyResult.result[dstCollection] = { nInserted: 0, nModified: 0, nSkipped: 0, failed: 0, }; let collectionRst = totalCopyResult.result[dstCollection]; let limitReadCount = Number.MAX_SAFE_INTEGER; if (isSameCollection){ limitReadCount = mb.runScript({ connection: srcConnection, db: srcDb, script: `db.getCollection(${tojson(srcCollection)}).find(${tojson(query)}).count()` }) } const batchSize = limitReadCount > BATCH_SIZE ? BATCH_SIZE : limitReadCount; const cursor = mb.getCursorFromCollection({ connection: srcConnection, db: srcDb, collection: srcCollection, query, projection}); const totalCount=cursor.size? cursor.size() : cursor.count(); await(mb.batchReadFromCursor(cursor, batchSize, (docs) => { return async(() => { let readLength = docs.length; if (!readLength) return; let copyResult = await(mb.writeToDb({ connection: dstConnection, db: dstDb, collection: dstCollection, docs, idPolicy })); let failed = copyResult.errors.length; let success = copyResult.nInserted + copyResult.nModified; collectionRst.nInserted += copyResult.nInserted; collectionRst.nModified += copyResult.nModified; collectionRst.nSkipped += copyResult.nSkipped; collectionRst.failed += failed; const processedCount=collectionRst.nInserted+collectionRst.nModified+collectionRst.nSkipped+collectionRst.failed; const percent = (processedCount / totalCount * 100).toFixed(1); console.log(`${dstCollection}: ${percent}% ${processedCount}/${totalCount} ${collectionRst.nInserted + collectionRst.nModified} docs successfully imported, ${collectionRst.failed} docs failed.`); if (failed) { console.log("Failed objects", copyResult.errors); } totalCopyResult.fails = [...totalCopyResult.fails, ...copyResult.errors]; })(); })); sleep(100); console.log(`copy docs from ${srcConnection}:${srcDb}:${srcCollection} to ${dstConnection}:${dstDb}:${dstCollection} finished. `); } //Copy collections for (let collection of toCopyCollections) { await(copyCollection(collection)); } //Recreate database readonly views for (let view of toRecreateViews) { const viewOn = tojson(view.viewOn); const viewName= tojson(view.name); const script= `var dropIfExists=true; if (dropIfExists){ db.getCollection(${viewName}).drop(); } db.getCollection(${viewOn}).aggregate(${tojson(view.pipeline)}).saveAsView(${viewName}, ${tojson(view.options)})`; const rst=await(mb.runScript({connection: dstConnection, db: dstDb, script})); if (!(rst && rst.ok)){ console.error(rst.message); }else{ console.log(`re-created readonly view ${ viewName } `); } } if (shouldCreateIndex){ let indexCreationPrompted = false; function indexCreationPrompt(){ if (indexCreationPrompted) return; const waitTime = 3; console.log(`Index creating will start in ${waitTime} seconds...`) sleep(1000*waitTime); indexCreationPrompted = true; } let srcCollections = toCopyCollections.filter(it=>it["shouldCreateIndex"] !== false).map(it => it.srcCollection) srcCollections.forEach(it => { let indexes = mb.runScript({connection: srcConnection, db: srcDb, script: `db.getCollection(${tojson(it)}).getIndexes();` }); if (indexes.length > 1){ let toCopyCollection = _.find(toCopyCollections, {srcCollection: it}); if (!toCopyCollection) return; let dstCollection = toCopyCollection.dstCollection; indexes.forEach(index => { if (index.name === "_id_") return; indexCreationPrompt(); let newIndex = _.cloneDeep(index); // remove index version and engine info, these info may fail createIndexes operator. delete newIndex.v; delete newIndex.textIndexVersion; delete newIndex["2dsphereIndexVersion"]; delete newIndex.storageEngine; newIndex.ns = `${dstDb}.${dstCollection}`; console.log(`create index ${newIndex.name} for ${dstDb}.${dstCollection}`); db.runCommand({ createIndexes: dstCollection, indexes: [newIndex] }); }) } }); if (indexCreationPrompted) console.log("create index complete.") } if(totalCopyResult.result){ let success=0; let failed=0; let collections=_.keys(totalCopyResult.result); collections.forEach((key)=>{ let obj=totalCopyResult.result[key]; success+=obj.nInserted+obj.nModified; failed+=obj.failed; }); console.log(` ${success} document(s) of ${collections.length} collection(s) have been copied.\n`,totalCopyResult.result); if(failed){ console.log(`${failed} document(s) haven't been copied, please check failed list below.`); }else{ console.log("All documents copied successfully."); } } if(totalCopyResult.fails.length){ console.log("All failed objects\n",totalCopyResult.fails); }
修改变量里的目标地址IP及源DB的DB名称或URL连接名称,修改copy的collection名称
const BATCH_SIZE = 2000; let srcConnection = "10.192.168.99"; let srcDb = "supervision"; let dstConnection = "positioning[开发]"; let dstDb = "test"; use(dstDb); //idPolicy: overwrite_with_same_id|always_insert_with_new_id|insert_with_new_id_if_id_exists|skip_documents_with_existing_id|abort_if_id_already_exists|drop_collection_first|log_errors let toCopyCollections = [ { srcCollection: "zhangluTest.key_manage_file.files", query: {}, projection: {}, dstCollection: "zhangluTest.key_manage_file.files", idPolicy: "log_errors"}, { srcCollection: "zhangluTest.key_manage_file.chunks", query: {}, projection: {}, dstCollection: "zhangluTest.key_manage_file.chunks", idPolicy: "log_errors"} ]; let toRecreateViews = [ ] // if have more indexes than default index (_id), create index after copy data. let shouldCreateIndex = true; let totalCopyResult={ result:{}, fails:[], } function copyCollection(params) { let { srcCollection, dstCollection, query, projection, idPolicy } = params; console.log(`Copy docs from ${srcConnection}:${srcDb}:${srcCollection} to ${dstConnection}:${dstDb}:${dstCollection} start...`); let isSameCollection = srcConnection === dstConnection && srcDb === dstDb && srcCollection === dstCollection; if (isSameCollection) { if (toCopyCollections.length === 1) shouldCreateIndex = false; else params.shouldCreateIndex = false; } if (idPolicy === "drop_collection_first") { // srcCollection is same to dstCollection, can not drop dstCollection (equal to drop srcCollection) // drop dst collection and copy from same collection, means nothing to do. if (isSameCollection) return; mb.dropCollection({ connection: dstConnection, db: dstDb, collection: dstCollection }); console.log(`drop collection ${dstDb}.${dstCollection}`); } totalCopyResult.result[dstCollection] = { nInserted: 0, nModified: 0, nSkipped: 0, failed: 0, }; let collectionRst = totalCopyResult.result[dstCollection]; let limitReadCount = Number.MAX_SAFE_INTEGER; if (isSameCollection){ limitReadCount = mb.runScript({ connection: srcConnection, db: srcDb, script: `db.getCollection(${tojson(srcCollection)}).find(${tojson(query)}).count()` }) } const batchSize = limitReadCount > BATCH_SIZE ? BATCH_SIZE : limitReadCount; const cursor = mb.getCursorFromCollection({ connection: srcConnection, db: srcDb, collection: srcCollection, query, projection}); const totalCount=cursor.size? cursor.size() : cursor.count(); await(mb.batchReadFromCursor(cursor, batchSize, (docs) => { return async(() => { let readLength = docs.length; if (!readLength) return; let copyResult = await(mb.writeToDb({ connection: dstConnection, db: dstDb, collection: dstCollection, docs, idPolicy })); let failed = copyResult.errors.length; let success = copyResult.nInserted + copyResult.nModified; collectionRst.nInserted += copyResult.nInserted; collectionRst.nModified += copyResult.nModified; collectionRst.nSkipped += copyResult.nSkipped; collectionRst.failed += failed; const processedCount=collectionRst.nInserted+collectionRst.nModified+collectionRst.nSkipped+collectionRst.failed; const percent = (processedCount / totalCount * 100).toFixed(1); console.log(`${dstCollection}: ${percent}% ${processedCount}/${totalCount} ${collectionRst.nInserted + collectionRst.nModified} docs successfully imported, ${collectionRst.failed} docs failed.`); if (failed) { console.log("Failed objects", copyResult.errors); } totalCopyResult.fails = [...totalCopyResult.fails, ...copyResult.errors]; })(); })); sleep(100); console.log(`copy docs from ${srcConnection}:${srcDb}:${srcCollection} to ${dstConnection}:${dstDb}:${dstCollection} finished. `); } //Copy collections for (let collection of toCopyCollections) { await(copyCollection(collection)); } //Recreate database readonly views for (let view of toRecreateViews) { const viewOn = tojson(view.viewOn); const viewName= tojson(view.name); const script= `var dropIfExists=false; if (dropIfExists){ db.getCollection(${viewName}).drop(); } db.getCollection(${viewOn}).aggregate(${tojson(view.pipeline)}).saveAsView(${viewName}, ${tojson(view.options)})`; const rst=await(mb.runScript({connection: dstConnection, db: dstDb, script})); if (!(rst && rst.ok)){ console.error(rst.message); }else{ console.log(`re-created readonly view ${ viewName } `); } } if (shouldCreateIndex){ let indexCreationPrompted = false; function indexCreationPrompt(){ if (indexCreationPrompted) return; const waitTime = 3; console.log(`Index creating will start in ${waitTime} seconds...`) sleep(1000*waitTime); indexCreationPrompted = true; } let srcCollections = toCopyCollections.filter(it=>it["shouldCreateIndex"] !== false).map(it => it.srcCollection) srcCollections.forEach(it => { let indexes = mb.runScript({connection: srcConnection, db: srcDb, script: `db.getCollection(${tojson(it)}).getIndexes();` }); if (indexes.length > 1){ let toCopyCollection = _.find(toCopyCollections, {srcCollection: it}); if (!toCopyCollection) return; let dstCollection = toCopyCollection.dstCollection; indexes.forEach(index => { if (index.name === "_id_") return; indexCreationPrompt(); let newIndex = _.cloneDeep(index); // remove index version and engine info, these info may fail createIndexes operator. delete newIndex.v; delete newIndex.textIndexVersion; delete newIndex["2dsphereIndexVersion"]; delete newIndex.storageEngine; newIndex.ns = `${dstDb}.${dstCollection}`; console.log(`create index ${newIndex.name} for ${dstDb}.${dstCollection}`); db.runCommand({ createIndexes: dstCollection, indexes: [newIndex] }); }) } }); if (indexCreationPrompted) console.log("create index complete.") } if(totalCopyResult.result){ let success=0; let failed=0; let collections=_.keys(totalCopyResult.result); collections.forEach((key)=>{ let obj=totalCopyResult.result[key]; success+=obj.nInserted+obj.nModified; failed+=obj.failed; }); console.log(` ${success} document(s) of ${collections.length} collection(s) have been copied.\n`,totalCopyResult.result); if(failed){ console.log(`${failed} document(s) haven't been copied, please check failed list below.`); }else{ console.log("All documents copied successfully."); } } if(totalCopyResult.fails.length){ console.log("All failed objects\n",totalCopyResult.fails); }