Changeset 444

Show
Ignore:
Timestamp:
05/08/08 08:46:35 (8 months ago)
Author:
mgoldman
Message:

Phase 1 refactor of service_manager()

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/trunk-pmd-intproto/server/master.py

    r440 r444  
    3838        def __init__(self): 
    3939                self.clients = server.Server() 
    40                 self.manager = server.Server('', 55555, accept_fun=accept_helper) 
     40                self.manager = server.Server('', 55555, accept_func=accept_helper) 
    4141 
    4242        def service_client(self): 
    4343                pass 
     44 
     45        def updatep(self, (mgr, info), msg): 
     46                msg.type = "response" 
     47                msg.body = self.clients_needing_update() # plug in here 
     48                return msg 
     49 
     50        def update(self, (mgr,info), msg): 
     51                if msg.body == "all": 
     52                        to_update = self.clients_needing_update() 
     53                else: 
     54                        to_update = val.body 
     55                ret = [] 
     56                for c in to_update: 
     57                        ret.append((c,self.update_client(c))) 
     58                msg.body = ret 
     59                return msg 
     60 
     61        def updategroup(self, (mgr,info), msg): 
     62                msg.type = 'response' 
     63                if msg.body in storage.list_all_layers(storage_root): 
     64                        l = Layer(os.path.join(os.path.dirname(storage_root, msg.body))) 
     65                        ret = [] 
     66                        for c in l.list_nodes(): 
     67                                ret.append((c, self.update_client(c))) 
     68                        msg.body = ret 
     69                else: 
     70                        msg.body = [] 
     71                return msg 
     72 
     73        def bools(self, (mgr,info), msg): 
     74                self.clients.send_all(msg) 
     75                res = [] 
     76                for (c, i) in self.clients.get_clients().items(): 
     77                        ret = pickle.load(c.makefile()) 
     78                        n = storage.NodeStatus(i[0], storage_root) 
     79                        for bool in val.body.keys(): 
     80                                n.set_bool_status(bool, msg.body[bool]) 
     81                        n.update() 
     82                        res.append((ret, i)) 
     83                return res 
     84 
     85        def list(self, (mgr,info), msg): 
     86                msg.type = 'response' 
     87                if msg.body == 'groups': 
     88                        msg.body = storage.list_all_layers(storage_root) 
     89                elif msg.body == 'reachable': 
     90                        msg.body = [x[0] for x in self.clients.get_clients().values()] 
     91                elif msg.body == 'all': 
     92                        l = storage.Layer(storage_root) 
     93                        msg.body = l.list_nodes() 
     94                else: 
     95                        msg.body = [] 
     96                return res 
     97 
     98        def storeadd(self, (mgr, info), msg): 
     99                msg.type = 'response' 
     100                result = [] 
     101                tmp_dir = tempfile.mkdtemp(prefix='senetwork-manager') 
     102                group, file_list = msg.body 
     103                l = storage.Layer(os.path.join(os.path.dirname(storage_root), group)) 
     104                for file_name, sum in file_list: 
     105                        base = os.path.basename(file_name) 
     106                        scp_command = 'scp ' + info[0] + ':' + file_name + ' "' + tmp_dir + '"' 
     107                        os.system(scp_command) 
     108                        tmp_sum = file_util.checksum(os.path.join(tmp_dir, base)) 
     109                        result.append((basefile_name, sum == tmp_sum)) 
     110                        preadd_latest = l.Entries['.'].LatestTime 
     111                        if sum == tmp_sum: 
     112                                shutil.move(os.path.join(tmp_dir, base), os.path.join(l.Name, base)) 
     113                                l.add(base) 
     114                os.rmdir(tmp_dir) 
     115                if l.Entries['.'].AppliedTime == preadd_latest: 
     116                        l.rollback() 
     117                l.update() 
     118                for client in l.list_nodes(): 
     119                        self.update_client(client) 
     120                msg.body = result 
     121                return msg 
     122 
     123        def storeremove(self, (mgr, info), msg): 
     124                msg.type = 'response' 
     125                group, file_list = msg.body 
     126                l = storage.Layer(os.path.join(os.path.dirname(storage_root), group)) 
     127                controlled_files = l.list_controlled_files() 
     128                for file_name in file_list: 
     129                        if file_name not in controlled_files: 
     130                                continue 
     131                        l.remove(file_name) 
     132                        os.remove(os.path.join(l.Name, file_name)) 
     133                l.update() 
     134                for client in l.list_nodes(): 
     135                        self.update_client(client) 
     136                msg.body = True 
    44137 
    45138        def service_manager(self, (mgr, info)): 
     
    50143                                val = pickle.load(mgr.makefile()) 
    51144                                if val.msg == "update?": 
    52                                         val.type = "response" 
    53                                         val.body = self.clients_needing_update() # plug in here 
    54                                         res = val 
     145                                        res = self.updatep((mgr,info), val) 
    55146                                elif val.msg == "update": 
    56                                         if val.body == "all": 
    57                                                 to_update = self.clients_needing_update() 
    58                                         else: 
    59                                                 to_update = val.body 
    60                                         ret = [] 
    61                                         for c in to_update: 
    62                                                 ret.append((c,self.update_client(c))) 
    63                                         val.body = ret 
    64                                         res = val 
     147                                        res = self.update((mgr,info), val) 
    65148                                elif val.msg == 'update group': 
    66                                         val.type = 'response' 
    67                                         if val.body in storage.list_all_layers(storage_root): 
    68                                                 l = Layer(os.path.join(os.path.dirname(storage_root, val.body))) 
    69                                                 ret = [] 
    70                                                 for c in l.list_nodes(): 
    71                                                         ret.append((c, self.update_client(c))) 
    72                                                 val.body = ret 
    73                                         else: 
    74                                                 val.body = [] 
    75                                         res = val 
     149                                        res = self.updategroup((mgr, info), val) 
    76150                                elif val.msg == 'bools': 
    77                                         self.clients.send_all(val) 
    78                                         res = [] 
    79                                         for (c, i) in self.clients.get_clients().items(): 
    80                                                 ret = pickle.load(c.makefile()) 
    81                                                 n = storage.NodeStatus(i[0], storage_root) 
    82                                                 for bool in val.body.keys(): 
    83                                                         n.set_bool_status(bool, val.body[bool]) 
    84                                                 n.update() 
    85                                                 res.append((ret, i)) 
     151                                        res = self.bools((mgr, info), val) 
    86152                                elif val.msg == 'list': 
    87                                         val.type = 'response' 
    88                                         if val.body == 'groups': 
    89                                                 val.body = storage.list_all_layers(storage_root) 
    90                                         elif val.body == 'reachable': 
    91                                                 val.body = [x[0] for x in self.clients.get_clients().values()] 
    92                                         elif val.body == 'all': 
    93                                                 l = storage.Layer(storage_root) 
    94                                                 val.body = l.list_nodes() 
    95                                         else: 
    96                                                 val.body = [] 
    97                                         res = val 
     153                                        res = self.list((mgr, info), val) 
    98154                                elif val.msg == 'add': 
    99                                         val.type = 'response' 
    100                                         result = [] 
    101                                         tmp_dir = tempfile.mkdtemp(prefix='senetwork-manager') 
    102                                         group, file_list = msg.body 
    103                                         l = storage.Layer(os.path.join(os.path.dirname(storage_root), group)) 
    104                                         for file_name, sum in file_list: 
    105                                                 base = os.path.basename(file_name) 
    106                                                 scp_command = 'scp ' + info[0] + ':' + file_name + ' "' + tmp_dir + '"' 
    107                                                 os.system(scp_command) 
    108                                                 tmp_sum = file_util.checksum(os.path.join(tmp_dir, base)) 
    109                                                 result.append((basefile_name, sum == tmp_sum)) 
    110                                                 preadd_latest = l.Entries['.'].LatestTime 
    111                                                 if sum == tmp_sum: 
    112                                                         shutil.move(os.path.join(tmp_dir, base), os.path.join(l.Name, base)) 
    113                                                         l.add(base) 
    114                                         os.rmdir(tmp_dir) 
    115                                         if l.Entries['.'].AppliedTime == preadd_latest: 
    116                                                 l.rollback() 
    117                                         l.update() 
    118                                         for client in l.list_nodes(): 
    119                                                 self.update_client(client) 
    120                                         val.body = result 
    121                                         res = val 
     155                                        res = self.storeadd((mgr, info), val) 
    122156                                elif val.msg == 'remove': 
    123                                         val.type = 'response' 
    124                                         group, file_list = msg.body 
    125                                         l = storage.Layer(os.path.join(os.path.dirname(storage_root), group)) 
    126                                         controlled_files = l.list_controlled_files() 
    127                                         for file_name in file_list: 
    128                                                 if file_name not in controlled_files: 
    129                                                         continue 
    130                                                 l.remove(file_name) 
    131                                                 os.remove(os.path.join(l.Name, file_name)) 
    132                                         l.update() 
    133                                         for client in l.list_nodes(): 
    134                                                 self.update_client(client) 
    135                                         val.body = True 
    136157                                        res = val 
    137158                                else: