00001
00002
00003 #include "../include/FS.h"
00004 #include "../include/command.h"
00005 #include "../include/utility.h"
00006 #include "../include/cond_comp.h"
00007 #include <netdb.h>
00008 #include <arpa/inet.h>
00009 #include <libgen.h>
00010 #include <sys/stat.h>
00011 #include <fcntl.h>
00012 #include <cstdlib>
00013 #include <cstring>
00014
00016 database_struct database;
00017
00019 string FS_id;
00020
00021 int get_filesize(const char* filename){
00022 struct stat file_info;
00023 if(!stat(filename, &file_info))
00024 return file_info.st_size;
00025 return -1;
00026 }
00027
00028 void* req_server(void* csk) {
00029 pthread_detach(pthread_self());
00030 int sock = (int)csk, CL_port, fd, cont, f_size;
00031 sockaddr_in CL_addr;
00032 socklen_t CL_addrlen = sizeof(CL_addr);
00033 char buf1[INET_ADDRSTRLEN];
00034 char buf2[INT_AS_STRING_BYTES];
00035 char* f_buf;
00036 char* b;
00037 command com;
00038 string CL_address, full_file_name;
00039 file_struct* f;
00040 int ret = getpeername(sock, (sockaddr*)&CL_addr, &CL_addrlen);
00041 if(ret < 0) {
00042 LOG(printf("FS %s -> could not get client address\n",
00043 FS_id.c_str()));
00044 close(sock);
00045 pthread_exit(NULL);
00046 }
00047 CL_address = inet_ntop(AF_INET, &(CL_addr.sin_addr), buf1, INET_ADDRSTRLEN);
00048 CL_port = ntohs(CL_addr.sin_port);
00049 sprintf(buf2, "%i", CL_port);
00050 CL_address += ':';
00051 CL_address += buf2;
00052 LOG(printf("FS %s -> client at %s successfully connected\n",
00053 FS_id.c_str(), CL_address.c_str()));
00054 ret = receive_command(sock, &com);
00055 if(ret < 0) {
00056 LOG(printf("FS %s -> could not receive a command from client at %s\n",
00057 FS_id.c_str(), CL_address.c_str()));
00058 close(sock);
00059 pthread_exit(NULL);
00060 }
00061 switch(com.id) {
00062 case get_file:
00063 pthread_mutex_lock(&database.mutex);
00064 f = database.files[com.file_name];
00065 if(!f) {
00066 pthread_mutex_unlock(&database.mutex);
00067 LOG(printf("FS %s -> client at %s asked for non-existent file %s\n",
00068 FS_id.c_str(), CL_address.c_str(), com.file_name.c_str()));
00069 reply_command(sock, ERR_FILE_NOT_EXISTS, 0, NULL);
00070 break;
00071 } else if(f->cur_clients >= f->max_clients) {
00072 pthread_mutex_unlock(&database.mutex);
00073 LOG(printf("FS %s -> client at %s asked for file %s, which is too busy\n",
00074 FS_id.c_str(), CL_address.c_str(), com.file_name.c_str()));
00075 reply_command(sock, ERR_TOO_MANY_CLIENTS, 0, NULL);
00076 break;
00077 }
00078 full_file_name = f->full_name;
00079 f->cur_clients++;
00080 pthread_mutex_unlock(&database.mutex);
00081
00082 f_size = get_filesize(full_file_name.c_str());
00083 if(f_size < 0) {
00084 pthread_mutex_lock(&database.mutex);
00085 f = database.files[com.file_name];
00086 f->cur_clients--;
00087 pthread_mutex_unlock(&database.mutex);
00088 LOG(printf("FS %s -> could not read dimension for client at %s, file %s\n",
00089 FS_id.c_str(), CL_address.c_str(), com.file_name.c_str()));
00090 reply_command(sock, ERR_UNKNOWN, 0, NULL);
00091 break;
00092 }
00093 fd = open(full_file_name.c_str(), O_RDONLY);
00094 f_buf = new char[f_size];
00095 b = f_buf;
00096 cont = f_size;
00097 while (cont > 0) {
00098 ret = read(fd, b, cont);
00099 if(ret == -1 || ret == 0) {
00100 pthread_mutex_lock(&database.mutex);
00101 f = database.files[com.file_name];
00102 f->cur_clients--;
00103 pthread_mutex_unlock(&database.mutex);
00104 LOG(printf("FS %s -> error during file reading for client at %s, file %s\n",
00105 FS_id.c_str(), CL_address.c_str(), com.file_name.c_str()));
00106 reply_command(sock, ERR_UNKNOWN, 0, NULL);
00107 close(fd);
00108 delete(f_buf);
00109 break;
00110 }
00111 cont -= ret;
00112 b += ret;
00113 }
00114 if(cont > 0)
00115 break;
00116 pthread_mutex_lock(&database.mutex);
00117 f = database.files[com.file_name];
00118 f->cur_clients--;
00119 pthread_mutex_unlock(&database.mutex);
00120 LOG(printf("FS %s -> file transfer complete for client at %s, file %s\n",
00121 FS_id.c_str(), CL_address.c_str(), com.file_name.c_str()));
00122 reply_command(sock, OK, f_size, f_buf);
00123 delete(f_buf);
00124 close(fd);
00125 break;
00126 default:
00127 LOG(printf("FS %s -> invalid command from client at %s\n",
00128 FS_id.c_str(), CL_address.c_str()));
00129 break;
00130 }
00131 close(sock);
00132 pthread_exit(NULL);
00133 }
00134
00135 void* listen (void* sock) {
00136 int lsock = (int)sock, csock, ret;
00137 sockaddr_in c_addr;
00138 socklen_t c_addrlen = sizeof(c_addr);
00139 int err_counter = 0;
00140 pthread_t new_t;
00141 while(1) {
00142 csock = accept(lsock, (sockaddr*)&c_addr, &c_addrlen);
00143 if(csock < 0) {
00144 LOG(printf("FS %s -> could not accept a client\n",
00145 FS_id.c_str()));
00146 err_counter++;
00147 if(err_counter >= MAX_FS_ERROR_COUNTER)
00148 pthread_exit(NULL);
00149 else continue;
00150 }
00151 ret = pthread_create(&new_t, 0, req_server, (void*)csock);
00152 if(ret != 0) {
00153 LOG(printf("FS %s -> could not create a request server thread\n",
00154 FS_id.c_str()));
00155 pthread_exit(NULL);
00156 }
00157 }
00158 }
00159
00160 int main(int argc, char* argv[]) {
00161 int DFR_port, port_CL, lsock, ret, DFRsock, loc_port;
00162 string DFR_ip_address, ip_address_CL;
00163 pthread_t lth;
00164 sockaddr_in loc_addr;
00165 socklen_t loc_addrlen = sizeof(loc_addr);
00166 int d_size;
00167 char* add_data;
00168 if(argc < 6 || (argc % 2 != 0)) {
00169 LOG(printf("FS -> usage: FS <DFR IP address or domain name for FSs> "));
00170 LOG(printf("<DFR port for FSs> <port for CLs> "));
00171 LOG(printf("<file1> <max clients for file1> "));
00172 LOG(printf("<file2> <max clients for file2> "));
00173 LOG(printf("<file3> <max clients for file3> ...\n"));
00174 exit(1);
00175 }
00176 hostent* DFR;
00177 char buf1[INET_ADDRSTRLEN];
00178 char buf2[INT_AS_STRING_BYTES];
00179 DFR = gethostbyname(argv[1]);
00180 if(DFR == NULL) {
00181 LOG(printf("CL -> DFR IP address or domain name not valid\n"));
00182 exit(1);
00183 }
00184 DFR_ip_address = inet_ntop(AF_INET, DFR->h_addr, buf1, INET_ADDRSTRLEN);
00185 DEB(fprintf(stderr, "ip address: %i %s\n", DFR->h_length, DFR_ip_address.c_str()));
00186 DFR_port = atoi(argv[2]);
00187 if(DFR_port <= 0 || DFR_port > 65535) {
00188 LOG(printf("FS -> DFR port not valid\n"));
00189 exit(1);
00190 }
00191 port_CL = atoi(argv[3]);
00192 if(port_CL <= 0 || port_CL > 65535) {
00193 LOG(printf("FS -> port for clients not valid\n"));
00194 exit(1);
00195 }
00196
00197 DFRsock = init_sd_cli(DFR_port, DFR_ip_address);
00198 if(DFRsock < 0) {
00199 LOG(printf("FS -> could not connect with DFR at %s:%i\n",
00200 DFR_ip_address.c_str(), DFR_port));
00201 exit(1);
00202 }
00203 ret = getsockname(DFRsock, (sockaddr*)&loc_addr, &loc_addrlen);
00204 if(ret < 0) {
00205 LOG(printf("FS %s -> could not get local socket address\n",
00206 FS_id.c_str()));
00207 close(DFRsock);
00208 exit(1);
00209 }
00210 ip_address_CL = inet_ntop(AF_INET, &(loc_addr.sin_addr), buf1, INET_ADDRSTRLEN);
00211 loc_port = ntohs(loc_addr.sin_port);
00212 sprintf(buf2, "%i", loc_port);
00213 FS_id = ip_address_CL;
00214 FS_id += ':';
00215 FS_id += buf2;
00216
00217
00218 lsock = init_sd_serv(port_CL, ip_address_CL);
00219 if(lsock < 0) {
00220 LOG(printf("FS %s -> listening socket for clients could not be initialized\n",
00221 FS_id.c_str()));
00222 close(DFRsock);
00223 exit(1);
00224 }
00225 LOG(printf("FS %s -> listening for clients on %s:%i\n",
00226 FS_id.c_str(), ip_address_CL.c_str(), port_CL));
00227 ret = pthread_create(<h, 0, listen, (void*)lsock);
00228 if(ret != 0) {
00229 LOG(printf("FS %s -> could not create listening thread\n",
00230 FS_id.c_str()));
00231 close(DFRsock);
00232 close(lsock);
00233 exit(1);
00234 }
00235
00236 file_struct* f;
00237 char* s_cpy1, *s_cpy2;
00238 string file_name = "";
00239 command c(register_file, file_name.c_str(), 0, port_CL);
00240 for(int i = 4; i < argc; i+=2) {
00241 s_cpy1 = new char[strlen(argv[i])+1];
00242 s_cpy2 = new char[strlen(argv[i])+1];
00243 strcpy(s_cpy1, argv[i]);
00244 strcpy(s_cpy2, argv[i]);
00245 f = new file_struct;
00246 file_name = basename(s_cpy1);
00247 f->max_clients = atoi(argv[i+1]);
00248 if(f->max_clients <= 0) {
00249 LOG(printf("FS %s -> number of max clients not valid for file %s\n",
00250 FS_id.c_str(), argv[i]));
00251 close(DFRsock);
00252 close(lsock);
00253 exit(1);
00254 }
00255 f->cur_clients = 0;
00256 f->full_name = dirname(s_cpy2);
00257 f->full_name += '/'+file_name;
00258 delete(s_cpy1);
00259 delete(s_cpy2);
00260
00261 pthread_mutex_lock(&database.mutex);
00262 database.files[file_name] = f;
00263 pthread_mutex_unlock(&database.mutex);
00264
00265 c.file_name = file_name;
00266 c.max_clients = f->max_clients;
00267 ret = send_command(DFRsock, c, &d_size, &add_data);
00268 delete(add_data);
00269 if(ret != OK) {
00270 LOG(printf("FS %s -> invalid reply from DFR when registering file %s\n",
00271 FS_id.c_str(), file_name.c_str()));
00272 close(DFRsock);
00273 close(lsock);
00274 exit(1);
00275 }
00276 DEB(fprintf(stderr, "%s, %i\n", database.files[file_name]->full_name.c_str(),
00277 database.files[file_name]->max_clients));
00278 }
00279 pthread_join(lth, 0);
00280 close(DFRsock);
00281 close(lsock);
00282 return 0;
00283 }