00001 #include "asset.h"
00002 #include "brender.h"
00003 #include "clip.h"
00004 #include "condition.h"
00005 #include "bchash.h"
00006 #include "edl.h"
00007 #include "filesystem.h"
00008 #include "filexml.h"
00009 #include "language.h"
00010 #include "mutex.h"
00011 #include "packagedispatcher.h"
00012 #include "preferences.h"
00013 #include "render.h"
00014 #include "renderfarm.h"
00015 #include "renderfarmclient.h"
00016 #include "bctimer.h"
00017 #include "transportque.h"
00018
00019
00020 #include <arpa/inet.h>
00021 #include <errno.h>
00022 #include <fcntl.h>
00023 #include <netdb.h>
00024 #include <netinet/in.h>
00025 #include <signal.h>
00026 #include <stdio.h>
00027 #include <string.h>
00028 #include <sys/socket.h>
00029 #include <sys/types.h>
00030 #include <sys/un.h>
00031 #include <unistd.h>
00032
00033
00034
00035
00036 RenderFarmServer::RenderFarmServer(ArrayList<PluginServer*> *plugindb,
00037 PackageDispatcher *packages,
00038 Preferences *preferences,
00039 int use_local_rate,
00040 int *result_return,
00041 int64_t *total_return,
00042 Mutex *total_return_lock,
00043 Asset *default_asset,
00044 EDL *edl,
00045 BRender *brender)
00046 {
00047 this->plugindb = plugindb;
00048 this->packages = packages;
00049 this->preferences = preferences;
00050 this->use_local_rate = use_local_rate;
00051 this->result_return = result_return;
00052 this->total_return = total_return;
00053 this->total_return_lock = total_return_lock;
00054 this->default_asset = default_asset;
00055 this->edl = edl;
00056 this->brender = brender;
00057 client_lock = new Mutex("RenderFarmServer::client_lock");
00058 }
00059
00060 RenderFarmServer::~RenderFarmServer()
00061 {
00062 clients.remove_all_objects();
00063 delete client_lock;
00064 }
00065
00066
00067 int RenderFarmServer::start_clients()
00068 {
00069 int result = 0;
00070
00071 for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
00072 {
00073 client_lock->lock("RenderFarmServer::start_clients");
00074 RenderFarmServerThread *client = new RenderFarmServerThread(plugindb,
00075 this,
00076 i);
00077 clients.append(client);
00078
00079 result = client->start_loop();
00080 client_lock->unlock();
00081 }
00082
00083 return result;
00084 }
00085
00086
00087 int RenderFarmServer::wait_clients()
00088 {
00089
00090 clients.remove_all_objects();
00091
00092 return 0;
00093 }
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108 RenderFarmServerThread::RenderFarmServerThread(ArrayList<PluginServer*> *plugindb,
00109 RenderFarmServer *server,
00110 int number)
00111 : Thread()
00112 {
00113 this->plugindb = plugindb;
00114 this->server = server;
00115 this->number = number;
00116 socket_fd = -1;
00117 frames_per_second = 0;
00118 watchdog = 0;
00119 buffer = 0;
00120 datagram = 0;
00121 Thread::set_synchronous(1);
00122 }
00123
00124
00125
00126 RenderFarmServerThread::~RenderFarmServerThread()
00127 {
00128
00129 Thread::join();
00130
00131 if(socket_fd >= 0) close(socket_fd);
00132 if(watchdog) delete watchdog;
00133 if(buffer) delete [] buffer;
00134 if(datagram) delete [] datagram;
00135
00136 }
00137
00138
00139 int RenderFarmServerThread::open_client(char *hostname, int port)
00140 {
00141 int socket_fd = -1;
00142 int result = 0;
00143
00144
00145 if(hostname[0] == '/')
00146 {
00147 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
00148 {
00149 perror(_("RenderFarmServerThread::start_loop: socket\n"));
00150 result = 1;
00151 }
00152 else
00153 {
00154 struct sockaddr_un addr;
00155 addr.sun_family = AF_FILE;
00156 strcpy(addr.sun_path, hostname);
00157 int size = (offsetof(struct sockaddr_un, sun_path) +
00158 strlen(hostname) + 1);
00159
00160
00161
00162 #define ATTEMPT_DELAY 100000
00163 int done = 0;
00164 int attempt = 0;
00165
00166 do
00167 {
00168 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
00169 {
00170 attempt++;
00171 if(attempt > 30000000 / ATTEMPT_DELAY)
00172 {
00173 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
00174 hostname,
00175 strerror(errno));
00176 result = 1;
00177 }
00178 else
00179 usleep(ATTEMPT_DELAY);
00180 }
00181 else
00182 done = 1;
00183 }while(!result && !done);
00184 }
00185 }
00186 else
00187
00188 {
00189 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
00190 {
00191 perror(_("RenderFarmServerThread::start_loop: socket"));
00192 result = 1;
00193 }
00194 else
00195 {
00196
00197 struct sockaddr_in addr;
00198 struct hostent *hostinfo;
00199 addr.sin_family = AF_INET;
00200 addr.sin_port = htons(port);
00201 hostinfo = gethostbyname(hostname);
00202 if(hostinfo == NULL)
00203 {
00204 fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
00205 hostname);
00206 result = 1;
00207 }
00208 else
00209 {
00210 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
00211
00212 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
00213 {
00214 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
00215 hostname,
00216 strerror(errno));
00217 result = 1;
00218 }
00219 }
00220 }
00221 }
00222
00223 if(result) socket_fd = -1;
00224
00225 return socket_fd;
00226 }
00227
00228 int RenderFarmServerThread::start_loop()
00229 {
00230 int result = 0;
00231
00232 socket_fd = open_client(server->preferences->get_node_hostname(number),
00233 server->preferences->get_node_port(number));
00234
00235 if(socket_fd < 0) result = 1;
00236
00237 if(!result)
00238 {
00239 watchdog = new RenderFarmWatchdog(this, 0);
00240 watchdog->start();
00241 }
00242
00243 if(!result) Thread::start();
00244
00245 return result;
00246 }
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258 int64_t RenderFarmServerThread::read_int64(int *error)
00259 {
00260 int temp = 0;
00261 if(!error) error = &temp;
00262
00263 unsigned char data[sizeof(int64_t)];
00264 *error = (read_socket((char*)data, sizeof(int64_t)) !=
00265 sizeof(int64_t));
00266
00267
00268
00269 int64_t result = 1;
00270 if(!*error)
00271 {
00272 result = (((int64_t)data[0]) << 56) |
00273 (((uint64_t)data[1]) << 48) |
00274 (((uint64_t)data[2]) << 40) |
00275 (((uint64_t)data[3]) << 32) |
00276 (((uint64_t)data[4]) << 24) |
00277 (((uint64_t)data[5]) << 16) |
00278 (((uint64_t)data[6]) << 8) |
00279 data[7];
00280 }
00281 return result;
00282 }
00283
00284 int RenderFarmServerThread::write_int64(int64_t value)
00285 {
00286 unsigned char data[sizeof(int64_t)];
00287 data[0] = (value >> 56) & 0xff;
00288 data[1] = (value >> 48) & 0xff;
00289 data[2] = (value >> 40) & 0xff;
00290 data[3] = (value >> 32) & 0xff;
00291 data[4] = (value >> 24) & 0xff;
00292 data[5] = (value >> 16) & 0xff;
00293 data[6] = (value >> 8) & 0xff;
00294 data[7] = value & 0xff;
00295 return (write_socket((char*)data, sizeof(int64_t)) !=
00296 sizeof(int64_t));
00297 }
00298
00299
00300
00301 int RenderFarmServerThread::read_socket(char *data, int len)
00302 {
00303 int bytes_read = 0;
00304 int offset = 0;
00305 watchdog->begin_request();
00306 while(len > 0 && bytes_read >= 0)
00307 {
00308 enable_cancel();
00309 bytes_read = read(socket_fd, data + offset, len);
00310 disable_cancel();
00311 if(bytes_read > 0)
00312 {
00313 len -= bytes_read;
00314 offset += bytes_read;
00315 }
00316 else
00317 if(bytes_read < 0)
00318 break;
00319 }
00320 watchdog->end_request();
00321
00322 return offset;
00323 }
00324
00325 int RenderFarmServerThread::write_socket(char *data, int len)
00326 {
00327 return write(socket_fd, data, len);
00328 }
00329
00330 void RenderFarmServerThread::reallocate_buffer(int size)
00331 {
00332 if(buffer && buffer_allocated < size)
00333 {
00334 delete [] buffer;
00335 buffer = 0;
00336 }
00337
00338 if(!buffer && size)
00339 {
00340 buffer = new unsigned char[size];
00341 buffer_allocated = size;
00342 }
00343 }
00344
00345 void RenderFarmServerThread::run()
00346 {
00347
00348 unsigned char header[5];
00349 int done = 0;
00350 int bytes_read = 0;
00351
00352
00353 buffer = 0;
00354 buffer_allocated = 0;
00355
00356
00357
00358
00359
00360
00361 write_int64(RENDERFARM_PACKAGES);
00362
00363
00364
00365 while(!done)
00366 {
00367
00368
00369
00370
00371 bytes_read = read_socket((char*)header, 5);
00372
00373 if(bytes_read != 5)
00374 {
00375 done = 1;
00376 continue;
00377 }
00378
00379 int request_id = header[0];
00380 int64_t request_size = (((u_int32_t)header[1]) << 24) |
00381 (((u_int32_t)header[2]) << 16) |
00382 (((u_int32_t)header[3]) << 8) |
00383 (u_int32_t)header[4];
00384
00385 reallocate_buffer(request_size);
00386
00387
00388 bytes_read = read_socket((char*)buffer, request_size);
00389
00390 if(bytes_read != request_size)
00391 {
00392 done = 1;
00393 continue;
00394 }
00395
00396
00397 switch(request_id)
00398 {
00399 case RENDERFARM_PREFERENCES:
00400 send_preferences();
00401 break;
00402
00403 case RENDERFARM_ASSET:
00404 send_asset();
00405 break;
00406
00407 case RENDERFARM_EDL:
00408 send_edl();
00409 break;
00410
00411 case RENDERFARM_PACKAGE:
00412 send_package(buffer);
00413 break;
00414
00415 case RENDERFARM_PROGRESS:
00416 set_progress(buffer);
00417 break;
00418
00419 case RENDERFARM_SET_RESULT:
00420 set_result(buffer);
00421 break;
00422
00423 case RENDERFARM_SET_VMAP:
00424 set_video_map(buffer);
00425 break;
00426
00427 case RENDERFARM_GET_RESULT:
00428 get_result();
00429 break;
00430
00431 case RENDERFARM_DONE:
00432
00433 done = 1;
00434 break;
00435
00436 case RENDERFARM_KEEPALIVE:
00437 break;
00438
00439 default:
00440
00441 {
00442 printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
00443 }
00444 break;
00445 }
00446
00447 }
00448
00449
00450
00451 if(watchdog)
00452 {
00453
00454 delete watchdog;
00455 watchdog = 0;
00456 }
00457
00458
00459 }
00460
00461 int RenderFarmServerThread::write_string(char *string)
00462 {
00463 int i, len;
00464 i = 0;
00465
00466 len = strlen(string) + 1;
00467 datagram = new char[len + 4];
00468 STORE_INT32(len);
00469 memcpy(datagram + i, string, len);
00470 write_socket((char*)datagram, len + 4);
00471
00472
00473
00474 delete [] datagram;
00475 datagram = 0;
00476 }
00477
00478 void RenderFarmServerThread::send_preferences()
00479 {
00480 BC_Hash defaults;
00481 char *string;
00482
00483 server->preferences->save_defaults(&defaults);
00484 defaults.save_string(string);
00485 write_string(string);
00486
00487 delete [] string;
00488 }
00489
00490 void RenderFarmServerThread::send_asset()
00491 {
00492 BC_Hash defaults;
00493 char *string1;
00494
00495
00496
00497
00498 server->default_asset->save_defaults(&defaults,
00499 0,
00500 1,
00501 1,
00502 1,
00503 1,
00504 1);
00505 defaults.save_string(string1);
00506
00507 FileXML file;
00508 server->default_asset->write(&file, 0, 0);
00509 file.terminate_string();
00510
00511 write_string(string1);
00512 write_string(file.string);
00513 delete [] string1;
00514 }
00515
00516
00517 void RenderFarmServerThread::send_edl()
00518 {
00519 FileXML file;
00520
00521
00522 server->edl->save_xml(plugindb,
00523 &file,
00524 0,
00525 0,
00526 0);
00527 file.terminate_string();
00528
00529
00530 write_string(file.string);
00531
00532 }
00533
00534
00535 void RenderFarmServerThread::send_package(unsigned char *buffer)
00536 {
00537 this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
00538 (((u_int32_t)buffer[1]) << 16) |
00539 (((u_int32_t)buffer[2]) << 8) |
00540 ((u_int32_t)buffer[3])) /
00541 65536.0;
00542
00543
00544 RenderPackage *package =
00545 server->packages->get_package(frames_per_second,
00546 number,
00547 server->use_local_rate);
00548
00549
00550 datagram = new char[BCTEXTLEN];
00551
00552
00553 if(!package)
00554 {
00555
00556 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
00557 write_socket(datagram, 4);
00558 }
00559 else
00560
00561 {
00562
00563 int i = 4;
00564 strcpy(&datagram[i], package->path);
00565 i += strlen(package->path);
00566 datagram[i++] = 0;
00567
00568 STORE_INT32(package->audio_start);
00569 STORE_INT32(package->audio_end);
00570 STORE_INT32(package->video_start);
00571 STORE_INT32(package->video_end);
00572 int use_brender = (server->brender ? 1 : 0);
00573 STORE_INT32(use_brender);
00574 STORE_INT32(package->audio_do);
00575 STORE_INT32(package->video_do);
00576
00577 int len = i;
00578 i = 0;
00579 STORE_INT32(len - 4);
00580
00581 write_socket(datagram, len);
00582 }
00583 delete [] datagram;
00584 datagram = 0;
00585 }
00586
00587
00588 void RenderFarmServerThread::set_progress(unsigned char *buffer)
00589 {
00590 server->total_return_lock->lock("RenderFarmServerThread::set_progress");
00591 *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
00592 (((u_int32_t)buffer[1]) << 16) |
00593 (((u_int32_t)buffer[2]) << 8) |
00594 ((u_int32_t)buffer[3]);
00595 server->total_return_lock->unlock();
00596 }
00597
00598 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
00599 {
00600 if(server->brender)
00601 {
00602 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
00603 (((u_int32_t)buffer[1]) << 16) |
00604 (((u_int32_t)buffer[2]) << 8) |
00605 ((u_int32_t)buffer[3]),
00606 (int64_t)(((u_int32_t)buffer[4]) << 24) |
00607 (((u_int32_t)buffer[5]) << 16) |
00608 (((u_int32_t)buffer[6]) << 8) |
00609 ((u_int32_t)buffer[7]));
00610 char return_value[1];
00611 return_value[0] = 0;
00612 write_socket(return_value, 1);
00613 return 0;
00614 }
00615 return 1;
00616 }
00617
00618
00619 void RenderFarmServerThread::set_result(unsigned char *buffer)
00620 {
00621
00622 if(!*server->result_return)
00623 *server->result_return = buffer[0];
00624 }
00625
00626
00627 void RenderFarmServerThread::get_result()
00628 {
00629 unsigned char data[1];
00630 data[0] = *server->result_return;
00631 write_socket((char*)data, 1);
00632 }
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647 RenderFarmWatchdog::RenderFarmWatchdog(
00648 RenderFarmServerThread *server,
00649 RenderFarmClientThread *client)
00650 : Thread(1, 0, 0)
00651 {
00652 this->server = server;
00653 this->client = client;
00654 next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
00655 request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
00656 done = 0;
00657 }
00658
00659 RenderFarmWatchdog::~RenderFarmWatchdog()
00660 {
00661 done = 1;
00662 next_request->unlock();
00663 request_complete->unlock();
00664 join();
00665 delete next_request;
00666 delete request_complete;
00667 }
00668
00669 void RenderFarmWatchdog::begin_request()
00670 {
00671 next_request->unlock();
00672 }
00673
00674 void RenderFarmWatchdog::end_request()
00675 {
00676 request_complete->unlock();
00677 }
00678
00679 void RenderFarmWatchdog::run()
00680 {
00681 while(!done)
00682 {
00683 next_request->lock("RenderFarmWatchdog::run");
00684
00685 int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000,
00686 "RenderFarmWatchdog::run");
00687
00688 if(result)
00689 {
00690 if(client)
00691 {
00692 printf("RenderFarmWatchdog::run 1 killing pid %d\n", client->pid);
00693
00694 kill(client->pid, SIGKILL);
00695 }
00696 else
00697 if(server)
00698 {
00699 printf("RenderFarmWatchdog::run 1 killing thread %p\n", server);
00700 server->cancel();
00701 unsigned char buffer[4];
00702 buffer[0] = 1;
00703 server->set_result(buffer);
00704 }
00705
00706 done = 1;
00707 }
00708 }
00709 }
00710
00711
00712
00713