Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members

renderfarm.C

Go to the documentation of this file.
00001 #include "asset.h"
00002 #include "brender.h"
00003 #include "clip.h"
00004 #include "defaults.h"
00005 #include "edl.h"
00006 #include "filesystem.h"
00007 #include "filexml.h"
00008 #include "language.h"
00009 #include "mutex.h"
00010 #include "packagedispatcher.h"
00011 #include "preferences.h"
00012 #include "render.h"
00013 #include "renderfarm.h"
00014 //#include "renderfarmfsserver.h"
00015 #include "bctimer.h"
00016 #include "transportque.h"
00017 
00018 
00019 #include <arpa/inet.h>
00020 #include <errno.h>
00021 #include <fcntl.h>
00022 #include <netdb.h>
00023 #include <netinet/in.h>
00024 #include <stdio.h>
00025 #include <string.h>
00026 #include <sys/socket.h>
00027 #include <sys/types.h>
00028 #include <sys/un.h>
00029 #include <unistd.h>
00030 
00031 
00032 
00033 
00034 RenderFarmServer::RenderFarmServer(ArrayList<PluginServer*> *plugindb, 
00035         PackageDispatcher *packages,
00036         Preferences *preferences,
00037         int use_local_rate,
00038         int *result_return,
00039         int64_t *total_return,
00040         Mutex *total_return_lock,
00041         Asset *default_asset,
00042         EDL *edl,
00043         BRender *brender)
00044 {
00045         this->plugindb = plugindb;
00046         this->packages = packages;
00047         this->preferences = preferences;
00048         this->use_local_rate = use_local_rate;
00049         this->result_return = result_return;
00050         this->total_return = total_return;
00051         this->total_return_lock = total_return_lock;
00052         this->default_asset = default_asset;
00053         this->edl = edl;
00054         this->brender = brender;
00055         client_lock = new Mutex("RenderFarmServer::client_lock");
00056 }
00057 
00058 RenderFarmServer::~RenderFarmServer()
00059 {
00060         clients.remove_all_objects();
00061         delete client_lock;
00062 }
00063 
00064 // Open connections to clients.
00065 int RenderFarmServer::start_clients()
00066 {
00067         int result = 0;
00068 
00069         for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
00070         {
00071                 client_lock->lock("RenderFarmServer::start_clients");
00072                 RenderFarmServerThread *client = new RenderFarmServerThread(plugindb, 
00073                         this, 
00074                         i);
00075                 clients.append(client);
00076 
00077                 result = client->start_loop();
00078                 client_lock->unlock();
00079 //usleep(100000);
00080 // Fails to connect all without a delay
00081         }
00082 
00083         return result;
00084 }
00085 
00086 // The render farm must wait for all the clients to finish.
00087 int RenderFarmServer::wait_clients()
00088 {
00089 //printf("RenderFarmServer::wait_clients 1\n");
00090         clients.remove_all_objects();
00091 //printf("RenderFarmServer::wait_clients 2\n");
00092         return 0;
00093 }
00094 
00095 
00096 
00097 
00098 
00099 
00100 
00101 
00102 
00103 
00104 
00105 
00106 // Waits for requests from every client.
00107 // Joins when the client is finished.
00108 RenderFarmServerThread::RenderFarmServerThread(ArrayList<PluginServer*> *plugindb, 
00109         RenderFarmServer *server, 
00110         int number)
00111  : Thread()
00112 {
00113         this->plugindb = plugindb;
00114         this->server = server;
00115         this->number = number;
00116         socket_fd = -1;
00117         frames_per_second = 0;
00118         Thread::set_synchronous(1);
00119 }
00120 
00121 
00122 
00123 RenderFarmServerThread::~RenderFarmServerThread()
00124 {
00125 //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this);
00126         Thread::join();
00127 //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n");
00128         if(socket_fd >= 0) close(socket_fd);
00129 //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n");
00130 }
00131 
00132 
00133 int RenderFarmServerThread::start_loop()
00134 {
00135         int result = 0;
00136         char *hostname = server->preferences->get_node_hostname(number);
00137 //printf("RenderFarmServerThread::start_loop 1\n");
00138 
00139 // Open file for master node
00140         if(hostname[0] == '/')
00141         {
00142                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
00143                 {
00144                         perror(_("RenderFarmServerThread::start_loop: socket\n"));
00145                         result = 1;
00146                 }
00147                 else
00148                 {
00149                         struct sockaddr_un addr;
00150                         addr.sun_family = AF_FILE;
00151                         strcpy(addr.sun_path, hostname);
00152                         int size = (offsetof(struct sockaddr_un, sun_path) + 
00153                                 strlen(hostname) + 1);
00154 
00155 // The master node is always created by BRender.  Keep trying for 30 seconds.
00156 
00157 #define ATTEMPT_DELAY 100000
00158                         int done = 0;
00159                         int attempt = 0;
00160 //printf("RenderFarmServerThread::start_loop 2 %s\n", hostname);
00161                         do
00162                         {
00163 //printf("RenderFarmServerThread::start_loop 3\n");
00164                                 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
00165                                 {
00166                                         attempt++;
00167                                         if(attempt > 30000000 / ATTEMPT_DELAY)
00168                                         {
00169 //printf("RenderFarmServerThread::start_loop 4 %s\n", hostname);
00170                                                 fprintf(stderr, _("RenderFarmServerThread::start_loop: %s: %s\n"), 
00171                                                         hostname, 
00172                                                         strerror(errno));
00173                                                 result = 1;
00174                                         }
00175                                         else
00176                                                 usleep(ATTEMPT_DELAY);
00177 //printf("RenderFarmServerThread::start_loop 5 %s\n", hostname);
00178                                 }
00179                                 else
00180                                         done = 1;
00181                         }while(!result && !done);
00182 //printf("RenderFarmServerThread::start_loop 6\n");
00183                 }
00184         }
00185         else
00186 // Open socket
00187         {
00188                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
00189                 {
00190                         perror(_("RenderFarmServerThread::start_loop: socket"));
00191                         result = 1;
00192                 }
00193                 else
00194                 {
00195 // Open port
00196                         struct sockaddr_in addr;
00197                         struct hostent *hostinfo;
00198                         addr.sin_family = AF_INET;
00199                         addr.sin_port = htons(server->preferences->get_node_port(number));
00200                         hostinfo = gethostbyname(hostname);
00201                         if(hostinfo == NULL)
00202                 {
00203                         fprintf(stderr, _("RenderFarmServerThread::start_loop: unknown host %s.\n"), 
00204                                         server->preferences->get_node_hostname(number));
00205                         result = 1;
00206                 }
00207                         else
00208                         {
00209                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;   
00210 
00211                                 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
00212                                 {
00213                                         fprintf(stderr, _("RenderFarmServerThread::start_loop: %s: %s\n"), 
00214                                                 server->preferences->get_node_hostname(number), 
00215                                                 strerror(errno));
00216                                         result = 1;
00217                                 }
00218                         }
00219                 }
00220         }
00221 //printf("RenderFarmServerThread::start_loop 7\n");
00222 
00223         if(!result) Thread::start();
00224 
00225         return result;
00226 }
00227 
00228 
00229 int RenderFarmServerThread::read_socket(int socket_fd, char *data, int len, int timeout)
00230 {
00231         int bytes_read = 0;
00232         int offset = 0;
00233 //timeout = 0;
00234 
00235 //printf("RenderFarmServerThread::read_socket 1\n");
00236         while(len > 0 && bytes_read >= 0)
00237         {
00238                 int result = 0;
00239                 if(timeout > 0)
00240                 {
00241                         fd_set read_fds;
00242                         struct timeval tv;
00243 
00244                         FD_ZERO(&read_fds);
00245                         FD_SET(socket_fd, &read_fds);
00246                         tv.tv_sec = timeout;
00247                         tv.tv_usec = 0;
00248 
00249                         result = select(socket_fd + 1, 
00250                                 &read_fds, 
00251                                 0, 
00252                                 0, 
00253                                 &tv);
00254                         FD_ZERO(&read_fds);
00255 //printf("RenderFarmServerThread::read_socket 1 %d\n", result);
00256                 }
00257                 else
00258                         result = 1;
00259 
00260                 if(result)
00261                 {
00262                         bytes_read = read(socket_fd, data + offset, len);
00263                         if(bytes_read > 0)
00264                         {
00265                                 len -= bytes_read;
00266                                 offset += bytes_read;
00267                         }
00268                         else
00269                         {
00270 //printf("RenderFarmServerThread::read_socket got 0 len=%d\n", len);
00271                                 break;
00272                         }
00273                 }
00274                 else
00275                 {
00276 printf("RenderFarmServerThread::read_socket timed out. len=%d\n", len);
00277                         break;
00278                 }
00279         }
00280 //printf("RenderFarmServerThread::read_socket 2\n");
00281 
00282         return offset;
00283 }
00284 
00285 int RenderFarmServerThread::write_socket(int socket_fd, char *data, int len, int timeout)
00286 {
00287         int result = 0;
00288         if(timeout > 0)
00289         {
00290                 fd_set write_fds;
00291                 struct timeval tv;
00292                 FD_ZERO(&write_fds);
00293                 FD_SET(socket_fd, &write_fds);
00294                 tv.tv_sec = timeout;
00295                 tv.tv_usec = 0;
00296 //printf("RenderFarmServerThread::write_socket 1\n");
00297                 result = select(socket_fd + 1, 
00298                                 0, 
00299                                 &write_fds, 
00300                                 0, 
00301                                 &tv);
00302 //printf("RenderFarmServerThread::write_socket 2\n");
00303                 FD_ZERO(&write_fds);
00304                 if(!result)
00305                 {
00306 printf("RenderFarmServerThread::write_socket 1 socket timed out. len=%d\n", len);
00307                         return 0;
00308                 }
00309         }
00310 
00311         return write(socket_fd, data, len);
00312 }
00313 
00314 int RenderFarmServerThread::read_socket(char *data, int len, int timeout)
00315 {
00316         return read_socket(socket_fd, data, len, timeout);
00317 }
00318 
00319 int RenderFarmServerThread::write_socket(char *data, int len, int timeout)
00320 {
00321         return write_socket(socket_fd, data, len, timeout);
00322 }
00323 
00324 void RenderFarmServerThread::reallocate_buffer(int size)
00325 {
00326         if(buffer && buffer_allocated < size)
00327         {
00328                 delete [] buffer;
00329                 buffer = 0;
00330         }
00331 
00332         if(!buffer && size)
00333         {
00334                 buffer = new unsigned char[size];
00335                 buffer_allocated = size;
00336         }
00337 }
00338 
00339 void RenderFarmServerThread::run()
00340 {
00341 // Wait for requests
00342         unsigned char header[5];
00343         int done = 0;
00344 
00345 
00346         buffer = 0;
00347         buffer_allocated = 0;
00348 //      fs_server = new RenderFarmFSServer(this);
00349 //      fs_server->initialize();
00350         while(!done)
00351         {
00352 
00353 //printf("RenderFarmServerThread::run 1\n");
00354 // Wait for requests.
00355 // Requests consist of request ID's and accompanying buffers.
00356 // Get request ID.
00357                 if(read_socket(socket_fd, (char*)header, 5, -1) != 5)
00358                 {
00359                         done = 1;
00360                         continue;
00361                 }
00362 
00363                 int request_id = header[0];
00364                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
00365                                                         (((u_int32_t)header[2]) << 16) |
00366                                                         (((u_int32_t)header[3]) << 8)  |
00367                                                         (u_int32_t)header[4];
00368 
00369 //printf("RenderFarmServerThread::run 2 %d %lld\n", request_id, request_size);
00370                 reallocate_buffer(request_size);
00371 
00372 // Get accompanying buffer
00373                 if(read_socket((char*)buffer, request_size, RENDERFARM_TIMEOUT) != request_size)
00374                 {
00375                         done = 1;
00376                         continue;
00377                 }
00378 
00379                 switch(request_id)
00380                 {
00381                         case RENDERFARM_PREFERENCES:
00382                                 send_preferences();
00383                                 break;
00384                         
00385                         case RENDERFARM_ASSET:
00386                                 send_asset();
00387                                 break;
00388                         
00389                         case RENDERFARM_EDL:
00390                                 send_edl();
00391                                 break;
00392                         
00393                         case RENDERFARM_PACKAGE:
00394                                 send_package(buffer);
00395                                 break;
00396                         
00397                         case RENDERFARM_PROGRESS:
00398                                 set_progress(buffer);
00399                                 break;
00400 
00401                         case RENDERFARM_SET_RESULT:
00402                                 set_result(buffer);
00403                                 break;
00404 
00405                         case RENDERFARM_SET_VMAP:
00406                                 set_video_map(buffer);
00407                                 break;
00408 
00409                         case RENDERFARM_GET_RESULT:
00410                                 get_result();
00411                                 break;
00412 
00413                         case RENDERFARM_DONE:
00414                                 done = 1;
00415                                 break;
00416 
00417 
00418                         default:
00419 //                              if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer))
00420                                 {
00421                                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
00422                                 }
00423                                 break;
00424                 }
00425 //printf("RenderFarmServerThread::run 10 %d %lld\n", request_id, request_size);
00426         }
00427         
00428         if(buffer) delete [] buffer;
00429 //      delete fs_server;
00430 }
00431 
00432 int RenderFarmServerThread::write_string(int socket_fd, char *string)
00433 {
00434         unsigned char *datagram;
00435         int i, len;
00436         i = 0;
00437 
00438         len = strlen(string) + 1;
00439         datagram = new unsigned char[len + 4];
00440         STORE_INT32(len);
00441         memcpy(datagram + i, string, len);
00442         write_socket(socket_fd, (char*)datagram, len + 4, RENDERFARM_TIMEOUT);
00443 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
00444 //      datagram[0], datagram[1], datagram[2], datagram[3]);
00445 
00446         delete [] datagram;
00447 }
00448 
00449 void RenderFarmServerThread::send_preferences()
00450 {
00451         Defaults defaults;
00452         char *string;
00453 
00454         server->preferences->save_defaults(&defaults);
00455         defaults.save_string(string);
00456         write_string(socket_fd, string);
00457 
00458         delete [] string;
00459 }
00460 
00461 void RenderFarmServerThread::send_asset()
00462 {
00463         Defaults defaults;
00464         char *string1;
00465 
00466 // The asset must be sent in two segments.
00467 // One segment is stored in the EDL and contains decoding information.
00468 // One segment is stored in the asset and contains encoding information.
00469         server->default_asset->save_defaults(&defaults, 
00470                 0, 
00471                 1,
00472                 1,
00473                 1,
00474                 1,
00475                 1);
00476         defaults.save_string(string1);
00477 
00478         FileXML file;
00479         server->default_asset->write(&file, 0, 0);
00480         file.terminate_string();
00481 
00482         write_string(socket_fd, string1);
00483         write_string(socket_fd, file.string);
00484         delete [] string1;
00485 }
00486 
00487 
00488 void RenderFarmServerThread::send_edl()
00489 {
00490         FileXML file;
00491 
00492 // Save the XML
00493         server->edl->save_xml(plugindb,
00494                 &file, 
00495                 0,
00496                 0,
00497                 0);
00498         file.terminate_string();
00499 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
00500 
00501         write_string(socket_fd, file.string);
00502 //printf("RenderFarmServerThread::send_edl 2\n");
00503 }
00504 
00505 
00506 void RenderFarmServerThread::send_package(unsigned char *buffer)
00507 {
00508         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
00509                 (((u_int32_t)buffer[1]) << 16) |
00510                 (((u_int32_t)buffer[2]) << 8)  |
00511                 ((u_int32_t)buffer[3])) / 
00512                 65536.0;
00513 
00514 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
00515         RenderPackage *package = 
00516                 server->packages->get_package(frames_per_second, 
00517                         number, 
00518                         server->use_local_rate);
00519 
00520 //printf("RenderFarmServerThread::send_package 2\n");
00521 
00522         char datagram[BCTEXTLEN];
00523 
00524 // No more packages
00525         if(!package)
00526         {
00527 //printf("RenderFarmServerThread::send_package 1\n");
00528                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
00529                 write_socket(datagram, 4, RENDERFARM_TIMEOUT);
00530         }
00531         else
00532 // Encode package
00533         {
00534 //printf("RenderFarmServerThread::send_package 10\n");
00535                 int i = 4;
00536                 strcpy(&datagram[i], package->path);
00537                 i += strlen(package->path);
00538                 datagram[i++] = 0;
00539 
00540                 STORE_INT32(package->audio_start);
00541                 STORE_INT32(package->audio_end);
00542                 STORE_INT32(package->video_start);
00543                 STORE_INT32(package->video_end);
00544                 int use_brender = (server->brender ? 1 : 0);
00545                 STORE_INT32(use_brender);
00546 
00547                 int len = i;
00548                 i = 0;
00549                 STORE_INT32(len - 4);
00550 
00551                 write_socket(datagram, len, RENDERFARM_TIMEOUT);
00552         }
00553 }
00554 
00555 
00556 void RenderFarmServerThread::set_progress(unsigned char *buffer)
00557 {
00558         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
00559         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
00560                                                                                         (((u_int32_t)buffer[1]) << 16) |
00561                                                                                         (((u_int32_t)buffer[2]) << 8)  |
00562                                                                                         ((u_int32_t)buffer[3]);
00563         server->total_return_lock->unlock();
00564 }
00565 
00566 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
00567 {
00568         if(server->brender)
00569         {
00570                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
00571                                                         (((u_int32_t)buffer[1]) << 16) |
00572                                                         (((u_int32_t)buffer[2]) << 8)  |
00573                                                         ((u_int32_t)buffer[3]),
00574                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
00575                                                         (((u_int32_t)buffer[5]) << 16) |
00576                                                         (((u_int32_t)buffer[6]) << 8)  |
00577                                                         ((u_int32_t)buffer[7]));
00578                 char return_value[1];
00579                 return_value[0] = 0;
00580                 write_socket(return_value, 1, RENDERFARM_TIMEOUT);
00581                 return 0;
00582         }
00583         return 1;
00584 }
00585 
00586 
00587 void RenderFarmServerThread::set_result(unsigned char *buffer)
00588 {
00589 //printf("RenderFarmServerThread::set_result %p\n", buffer);
00590         if(!*server->result_return)
00591                 *server->result_return = buffer[0];
00592 }
00593 
00594 
00595 void RenderFarmServerThread::get_result()
00596 {
00597         unsigned char data[1];
00598         data[0] = *server->result_return;
00599         write_socket((char*)data, 1, RENDERFARM_TIMEOUT);
00600 }
00601 
00602 

Generated on Sun Jan 8 13:39:00 2006 for Cinelerra-svn by  doxygen 1.4.4