00001 #include "condition.h" 00002 #include "mutex.h" 00003 #include "loadbalance.h" 00004 00005 00006 00007 00008 LoadPackage::LoadPackage() 00009 { 00010 completion_lock = new Condition(0, "LoadPackage::completion_lock"); 00011 } 00012 LoadPackage::~LoadPackage() 00013 { 00014 delete completion_lock; 00015 } 00016 00017 00018 00019 00020 00021 00022 00023 00024 00025 LoadClient::LoadClient(LoadServer *server) 00026 : Thread() 00027 { 00028 Thread::set_synchronous(1); 00029 this->server = server; 00030 done = 0; 00031 package_number = 0; 00032 input_lock = new Condition(0, "LoadClient::input_lock"); 00033 completion_lock = new Condition(0, "LoadClient::completion_lock"); 00034 } 00035 00036 LoadClient::LoadClient() 00037 : Thread() 00038 { 00039 Thread::set_synchronous(1); 00040 server = 0; 00041 done = 0; 00042 package_number = 0; 00043 input_lock = new Condition(0, "LoadClient::input_lock"); 00044 completion_lock = new Condition(0, "LoadClient::completion_lock"); 00045 } 00046 00047 LoadClient::~LoadClient() 00048 { 00049 done = 1; 00050 input_lock->unlock(); 00051 Thread::join(); 00052 delete input_lock; 00053 delete completion_lock; 00054 } 00055 00056 int LoadClient::get_package_number() 00057 { 00058 return package_number; 00059 } 00060 00061 LoadServer* LoadClient::get_server() 00062 { 00063 return server; 00064 } 00065 00066 00067 void LoadClient::run() 00068 { 00069 while(!done) 00070 { 00071 input_lock->lock("LoadClient::run"); 00072 00073 if(!done) 00074 { 00075 // Read packet 00076 LoadPackage *package; 00077 00078 00079 server->client_lock->lock("LoadClient::run"); 00080 if(server->current_package < server->total_packages) 00081 { 00082 package_number = server->current_package; 00083 package = server->packages[server->current_package++]; 00084 server->client_lock->unlock(); 00085 input_lock->unlock(); 00086 00087 process_package(package); 00088 00089 package->completion_lock->unlock(); 00090 } 00091 else 00092 { 00093 completion_lock->unlock(); 00094 server->client_lock->unlock(); 00095 } 00096 } 00097 } 00098 } 00099 00100 void LoadClient::run_single() 00101 { 00102 if(server->total_packages) 00103 process_package(server->packages[0]); 00104 } 00105 00106 void LoadClient::process_package(LoadPackage *package) 00107 { 00108 printf("LoadClient::process_package\n"); 00109 } 00110 00111 00112 00113 00114 00115 LoadServer::LoadServer(int total_clients, int total_packages) 00116 { 00117 if(total_clients <= 0) 00118 printf("LoadServer::LoadServer total_clients == %d\n", total_clients); 00119 this->total_clients = total_clients; 00120 this->total_packages = total_packages; 00121 current_package = 0; 00122 clients = 0; 00123 packages = 0; 00124 client_lock = new Mutex("LoadServer::client_lock"); 00125 is_single = 0; 00126 single_client = 0; 00127 } 00128 00129 LoadServer::~LoadServer() 00130 { 00131 delete_clients(); 00132 delete_packages(); 00133 delete client_lock; 00134 } 00135 00136 void LoadServer::delete_clients() 00137 { 00138 if(clients) 00139 { 00140 for(int i = 0; i < total_clients; i++) 00141 delete clients[i]; 00142 delete [] clients; 00143 } 00144 if(single_client) delete single_client; 00145 clients = 0; 00146 single_client = 0; 00147 } 00148 00149 void LoadServer::delete_packages() 00150 { 00151 if(packages) 00152 { 00153 for(int i = 0; i < total_packages; i++) 00154 delete packages[i]; 00155 delete [] packages; 00156 } 00157 packages = 0; 00158 } 00159 00160 void LoadServer::set_package_count(int total_packages) 00161 { 00162 delete_packages(); 00163 this->total_packages = total_packages; 00164 create_packages(); 00165 } 00166 00167 00168 void LoadServer::create_clients() 00169 { 00170 if(!is_single && !clients) 00171 { 00172 clients = new LoadClient*[total_clients]; 00173 for(int i = 0; i < total_clients; i++) 00174 { 00175 clients[i] = new_client(); 00176 clients[i]->server = this; 00177 clients[i]->start(); 00178 } 00179 } 00180 00181 if(is_single && !single_client) 00182 { 00183 single_client = new_client(); 00184 single_client->server = this; 00185 } 00186 } 00187 00188 void LoadServer::create_packages() 00189 { 00190 if(!packages) 00191 { 00192 packages = new LoadPackage*[total_packages]; 00193 for(int i = 0; i < total_packages; i++) 00194 packages[i] = new_package(); 00195 } 00196 } 00197 00198 LoadPackage* LoadServer::get_package(int number) 00199 { 00200 return packages[number]; 00201 } 00202 00203 LoadClient* LoadServer::get_client(int number) 00204 { 00205 return clients[number]; 00206 } 00207 00208 int LoadServer::get_total_packages() 00209 { 00210 if(is_single) return 1; 00211 return total_packages; 00212 } 00213 00214 int LoadServer::get_total_clients() 00215 { 00216 if(is_single) return 1; 00217 return total_clients; 00218 } 00219 00220 void LoadServer::process_packages() 00221 { 00222 is_single = 0; 00223 create_clients(); 00224 create_packages(); 00225 00226 00227 00228 // Set up packages 00229 init_packages(); 00230 00231 current_package = 0; 00232 // Start all clients 00233 for(int i = 0; i < total_clients; i++) 00234 { 00235 clients[i]->input_lock->unlock(); 00236 } 00237 00238 // Wait for packages to get finished 00239 for(int i = 0; i < total_packages; i++) 00240 { 00241 packages[i]->completion_lock->lock("LoadServer::process_packages 1"); 00242 } 00243 00244 // Wait for clients to finish before allowing changes to packages 00245 for(int i = 0; i < total_clients; i++) 00246 { 00247 clients[i]->completion_lock->lock("LoadServer::process_packages 2"); 00248 } 00249 } 00250 00251 void LoadServer::process_single() 00252 { 00253 is_single = 1; 00254 create_clients(); 00255 create_packages(); 00256 init_packages(); 00257 current_package = 0; 00258 single_client->run_single(); 00259 } 00260
1.5.5