2013.09 from http://www./blog/archives/737.html Comet 技術(shù)就是常見(jiàn)的 Web 服務(wù)器"推"技術(shù), 用于向網(wǎng)頁(yè)實(shí)時(shí)地推送數(shù)據(jù). 最常見(jiàn)的 Comet 技術(shù)應(yīng)用在網(wǎng)頁(yè)聊天, 當(dāng)然還可以應(yīng)用于很多的方面, 如微博更新, 熱點(diǎn)新聞推送, 股票即時(shí)行情等等, 甚至是網(wǎng)頁(yè)游戲! Comet 技術(shù)如此重要, 但市面上并沒(méi)有真正流行通用的 Comet 服務(wù)器和解決方案, 比較知名的互聯(lián)網(wǎng)公司大多是自己開發(fā), 或者基于開源服務(wù)器進(jìn)行二次開發(fā), 例如基于 Jetty(一個(gè)開源 Java Web 容器), 而 Facebook 的聊天系統(tǒng)的 Comet 服務(wù)器是基于 Mochiweb(一個(gè)開源的 Erlang Web 服務(wù)器). 當(dāng)然還有比較知名的以 nginx 模塊形式出現(xiàn)的 nginx-push-stream, 但根據(jù)實(shí)際使用經(jīng)驗(yàn), 這個(gè)模塊無(wú)法穩(wěn)定支撐 10 萬(wàn)個(gè)并發(fā)連接, 更別談百萬(wàn)同時(shí)在線了. 這也是這個(gè)模塊為什么沒(méi)有被普遍大規(guī)模應(yīng)用的原因. 既然大家都開發(fā)自己的 Comet 服務(wù)器, 那必然有其中的道理, 說(shuō)是核心技術(shù)倒說(shuō)不上, 不過(guò)是為了便于擴(kuò)展, 能很好地和現(xiàn)有系統(tǒng)整合, 易于運(yùn)維和管理而已. 那么, 要開發(fā)一個(gè) Comet 服務(wù)器到底有多難呢? 其實(shí), 一個(gè)最簡(jiǎn)單的 Comet 服務(wù)器只需要 150 行 C 語(yǔ)言代碼! 先說(shuō)一下 Comet 技術(shù), 從瀏覽器支持考慮, long-polling 技術(shù)顯然是最佳的選擇, 又從跨域方面考慮, 那必然是 script tag long-polling 技術(shù)獲勝. 這也是 Facebook 的選擇. 所以, 最簡(jiǎn)單的 Comet 服務(wù)器只支持 Script tag long-polling 即可. Long-polling 技術(shù)要求瀏覽器的每一個(gè)網(wǎng)頁(yè)和服務(wù)器保持一個(gè) HTTP 請(qǐng)求連接(TCP 連接), 服務(wù)器收到這樣的連接后, 會(huì)立即返回 HTTP 首部, 接著通過(guò) chunk 傳輸編碼, 源源不斷地將一個(gè)個(gè)消息發(fā)送給瀏覽器. 一個(gè)完整的 chunk 編碼的 HTTP 響應(yīng)如下: HTTP/1.1 200 OK Date: Fri, 31 Dec 1999 23:59:59 GMT Content-Type: text/plain Transfer-Encoding: chunked 1a; ignore-stuff-here abcdefghijklmnopqrstuvwxyz 10 1234567890abcdef 0 [blank line here] 只要服務(wù)器不返回只有"0"的那一行以及緊接著的空白行, 那么就可以保持向網(wǎng)頁(yè)推數(shù)據(jù). 最簡(jiǎn)單的 Comet 服務(wù)器使用了 libevent 框架, 你可以在這里得到它的代碼: https://github.com/ideawu/icomet. 歡迎對(duì) Comet 了解的前端工程師貢獻(xiàn) JavaScript 相關(guān)的代碼! 使用方式: 訂閱: curl -v "http://127.0.0.1:8000/sub?id=12" 推送: curl -v "http://127.0.0.1:8000/pub?id=12&content=hi" 這個(gè) Comet 服務(wù)器的最大并發(fā)數(shù)并沒(méi)有進(jìn)行測(cè)試, 但 last.fm 的 CTO 對(duì)一個(gè)同樣是基于 libevent 的類似程序進(jìn)行測(cè)試, 100 萬(wàn)連接只需要 2GB 內(nèi)存. #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <signal.h> #include <fcntl.h> #include <unistd.h> #include <event.h> #include <evhttp.h> #include <event2/event.h> #include <event2/http.h> #include <event2/buffer.h> #include <event2/util.h> #include <event2/keyvalq_struct.h> #include <netinet/in.h> #include <arpa/inet.h> #define MAX_CHANNELS 1000 struct Channel{ int id; struct evhttp_request *req; }; struct Channel channels[MAX_CHANNELS]; void init(){ int i; for(i=0; i<MAX_CHANNELS; i++){ channels[i].id = i; channels[i].req = NULL; } } // called when user disconnects void cleanup(struct evhttp_connection *evcon, void *arg){ struct Channel *sub = (struct Channel *)arg; printf("disconnected uid %d\n", sub->id); sub->req = NULL; } void sub_handler(struct evhttp_request *req, void *arg) { struct evkeyvalq params; const char *uri = evhttp_request_get_uri(req); evhttp_parse_query(uri, ¶ms); struct evbuffer *buf; int uid = -1; struct evkeyval *kv; for(kv = params.tqh_first; kv; kv = kv->next.tqe_next){ if(strcmp(kv->key, "id") == 0){ uid = atoi(kv->value); } } if(uid < 0 || uid >= MAX_CHANNELS){ buf = evbuffer_new(); evhttp_send_reply_start(req, HTTP_NOTFOUND, "Not Found"); evbuffer_free(buf); return; } printf("sub: %d\n", uid); struct Channel *sub = &channels[uid]; sub->req = req; buf = evbuffer_new(); evhttp_send_reply_start(req, HTTP_OK, "OK"); evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8"); evbuffer_add_printf(buf, "{type: \"welcome\", id: \"%d\", content: \"hello world!\"}\n", uid); evhttp_send_reply_chunk(req, buf); evbuffer_free(buf); evhttp_connection_set_closecb(req->evcon, cleanup, &channels[uid]); } void pub_handler(struct evhttp_request *req, void *arg){ struct evkeyvalq params; const char *uri = evhttp_request_get_uri(req); evhttp_parse_query(uri, ¶ms); struct evbuffer *buf; int uid = -1; const char *content = ""; struct evkeyval *kv; for(kv = params.tqh_first; kv; kv = kv->next.tqe_next){ if(strcmp(kv->key, "id") == 0){ uid = atoi(kv->value); }else if(strcmp(kv->key, "content") == 0){ content = kv->value; } } struct Channel *sub = NULL; if(uid < 0 || uid >= MAX_CHANNELS){ sub = NULL; }else{ sub = &channels[uid]; } if(sub && sub->req){ printf("pub: %d content: %s\n", uid, content); // push to browser buf = evbuffer_new(); evbuffer_add_printf(buf, "{type: \"data\", id: \"%d\", content: \"%s\"}\n", uid, content); evhttp_send_reply_chunk(sub->req, buf); evbuffer_free(buf); // response to publisher buf = evbuffer_new(); evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8"); evbuffer_add_printf(buf, "ok\n"); evhttp_send_reply(req, 200, "OK", buf); evbuffer_free(buf); }else{ buf = evbuffer_new(); evbuffer_add_printf(buf, "id: %d not connected\n", uid); evhttp_send_reply(req, 404, "Not Found", buf); evbuffer_free(buf); } } int main(int argc, char **argv){ signal(SIGPIPE, SIG_IGN); struct event_base *base; struct evhttp *http; struct evhttp_bound_socket *handle; unsigned short port = 8000; init(); base = event_base_new(); http = evhttp_new(base); evhttp_set_cb(http, "/sub", sub_handler, NULL); evhttp_set_cb(http, "/pub", pub_handler, NULL); //evhttp_set_gencb(http, request_handler, NULL); handle = evhttp_bind_socket_with_handle(http, "0.0.0.0", port); printf("server listen at 127.0.0.1:%d\n", port); event_base_dispatch(base); return 0; } Related posts: |
|