libavformat/rtmpproto.c
Go to the documentation of this file.
00001 /*
00002  * RTMP network protocol
00003  * Copyright (c) 2009 Kostya Shishkov
00004  *
00005  * This file is part of Libav.
00006  *
00007  * Libav is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  *
00012  * Libav is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with Libav; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
00020  */
00021 
00027 #include "libavcodec/bytestream.h"
00028 #include "libavutil/avstring.h"
00029 #include "libavutil/intfloat.h"
00030 #include "libavutil/lfg.h"
00031 #include "libavutil/sha.h"
00032 #include "avformat.h"
00033 #include "internal.h"
00034 
00035 #include "network.h"
00036 
00037 #include "flv.h"
00038 #include "rtmp.h"
00039 #include "rtmppkt.h"
00040 #include "url.h"
00041 
00042 //#define DEBUG
00043 
00045 typedef enum {
00046     STATE_START,      
00047     STATE_HANDSHAKED, 
00048     STATE_RELEASING,  
00049     STATE_FCPUBLISH,  
00050     STATE_CONNECTING, 
00051     STATE_READY,      
00052     STATE_PLAYING,    
00053     STATE_PUBLISHING, 
00054     STATE_STOPPED,    
00055 } ClientState;
00056 
00058 typedef struct RTMPContext {
00059     URLContext*   stream;                     
00060     RTMPPacket    prev_pkt[2][RTMP_CHANNELS]; 
00061     int           chunk_size;                 
00062     int           is_input;                   
00063     char          playpath[256];              
00064     char          app[128];                   
00065     ClientState   state;                      
00066     int           main_channel_id;            
00067     uint8_t*      flv_data;                   
00068     int           flv_size;                   
00069     int           flv_off;                    
00070     RTMPPacket    out_pkt;                    
00071     uint32_t      client_report_size;         
00072     uint32_t      bytes_read;                 
00073     uint32_t      last_bytes_read;            
00074     int           skip_bytes;                 
00075     uint8_t       flv_header[11];             
00076     int           flv_header_bytes;           
00077     int           nb_invokes;                 
00078     int           create_stream_invoke;       
00079 } RTMPContext;
00080 
00081 #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used for first client digest signing
00082 
00083 static const uint8_t rtmp_player_key[] = {
00084     'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
00085     'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ', '0', '0', '1',
00086 
00087     0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
00088     0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
00089     0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
00090 };
00091 
00092 #define SERVER_KEY_OPEN_PART_LEN 36   ///< length of partial key used for first server digest signing
00093 
00094 static const uint8_t rtmp_server_key[] = {
00095     'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
00096     'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ',
00097     'S', 'e', 'r', 'v', 'e', 'r', ' ', '0', '0', '1',
00098 
00099     0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
00100     0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
00101     0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
00102 };
00103 
00107 static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto,
00108                         const char *host, int port)
00109 {
00110     RTMPPacket pkt;
00111     uint8_t ver[64], *p;
00112     char tcurl[512];
00113 
00114     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 4096);
00115     p = pkt.data;
00116 
00117     ff_url_join(tcurl, sizeof(tcurl), proto, NULL, host, port, "/%s", rt->app);
00118     ff_amf_write_string(&p, "connect");
00119     ff_amf_write_number(&p, ++rt->nb_invokes);
00120     ff_amf_write_object_start(&p);
00121     ff_amf_write_field_name(&p, "app");
00122     ff_amf_write_string(&p, rt->app);
00123 
00124     if (rt->is_input) {
00125         snprintf(ver, sizeof(ver), "%s %d,%d,%d,%d", RTMP_CLIENT_PLATFORM, RTMP_CLIENT_VER1,
00126                  RTMP_CLIENT_VER2, RTMP_CLIENT_VER3, RTMP_CLIENT_VER4);
00127     } else {
00128         snprintf(ver, sizeof(ver), "FMLE/3.0 (compatible; %s)", LIBAVFORMAT_IDENT);
00129         ff_amf_write_field_name(&p, "type");
00130         ff_amf_write_string(&p, "nonprivate");
00131     }
00132     ff_amf_write_field_name(&p, "flashVer");
00133     ff_amf_write_string(&p, ver);
00134     ff_amf_write_field_name(&p, "tcUrl");
00135     ff_amf_write_string(&p, tcurl);
00136     if (rt->is_input) {
00137         ff_amf_write_field_name(&p, "fpad");
00138         ff_amf_write_bool(&p, 0);
00139         ff_amf_write_field_name(&p, "capabilities");
00140         ff_amf_write_number(&p, 15.0);
00141         ff_amf_write_field_name(&p, "audioCodecs");
00142         ff_amf_write_number(&p, 1639.0);
00143         ff_amf_write_field_name(&p, "videoCodecs");
00144         ff_amf_write_number(&p, 252.0);
00145         ff_amf_write_field_name(&p, "videoFunction");
00146         ff_amf_write_number(&p, 1.0);
00147     }
00148     ff_amf_write_object_end(&p);
00149 
00150     pkt.size = p - pkt.data;
00151 
00152     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00153     ff_rtmp_packet_destroy(&pkt);
00154 }
00155 
00160 static void gen_release_stream(URLContext *s, RTMPContext *rt)
00161 {
00162     RTMPPacket pkt;
00163     uint8_t *p;
00164 
00165     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00166                           29 + strlen(rt->playpath));
00167 
00168     av_log(s, AV_LOG_DEBUG, "Releasing stream...\n");
00169     p = pkt.data;
00170     ff_amf_write_string(&p, "releaseStream");
00171     ff_amf_write_number(&p, ++rt->nb_invokes);
00172     ff_amf_write_null(&p);
00173     ff_amf_write_string(&p, rt->playpath);
00174 
00175     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00176     ff_rtmp_packet_destroy(&pkt);
00177 }
00178 
00183 static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
00184 {
00185     RTMPPacket pkt;
00186     uint8_t *p;
00187 
00188     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00189                           25 + strlen(rt->playpath));
00190 
00191     av_log(s, AV_LOG_DEBUG, "FCPublish stream...\n");
00192     p = pkt.data;
00193     ff_amf_write_string(&p, "FCPublish");
00194     ff_amf_write_number(&p, ++rt->nb_invokes);
00195     ff_amf_write_null(&p);
00196     ff_amf_write_string(&p, rt->playpath);
00197 
00198     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00199     ff_rtmp_packet_destroy(&pkt);
00200 }
00201 
00206 static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
00207 {
00208     RTMPPacket pkt;
00209     uint8_t *p;
00210 
00211     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00212                           27 + strlen(rt->playpath));
00213 
00214     av_log(s, AV_LOG_DEBUG, "UnPublishing stream...\n");
00215     p = pkt.data;
00216     ff_amf_write_string(&p, "FCUnpublish");
00217     ff_amf_write_number(&p, ++rt->nb_invokes);
00218     ff_amf_write_null(&p);
00219     ff_amf_write_string(&p, rt->playpath);
00220 
00221     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00222     ff_rtmp_packet_destroy(&pkt);
00223 }
00224 
00229 static void gen_create_stream(URLContext *s, RTMPContext *rt)
00230 {
00231     RTMPPacket pkt;
00232     uint8_t *p;
00233 
00234     av_log(s, AV_LOG_DEBUG, "Creating stream...\n");
00235     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 25);
00236 
00237     p = pkt.data;
00238     ff_amf_write_string(&p, "createStream");
00239     ff_amf_write_number(&p, ++rt->nb_invokes);
00240     ff_amf_write_null(&p);
00241     rt->create_stream_invoke = rt->nb_invokes;
00242 
00243     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00244     ff_rtmp_packet_destroy(&pkt);
00245 }
00246 
00247 
00252 static void gen_delete_stream(URLContext *s, RTMPContext *rt)
00253 {
00254     RTMPPacket pkt;
00255     uint8_t *p;
00256 
00257     av_log(s, AV_LOG_DEBUG, "Deleting stream...\n");
00258     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 34);
00259 
00260     p = pkt.data;
00261     ff_amf_write_string(&p, "deleteStream");
00262     ff_amf_write_number(&p, ++rt->nb_invokes);
00263     ff_amf_write_null(&p);
00264     ff_amf_write_number(&p, rt->main_channel_id);
00265 
00266     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00267     ff_rtmp_packet_destroy(&pkt);
00268 }
00269 
00274 static void gen_play(URLContext *s, RTMPContext *rt)
00275 {
00276     RTMPPacket pkt;
00277     uint8_t *p;
00278 
00279     av_log(s, AV_LOG_DEBUG, "Sending play command for '%s'\n", rt->playpath);
00280     ff_rtmp_packet_create(&pkt, RTMP_VIDEO_CHANNEL, RTMP_PT_INVOKE, 0,
00281                           20 + strlen(rt->playpath));
00282     pkt.extra = rt->main_channel_id;
00283 
00284     p = pkt.data;
00285     ff_amf_write_string(&p, "play");
00286     ff_amf_write_number(&p, ++rt->nb_invokes);
00287     ff_amf_write_null(&p);
00288     ff_amf_write_string(&p, rt->playpath);
00289 
00290     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00291     ff_rtmp_packet_destroy(&pkt);
00292 
00293     // set client buffer time disguised in ping packet
00294     ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, 1, 10);
00295 
00296     p = pkt.data;
00297     bytestream_put_be16(&p, 3);
00298     bytestream_put_be32(&p, 1);
00299     bytestream_put_be32(&p, 256); //TODO: what is a good value here?
00300 
00301     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00302     ff_rtmp_packet_destroy(&pkt);
00303 }
00304 
00308 static void gen_publish(URLContext *s, RTMPContext *rt)
00309 {
00310     RTMPPacket pkt;
00311     uint8_t *p;
00312 
00313     av_log(s, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath);
00314     ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE, 0,
00315                           30 + strlen(rt->playpath));
00316     pkt.extra = rt->main_channel_id;
00317 
00318     p = pkt.data;
00319     ff_amf_write_string(&p, "publish");
00320     ff_amf_write_number(&p, ++rt->nb_invokes);
00321     ff_amf_write_null(&p);
00322     ff_amf_write_string(&p, rt->playpath);
00323     ff_amf_write_string(&p, "live");
00324 
00325     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00326     ff_rtmp_packet_destroy(&pkt);
00327 }
00328 
00332 static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
00333 {
00334     RTMPPacket pkt;
00335     uint8_t *p;
00336 
00337     ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, ppkt->timestamp + 1, 6);
00338     p = pkt.data;
00339     bytestream_put_be16(&p, 7);
00340     bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
00341     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00342     ff_rtmp_packet_destroy(&pkt);
00343 }
00344 
00348 static void gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
00349 {
00350     RTMPPacket pkt;
00351     uint8_t *p;
00352 
00353     ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_BYTES_READ, ts, 4);
00354     p = pkt.data;
00355     bytestream_put_be32(&p, rt->bytes_read);
00356     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00357     ff_rtmp_packet_destroy(&pkt);
00358 }
00359 
00360 //TODO: Move HMAC code somewhere. Eventually.
00361 #define HMAC_IPAD_VAL 0x36
00362 #define HMAC_OPAD_VAL 0x5C
00363 
00375 static void rtmp_calc_digest(const uint8_t *src, int len, int gap,
00376                              const uint8_t *key, int keylen, uint8_t *dst)
00377 {
00378     struct AVSHA *sha;
00379     uint8_t hmac_buf[64+32] = {0};
00380     int i;
00381 
00382     sha = av_mallocz(av_sha_size);
00383 
00384     if (keylen < 64) {
00385         memcpy(hmac_buf, key, keylen);
00386     } else {
00387         av_sha_init(sha, 256);
00388         av_sha_update(sha,key, keylen);
00389         av_sha_final(sha, hmac_buf);
00390     }
00391     for (i = 0; i < 64; i++)
00392         hmac_buf[i] ^= HMAC_IPAD_VAL;
00393 
00394     av_sha_init(sha, 256);
00395     av_sha_update(sha, hmac_buf, 64);
00396     if (gap <= 0) {
00397         av_sha_update(sha, src, len);
00398     } else { //skip 32 bytes used for storing digest
00399         av_sha_update(sha, src, gap);
00400         av_sha_update(sha, src + gap + 32, len - gap - 32);
00401     }
00402     av_sha_final(sha, hmac_buf + 64);
00403 
00404     for (i = 0; i < 64; i++)
00405         hmac_buf[i] ^= HMAC_IPAD_VAL ^ HMAC_OPAD_VAL; //reuse XORed key for opad
00406     av_sha_init(sha, 256);
00407     av_sha_update(sha, hmac_buf, 64+32);
00408     av_sha_final(sha, dst);
00409 
00410     av_free(sha);
00411 }
00412 
00420 static int rtmp_handshake_imprint_with_digest(uint8_t *buf)
00421 {
00422     int i, digest_pos = 0;
00423 
00424     for (i = 8; i < 12; i++)
00425         digest_pos += buf[i];
00426     digest_pos = (digest_pos % 728) + 12;
00427 
00428     rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
00429                      rtmp_player_key, PLAYER_KEY_OPEN_PART_LEN,
00430                      buf + digest_pos);
00431     return digest_pos;
00432 }
00433 
00441 static int rtmp_validate_digest(uint8_t *buf, int off)
00442 {
00443     int i, digest_pos = 0;
00444     uint8_t digest[32];
00445 
00446     for (i = 0; i < 4; i++)
00447         digest_pos += buf[i + off];
00448     digest_pos = (digest_pos % 728) + off + 4;
00449 
00450     rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
00451                      rtmp_server_key, SERVER_KEY_OPEN_PART_LEN,
00452                      digest);
00453     if (!memcmp(digest, buf + digest_pos, 32))
00454         return digest_pos;
00455     return 0;
00456 }
00457 
00464 static int rtmp_handshake(URLContext *s, RTMPContext *rt)
00465 {
00466     AVLFG rnd;
00467     uint8_t tosend    [RTMP_HANDSHAKE_PACKET_SIZE+1] = {
00468         3,                // unencrypted data
00469         0, 0, 0, 0,       // client uptime
00470         RTMP_CLIENT_VER1,
00471         RTMP_CLIENT_VER2,
00472         RTMP_CLIENT_VER3,
00473         RTMP_CLIENT_VER4,
00474     };
00475     uint8_t clientdata[RTMP_HANDSHAKE_PACKET_SIZE];
00476     uint8_t serverdata[RTMP_HANDSHAKE_PACKET_SIZE+1];
00477     int i;
00478     int server_pos, client_pos;
00479     uint8_t digest[32];
00480 
00481     av_log(s, AV_LOG_DEBUG, "Handshaking...\n");
00482 
00483     av_lfg_init(&rnd, 0xDEADC0DE);
00484     // generate handshake packet - 1536 bytes of pseudorandom data
00485     for (i = 9; i <= RTMP_HANDSHAKE_PACKET_SIZE; i++)
00486         tosend[i] = av_lfg_get(&rnd) >> 24;
00487     client_pos = rtmp_handshake_imprint_with_digest(tosend + 1);
00488 
00489     ffurl_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE + 1);
00490     i = ffurl_read_complete(rt->stream, serverdata, RTMP_HANDSHAKE_PACKET_SIZE + 1);
00491     if (i != RTMP_HANDSHAKE_PACKET_SIZE + 1) {
00492         av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
00493         return -1;
00494     }
00495     i = ffurl_read_complete(rt->stream, clientdata, RTMP_HANDSHAKE_PACKET_SIZE);
00496     if (i != RTMP_HANDSHAKE_PACKET_SIZE) {
00497         av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
00498         return -1;
00499     }
00500 
00501     av_log(s, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n",
00502            serverdata[5], serverdata[6], serverdata[7], serverdata[8]);
00503 
00504     if (rt->is_input && serverdata[5] >= 3) {
00505         server_pos = rtmp_validate_digest(serverdata + 1, 772);
00506         if (!server_pos) {
00507             server_pos = rtmp_validate_digest(serverdata + 1, 8);
00508             if (!server_pos) {
00509                 av_log(s, AV_LOG_ERROR, "Server response validating failed\n");
00510                 return -1;
00511             }
00512         }
00513 
00514         rtmp_calc_digest(tosend + 1 + client_pos, 32, 0,
00515                          rtmp_server_key, sizeof(rtmp_server_key),
00516                          digest);
00517         rtmp_calc_digest(clientdata, RTMP_HANDSHAKE_PACKET_SIZE-32, 0,
00518                          digest, 32,
00519                          digest);
00520         if (memcmp(digest, clientdata + RTMP_HANDSHAKE_PACKET_SIZE - 32, 32)) {
00521             av_log(s, AV_LOG_ERROR, "Signature mismatch\n");
00522             return -1;
00523         }
00524 
00525         for (i = 0; i < RTMP_HANDSHAKE_PACKET_SIZE; i++)
00526             tosend[i] = av_lfg_get(&rnd) >> 24;
00527         rtmp_calc_digest(serverdata + 1 + server_pos, 32, 0,
00528                          rtmp_player_key, sizeof(rtmp_player_key),
00529                          digest);
00530         rtmp_calc_digest(tosend,  RTMP_HANDSHAKE_PACKET_SIZE - 32, 0,
00531                          digest, 32,
00532                          tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32);
00533 
00534         // write reply back to the server
00535         ffurl_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE);
00536     } else {
00537         ffurl_write(rt->stream, serverdata+1, RTMP_HANDSHAKE_PACKET_SIZE);
00538     }
00539 
00540     return 0;
00541 }
00542 
00549 static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
00550 {
00551     int i, t;
00552     const uint8_t *data_end = pkt->data + pkt->size;
00553 
00554 #ifdef DEBUG
00555     ff_rtmp_packet_dump(s, pkt);
00556 #endif
00557 
00558     switch (pkt->type) {
00559     case RTMP_PT_CHUNK_SIZE:
00560         if (pkt->size != 4) {
00561             av_log(s, AV_LOG_ERROR,
00562                    "Chunk size change packet is not 4 bytes long (%d)\n", pkt->size);
00563             return -1;
00564         }
00565         if (!rt->is_input)
00566             ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, rt->prev_pkt[1]);
00567         rt->chunk_size = AV_RB32(pkt->data);
00568         if (rt->chunk_size <= 0) {
00569             av_log(s, AV_LOG_ERROR, "Incorrect chunk size %d\n", rt->chunk_size);
00570             return -1;
00571         }
00572         av_log(s, AV_LOG_DEBUG, "New chunk size = %d\n", rt->chunk_size);
00573         break;
00574     case RTMP_PT_PING:
00575         t = AV_RB16(pkt->data);
00576         if (t == 6)
00577             gen_pong(s, rt, pkt);
00578         break;
00579     case RTMP_PT_CLIENT_BW:
00580         if (pkt->size < 4) {
00581             av_log(s, AV_LOG_ERROR,
00582                    "Client bandwidth report packet is less than 4 bytes long (%d)\n",
00583                    pkt->size);
00584             return -1;
00585         }
00586         av_log(s, AV_LOG_DEBUG, "Client bandwidth = %d\n", AV_RB32(pkt->data));
00587         rt->client_report_size = AV_RB32(pkt->data) >> 1;
00588         break;
00589     case RTMP_PT_INVOKE:
00590         //TODO: check for the messages sent for wrong state?
00591         if (ff_amf_match_string(pkt->data, pkt->size, "_error")) {
00592             uint8_t tmpstr[256];
00593 
00594             if (!ff_amf_get_field_value(pkt->data + 9, data_end,
00595                                         "description", tmpstr, sizeof(tmpstr)))
00596                 av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
00597             return -1;
00598         } else if (ff_amf_match_string(pkt->data, pkt->size, "_result")) {
00599             switch (rt->state) {
00600             case STATE_HANDSHAKED:
00601                 if (!rt->is_input) {
00602                     gen_release_stream(s, rt);
00603                     gen_fcpublish_stream(s, rt);
00604                     rt->state = STATE_RELEASING;
00605                 } else {
00606                     rt->state = STATE_CONNECTING;
00607                 }
00608                 gen_create_stream(s, rt);
00609                 break;
00610             case STATE_FCPUBLISH:
00611                 rt->state = STATE_CONNECTING;
00612                 break;
00613             case STATE_RELEASING:
00614                 rt->state = STATE_FCPUBLISH;
00615                 /* hack for Wowza Media Server, it does not send result for
00616                  * releaseStream and FCPublish calls */
00617                 if (!pkt->data[10]) {
00618                     int pkt_id = av_int2double(AV_RB64(pkt->data + 11));
00619                     if (pkt_id == rt->create_stream_invoke)
00620                         rt->state = STATE_CONNECTING;
00621                 }
00622                 if (rt->state != STATE_CONNECTING)
00623                     break;
00624             case STATE_CONNECTING:
00625                 //extract a number from the result
00626                 if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
00627                     av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n");
00628                 } else {
00629                     rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
00630                 }
00631                 if (rt->is_input) {
00632                     gen_play(s, rt);
00633                 } else {
00634                     gen_publish(s, rt);
00635                 }
00636                 rt->state = STATE_READY;
00637                 break;
00638             }
00639         } else if (ff_amf_match_string(pkt->data, pkt->size, "onStatus")) {
00640             const uint8_t* ptr = pkt->data + 11;
00641             uint8_t tmpstr[256];
00642 
00643             for (i = 0; i < 2; i++) {
00644                 t = ff_amf_tag_size(ptr, data_end);
00645                 if (t < 0)
00646                     return 1;
00647                 ptr += t;
00648             }
00649             t = ff_amf_get_field_value(ptr, data_end,
00650                                        "level", tmpstr, sizeof(tmpstr));
00651             if (!t && !strcmp(tmpstr, "error")) {
00652                 if (!ff_amf_get_field_value(ptr, data_end,
00653                                             "description", tmpstr, sizeof(tmpstr)))
00654                     av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
00655                 return -1;
00656             }
00657             t = ff_amf_get_field_value(ptr, data_end,
00658                                        "code", tmpstr, sizeof(tmpstr));
00659             if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) rt->state = STATE_PLAYING;
00660             if (!t && !strcmp(tmpstr, "NetStream.Play.Stop")) rt->state = STATE_STOPPED;
00661             if (!t && !strcmp(tmpstr, "NetStream.Play.UnpublishNotify")) rt->state = STATE_STOPPED;
00662             if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING;
00663         }
00664         break;
00665     }
00666     return 0;
00667 }
00668 
00680 static int get_packet(URLContext *s, int for_header)
00681 {
00682     RTMPContext *rt = s->priv_data;
00683     int ret;
00684     uint8_t *p;
00685     const uint8_t *next;
00686     uint32_t size;
00687     uint32_t ts, cts, pts=0;
00688 
00689     if (rt->state == STATE_STOPPED)
00690         return AVERROR_EOF;
00691 
00692     for (;;) {
00693         RTMPPacket rpkt = { 0 };
00694         if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
00695                                        rt->chunk_size, rt->prev_pkt[0])) <= 0) {
00696             if (ret == 0) {
00697                 return AVERROR(EAGAIN);
00698             } else {
00699                 return AVERROR(EIO);
00700             }
00701         }
00702         rt->bytes_read += ret;
00703         if (rt->bytes_read > rt->last_bytes_read + rt->client_report_size) {
00704             av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n");
00705             gen_bytes_read(s, rt, rpkt.timestamp + 1);
00706             rt->last_bytes_read = rt->bytes_read;
00707         }
00708 
00709         ret = rtmp_parse_result(s, rt, &rpkt);
00710         if (ret < 0) {//serious error in current packet
00711             ff_rtmp_packet_destroy(&rpkt);
00712             return -1;
00713         }
00714         if (rt->state == STATE_STOPPED) {
00715             ff_rtmp_packet_destroy(&rpkt);
00716             return AVERROR_EOF;
00717         }
00718         if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING)) {
00719             ff_rtmp_packet_destroy(&rpkt);
00720             return 0;
00721         }
00722         if (!rpkt.size || !rt->is_input) {
00723             ff_rtmp_packet_destroy(&rpkt);
00724             continue;
00725         }
00726         if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO ||
00727            (rpkt.type == RTMP_PT_NOTIFY &&
00728             ff_amf_match_string(rpkt.data, rpkt.size, "onMetaData"))) {
00729             ts = rpkt.timestamp;
00730 
00731             // generate packet header and put data into buffer for FLV demuxer
00732             rt->flv_off  = 0;
00733             rt->flv_size = rpkt.size + 15;
00734             rt->flv_data = p = av_realloc(rt->flv_data, rt->flv_size);
00735             bytestream_put_byte(&p, rpkt.type);
00736             bytestream_put_be24(&p, rpkt.size);
00737             bytestream_put_be24(&p, ts);
00738             bytestream_put_byte(&p, ts >> 24);
00739             bytestream_put_be24(&p, 0);
00740             bytestream_put_buffer(&p, rpkt.data, rpkt.size);
00741             bytestream_put_be32(&p, 0);
00742             ff_rtmp_packet_destroy(&rpkt);
00743             return 0;
00744         } else if (rpkt.type == RTMP_PT_METADATA) {
00745             // we got raw FLV data, make it available for FLV demuxer
00746             rt->flv_off  = 0;
00747             rt->flv_size = rpkt.size;
00748             rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
00749             /* rewrite timestamps */
00750             next = rpkt.data;
00751             ts = rpkt.timestamp;
00752             while (next - rpkt.data < rpkt.size - 11) {
00753                 next++;
00754                 size = bytestream_get_be24(&next);
00755                 p=next;
00756                 cts = bytestream_get_be24(&next);
00757                 cts |= bytestream_get_byte(&next) << 24;
00758                 if (pts==0)
00759                     pts=cts;
00760                 ts += cts - pts;
00761                 pts = cts;
00762                 bytestream_put_be24(&p, ts);
00763                 bytestream_put_byte(&p, ts >> 24);
00764                 next += size + 3 + 4;
00765             }
00766             memcpy(rt->flv_data, rpkt.data, rpkt.size);
00767             ff_rtmp_packet_destroy(&rpkt);
00768             return 0;
00769         }
00770         ff_rtmp_packet_destroy(&rpkt);
00771     }
00772 }
00773 
00774 static int rtmp_close(URLContext *h)
00775 {
00776     RTMPContext *rt = h->priv_data;
00777 
00778     if (!rt->is_input) {
00779         rt->flv_data = NULL;
00780         if (rt->out_pkt.size)
00781             ff_rtmp_packet_destroy(&rt->out_pkt);
00782         if (rt->state > STATE_FCPUBLISH)
00783             gen_fcunpublish_stream(h, rt);
00784     }
00785     if (rt->state > STATE_HANDSHAKED)
00786         gen_delete_stream(h, rt);
00787 
00788     av_freep(&rt->flv_data);
00789     ffurl_close(rt->stream);
00790     return 0;
00791 }
00792 
00802 static int rtmp_open(URLContext *s, const char *uri, int flags)
00803 {
00804     RTMPContext *rt = s->priv_data;
00805     char proto[8], hostname[256], path[1024], *fname;
00806     uint8_t buf[2048];
00807     int port;
00808     int ret;
00809 
00810     rt->is_input = !(flags & AVIO_FLAG_WRITE);
00811 
00812     av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname), &port,
00813                  path, sizeof(path), s->filename);
00814 
00815     if (port < 0)
00816         port = RTMP_DEFAULT_PORT;
00817     ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
00818 
00819     if (ffurl_open(&rt->stream, buf, AVIO_FLAG_READ_WRITE,
00820                    &s->interrupt_callback, NULL) < 0) {
00821         av_log(s , AV_LOG_ERROR, "Cannot open connection %s\n", buf);
00822         goto fail;
00823     }
00824 
00825     rt->state = STATE_START;
00826     if (rtmp_handshake(s, rt))
00827         goto fail;
00828 
00829     rt->chunk_size = 128;
00830     rt->state = STATE_HANDSHAKED;
00831     //extract "app" part from path
00832     if (!strncmp(path, "/ondemand/", 10)) {
00833         fname = path + 10;
00834         memcpy(rt->app, "ondemand", 9);
00835     } else {
00836         char *p = strchr(path + 1, '/');
00837         if (!p) {
00838             fname = path + 1;
00839             rt->app[0] = '\0';
00840         } else {
00841             char *c = strchr(p + 1, ':');
00842             fname = strchr(p + 1, '/');
00843             if (!fname || c < fname) {
00844                 fname = p + 1;
00845                 av_strlcpy(rt->app, path + 1, p - path);
00846             } else {
00847                 fname++;
00848                 av_strlcpy(rt->app, path + 1, fname - path - 1);
00849             }
00850         }
00851     }
00852     if (!strchr(fname, ':') &&
00853         (!strcmp(fname + strlen(fname) - 4, ".f4v") ||
00854          !strcmp(fname + strlen(fname) - 4, ".mp4"))) {
00855         memcpy(rt->playpath, "mp4:", 5);
00856     } else {
00857         rt->playpath[0] = 0;
00858     }
00859     strncat(rt->playpath, fname, sizeof(rt->playpath) - 5);
00860 
00861     rt->client_report_size = 1048576;
00862     rt->bytes_read = 0;
00863     rt->last_bytes_read = 0;
00864 
00865     av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
00866            proto, path, rt->app, rt->playpath);
00867     gen_connect(s, rt, proto, hostname, port);
00868 
00869     do {
00870         ret = get_packet(s, 1);
00871     } while (ret == EAGAIN);
00872     if (ret < 0)
00873         goto fail;
00874 
00875     if (rt->is_input) {
00876         // generate FLV header for demuxer
00877         rt->flv_size = 13;
00878         rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
00879         rt->flv_off  = 0;
00880         memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
00881     } else {
00882         rt->flv_size = 0;
00883         rt->flv_data = NULL;
00884         rt->flv_off  = 0;
00885         rt->skip_bytes = 13;
00886     }
00887 
00888     s->max_packet_size = rt->stream->max_packet_size;
00889     s->is_streamed     = 1;
00890     return 0;
00891 
00892 fail:
00893     rtmp_close(s);
00894     return AVERROR(EIO);
00895 }
00896 
00897 static int rtmp_read(URLContext *s, uint8_t *buf, int size)
00898 {
00899     RTMPContext *rt = s->priv_data;
00900     int orig_size = size;
00901     int ret;
00902 
00903     while (size > 0) {
00904         int data_left = rt->flv_size - rt->flv_off;
00905 
00906         if (data_left >= size) {
00907             memcpy(buf, rt->flv_data + rt->flv_off, size);
00908             rt->flv_off += size;
00909             return orig_size;
00910         }
00911         if (data_left > 0) {
00912             memcpy(buf, rt->flv_data + rt->flv_off, data_left);
00913             buf  += data_left;
00914             size -= data_left;
00915             rt->flv_off = rt->flv_size;
00916             return data_left;
00917         }
00918         if ((ret = get_packet(s, 0)) < 0)
00919            return ret;
00920     }
00921     return orig_size;
00922 }
00923 
00924 static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
00925 {
00926     RTMPContext *rt = s->priv_data;
00927     int size_temp = size;
00928     int pktsize, pkttype;
00929     uint32_t ts;
00930     const uint8_t *buf_temp = buf;
00931 
00932     do {
00933         if (rt->skip_bytes) {
00934             int skip = FFMIN(rt->skip_bytes, size_temp);
00935             buf_temp       += skip;
00936             size_temp      -= skip;
00937             rt->skip_bytes -= skip;
00938             continue;
00939         }
00940 
00941         if (rt->flv_header_bytes < 11) {
00942             const uint8_t *header = rt->flv_header;
00943             int copy = FFMIN(11 - rt->flv_header_bytes, size_temp);
00944             bytestream_get_buffer(&buf_temp, rt->flv_header + rt->flv_header_bytes, copy);
00945             rt->flv_header_bytes += copy;
00946             size_temp            -= copy;
00947             if (rt->flv_header_bytes < 11)
00948                 break;
00949 
00950             pkttype = bytestream_get_byte(&header);
00951             pktsize = bytestream_get_be24(&header);
00952             ts = bytestream_get_be24(&header);
00953             ts |= bytestream_get_byte(&header) << 24;
00954             bytestream_get_be24(&header);
00955             rt->flv_size = pktsize;
00956 
00957             //force 12bytes header
00958             if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
00959                 pkttype == RTMP_PT_NOTIFY) {
00960                 if (pkttype == RTMP_PT_NOTIFY)
00961                     pktsize += 16;
00962                 rt->prev_pkt[1][RTMP_SOURCE_CHANNEL].channel_id = 0;
00963             }
00964 
00965             //this can be a big packet, it's better to send it right here
00966             ff_rtmp_packet_create(&rt->out_pkt, RTMP_SOURCE_CHANNEL, pkttype, ts, pktsize);
00967             rt->out_pkt.extra = rt->main_channel_id;
00968             rt->flv_data = rt->out_pkt.data;
00969 
00970             if (pkttype == RTMP_PT_NOTIFY)
00971                 ff_amf_write_string(&rt->flv_data, "@setDataFrame");
00972         }
00973 
00974         if (rt->flv_size - rt->flv_off > size_temp) {
00975             bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, size_temp);
00976             rt->flv_off += size_temp;
00977             size_temp = 0;
00978         } else {
00979             bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, rt->flv_size - rt->flv_off);
00980             size_temp   -= rt->flv_size - rt->flv_off;
00981             rt->flv_off += rt->flv_size - rt->flv_off;
00982         }
00983 
00984         if (rt->flv_off == rt->flv_size) {
00985             rt->skip_bytes = 4;
00986 
00987             ff_rtmp_packet_write(rt->stream, &rt->out_pkt, rt->chunk_size, rt->prev_pkt[1]);
00988             ff_rtmp_packet_destroy(&rt->out_pkt);
00989             rt->flv_size = 0;
00990             rt->flv_off = 0;
00991             rt->flv_header_bytes = 0;
00992         }
00993     } while (buf_temp - buf < size);
00994     return size;
00995 }
00996 
00997 URLProtocol ff_rtmp_protocol = {
00998     .name           = "rtmp",
00999     .url_open       = rtmp_open,
01000     .url_read       = rtmp_read,
01001     .url_write      = rtmp_write,
01002     .url_close      = rtmp_close,
01003     .priv_data_size = sizeof(RTMPContext),
01004     .flags          = URL_PROTOCOL_FLAG_NETWORK,
01005 };