重慶分公司,新征程啟航
為企業(yè)提供網(wǎng)站建設(shè)、域名注冊、服務(wù)器等服務(wù)
為企業(yè)提供網(wǎng)站建設(shè)、域名注冊、服務(wù)器等服務(wù)
這篇文章主要講解了“flink sql怎么實(shí)時計算當(dāng)天pv寫入MySQL”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“flink sql怎么實(shí)時計算當(dāng)天pv寫入mysql”吧!
創(chuàng)新互聯(lián)公司是一家專注于網(wǎng)站制作、成都網(wǎng)站建設(shè)與策劃設(shè)計,茫崖網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)十多年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:茫崖等地區(qū)。茫崖做網(wǎng)站價格咨詢:13518219792
首先我們還是使用datagen生成測試數(shù)據(jù),隨機(jī)生成一些用戶id
String sourceSql = "CREATE TABLE datagen (\n" +
" userid int,\n" +
" proctime as PROCTIME()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='100',\n" +
" 'fields.userid.kind'='random',\n" +
" 'fields.userid.min'='1',\n" +
" 'fields.userid.max'='100'\n" +
")";
定義mysql的sink,這里mysql是作為了一個upsert的sink,所以必須要一個主鍵,在mysql建表的時候我們指定了當(dāng)天的日期作為主鍵,mysql ddl如下
CREATE TABLE `pv` (
`day_str` varchar(100) NOT NULL,
`pv` bigint(10) DEFAULT NULL,
PRIMARY KEY (`day_str`)
)
Flink中的ddl要和mysql中對的上,也要指定主鍵。
String mysqlsql = "CREATE TABLE pv (\n" +
" day_str STRING,\n" +
" pv bigINT,\n" +
" PRIMARY KEY (day_str) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/test',\n" +
" 'table-name' = 'pv'\n" +
")";
接下來我們寫一個簡單的查詢:
tEnv.executeSql("insert into pv SELECT DATE_FORMAT(proctime, 'yyyy-MM-dd') as day_str, count(*) \n" +
"FROM datagen \n" +
"GROUP BY DATE_FORMAT(proctime, 'yyyy-MM-dd')");
可能對于以前一直做批處理的同學(xué)來說會感到疑惑,對于流式處理來說,group by將會返回一個可撤回流(RetractStream),轉(zhuǎn)化成datastream,將會得到一個Tuple2
類似的需求我們還可以使用flink的窗口來實(shí)現(xiàn),定義一個窗口周期是一天的窗口,然后自定義一個觸發(fā)器,比如每秒鐘觸發(fā)一次,然后將結(jié)果輸出寫入第三方sink。
感謝各位的閱讀,以上就是“flink sql怎么實(shí)時計算當(dāng)天pv寫入mysql”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對flink sql怎么實(shí)時計算當(dāng)天pv寫入mysql這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!