00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00027 #include "libavcodec/bytestream.h"
00028 #include "libavutil/avstring.h"
00029 #include "libavutil/intfloat.h"
00030 #include "libavutil/lfg.h"
00031 #include "libavutil/sha.h"
00032 #include "avformat.h"
00033 #include "internal.h"
00034
00035 #include "network.h"
00036
00037 #include "flv.h"
00038 #include "rtmp.h"
00039 #include "rtmppkt.h"
00040 #include "url.h"
00041
00042
00043
00045 typedef enum {
00046 STATE_START,
00047 STATE_HANDSHAKED,
00048 STATE_RELEASING,
00049 STATE_FCPUBLISH,
00050 STATE_CONNECTING,
00051 STATE_READY,
00052 STATE_PLAYING,
00053 STATE_PUBLISHING,
00054 STATE_STOPPED,
00055 } ClientState;
00056
00058 typedef struct RTMPContext {
00059 URLContext* stream;
00060 RTMPPacket prev_pkt[2][RTMP_CHANNELS];
00061 int chunk_size;
00062 int is_input;
00063 char playpath[256];
00064 char app[128];
00065 ClientState state;
00066 int main_channel_id;
00067 uint8_t* flv_data;
00068 int flv_size;
00069 int flv_off;
00070 RTMPPacket out_pkt;
00071 uint32_t client_report_size;
00072 uint32_t bytes_read;
00073 uint32_t last_bytes_read;
00074 int skip_bytes;
00075 uint8_t flv_header[11];
00076 int flv_header_bytes;
00077 int nb_invokes;
00078 int create_stream_invoke;
00079 } RTMPContext;
00080
00081 #define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing
00082
00083 static const uint8_t rtmp_player_key[] = {
00084 'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
00085 'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ', '0', '0', '1',
00086
00087 0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
00088 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
00089 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
00090 };
00091
00092 #define SERVER_KEY_OPEN_PART_LEN 36 ///< length of partial key used for first server digest signing
00093
00094 static const uint8_t rtmp_server_key[] = {
00095 'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
00096 'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ',
00097 'S', 'e', 'r', 'v', 'e', 'r', ' ', '0', '0', '1',
00098
00099 0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
00100 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
00101 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
00102 };
00103
00107 static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto,
00108 const char *host, int port)
00109 {
00110 RTMPPacket pkt;
00111 uint8_t ver[64], *p;
00112 char tcurl[512];
00113
00114 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 4096);
00115 p = pkt.data;
00116
00117 ff_url_join(tcurl, sizeof(tcurl), proto, NULL, host, port, "/%s", rt->app);
00118 ff_amf_write_string(&p, "connect");
00119 ff_amf_write_number(&p, ++rt->nb_invokes);
00120 ff_amf_write_object_start(&p);
00121 ff_amf_write_field_name(&p, "app");
00122 ff_amf_write_string(&p, rt->app);
00123
00124 if (rt->is_input) {
00125 snprintf(ver, sizeof(ver), "%s %d,%d,%d,%d", RTMP_CLIENT_PLATFORM, RTMP_CLIENT_VER1,
00126 RTMP_CLIENT_VER2, RTMP_CLIENT_VER3, RTMP_CLIENT_VER4);
00127 } else {
00128 snprintf(ver, sizeof(ver), "FMLE/3.0 (compatible; %s)", LIBAVFORMAT_IDENT);
00129 ff_amf_write_field_name(&p, "type");
00130 ff_amf_write_string(&p, "nonprivate");
00131 }
00132 ff_amf_write_field_name(&p, "flashVer");
00133 ff_amf_write_string(&p, ver);
00134 ff_amf_write_field_name(&p, "tcUrl");
00135 ff_amf_write_string(&p, tcurl);
00136 if (rt->is_input) {
00137 ff_amf_write_field_name(&p, "fpad");
00138 ff_amf_write_bool(&p, 0);
00139 ff_amf_write_field_name(&p, "capabilities");
00140 ff_amf_write_number(&p, 15.0);
00141 ff_amf_write_field_name(&p, "audioCodecs");
00142 ff_amf_write_number(&p, 1639.0);
00143 ff_amf_write_field_name(&p, "videoCodecs");
00144 ff_amf_write_number(&p, 252.0);
00145 ff_amf_write_field_name(&p, "videoFunction");
00146 ff_amf_write_number(&p, 1.0);
00147 }
00148 ff_amf_write_object_end(&p);
00149
00150 pkt.size = p - pkt.data;
00151
00152 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00153 ff_rtmp_packet_destroy(&pkt);
00154 }
00155
00160 static void gen_release_stream(URLContext *s, RTMPContext *rt)
00161 {
00162 RTMPPacket pkt;
00163 uint8_t *p;
00164
00165 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00166 29 + strlen(rt->playpath));
00167
00168 av_log(s, AV_LOG_DEBUG, "Releasing stream...\n");
00169 p = pkt.data;
00170 ff_amf_write_string(&p, "releaseStream");
00171 ff_amf_write_number(&p, ++rt->nb_invokes);
00172 ff_amf_write_null(&p);
00173 ff_amf_write_string(&p, rt->playpath);
00174
00175 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00176 ff_rtmp_packet_destroy(&pkt);
00177 }
00178
00183 static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
00184 {
00185 RTMPPacket pkt;
00186 uint8_t *p;
00187
00188 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00189 25 + strlen(rt->playpath));
00190
00191 av_log(s, AV_LOG_DEBUG, "FCPublish stream...\n");
00192 p = pkt.data;
00193 ff_amf_write_string(&p, "FCPublish");
00194 ff_amf_write_number(&p, ++rt->nb_invokes);
00195 ff_amf_write_null(&p);
00196 ff_amf_write_string(&p, rt->playpath);
00197
00198 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00199 ff_rtmp_packet_destroy(&pkt);
00200 }
00201
00206 static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
00207 {
00208 RTMPPacket pkt;
00209 uint8_t *p;
00210
00211 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00212 27 + strlen(rt->playpath));
00213
00214 av_log(s, AV_LOG_DEBUG, "UnPublishing stream...\n");
00215 p = pkt.data;
00216 ff_amf_write_string(&p, "FCUnpublish");
00217 ff_amf_write_number(&p, ++rt->nb_invokes);
00218 ff_amf_write_null(&p);
00219 ff_amf_write_string(&p, rt->playpath);
00220
00221 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00222 ff_rtmp_packet_destroy(&pkt);
00223 }
00224
00229 static void gen_create_stream(URLContext *s, RTMPContext *rt)
00230 {
00231 RTMPPacket pkt;
00232 uint8_t *p;
00233
00234 av_log(s, AV_LOG_DEBUG, "Creating stream...\n");
00235 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 25);
00236
00237 p = pkt.data;
00238 ff_amf_write_string(&p, "createStream");
00239 ff_amf_write_number(&p, ++rt->nb_invokes);
00240 ff_amf_write_null(&p);
00241 rt->create_stream_invoke = rt->nb_invokes;
00242
00243 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00244 ff_rtmp_packet_destroy(&pkt);
00245 }
00246
00247
00252 static void gen_delete_stream(URLContext *s, RTMPContext *rt)
00253 {
00254 RTMPPacket pkt;
00255 uint8_t *p;
00256
00257 av_log(s, AV_LOG_DEBUG, "Deleting stream...\n");
00258 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 34);
00259
00260 p = pkt.data;
00261 ff_amf_write_string(&p, "deleteStream");
00262 ff_amf_write_number(&p, ++rt->nb_invokes);
00263 ff_amf_write_null(&p);
00264 ff_amf_write_number(&p, rt->main_channel_id);
00265
00266 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00267 ff_rtmp_packet_destroy(&pkt);
00268 }
00269
00274 static void gen_play(URLContext *s, RTMPContext *rt)
00275 {
00276 RTMPPacket pkt;
00277 uint8_t *p;
00278
00279 av_log(s, AV_LOG_DEBUG, "Sending play command for '%s'\n", rt->playpath);
00280 ff_rtmp_packet_create(&pkt, RTMP_VIDEO_CHANNEL, RTMP_PT_INVOKE, 0,
00281 20 + strlen(rt->playpath));
00282 pkt.extra = rt->main_channel_id;
00283
00284 p = pkt.data;
00285 ff_amf_write_string(&p, "play");
00286 ff_amf_write_number(&p, ++rt->nb_invokes);
00287 ff_amf_write_null(&p);
00288 ff_amf_write_string(&p, rt->playpath);
00289
00290 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00291 ff_rtmp_packet_destroy(&pkt);
00292
00293
00294 ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, 1, 10);
00295
00296 p = pkt.data;
00297 bytestream_put_be16(&p, 3);
00298 bytestream_put_be32(&p, 1);
00299 bytestream_put_be32(&p, 256);
00300
00301 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00302 ff_rtmp_packet_destroy(&pkt);
00303 }
00304
00308 static void gen_publish(URLContext *s, RTMPContext *rt)
00309 {
00310 RTMPPacket pkt;
00311 uint8_t *p;
00312
00313 av_log(s, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath);
00314 ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE, 0,
00315 30 + strlen(rt->playpath));
00316 pkt.extra = rt->main_channel_id;
00317
00318 p = pkt.data;
00319 ff_amf_write_string(&p, "publish");
00320 ff_amf_write_number(&p, ++rt->nb_invokes);
00321 ff_amf_write_null(&p);
00322 ff_amf_write_string(&p, rt->playpath);
00323 ff_amf_write_string(&p, "live");
00324
00325 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00326 ff_rtmp_packet_destroy(&pkt);
00327 }
00328
00332 static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
00333 {
00334 RTMPPacket pkt;
00335 uint8_t *p;
00336
00337 ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, ppkt->timestamp + 1, 6);
00338 p = pkt.data;
00339 bytestream_put_be16(&p, 7);
00340 bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
00341 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00342 ff_rtmp_packet_destroy(&pkt);
00343 }
00344
00348 static void gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
00349 {
00350 RTMPPacket pkt;
00351 uint8_t *p;
00352
00353 ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_BYTES_READ, ts, 4);
00354 p = pkt.data;
00355 bytestream_put_be32(&p, rt->bytes_read);
00356 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00357 ff_rtmp_packet_destroy(&pkt);
00358 }
00359
00360
00361 #define HMAC_IPAD_VAL 0x36
00362 #define HMAC_OPAD_VAL 0x5C
00363
00375 static void rtmp_calc_digest(const uint8_t *src, int len, int gap,
00376 const uint8_t *key, int keylen, uint8_t *dst)
00377 {
00378 struct AVSHA *sha;
00379 uint8_t hmac_buf[64+32] = {0};
00380 int i;
00381
00382 sha = av_mallocz(av_sha_size);
00383
00384 if (keylen < 64) {
00385 memcpy(hmac_buf, key, keylen);
00386 } else {
00387 av_sha_init(sha, 256);
00388 av_sha_update(sha,key, keylen);
00389 av_sha_final(sha, hmac_buf);
00390 }
00391 for (i = 0; i < 64; i++)
00392 hmac_buf[i] ^= HMAC_IPAD_VAL;
00393
00394 av_sha_init(sha, 256);
00395 av_sha_update(sha, hmac_buf, 64);
00396 if (gap <= 0) {
00397 av_sha_update(sha, src, len);
00398 } else {
00399 av_sha_update(sha, src, gap);
00400 av_sha_update(sha, src + gap + 32, len - gap - 32);
00401 }
00402 av_sha_final(sha, hmac_buf + 64);
00403
00404 for (i = 0; i < 64; i++)
00405 hmac_buf[i] ^= HMAC_IPAD_VAL ^ HMAC_OPAD_VAL;
00406 av_sha_init(sha, 256);
00407 av_sha_update(sha, hmac_buf, 64+32);
00408 av_sha_final(sha, dst);
00409
00410 av_free(sha);
00411 }
00412
00420 static int rtmp_handshake_imprint_with_digest(uint8_t *buf)
00421 {
00422 int i, digest_pos = 0;
00423
00424 for (i = 8; i < 12; i++)
00425 digest_pos += buf[i];
00426 digest_pos = (digest_pos % 728) + 12;
00427
00428 rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
00429 rtmp_player_key, PLAYER_KEY_OPEN_PART_LEN,
00430 buf + digest_pos);
00431 return digest_pos;
00432 }
00433
00441 static int rtmp_validate_digest(uint8_t *buf, int off)
00442 {
00443 int i, digest_pos = 0;
00444 uint8_t digest[32];
00445
00446 for (i = 0; i < 4; i++)
00447 digest_pos += buf[i + off];
00448 digest_pos = (digest_pos % 728) + off + 4;
00449
00450 rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
00451 rtmp_server_key, SERVER_KEY_OPEN_PART_LEN,
00452 digest);
00453 if (!memcmp(digest, buf + digest_pos, 32))
00454 return digest_pos;
00455 return 0;
00456 }
00457
00464 static int rtmp_handshake(URLContext *s, RTMPContext *rt)
00465 {
00466 AVLFG rnd;
00467 uint8_t tosend [RTMP_HANDSHAKE_PACKET_SIZE+1] = {
00468 3,
00469 0, 0, 0, 0,
00470 RTMP_CLIENT_VER1,
00471 RTMP_CLIENT_VER2,
00472 RTMP_CLIENT_VER3,
00473 RTMP_CLIENT_VER4,
00474 };
00475 uint8_t clientdata[RTMP_HANDSHAKE_PACKET_SIZE];
00476 uint8_t serverdata[RTMP_HANDSHAKE_PACKET_SIZE+1];
00477 int i;
00478 int server_pos, client_pos;
00479 uint8_t digest[32];
00480
00481 av_log(s, AV_LOG_DEBUG, "Handshaking...\n");
00482
00483 av_lfg_init(&rnd, 0xDEADC0DE);
00484
00485 for (i = 9; i <= RTMP_HANDSHAKE_PACKET_SIZE; i++)
00486 tosend[i] = av_lfg_get(&rnd) >> 24;
00487 client_pos = rtmp_handshake_imprint_with_digest(tosend + 1);
00488
00489 ffurl_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE + 1);
00490 i = ffurl_read_complete(rt->stream, serverdata, RTMP_HANDSHAKE_PACKET_SIZE + 1);
00491 if (i != RTMP_HANDSHAKE_PACKET_SIZE + 1) {
00492 av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
00493 return -1;
00494 }
00495 i = ffurl_read_complete(rt->stream, clientdata, RTMP_HANDSHAKE_PACKET_SIZE);
00496 if (i != RTMP_HANDSHAKE_PACKET_SIZE) {
00497 av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
00498 return -1;
00499 }
00500
00501 av_log(s, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n",
00502 serverdata[5], serverdata[6], serverdata[7], serverdata[8]);
00503
00504 if (rt->is_input && serverdata[5] >= 3) {
00505 server_pos = rtmp_validate_digest(serverdata + 1, 772);
00506 if (!server_pos) {
00507 server_pos = rtmp_validate_digest(serverdata + 1, 8);
00508 if (!server_pos) {
00509 av_log(s, AV_LOG_ERROR, "Server response validating failed\n");
00510 return -1;
00511 }
00512 }
00513
00514 rtmp_calc_digest(tosend + 1 + client_pos, 32, 0,
00515 rtmp_server_key, sizeof(rtmp_server_key),
00516 digest);
00517 rtmp_calc_digest(clientdata, RTMP_HANDSHAKE_PACKET_SIZE-32, 0,
00518 digest, 32,
00519 digest);
00520 if (memcmp(digest, clientdata + RTMP_HANDSHAKE_PACKET_SIZE - 32, 32)) {
00521 av_log(s, AV_LOG_ERROR, "Signature mismatch\n");
00522 return -1;
00523 }
00524
00525 for (i = 0; i < RTMP_HANDSHAKE_PACKET_SIZE; i++)
00526 tosend[i] = av_lfg_get(&rnd) >> 24;
00527 rtmp_calc_digest(serverdata + 1 + server_pos, 32, 0,
00528 rtmp_player_key, sizeof(rtmp_player_key),
00529 digest);
00530 rtmp_calc_digest(tosend, RTMP_HANDSHAKE_PACKET_SIZE - 32, 0,
00531 digest, 32,
00532 tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32);
00533
00534
00535 ffurl_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE);
00536 } else {
00537 ffurl_write(rt->stream, serverdata+1, RTMP_HANDSHAKE_PACKET_SIZE);
00538 }
00539
00540 return 0;
00541 }
00542
00549 static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
00550 {
00551 int i, t;
00552 const uint8_t *data_end = pkt->data + pkt->size;
00553
00554 #ifdef DEBUG
00555 ff_rtmp_packet_dump(s, pkt);
00556 #endif
00557
00558 switch (pkt->type) {
00559 case RTMP_PT_CHUNK_SIZE:
00560 if (pkt->size != 4) {
00561 av_log(s, AV_LOG_ERROR,
00562 "Chunk size change packet is not 4 bytes long (%d)\n", pkt->size);
00563 return -1;
00564 }
00565 if (!rt->is_input)
00566 ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, rt->prev_pkt[1]);
00567 rt->chunk_size = AV_RB32(pkt->data);
00568 if (rt->chunk_size <= 0) {
00569 av_log(s, AV_LOG_ERROR, "Incorrect chunk size %d\n", rt->chunk_size);
00570 return -1;
00571 }
00572 av_log(s, AV_LOG_DEBUG, "New chunk size = %d\n", rt->chunk_size);
00573 break;
00574 case RTMP_PT_PING:
00575 t = AV_RB16(pkt->data);
00576 if (t == 6)
00577 gen_pong(s, rt, pkt);
00578 break;
00579 case RTMP_PT_CLIENT_BW:
00580 if (pkt->size < 4) {
00581 av_log(s, AV_LOG_ERROR,
00582 "Client bandwidth report packet is less than 4 bytes long (%d)\n",
00583 pkt->size);
00584 return -1;
00585 }
00586 av_log(s, AV_LOG_DEBUG, "Client bandwidth = %d\n", AV_RB32(pkt->data));
00587 rt->client_report_size = AV_RB32(pkt->data) >> 1;
00588 break;
00589 case RTMP_PT_INVOKE:
00590
00591 if (ff_amf_match_string(pkt->data, pkt->size, "_error")) {
00592 uint8_t tmpstr[256];
00593
00594 if (!ff_amf_get_field_value(pkt->data + 9, data_end,
00595 "description", tmpstr, sizeof(tmpstr)))
00596 av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
00597 return -1;
00598 } else if (ff_amf_match_string(pkt->data, pkt->size, "_result")) {
00599 switch (rt->state) {
00600 case STATE_HANDSHAKED:
00601 if (!rt->is_input) {
00602 gen_release_stream(s, rt);
00603 gen_fcpublish_stream(s, rt);
00604 rt->state = STATE_RELEASING;
00605 } else {
00606 rt->state = STATE_CONNECTING;
00607 }
00608 gen_create_stream(s, rt);
00609 break;
00610 case STATE_FCPUBLISH:
00611 rt->state = STATE_CONNECTING;
00612 break;
00613 case STATE_RELEASING:
00614 rt->state = STATE_FCPUBLISH;
00615
00616
00617 if (!pkt->data[10]) {
00618 int pkt_id = av_int2double(AV_RB64(pkt->data + 11));
00619 if (pkt_id == rt->create_stream_invoke)
00620 rt->state = STATE_CONNECTING;
00621 }
00622 if (rt->state != STATE_CONNECTING)
00623 break;
00624 case STATE_CONNECTING:
00625
00626 if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
00627 av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n");
00628 } else {
00629 rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
00630 }
00631 if (rt->is_input) {
00632 gen_play(s, rt);
00633 } else {
00634 gen_publish(s, rt);
00635 }
00636 rt->state = STATE_READY;
00637 break;
00638 }
00639 } else if (ff_amf_match_string(pkt->data, pkt->size, "onStatus")) {
00640 const uint8_t* ptr = pkt->data + 11;
00641 uint8_t tmpstr[256];
00642
00643 for (i = 0; i < 2; i++) {
00644 t = ff_amf_tag_size(ptr, data_end);
00645 if (t < 0)
00646 return 1;
00647 ptr += t;
00648 }
00649 t = ff_amf_get_field_value(ptr, data_end,
00650 "level", tmpstr, sizeof(tmpstr));
00651 if (!t && !strcmp(tmpstr, "error")) {
00652 if (!ff_amf_get_field_value(ptr, data_end,
00653 "description", tmpstr, sizeof(tmpstr)))
00654 av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
00655 return -1;
00656 }
00657 t = ff_amf_get_field_value(ptr, data_end,
00658 "code", tmpstr, sizeof(tmpstr));
00659 if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) rt->state = STATE_PLAYING;
00660 if (!t && !strcmp(tmpstr, "NetStream.Play.Stop")) rt->state = STATE_STOPPED;
00661 if (!t && !strcmp(tmpstr, "NetStream.Play.UnpublishNotify")) rt->state = STATE_STOPPED;
00662 if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING;
00663 }
00664 break;
00665 }
00666 return 0;
00667 }
00668
00680 static int get_packet(URLContext *s, int for_header)
00681 {
00682 RTMPContext *rt = s->priv_data;
00683 int ret;
00684 uint8_t *p;
00685 const uint8_t *next;
00686 uint32_t size;
00687 uint32_t ts, cts, pts=0;
00688
00689 if (rt->state == STATE_STOPPED)
00690 return AVERROR_EOF;
00691
00692 for (;;) {
00693 RTMPPacket rpkt = { 0 };
00694 if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
00695 rt->chunk_size, rt->prev_pkt[0])) <= 0) {
00696 if (ret == 0) {
00697 return AVERROR(EAGAIN);
00698 } else {
00699 return AVERROR(EIO);
00700 }
00701 }
00702 rt->bytes_read += ret;
00703 if (rt->bytes_read > rt->last_bytes_read + rt->client_report_size) {
00704 av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n");
00705 gen_bytes_read(s, rt, rpkt.timestamp + 1);
00706 rt->last_bytes_read = rt->bytes_read;
00707 }
00708
00709 ret = rtmp_parse_result(s, rt, &rpkt);
00710 if (ret < 0) {
00711 ff_rtmp_packet_destroy(&rpkt);
00712 return -1;
00713 }
00714 if (rt->state == STATE_STOPPED) {
00715 ff_rtmp_packet_destroy(&rpkt);
00716 return AVERROR_EOF;
00717 }
00718 if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING)) {
00719 ff_rtmp_packet_destroy(&rpkt);
00720 return 0;
00721 }
00722 if (!rpkt.size || !rt->is_input) {
00723 ff_rtmp_packet_destroy(&rpkt);
00724 continue;
00725 }
00726 if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO ||
00727 (rpkt.type == RTMP_PT_NOTIFY &&
00728 ff_amf_match_string(rpkt.data, rpkt.size, "onMetaData"))) {
00729 ts = rpkt.timestamp;
00730
00731
00732 rt->flv_off = 0;
00733 rt->flv_size = rpkt.size + 15;
00734 rt->flv_data = p = av_realloc(rt->flv_data, rt->flv_size);
00735 bytestream_put_byte(&p, rpkt.type);
00736 bytestream_put_be24(&p, rpkt.size);
00737 bytestream_put_be24(&p, ts);
00738 bytestream_put_byte(&p, ts >> 24);
00739 bytestream_put_be24(&p, 0);
00740 bytestream_put_buffer(&p, rpkt.data, rpkt.size);
00741 bytestream_put_be32(&p, 0);
00742 ff_rtmp_packet_destroy(&rpkt);
00743 return 0;
00744 } else if (rpkt.type == RTMP_PT_METADATA) {
00745
00746 rt->flv_off = 0;
00747 rt->flv_size = rpkt.size;
00748 rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
00749
00750 next = rpkt.data;
00751 ts = rpkt.timestamp;
00752 while (next - rpkt.data < rpkt.size - 11) {
00753 next++;
00754 size = bytestream_get_be24(&next);
00755 p=next;
00756 cts = bytestream_get_be24(&next);
00757 cts |= bytestream_get_byte(&next) << 24;
00758 if (pts==0)
00759 pts=cts;
00760 ts += cts - pts;
00761 pts = cts;
00762 bytestream_put_be24(&p, ts);
00763 bytestream_put_byte(&p, ts >> 24);
00764 next += size + 3 + 4;
00765 }
00766 memcpy(rt->flv_data, rpkt.data, rpkt.size);
00767 ff_rtmp_packet_destroy(&rpkt);
00768 return 0;
00769 }
00770 ff_rtmp_packet_destroy(&rpkt);
00771 }
00772 }
00773
00774 static int rtmp_close(URLContext *h)
00775 {
00776 RTMPContext *rt = h->priv_data;
00777
00778 if (!rt->is_input) {
00779 rt->flv_data = NULL;
00780 if (rt->out_pkt.size)
00781 ff_rtmp_packet_destroy(&rt->out_pkt);
00782 if (rt->state > STATE_FCPUBLISH)
00783 gen_fcunpublish_stream(h, rt);
00784 }
00785 if (rt->state > STATE_HANDSHAKED)
00786 gen_delete_stream(h, rt);
00787
00788 av_freep(&rt->flv_data);
00789 ffurl_close(rt->stream);
00790 return 0;
00791 }
00792
00802 static int rtmp_open(URLContext *s, const char *uri, int flags)
00803 {
00804 RTMPContext *rt = s->priv_data;
00805 char proto[8], hostname[256], path[1024], *fname;
00806 uint8_t buf[2048];
00807 int port;
00808 int ret;
00809
00810 rt->is_input = !(flags & AVIO_FLAG_WRITE);
00811
00812 av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname), &port,
00813 path, sizeof(path), s->filename);
00814
00815 if (port < 0)
00816 port = RTMP_DEFAULT_PORT;
00817 ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
00818
00819 if (ffurl_open(&rt->stream, buf, AVIO_FLAG_READ_WRITE,
00820 &s->interrupt_callback, NULL) < 0) {
00821 av_log(s , AV_LOG_ERROR, "Cannot open connection %s\n", buf);
00822 goto fail;
00823 }
00824
00825 rt->state = STATE_START;
00826 if (rtmp_handshake(s, rt))
00827 goto fail;
00828
00829 rt->chunk_size = 128;
00830 rt->state = STATE_HANDSHAKED;
00831
00832 if (!strncmp(path, "/ondemand/", 10)) {
00833 fname = path + 10;
00834 memcpy(rt->app, "ondemand", 9);
00835 } else {
00836 char *p = strchr(path + 1, '/');
00837 if (!p) {
00838 fname = path + 1;
00839 rt->app[0] = '\0';
00840 } else {
00841 char *c = strchr(p + 1, ':');
00842 fname = strchr(p + 1, '/');
00843 if (!fname || c < fname) {
00844 fname = p + 1;
00845 av_strlcpy(rt->app, path + 1, p - path);
00846 } else {
00847 fname++;
00848 av_strlcpy(rt->app, path + 1, fname - path - 1);
00849 }
00850 }
00851 }
00852 if (!strchr(fname, ':') &&
00853 (!strcmp(fname + strlen(fname) - 4, ".f4v") ||
00854 !strcmp(fname + strlen(fname) - 4, ".mp4"))) {
00855 memcpy(rt->playpath, "mp4:", 5);
00856 } else {
00857 rt->playpath[0] = 0;
00858 }
00859 strncat(rt->playpath, fname, sizeof(rt->playpath) - 5);
00860
00861 rt->client_report_size = 1048576;
00862 rt->bytes_read = 0;
00863 rt->last_bytes_read = 0;
00864
00865 av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
00866 proto, path, rt->app, rt->playpath);
00867 gen_connect(s, rt, proto, hostname, port);
00868
00869 do {
00870 ret = get_packet(s, 1);
00871 } while (ret == EAGAIN);
00872 if (ret < 0)
00873 goto fail;
00874
00875 if (rt->is_input) {
00876
00877 rt->flv_size = 13;
00878 rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
00879 rt->flv_off = 0;
00880 memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
00881 } else {
00882 rt->flv_size = 0;
00883 rt->flv_data = NULL;
00884 rt->flv_off = 0;
00885 rt->skip_bytes = 13;
00886 }
00887
00888 s->max_packet_size = rt->stream->max_packet_size;
00889 s->is_streamed = 1;
00890 return 0;
00891
00892 fail:
00893 rtmp_close(s);
00894 return AVERROR(EIO);
00895 }
00896
00897 static int rtmp_read(URLContext *s, uint8_t *buf, int size)
00898 {
00899 RTMPContext *rt = s->priv_data;
00900 int orig_size = size;
00901 int ret;
00902
00903 while (size > 0) {
00904 int data_left = rt->flv_size - rt->flv_off;
00905
00906 if (data_left >= size) {
00907 memcpy(buf, rt->flv_data + rt->flv_off, size);
00908 rt->flv_off += size;
00909 return orig_size;
00910 }
00911 if (data_left > 0) {
00912 memcpy(buf, rt->flv_data + rt->flv_off, data_left);
00913 buf += data_left;
00914 size -= data_left;
00915 rt->flv_off = rt->flv_size;
00916 return data_left;
00917 }
00918 if ((ret = get_packet(s, 0)) < 0)
00919 return ret;
00920 }
00921 return orig_size;
00922 }
00923
00924 static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
00925 {
00926 RTMPContext *rt = s->priv_data;
00927 int size_temp = size;
00928 int pktsize, pkttype;
00929 uint32_t ts;
00930 const uint8_t *buf_temp = buf;
00931
00932 do {
00933 if (rt->skip_bytes) {
00934 int skip = FFMIN(rt->skip_bytes, size_temp);
00935 buf_temp += skip;
00936 size_temp -= skip;
00937 rt->skip_bytes -= skip;
00938 continue;
00939 }
00940
00941 if (rt->flv_header_bytes < 11) {
00942 const uint8_t *header = rt->flv_header;
00943 int copy = FFMIN(11 - rt->flv_header_bytes, size_temp);
00944 bytestream_get_buffer(&buf_temp, rt->flv_header + rt->flv_header_bytes, copy);
00945 rt->flv_header_bytes += copy;
00946 size_temp -= copy;
00947 if (rt->flv_header_bytes < 11)
00948 break;
00949
00950 pkttype = bytestream_get_byte(&header);
00951 pktsize = bytestream_get_be24(&header);
00952 ts = bytestream_get_be24(&header);
00953 ts |= bytestream_get_byte(&header) << 24;
00954 bytestream_get_be24(&header);
00955 rt->flv_size = pktsize;
00956
00957
00958 if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
00959 pkttype == RTMP_PT_NOTIFY) {
00960 if (pkttype == RTMP_PT_NOTIFY)
00961 pktsize += 16;
00962 rt->prev_pkt[1][RTMP_SOURCE_CHANNEL].channel_id = 0;
00963 }
00964
00965
00966 ff_rtmp_packet_create(&rt->out_pkt, RTMP_SOURCE_CHANNEL, pkttype, ts, pktsize);
00967 rt->out_pkt.extra = rt->main_channel_id;
00968 rt->flv_data = rt->out_pkt.data;
00969
00970 if (pkttype == RTMP_PT_NOTIFY)
00971 ff_amf_write_string(&rt->flv_data, "@setDataFrame");
00972 }
00973
00974 if (rt->flv_size - rt->flv_off > size_temp) {
00975 bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, size_temp);
00976 rt->flv_off += size_temp;
00977 size_temp = 0;
00978 } else {
00979 bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, rt->flv_size - rt->flv_off);
00980 size_temp -= rt->flv_size - rt->flv_off;
00981 rt->flv_off += rt->flv_size - rt->flv_off;
00982 }
00983
00984 if (rt->flv_off == rt->flv_size) {
00985 rt->skip_bytes = 4;
00986
00987 ff_rtmp_packet_write(rt->stream, &rt->out_pkt, rt->chunk_size, rt->prev_pkt[1]);
00988 ff_rtmp_packet_destroy(&rt->out_pkt);
00989 rt->flv_size = 0;
00990 rt->flv_off = 0;
00991 rt->flv_header_bytes = 0;
00992 }
00993 } while (buf_temp - buf < size);
00994 return size;
00995 }
00996
00997 URLProtocol ff_rtmp_protocol = {
00998 .name = "rtmp",
00999 .url_open = rtmp_open,
01000 .url_read = rtmp_read,
01001 .url_write = rtmp_write,
01002 .url_close = rtmp_close,
01003 .priv_data_size = sizeof(RTMPContext),
01004 .flags = URL_PROTOCOL_FLAG_NETWORK,
01005 };