00001 #include "condition.h" 00002 #include "mutex.h" 00003 #include "loadbalance.h" 00004 00005 00006 00007 00008 LoadPackage::LoadPackage() 00009 { 00010 completion_lock = new Condition(0, "LoadPackage::completion_lock"); 00011 } 00012 LoadPackage::~LoadPackage() 00013 { 00014 delete completion_lock; 00015 } 00016 00017 00018 00019 00020 00021 00022 00023 00024 00025 LoadClient::LoadClient(LoadServer *server) 00026 : Thread() 00027 { 00028 Thread::set_synchronous(1); 00029 this->server = server; 00030 done = 0; 00031 package_number = 0; 00032 input_lock = new Condition(0, "LoadClient::input_lock"); 00033 completion_lock = new Condition(0, "LoadClient::completion_lock"); 00034 } 00035 00036 LoadClient::LoadClient() 00037 : Thread() 00038 { 00039 Thread::set_synchronous(1); 00040 server = 0; 00041 done = 0; 00042 package_number = 0; 00043 input_lock = new Condition(0, "LoadClient::input_lock"); 00044 completion_lock = new Condition(0, "LoadClient::completion_lock"); 00045 } 00046 00047 LoadClient::~LoadClient() 00048 { 00049 done = 1; 00050 input_lock->unlock(); 00051 Thread::join(); 00052 delete input_lock; 00053 delete completion_lock; 00054 } 00055 00056 int LoadClient::get_package_number() 00057 { 00058 return package_number; 00059 } 00060 00061 LoadServer* LoadClient::get_server() 00062 { 00063 return server; 00064 } 00065 00066 00067 void LoadClient::run() 00068 { 00069 while(!done) 00070 { 00071 input_lock->lock("LoadClient::run"); 00072 00073 if(!done) 00074 { 00075 // Read packet 00076 LoadPackage *package; 00077 00078 00079 server->client_lock->lock("LoadClient::run"); 00080 if(server->current_package < server->total_packages) 00081 { 00082 package_number = server->current_package; 00083 package = server->packages[server->current_package++]; 00084 server->client_lock->unlock(); 00085 input_lock->unlock(); 00086 00087 process_package(package); 00088 00089 package->completion_lock->unlock(); 00090 } 00091 else 00092 { 00093 completion_lock->unlock(); 00094 server->client_lock->unlock(); 00095 } 00096 } 00097 } 00098 } 00099 00100 00101 00102 00103 00104 00105 00106 00107 LoadServer::LoadServer(int total_clients, int total_packages) 00108 { 00109 if(total_clients <= 0) 00110 printf("LoadServer::LoadServer total_clients == %d\n", total_clients); 00111 this->total_clients = total_clients; 00112 this->total_packages = total_packages; 00113 current_package = 0; 00114 clients = 0; 00115 packages = 0; 00116 client_lock = new Mutex("LoadServer::client_lock"); 00117 } 00118 00119 LoadServer::~LoadServer() 00120 { 00121 delete_clients(); 00122 delete_packages(); 00123 delete client_lock; 00124 } 00125 00126 void LoadServer::delete_clients() 00127 { 00128 if(clients) 00129 { 00130 for(int i = 0; i < total_clients; i++) 00131 delete clients[i]; 00132 delete [] clients; 00133 } 00134 clients = 0; 00135 } 00136 00137 void LoadServer::delete_packages() 00138 { 00139 if(packages) 00140 { 00141 for(int i = 0; i < total_packages; i++) 00142 delete packages[i]; 00143 delete [] packages; 00144 } 00145 packages = 0; 00146 } 00147 00148 void LoadServer::set_package_count(int total_packages) 00149 { 00150 delete_packages(); 00151 this->total_packages = total_packages; 00152 create_packages(); 00153 } 00154 00155 00156 void LoadServer::create_clients() 00157 { 00158 if(!clients) 00159 { 00160 clients = new LoadClient*[total_clients]; 00161 for(int i = 0; i < total_clients; i++) 00162 { 00163 clients[i] = new_client(); 00164 clients[i]->server = this; 00165 clients[i]->start(); 00166 } 00167 } 00168 } 00169 00170 void LoadServer::create_packages() 00171 { 00172 if(!packages) 00173 { 00174 packages = new LoadPackage*[total_packages]; 00175 for(int i = 0; i < total_packages; i++) 00176 packages[i] = new_package(); 00177 } 00178 } 00179 00180 LoadPackage* LoadServer::get_package(int number) 00181 { 00182 return packages[number]; 00183 } 00184 00185 LoadClient* LoadServer::get_client(int number) 00186 { 00187 return clients[number]; 00188 } 00189 00190 int LoadServer::get_total_packages() 00191 { 00192 return total_packages; 00193 } 00194 00195 int LoadServer::get_total_clients() 00196 { 00197 return total_clients; 00198 } 00199 00200 void LoadServer::process_packages() 00201 { 00202 if(!clients) create_clients(); 00203 if(!packages) create_packages(); 00204 00205 00206 00207 // Set up packages 00208 init_packages(); 00209 00210 current_package = 0; 00211 // Start all clients 00212 for(int i = 0; i < total_clients; i++) 00213 { 00214 clients[i]->input_lock->unlock(); 00215 } 00216 00217 // Wait for packages to get finished 00218 for(int i = 0; i < total_packages; i++) 00219 { 00220 packages[i]->completion_lock->lock("LoadServer::process_packages 1"); 00221 } 00222 00223 // Wait for clients to finish before allowing changes to packages 00224 for(int i = 0; i < total_clients; i++) 00225 { 00226 clients[i]->completion_lock->lock("LoadServer::process_packages 2"); 00227 } 00228 } 00229
1.4.4