本文不支持复制粘贴的转载,鼓励修改和扩展后的转载。转载前必须邮件取得本人同意,联系方式:1505514388@qq.com
//获取mongo客户端 MongoClient mongoClient=new MongoClient("localhost"); //获取数据库 MongoDatabase database=mongoClient.getDatabase("test_database"); //获取集合 MongoCollection<Document>collection=database.getCollection("test_database"); //查询集合中的第一个元素 Document myDoc=collection.find().first(); System.out.println(myDoc); //关闭资源 mongoClient.close();
val mongoClient = MongoClient("mongodb://localhost:27017") val database = mongoClient.getDatabase("test_basedata") val collection = database.getCollection("test_basedata") val document = collection.find().first() System.out.println(document.toString) mongoClient.close()
在单纯的使用scala的mongodb的连接器时遇到了以下报错:
com.mongodb.ConnectionString.getThreadsAllowedToBlockForConnectionMultiplier()Ljava/lang/Integer;
解决办法为:
<dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.10.0</version> </dependency>
具体原因未知
spark的连接器的代码如下:
//TODO 开启环境 val spark = SparkSession.builder().master("local") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test_database.test_database") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test_database.test_database") .getOrCreate() //TODO 数据操作 val testDF = MongoSpark.load(spark) testDF.show(20) //TODO 关闭环境 spark.close()
在flink中没有将mongodb作为数据源的,所以下面使用的依赖也是第三方连接器。
在一般情况下,也不会遇到将mongodb作为flink的数据源。
所需要的依赖:
<!-- https://mvnrepository.com/artifact/org.mongodb/casbah-core --> <dependency> <groupId>org.mongodb</groupId> <artifactId>casbah-core_2.11</artifactId> <version>3.1.1</version> </dependency>
下面是自定义的source
package com.myFlink.test import com.mongodb.{BasicDBObject, MongoClientURI, casbah} import com.mongodb.casbah.{MongoClient, MongoClientURI} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} class MongodbSource extends RichSourceFunction[User]{ //创建mongodb数据源时运行 var client:MongoClient=_ override def open(parameters: Configuration): Unit = { //连接本地mongodb client= MongoClient(casbah.MongoClientURI("mongodb://localhost:27017")) } //实时运行的函数 override def run(sourceContext: SourceFunction.SourceContext[User]): Unit = { //TODO 获取数据库 val database = client("test_database") //TODO 获取集合 val coll = database("test_database") //TODO 取出数据 val query = new BasicDBObject("date", "20171203") val cursorType = coll.find(query) if(cursorType.nonEmpty){ val oneData = cursorType.next() //拿出一条数据 sourceContext.collect( User( name=oneData.get("name").toString, date=oneData.get("date").toString ) ) } } //结束时的函数 override def cancel(): Unit ={ if(client!=null){ client.close() } } } case class User(name:String,date:String)
然后向本地的mongdb中插入数据:
db.getCollection("test_database").insert({"name":"kone", "date":"20171203"})
然后输出:
4> User(kone,20171203)
package com.myFlink.test import com.mongodb.BasicDBObject import com.mongodb.casbah.{MongoClient, MongoClientURI} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction class MongoDBSink extends RichSinkFunction[User]{ var client:MongoClient=_ override def open(parameters: Configuration): Unit = { client=MongoClient(MongoClientURI("mongodb://localhost:27017")) } override def invoke(value: User): Unit = { //TODO 获取数据库 val database = client("test_database") //TODO 获取集合 val coll = database("test_database") //TODO 数据操作 val obj = new BasicDBObject("name", value.name).append("date", value.date) coll.insert(obj) } override def close(): Unit = { if(client!=null){ client.close() } } }
还没完成。