00001 #include "asset.h"
00002 #include "brender.h"
00003 #include "clip.h"
00004 #include "defaults.h"
00005 #include "edl.h"
00006 #include "filesystem.h"
00007 #include "filexml.h"
00008 #include "language.h"
00009 #include "mutex.h"
00010 #include "packagedispatcher.h"
00011 #include "preferences.h"
00012 #include "render.h"
00013 #include "renderfarm.h"
00014
00015 #include "bctimer.h"
00016 #include "transportque.h"
00017
00018
00019 #include <arpa/inet.h>
00020 #include <errno.h>
00021 #include <fcntl.h>
00022 #include <netdb.h>
00023 #include <netinet/in.h>
00024 #include <stdio.h>
00025 #include <string.h>
00026 #include <sys/socket.h>
00027 #include <sys/types.h>
00028 #include <sys/un.h>
00029 #include <unistd.h>
00030
00031
00032
00033
00034 RenderFarmServer::RenderFarmServer(ArrayList<PluginServer*> *plugindb,
00035 PackageDispatcher *packages,
00036 Preferences *preferences,
00037 int use_local_rate,
00038 int *result_return,
00039 int64_t *total_return,
00040 Mutex *total_return_lock,
00041 Asset *default_asset,
00042 EDL *edl,
00043 BRender *brender)
00044 {
00045 this->plugindb = plugindb;
00046 this->packages = packages;
00047 this->preferences = preferences;
00048 this->use_local_rate = use_local_rate;
00049 this->result_return = result_return;
00050 this->total_return = total_return;
00051 this->total_return_lock = total_return_lock;
00052 this->default_asset = default_asset;
00053 this->edl = edl;
00054 this->brender = brender;
00055 client_lock = new Mutex("RenderFarmServer::client_lock");
00056 }
00057
00058 RenderFarmServer::~RenderFarmServer()
00059 {
00060 clients.remove_all_objects();
00061 delete client_lock;
00062 }
00063
00064
00065 int RenderFarmServer::start_clients()
00066 {
00067 int result = 0;
00068
00069 for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
00070 {
00071 client_lock->lock("RenderFarmServer::start_clients");
00072 RenderFarmServerThread *client = new RenderFarmServerThread(plugindb,
00073 this,
00074 i);
00075 clients.append(client);
00076
00077 result = client->start_loop();
00078 client_lock->unlock();
00079
00080
00081 }
00082
00083 return result;
00084 }
00085
00086
00087 int RenderFarmServer::wait_clients()
00088 {
00089
00090 clients.remove_all_objects();
00091
00092 return 0;
00093 }
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108 RenderFarmServerThread::RenderFarmServerThread(ArrayList<PluginServer*> *plugindb,
00109 RenderFarmServer *server,
00110 int number)
00111 : Thread()
00112 {
00113 this->plugindb = plugindb;
00114 this->server = server;
00115 this->number = number;
00116 socket_fd = -1;
00117 frames_per_second = 0;
00118 Thread::set_synchronous(1);
00119 }
00120
00121
00122
00123 RenderFarmServerThread::~RenderFarmServerThread()
00124 {
00125
00126 Thread::join();
00127
00128 if(socket_fd >= 0) close(socket_fd);
00129
00130 }
00131
00132
00133 int RenderFarmServerThread::start_loop()
00134 {
00135 int result = 0;
00136 char *hostname = server->preferences->get_node_hostname(number);
00137
00138
00139
00140 if(hostname[0] == '/')
00141 {
00142 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
00143 {
00144 perror(_("RenderFarmServerThread::start_loop: socket\n"));
00145 result = 1;
00146 }
00147 else
00148 {
00149 struct sockaddr_un addr;
00150 addr.sun_family = AF_FILE;
00151 strcpy(addr.sun_path, hostname);
00152 int size = (offsetof(struct sockaddr_un, sun_path) +
00153 strlen(hostname) + 1);
00154
00155
00156
00157 #define ATTEMPT_DELAY 100000
00158 int done = 0;
00159 int attempt = 0;
00160
00161 do
00162 {
00163
00164 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
00165 {
00166 attempt++;
00167 if(attempt > 30000000 / ATTEMPT_DELAY)
00168 {
00169
00170 fprintf(stderr, _("RenderFarmServerThread::start_loop: %s: %s\n"),
00171 hostname,
00172 strerror(errno));
00173 result = 1;
00174 }
00175 else
00176 usleep(ATTEMPT_DELAY);
00177
00178 }
00179 else
00180 done = 1;
00181 }while(!result && !done);
00182
00183 }
00184 }
00185 else
00186
00187 {
00188 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
00189 {
00190 perror(_("RenderFarmServerThread::start_loop: socket"));
00191 result = 1;
00192 }
00193 else
00194 {
00195
00196 struct sockaddr_in addr;
00197 struct hostent *hostinfo;
00198 addr.sin_family = AF_INET;
00199 addr.sin_port = htons(server->preferences->get_node_port(number));
00200 hostinfo = gethostbyname(hostname);
00201 if(hostinfo == NULL)
00202 {
00203 fprintf(stderr, _("RenderFarmServerThread::start_loop: unknown host %s.\n"),
00204 server->preferences->get_node_hostname(number));
00205 result = 1;
00206 }
00207 else
00208 {
00209 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
00210
00211 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
00212 {
00213 fprintf(stderr, _("RenderFarmServerThread::start_loop: %s: %s\n"),
00214 server->preferences->get_node_hostname(number),
00215 strerror(errno));
00216 result = 1;
00217 }
00218 }
00219 }
00220 }
00221
00222
00223 if(!result) Thread::start();
00224
00225 return result;
00226 }
00227
00228
00229 int RenderFarmServerThread::read_socket(int socket_fd, char *data, int len, int timeout)
00230 {
00231 int bytes_read = 0;
00232 int offset = 0;
00233
00234
00235
00236 while(len > 0 && bytes_read >= 0)
00237 {
00238 int result = 0;
00239 if(timeout > 0)
00240 {
00241 fd_set read_fds;
00242 struct timeval tv;
00243
00244 FD_ZERO(&read_fds);
00245 FD_SET(socket_fd, &read_fds);
00246 tv.tv_sec = timeout;
00247 tv.tv_usec = 0;
00248
00249 result = select(socket_fd + 1,
00250 &read_fds,
00251 0,
00252 0,
00253 &tv);
00254 FD_ZERO(&read_fds);
00255
00256 }
00257 else
00258 result = 1;
00259
00260 if(result)
00261 {
00262 bytes_read = read(socket_fd, data + offset, len);
00263 if(bytes_read > 0)
00264 {
00265 len -= bytes_read;
00266 offset += bytes_read;
00267 }
00268 else
00269 {
00270
00271 break;
00272 }
00273 }
00274 else
00275 {
00276 printf("RenderFarmServerThread::read_socket timed out. len=%d\n", len);
00277 break;
00278 }
00279 }
00280
00281
00282 return offset;
00283 }
00284
00285 int RenderFarmServerThread::write_socket(int socket_fd, char *data, int len, int timeout)
00286 {
00287 int result = 0;
00288 if(timeout > 0)
00289 {
00290 fd_set write_fds;
00291 struct timeval tv;
00292 FD_ZERO(&write_fds);
00293 FD_SET(socket_fd, &write_fds);
00294 tv.tv_sec = timeout;
00295 tv.tv_usec = 0;
00296
00297 result = select(socket_fd + 1,
00298 0,
00299 &write_fds,
00300 0,
00301 &tv);
00302
00303 FD_ZERO(&write_fds);
00304 if(!result)
00305 {
00306 printf("RenderFarmServerThread::write_socket 1 socket timed out. len=%d\n", len);
00307 return 0;
00308 }
00309 }
00310
00311 return write(socket_fd, data, len);
00312 }
00313
00314 int RenderFarmServerThread::read_socket(char *data, int len, int timeout)
00315 {
00316 return read_socket(socket_fd, data, len, timeout);
00317 }
00318
00319 int RenderFarmServerThread::write_socket(char *data, int len, int timeout)
00320 {
00321 return write_socket(socket_fd, data, len, timeout);
00322 }
00323
00324 void RenderFarmServerThread::reallocate_buffer(int size)
00325 {
00326 if(buffer && buffer_allocated < size)
00327 {
00328 delete [] buffer;
00329 buffer = 0;
00330 }
00331
00332 if(!buffer && size)
00333 {
00334 buffer = new unsigned char[size];
00335 buffer_allocated = size;
00336 }
00337 }
00338
00339 void RenderFarmServerThread::run()
00340 {
00341
00342 unsigned char header[5];
00343 int done = 0;
00344
00345
00346 buffer = 0;
00347 buffer_allocated = 0;
00348
00349
00350 while(!done)
00351 {
00352
00353
00354
00355
00356
00357 if(read_socket(socket_fd, (char*)header, 5, -1) != 5)
00358 {
00359 done = 1;
00360 continue;
00361 }
00362
00363 int request_id = header[0];
00364 int64_t request_size = (((u_int32_t)header[1]) << 24) |
00365 (((u_int32_t)header[2]) << 16) |
00366 (((u_int32_t)header[3]) << 8) |
00367 (u_int32_t)header[4];
00368
00369
00370 reallocate_buffer(request_size);
00371
00372
00373 if(read_socket((char*)buffer, request_size, RENDERFARM_TIMEOUT) != request_size)
00374 {
00375 done = 1;
00376 continue;
00377 }
00378
00379 switch(request_id)
00380 {
00381 case RENDERFARM_PREFERENCES:
00382 send_preferences();
00383 break;
00384
00385 case RENDERFARM_ASSET:
00386 send_asset();
00387 break;
00388
00389 case RENDERFARM_EDL:
00390 send_edl();
00391 break;
00392
00393 case RENDERFARM_PACKAGE:
00394 send_package(buffer);
00395 break;
00396
00397 case RENDERFARM_PROGRESS:
00398 set_progress(buffer);
00399 break;
00400
00401 case RENDERFARM_SET_RESULT:
00402 set_result(buffer);
00403 break;
00404
00405 case RENDERFARM_SET_VMAP:
00406 set_video_map(buffer);
00407 break;
00408
00409 case RENDERFARM_GET_RESULT:
00410 get_result();
00411 break;
00412
00413 case RENDERFARM_DONE:
00414 done = 1;
00415 break;
00416
00417
00418 default:
00419
00420 {
00421 printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
00422 }
00423 break;
00424 }
00425
00426 }
00427
00428 if(buffer) delete [] buffer;
00429
00430 }
00431
00432 int RenderFarmServerThread::write_string(int socket_fd, char *string)
00433 {
00434 unsigned char *datagram;
00435 int i, len;
00436 i = 0;
00437
00438 len = strlen(string) + 1;
00439 datagram = new unsigned char[len + 4];
00440 STORE_INT32(len);
00441 memcpy(datagram + i, string, len);
00442 write_socket(socket_fd, (char*)datagram, len + 4, RENDERFARM_TIMEOUT);
00443
00444
00445
00446 delete [] datagram;
00447 }
00448
00449 void RenderFarmServerThread::send_preferences()
00450 {
00451 Defaults defaults;
00452 char *string;
00453
00454 server->preferences->save_defaults(&defaults);
00455 defaults.save_string(string);
00456 write_string(socket_fd, string);
00457
00458 delete [] string;
00459 }
00460
00461 void RenderFarmServerThread::send_asset()
00462 {
00463 Defaults defaults;
00464 char *string1;
00465
00466
00467
00468
00469 server->default_asset->save_defaults(&defaults,
00470 0,
00471 1,
00472 1,
00473 1,
00474 1,
00475 1);
00476 defaults.save_string(string1);
00477
00478 FileXML file;
00479 server->default_asset->write(&file, 0, 0);
00480 file.terminate_string();
00481
00482 write_string(socket_fd, string1);
00483 write_string(socket_fd, file.string);
00484 delete [] string1;
00485 }
00486
00487
00488 void RenderFarmServerThread::send_edl()
00489 {
00490 FileXML file;
00491
00492
00493 server->edl->save_xml(plugindb,
00494 &file,
00495 0,
00496 0,
00497 0);
00498 file.terminate_string();
00499
00500
00501 write_string(socket_fd, file.string);
00502
00503 }
00504
00505
00506 void RenderFarmServerThread::send_package(unsigned char *buffer)
00507 {
00508 this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
00509 (((u_int32_t)buffer[1]) << 16) |
00510 (((u_int32_t)buffer[2]) << 8) |
00511 ((u_int32_t)buffer[3])) /
00512 65536.0;
00513
00514
00515 RenderPackage *package =
00516 server->packages->get_package(frames_per_second,
00517 number,
00518 server->use_local_rate);
00519
00520
00521
00522 char datagram[BCTEXTLEN];
00523
00524
00525 if(!package)
00526 {
00527
00528 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
00529 write_socket(datagram, 4, RENDERFARM_TIMEOUT);
00530 }
00531 else
00532
00533 {
00534
00535 int i = 4;
00536 strcpy(&datagram[i], package->path);
00537 i += strlen(package->path);
00538 datagram[i++] = 0;
00539
00540 STORE_INT32(package->audio_start);
00541 STORE_INT32(package->audio_end);
00542 STORE_INT32(package->video_start);
00543 STORE_INT32(package->video_end);
00544 int use_brender = (server->brender ? 1 : 0);
00545 STORE_INT32(use_brender);
00546
00547 int len = i;
00548 i = 0;
00549 STORE_INT32(len - 4);
00550
00551 write_socket(datagram, len, RENDERFARM_TIMEOUT);
00552 }
00553 }
00554
00555
00556 void RenderFarmServerThread::set_progress(unsigned char *buffer)
00557 {
00558 server->total_return_lock->lock("RenderFarmServerThread::set_progress");
00559 *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
00560 (((u_int32_t)buffer[1]) << 16) |
00561 (((u_int32_t)buffer[2]) << 8) |
00562 ((u_int32_t)buffer[3]);
00563 server->total_return_lock->unlock();
00564 }
00565
00566 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
00567 {
00568 if(server->brender)
00569 {
00570 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
00571 (((u_int32_t)buffer[1]) << 16) |
00572 (((u_int32_t)buffer[2]) << 8) |
00573 ((u_int32_t)buffer[3]),
00574 (int64_t)(((u_int32_t)buffer[4]) << 24) |
00575 (((u_int32_t)buffer[5]) << 16) |
00576 (((u_int32_t)buffer[6]) << 8) |
00577 ((u_int32_t)buffer[7]));
00578 char return_value[1];
00579 return_value[0] = 0;
00580 write_socket(return_value, 1, RENDERFARM_TIMEOUT);
00581 return 0;
00582 }
00583 return 1;
00584 }
00585
00586
00587 void RenderFarmServerThread::set_result(unsigned char *buffer)
00588 {
00589
00590 if(!*server->result_return)
00591 *server->result_return = buffer[0];
00592 }
00593
00594
00595 void RenderFarmServerThread::get_result()
00596 {
00597 unsigned char data[1];
00598 data[0] = *server->result_return;
00599 write_socket((char*)data, 1, RENDERFARM_TIMEOUT);
00600 }
00601
00602