twitter user stream

This commit is contained in:
Gabor Adam Toth 2011-08-24 10:57:50 +02:00
parent 7840f73c91
commit 319d7329bf
3 changed files with 326 additions and 47 deletions

View File

@ -0,0 +1,44 @@
#include <net.h>
#include <ht/http.h>
inherit NET_PATH "http/fetch";
int buffer_content(string data) {
P2(("%O got data:\n%O\n", ME, data))
mixed *waiter;
foreach (waiter : qToArray(ME)) {
funcall(waiter[0], data, waiter[1] ? fheaders : copy(fheaders), http_status, 1);
}
next_input_to(#'buffer_content); //'
return 1;
}
disconnected(string data) {
P2(("%O got disconnected:\n%O\n", ME, data))
headers["_fetchtime"] = isotime(ctime(time()), 1);
if (headers["last-modified"])
rheaders["if-modified-since"] = headers["last-modified"];
//if (headers["etag"])
// rheaders["if-none-match"] = headers["etag"]; // heise does not work with etag
fheaders = headers;
buffer = headers = 0;
switch (http_status) {
default:
mixed *waiter;
while (qSize(ME)) {
waiter = shift(ME);
P2(("%O calls back.. body is %O\n", ME, data))
funcall(waiter[0], data, waiter[1] ? fheaders : copy(fheaders), http_status, 0);
}
if (http_status == R_OK) break;
// doesn't seem to get here when HTTP returns 301 or 302. strange.
// fall thru
case R_NOTMODIFIED:
qDel(ME);
qInit(ME, 150, 5);
}
fetching = 0;
return 1; // presume this disc was expected
}

View File

@ -11,15 +11,19 @@
#include <tls.h> #include <tls.h>
#include <ht/http.h> #include <ht/http.h>
string consumer_key; volatile string consumer_key;
string consumer_secret; volatile string consumer_secret;
string request_token_url; volatile string request_token_url;
mapping request_params = ([ ]); persistent mapping request_params = ([ ]);
mapping access_params = ([ ]); persistent mapping access_params = ([ ]);
string access_token_url; volatile string access_token_url;
string authorize_url; volatile string authorize_url;
string callback_url = HTTPS_OR_HTTP_URL + "/oauth"; volatile string callback_url = HTTPS_OR_HTTP_URL + "/oauth";
object user; volatile object user;
volatile int authorized = 0;
oauth_success() {}
oauth_error() {}
varargs void fetch(object ua, string url, string method, mapping post, mapping oauth) { varargs void fetch(object ua, string url, string method, mapping post, mapping oauth) {
P3((">> oauth:fetch(%O, %O, %O)\n", url, method, oauth)) P3((">> oauth:fetch(%O, %O, %O)\n", url, method, oauth))
@ -60,12 +64,16 @@ void parse_request_token(string body, mapping headers, int http_status) {
if (strlen(request_params["oauth_token"]) && strlen(request_params["oauth_token_secret"])) { if (strlen(request_params["oauth_token"]) && strlen(request_params["oauth_token_secret"])) {
shared_memory("oauth_request_tokens")[request_params["oauth_token"]] = ME; shared_memory("oauth_request_tokens")[request_params["oauth_token"]] = ME;
//P3((">>> adding token: %O to shm: %O\n", request_params["oauth_token"], shared_memory("oauth_request_tokens"))) //P3((">>> adding token: %O to shm: %O\n", request_params["oauth_token"], shared_memory("oauth_request_tokens")))
sendmsg(user, "_notice_oauth_authorize_url", "Open [_url] to perform authorization.", if (user) sendmsg(user, "_notice_oauth_authorize_url", "Open [_url] to perform authorization.",
(["_url": authorize_url + "?oauth_token=" + request_params["oauth_token"]])); (["_url": authorize_url + "?oauth_token=" + request_params["oauth_token"]]));
P1(("OAuth: open %s to perform authorization.\n", authorize_url + "?oauth_token=" + request_params["oauth_token"]));
return; return;
} }
} }
sendmsg(user, "_error_oauth_token_request", "OAuth failed: could not get a request token."); if (user) sendmsg(user, "_error_oauth_token_request", "OAuth failed: could not get a request token.");
P1(("OAuth failed: could not get a request token.\n"));
authorized = -1;
oauth_error();
} }
void parse_access_token(string body, mapping headers, int http_status) { void parse_access_token(string body, mapping headers, int http_status) {
@ -74,11 +82,17 @@ void parse_access_token(string body, mapping headers, int http_status) {
access_params = ([]); access_params = ([]);
url_parse_query(access_params, body); url_parse_query(access_params, body);
if (strlen(access_params["oauth_token"]) && strlen(access_params["oauth_token_secret"])) { if (strlen(access_params["oauth_token"]) && strlen(access_params["oauth_token_secret"])) {
sendmsg(user, "_notice_oauth_success", "OAuth successful."); if (user) sendmsg(user, "_notice_oauth_success", "OAuth successful.");
P1(("OAuth successful.\n"));
authorized = 1;
oauth_success();
return; return;
} }
} }
sendmsg(user, "_error_oauth_token_access", "OAuth failed: could not get an access token."); if (user) sendmsg(user, "_error_oauth_token_access", "OAuth failed: could not get an access token.");
P1(("OAuth failed: could not get an access token.\n"));
authorized = -1;
oauth_error();
} }
void verified(string verifier) { void verified(string verifier) {
@ -88,18 +102,30 @@ void verified(string verifier) {
fetch(ua, access_token_url, "POST", 0, (["oauth_verifier": verifier])); fetch(ua, access_token_url, "POST", 0, (["oauth_verifier": verifier]));
} }
object load(object usr, string key, string secret, string request, string access, string authorize) { void oauth() {
if (usr) user = usr; if (!request_token_url) return;
if (key) consumer_key = key;
if (secret) consumer_secret = secret; request_params = ([ ]);
if (request) request_token_url = request; access_params = ([ ]);
if (access) access_token_url = access; authorized = 0;
if (authorize) authorize_url = authorize;
if (request_token_url && user) {
object ua = clone_object(NET_PATH "http/fetch"); object ua = clone_object(NET_PATH "http/fetch");
ua->content(#'parse_request_token, 1, 1); //'); ua->content(#'parse_request_token, 1, 1); //');
fetch(ua, request_token_url, "POST", 0, (["oauth_callback": callback_url])); fetch(ua, request_token_url, "POST", 0, (["oauth_callback": callback_url]));
} }
object load(object usr, mapping opts) {
if (usr) user = usr;
unless (mappingp(opts)) opts = ([]);
if (opts["consumer_key"]) consumer_key = opts["consumer_key"];
if (opts["consumer_secret"]) consumer_secret = opts["consumer_secret"];
if (opts["request_token_url"]) request_token_url = opts["request_token_url"];
if (opts["access_token_url"]) access_token_url = opts["access_token_url"];
if (opts["authorize_url"]) authorize_url = opts["authorize_url"];
if (access_params["oauth_token"])
authorized = 1;
else
oauth();
return ME; return ME;
} }

View File

@ -7,34 +7,72 @@
* - app type: browser * - app type: browser
* - callback url: http://your.host/oauth * - callback url: http://your.host/oauth
* (actually the url psyced sends will be used but you have to type in something) * (actually the url psyced sends will be used but you have to type in something)
* - access type: read/write
* - then in local.h #define TWITTER_KEY & TWITTER_SECRET * - then in local.h #define TWITTER_KEY & TWITTER_SECRET
*
* - enabling the user stream
* - #define TWITTER_ADMIN which should contain a user name who will receive oauth messages
* - #define USE_TWITTER_STREAM
* - add this to local/config.c:
# ifdef USE_TWITTER_STREAM
D(" " NET_PATH "twitter/client\n");
load_object(NET_PATH "twitter/client")->home_stream();
# endif
*/ */
#include <net.h> #include <net.h>
#include <ht/http.h> #include <ht/http.h>
inherit NET_PATH "http/oauth"; inherit NET_PATH "http/oauth";
inherit NET_PATH "queue";
string name = "twitter"; persistent mixed lastid;
string display_name = "twitter";
string api_base_url = "http://api.twitter.com/1";
int status_max_len = 140; volatile string api_url = "https://api.twitter.com/1";
volatile string userstream_url = "https://userstream.twitter.com/2";
object load(object usr, string key, string secret, string request, string access, string authorize) { volatile int status_max_len = 140;
unless (consumer_key) consumer_key = TWITTER_KEY; volatile int send_to_user = 0;
unless (consumer_secret) consumer_secret = TWITTER_SECRET; volatile int wait = 0;
unless (request_token_url) request_token_url = "http://twitter.com/oauth/request_token"; volatile mapping friends;
unless (access_token_url) access_token_url = "http://twitter.com/oauth/access_token";
unless (authorize_url) authorize_url = "http://twitter.com/oauth/authorize";
return ::load(usr, key, secret, request, access, authorize); user_stream();
string object_file_name() {
return DATA_PATH "twitter/" + (user ? user->qNameLower() : "-default");
} }
void parse_status_update(string body, string headers, int http_status) { save() {
mkdir(DATA_PATH "twitter");
save_object(object_file_name());
}
create() {
return load();
}
object load(object usr, mapping opts) {
#ifdef TWITTER_ADMIN
unless (usr) usr = user = summon_person(TWITTER_ADMIN, NET_PATH "user");
#endif
unless (mappingp(opts)) opts = ([]);
send_to_user = opts["send_to_user"];
unless (consumer_key) consumer_key = TWITTER_KEY;
unless (consumer_secret) consumer_secret = TWITTER_SECRET;
unless (request_token_url) request_token_url = "https://twitter.com/oauth/request_token";
unless (access_token_url) access_token_url = "https://twitter.com/oauth/access_token";
unless (authorize_url) authorize_url = "https://twitter.com/oauth/authorize";
restore_object(object_file_name());
qCreate();
qInit(ME, 100, 5);
return ::load(usr, opts);
}
void check_status_update(string body, string headers, int http_status) {
P3(("twitter/client:parse_status_update(%O, %O, %O)\n", body, headers, http_status)) P3(("twitter/client:parse_status_update(%O, %O, %O)\n", body, headers, http_status))
if (http_status != R_OK) if (http_status != R_OK)
sendmsg(user, "_error_"+name+"_status_update", "Error: failed to post status update on [_name].", (["_name": display_name])); sendmsg(user, "_error_twitter_status_update", "Error: failed to post status update on twitter.");
} }
void status_update(string text) { void status_update(string text) {
@ -42,19 +80,190 @@ void status_update(string text) {
if (status_max_len && strlen(text) > status_max_len) text = text[0..status_max_len-4] + "..."; if (status_max_len && strlen(text) > status_max_len) text = text[0..status_max_len-4] + "...";
object ua = clone_object(NET_PATH "http/fetch"); object ua = clone_object(NET_PATH "http/fetch");
ua->content(#'parse_status_update, 1, 1); //'); ua->content(#'check_status_update, 1, 1); //');
fetch(ua, api_base_url + "/statuses/update.json", "POST", (["status": text])); fetch(ua, api_url + "/statuses/update.json", "POST", (["status": text]));
} }
#if 0 //not used, just an example parse_statuses(string data) {
void parse_home_timeline(string body, string headers, int http_status) { mixed wurst;
string nick;
object o;
mapping d, p;
int i;
if (!data || data == "") return;
wurst = parse_json(data);
if (mappingp(wurst))
wurst = ({ wurst });
else unless (pointerp(wurst)) {
monitor_report("_failure_network_fetch_twitter_broken",
"[_source] failed to parse a status message");
return;
}
unless (sizeof(wurst)) {
monitor_report("_failure_network_fetch_twitter_empty",
"[_source] received an empty structure.");
return;
}
// this used to fail on MAX_INT turning the ints to negative.. it would work for
// a while longer using floats, but since floating point mantissa in lpc is only
// 32 bits wide, it's just a question of time until we hit that roof again (when
// status_id reaches 4294967296). so let's use bignums instead. funny to run into
// such a weird problem only after years that twitter has been in existence.
if (lastid && bignum_cmp(to_string(wurst[0]["id"]), to_string(lastid)) <= 0) {
P1(("%O received %d old updates (id0 %O <= lastid %O).\n",
ME, sizeof(wurst), wurst[0]["id"], lastid))
return;
}
lastid = wurst[0]["id"];
save();
for (i=sizeof(wurst)-1; i>=0; i--) {
d = wurst[i];
unless (mappingp(d)) {
P1(("%O got a broken tweet: %O.\n", ME, d))
continue;
}
p = d["user"];
unless (mappingp(p)) {
P1(("%O got a userless tweet.\n", ME))
continue;
}
unless (nick = p["screen_name"]) {
P1(("%O got a nickless tweeter.\n", ME))
continue;
}
o = send_to_user ? user : find_place(nick);
// _message_twitter ? not so convincing.. a lot of the
// things are converted rss newsfeeds, and when private
// people are "chatting" over twitter, they are still
// "broadcasting" each message to a random conjunction
// of friends and strangers (we don't follow private
// twitters with this gateway!) ... thus it is quite
// appropriate that twitters are not given the same
// relevance as a _message. still you can /highlight
// particular senders in your client...
//
sendmsg(o,
// "_notice_headline_twitter", "([_nick]) [_headline]",
"_message_twitter", d["text"],
([
"_headline": d["text"], // should i send text as _action?
"_nick": nick,
// _count seems to be the better word for this
"_amount_updates": p["statuses_count"],
"_amount_followers": p["followers_count"],
"_amount_sources": p["friends_count"],
"_color": "#"+ p["profile_sidebar_fill_color"],
"_description": p["description"] || "",
"_page": p["url"] || "",
"_name": p["name"] || "",
// "_contact_twitter": p["id"],
"_description_agent_HTML": d["source"],
"_reference_reply": d["in_reply_to_screen_name"],
// "_twit": d["id"],
"_uniform_photo": p["profile_image_url"] || "",
"_uniform_photo_background":
p["profile_background_image_url"] || ""
]), "/"); // send as root
// der spiegel u.a. twittern übrigens in latin-1
// während psyc utf-8 erwartet.. eine char guess engine
// muss her.. FIXME
}
}
parse_home_timeline(string body, string headers, int http_status) {
P3(("twitter/client:parse_home_timeline(%O, %O, %O)\n", body, headers, http_status)) P3(("twitter/client:parse_home_timeline(%O, %O, %O)\n", body, headers, http_status))
if (http_status == 401) {
oauth();
home_timeline();
}
if (http_status != R_OK || !body || body == "") return;
parse_statuses(body);
} }
void home_timeline() { home_timeline(mixed *next) {
P3(("twitter/client:home_timeline()\n")) P3(("twitter/client:home_timeline()\n"))
if (!authorized) return enqueue(ME, ({ #'home_timeline })); //'}));
object ua = clone_object(NET_PATH "http/fetch"); object ua = clone_object(NET_PATH "http/fetch");
ua->content(#'parse_home_timeline, 1, 1); //'); ua->content(#'parse_home_timeline, 1, 1); //');
fetch(ua, api_base_url + "/statuses/home_timeline.json"); if (next) ua->content(next, 1, 1); //');
fetch(ua, api_url + "/statuses/home_timeline.json");
return ua;
}
home_stream() {
home_timeline(#'user_stream);
}
// handle one line in the user stream which contains a full message in json format
// or the user's friend if this is the first line in the stream
user_stream_data(string data, string headers, int http_status, int fetching) {
P3(("twitter/client:user_stream_data(..., %O, %O, %O)\n%O\n", headers, http_status, fetching, data))
if (http_status == R_OK && data && data != "") {
if (!friends)
friends = parse_json(data);
else
parse_statuses(data);
}
if (fetching) {
wait = 0;
return;
} else {
P1(("%O disconnected with status %d, headers: %O\n", ME, http_status, headers))
}
switch (http_status) {
case 401: // unauthorized
oauth();
home_stream();
case 403: // forbidden
case 404: // unknown
case 406: // not acceptable
case 413: // too long
case 416: // range unacceptable
return;
case 200:
break;
case 420: // rate limited
case 500: // server internal error
case 503: // service overloaded
default:
wait *= 2;
}
if (!wait) wait = 10;
if (wait > 240) wait = 240;
P1(("%O reconnecting in %d seconds.\n", ME, wait))
call_out(#'home_stream, wait); //');
}
user_stream() {
P3(("twitter/client:user_stream()\n"))
if (!authorized) return enqueue(ME, ({ #'user_stream })); //'}));
friends = 0;
object user_ua = clone_object(NET_PATH "http/fetch_stream");
user_ua->content(#'user_stream_data, 1, 1); //');
fetch(user_ua, userstream_url + "/user.json");
}
oauth_success() {
P3(("twitter/client:oauth_success()\n"))
save();
mixed *waiter;
while (qSize(ME)) {
waiter = shift(ME);
funcall(waiter[0]);
}
}
oauth_error() {
P3(("twitter/client:oauth_error()\n"))
call_out(#'oauth, 60); //');
} }
#endif