ad3ec7df15471322172183664e80cceea12449c8
[linux-2.6-block.git] / drivers / md / md-cluster.c
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10
11
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
16 #include "md.h"
17 #include "bitmap.h"
18 #include "md-cluster.h"
19
20 #define LVB_SIZE        64
21 #define NEW_DEV_TIMEOUT 5000
22
23 struct dlm_lock_resource {
24         dlm_lockspace_t *ls;
25         struct dlm_lksb lksb;
26         char *name; /* lock name. */
27         uint32_t flags; /* flags to pass to dlm_lock() */
28         struct completion completion; /* completion for synchronized locking */
29         void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
30         struct mddev *mddev; /* pointing back to mddev. */
31         int mode;
32 };
33
34 struct suspend_info {
35         int slot;
36         sector_t lo;
37         sector_t hi;
38         struct list_head list;
39 };
40
41 struct resync_info {
42         __le64 lo;
43         __le64 hi;
44 };
45
46 /* md_cluster_info flags */
47 #define         MD_CLUSTER_WAITING_FOR_NEWDISK          1
48 #define         MD_CLUSTER_SUSPEND_READ_BALANCING       2
49 #define         MD_CLUSTER_BEGIN_JOIN_CLUSTER           3
50
51 /* Lock the send communication. This is done through
52  * bit manipulation as opposed to a mutex in order to
53  * accomodate lock and hold. See next comment.
54  */
55 #define         MD_CLUSTER_SEND_LOCK                    4
56 /* If cluster operations must lock the communication channel,
57  * so as to perform extra operations (and no other operation
58  * is allowed on the MD, such as adding a disk. Token needs
59  * to be locked and held until the operation completes with
60  * a md_update_sb(), which would eventually release the lock.
61  */
62 #define         MD_CLUSTER_SEND_LOCKED_ALREADY          5
63
64
65 struct md_cluster_info {
66         /* dlm lock space and resources for clustered raid. */
67         dlm_lockspace_t *lockspace;
68         int slot_number;
69         struct completion completion;
70         struct mutex recv_mutex;
71         struct dlm_lock_resource *bitmap_lockres;
72         struct dlm_lock_resource **other_bitmap_lockres;
73         struct dlm_lock_resource *resync_lockres;
74         struct list_head suspend_list;
75         spinlock_t suspend_lock;
76         struct md_thread *recovery_thread;
77         unsigned long recovery_map;
78         /* communication loc resources */
79         struct dlm_lock_resource *ack_lockres;
80         struct dlm_lock_resource *message_lockres;
81         struct dlm_lock_resource *token_lockres;
82         struct dlm_lock_resource *no_new_dev_lockres;
83         struct md_thread *recv_thread;
84         struct completion newdisk_completion;
85         wait_queue_head_t wait;
86         unsigned long state;
87 };
88
89 enum msg_type {
90         METADATA_UPDATED = 0,
91         RESYNCING,
92         NEWDISK,
93         REMOVE,
94         RE_ADD,
95         BITMAP_NEEDS_SYNC,
96 };
97
98 struct cluster_msg {
99         __le32 type;
100         __le32 slot;
101         /* TODO: Unionize this for smaller footprint */
102         __le64 low;
103         __le64 high;
104         char uuid[16];
105         __le32 raid_slot;
106 };
107
108 static void sync_ast(void *arg)
109 {
110         struct dlm_lock_resource *res;
111
112         res = arg;
113         complete(&res->completion);
114 }
115
116 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
117 {
118         int ret = 0;
119
120         ret = dlm_lock(res->ls, mode, &res->lksb,
121                         res->flags, res->name, strlen(res->name),
122                         0, sync_ast, res, res->bast);
123         if (ret)
124                 return ret;
125         wait_for_completion(&res->completion);
126         if (res->lksb.sb_status == 0)
127                 res->mode = mode;
128         return res->lksb.sb_status;
129 }
130
131 static int dlm_unlock_sync(struct dlm_lock_resource *res)
132 {
133         return dlm_lock_sync(res, DLM_LOCK_NL);
134 }
135
136 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
137                 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
138 {
139         struct dlm_lock_resource *res = NULL;
140         int ret, namelen;
141         struct md_cluster_info *cinfo = mddev->cluster_info;
142
143         res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
144         if (!res)
145                 return NULL;
146         init_completion(&res->completion);
147         res->ls = cinfo->lockspace;
148         res->mddev = mddev;
149         res->mode = DLM_LOCK_IV;
150         namelen = strlen(name);
151         res->name = kzalloc(namelen + 1, GFP_KERNEL);
152         if (!res->name) {
153                 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
154                 goto out_err;
155         }
156         strlcpy(res->name, name, namelen + 1);
157         if (with_lvb) {
158                 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
159                 if (!res->lksb.sb_lvbptr) {
160                         pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
161                         goto out_err;
162                 }
163                 res->flags = DLM_LKF_VALBLK;
164         }
165
166         if (bastfn)
167                 res->bast = bastfn;
168
169         res->flags |= DLM_LKF_EXPEDITE;
170
171         ret = dlm_lock_sync(res, DLM_LOCK_NL);
172         if (ret) {
173                 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
174                 goto out_err;
175         }
176         res->flags &= ~DLM_LKF_EXPEDITE;
177         res->flags |= DLM_LKF_CONVERT;
178
179         return res;
180 out_err:
181         kfree(res->lksb.sb_lvbptr);
182         kfree(res->name);
183         kfree(res);
184         return NULL;
185 }
186
187 static void lockres_free(struct dlm_lock_resource *res)
188 {
189         int ret;
190
191         if (!res)
192                 return;
193
194         /* cancel a lock request or a conversion request that is blocked */
195         res->flags |= DLM_LKF_CANCEL;
196 retry:
197         ret = dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
198         if (unlikely(ret != 0)) {
199                 pr_info("%s: failed to unlock %s return %d\n", __func__, res->name, ret);
200
201                 /* if a lock conversion is cancelled, then the lock is put
202                  * back to grant queue, need to ensure it is unlocked */
203                 if (ret == -DLM_ECANCEL)
204                         goto retry;
205         }
206         res->flags &= ~DLM_LKF_CANCEL;
207         wait_for_completion(&res->completion);
208
209         kfree(res->name);
210         kfree(res->lksb.sb_lvbptr);
211         kfree(res);
212 }
213
214 static void add_resync_info(struct dlm_lock_resource *lockres,
215                             sector_t lo, sector_t hi)
216 {
217         struct resync_info *ri;
218
219         ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
220         ri->lo = cpu_to_le64(lo);
221         ri->hi = cpu_to_le64(hi);
222 }
223
224 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
225 {
226         struct resync_info ri;
227         struct suspend_info *s = NULL;
228         sector_t hi = 0;
229
230         dlm_lock_sync(lockres, DLM_LOCK_CR);
231         memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
232         hi = le64_to_cpu(ri.hi);
233         if (hi > 0) {
234                 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
235                 if (!s)
236                         goto out;
237                 s->hi = hi;
238                 s->lo = le64_to_cpu(ri.lo);
239         }
240         dlm_unlock_sync(lockres);
241 out:
242         return s;
243 }
244
245 static void recover_bitmaps(struct md_thread *thread)
246 {
247         struct mddev *mddev = thread->mddev;
248         struct md_cluster_info *cinfo = mddev->cluster_info;
249         struct dlm_lock_resource *bm_lockres;
250         char str[64];
251         int slot, ret;
252         struct suspend_info *s, *tmp;
253         sector_t lo, hi;
254
255         while (cinfo->recovery_map) {
256                 slot = fls64((u64)cinfo->recovery_map) - 1;
257
258                 /* Clear suspend_area associated with the bitmap */
259                 spin_lock_irq(&cinfo->suspend_lock);
260                 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
261                         if (slot == s->slot) {
262                                 list_del(&s->list);
263                                 kfree(s);
264                         }
265                 spin_unlock_irq(&cinfo->suspend_lock);
266
267                 snprintf(str, 64, "bitmap%04d", slot);
268                 bm_lockres = lockres_init(mddev, str, NULL, 1);
269                 if (!bm_lockres) {
270                         pr_err("md-cluster: Cannot initialize bitmaps\n");
271                         goto clear_bit;
272                 }
273
274                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
275                 if (ret) {
276                         pr_err("md-cluster: Could not DLM lock %s: %d\n",
277                                         str, ret);
278                         goto clear_bit;
279                 }
280                 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
281                 if (ret) {
282                         pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
283                         goto dlm_unlock;
284                 }
285                 if (hi > 0) {
286                         /* TODO:Wait for current resync to get over */
287                         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
288                         if (lo < mddev->recovery_cp)
289                                 mddev->recovery_cp = lo;
290                         md_check_recovery(mddev);
291                 }
292 dlm_unlock:
293                 dlm_unlock_sync(bm_lockres);
294 clear_bit:
295                 clear_bit(slot, &cinfo->recovery_map);
296         }
297 }
298
299 static void recover_prep(void *arg)
300 {
301         struct mddev *mddev = arg;
302         struct md_cluster_info *cinfo = mddev->cluster_info;
303         set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
304 }
305
306 static void __recover_slot(struct mddev *mddev, int slot)
307 {
308         struct md_cluster_info *cinfo = mddev->cluster_info;
309
310         set_bit(slot, &cinfo->recovery_map);
311         if (!cinfo->recovery_thread) {
312                 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
313                                 mddev, "recover");
314                 if (!cinfo->recovery_thread) {
315                         pr_warn("md-cluster: Could not create recovery thread\n");
316                         return;
317                 }
318         }
319         md_wakeup_thread(cinfo->recovery_thread);
320 }
321
322 static void recover_slot(void *arg, struct dlm_slot *slot)
323 {
324         struct mddev *mddev = arg;
325         struct md_cluster_info *cinfo = mddev->cluster_info;
326
327         pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
328                         mddev->bitmap_info.cluster_name,
329                         slot->nodeid, slot->slot,
330                         cinfo->slot_number);
331         /* deduct one since dlm slot starts from one while the num of
332          * cluster-md begins with 0 */
333         __recover_slot(mddev, slot->slot - 1);
334 }
335
336 static void recover_done(void *arg, struct dlm_slot *slots,
337                 int num_slots, int our_slot,
338                 uint32_t generation)
339 {
340         struct mddev *mddev = arg;
341         struct md_cluster_info *cinfo = mddev->cluster_info;
342
343         cinfo->slot_number = our_slot;
344         /* completion is only need to be complete when node join cluster,
345          * it doesn't need to run during another node's failure */
346         if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
347                 complete(&cinfo->completion);
348                 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
349         }
350         clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
351 }
352
353 /* the ops is called when node join the cluster, and do lock recovery
354  * if node failure occurs */
355 static const struct dlm_lockspace_ops md_ls_ops = {
356         .recover_prep = recover_prep,
357         .recover_slot = recover_slot,
358         .recover_done = recover_done,
359 };
360
361 /*
362  * The BAST function for the ack lock resource
363  * This function wakes up the receive thread in
364  * order to receive and process the message.
365  */
366 static void ack_bast(void *arg, int mode)
367 {
368         struct dlm_lock_resource *res = arg;
369         struct md_cluster_info *cinfo = res->mddev->cluster_info;
370
371         if (mode == DLM_LOCK_EX)
372                 md_wakeup_thread(cinfo->recv_thread);
373 }
374
375 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
376 {
377         struct suspend_info *s, *tmp;
378
379         list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
380                 if (slot == s->slot) {
381                         list_del(&s->list);
382                         kfree(s);
383                         break;
384                 }
385 }
386
387 static void remove_suspend_info(struct mddev *mddev, int slot)
388 {
389         struct md_cluster_info *cinfo = mddev->cluster_info;
390         spin_lock_irq(&cinfo->suspend_lock);
391         __remove_suspend_info(cinfo, slot);
392         spin_unlock_irq(&cinfo->suspend_lock);
393         mddev->pers->quiesce(mddev, 2);
394 }
395
396
397 static void process_suspend_info(struct mddev *mddev,
398                 int slot, sector_t lo, sector_t hi)
399 {
400         struct md_cluster_info *cinfo = mddev->cluster_info;
401         struct suspend_info *s;
402
403         if (!hi) {
404                 remove_suspend_info(mddev, slot);
405                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
406                 md_wakeup_thread(mddev->thread);
407                 return;
408         }
409         s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
410         if (!s)
411                 return;
412         s->slot = slot;
413         s->lo = lo;
414         s->hi = hi;
415         mddev->pers->quiesce(mddev, 1);
416         mddev->pers->quiesce(mddev, 0);
417         spin_lock_irq(&cinfo->suspend_lock);
418         /* Remove existing entry (if exists) before adding */
419         __remove_suspend_info(cinfo, slot);
420         list_add(&s->list, &cinfo->suspend_list);
421         spin_unlock_irq(&cinfo->suspend_lock);
422         mddev->pers->quiesce(mddev, 2);
423 }
424
425 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
426 {
427         char disk_uuid[64];
428         struct md_cluster_info *cinfo = mddev->cluster_info;
429         char event_name[] = "EVENT=ADD_DEVICE";
430         char raid_slot[16];
431         char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
432         int len;
433
434         len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
435         sprintf(disk_uuid + len, "%pU", cmsg->uuid);
436         snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
437         pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
438         init_completion(&cinfo->newdisk_completion);
439         set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
440         kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
441         wait_for_completion_timeout(&cinfo->newdisk_completion,
442                         NEW_DEV_TIMEOUT);
443         clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
444 }
445
446
447 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
448 {
449         struct md_cluster_info *cinfo = mddev->cluster_info;
450         mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
451         set_bit(MD_RELOAD_SB, &mddev->flags);
452         dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
453         md_wakeup_thread(mddev->thread);
454 }
455
456 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
457 {
458         struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
459                                                    le32_to_cpu(msg->raid_slot));
460
461         if (rdev) {
462                 set_bit(ClusterRemove, &rdev->flags);
463                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
464                 md_wakeup_thread(mddev->thread);
465         }
466         else
467                 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
468                         __func__, __LINE__, le32_to_cpu(msg->raid_slot));
469 }
470
471 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
472 {
473         struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
474                                                    le32_to_cpu(msg->raid_slot));
475
476         if (rdev && test_bit(Faulty, &rdev->flags))
477                 clear_bit(Faulty, &rdev->flags);
478         else
479                 pr_warn("%s: %d Could not find disk(%d) which is faulty",
480                         __func__, __LINE__, le32_to_cpu(msg->raid_slot));
481 }
482
483 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
484 {
485         if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
486                 "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
487                 return;
488         switch (le32_to_cpu(msg->type)) {
489         case METADATA_UPDATED:
490                 process_metadata_update(mddev, msg);
491                 break;
492         case RESYNCING:
493                 process_suspend_info(mddev, le32_to_cpu(msg->slot),
494                                      le64_to_cpu(msg->low),
495                                      le64_to_cpu(msg->high));
496                 break;
497         case NEWDISK:
498                 process_add_new_disk(mddev, msg);
499                 break;
500         case REMOVE:
501                 process_remove_disk(mddev, msg);
502                 break;
503         case RE_ADD:
504                 process_readd_disk(mddev, msg);
505                 break;
506         case BITMAP_NEEDS_SYNC:
507                 __recover_slot(mddev, le32_to_cpu(msg->slot));
508                 break;
509         default:
510                 pr_warn("%s:%d Received unknown message from %d\n",
511                         __func__, __LINE__, msg->slot);
512         }
513 }
514
515 /*
516  * thread for receiving message
517  */
518 static void recv_daemon(struct md_thread *thread)
519 {
520         struct md_cluster_info *cinfo = thread->mddev->cluster_info;
521         struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
522         struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
523         struct cluster_msg msg;
524         int ret;
525
526         mutex_lock(&cinfo->recv_mutex);
527         /*get CR on Message*/
528         if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
529                 pr_err("md/raid1:failed to get CR on MESSAGE\n");
530                 mutex_unlock(&cinfo->recv_mutex);
531                 return;
532         }
533
534         /* read lvb and wake up thread to process this message_lockres */
535         memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
536         process_recvd_msg(thread->mddev, &msg);
537
538         /*release CR on ack_lockres*/
539         ret = dlm_unlock_sync(ack_lockres);
540         if (unlikely(ret != 0))
541                 pr_info("unlock ack failed return %d\n", ret);
542         /*up-convert to PR on message_lockres*/
543         ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
544         if (unlikely(ret != 0))
545                 pr_info("lock PR on msg failed return %d\n", ret);
546         /*get CR on ack_lockres again*/
547         ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
548         if (unlikely(ret != 0))
549                 pr_info("lock CR on ack failed return %d\n", ret);
550         /*release CR on message_lockres*/
551         ret = dlm_unlock_sync(message_lockres);
552         if (unlikely(ret != 0))
553                 pr_info("unlock msg failed return %d\n", ret);
554         mutex_unlock(&cinfo->recv_mutex);
555 }
556
557 /* lock_token()
558  * Takes the lock on the TOKEN lock resource so no other
559  * node can communicate while the operation is underway.
560  */
561 static int lock_token(struct md_cluster_info *cinfo)
562 {
563         int error;
564
565         error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
566         if (error)
567                 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
568                                 __func__, __LINE__, error);
569
570         /* Lock the receive sequence */
571         mutex_lock(&cinfo->recv_mutex);
572         return error;
573 }
574
575 /* lock_comm()
576  * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
577  */
578 static int lock_comm(struct md_cluster_info *cinfo)
579 {
580         wait_event(cinfo->wait,
581                    !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
582
583         return lock_token(cinfo);
584 }
585
586 static void unlock_comm(struct md_cluster_info *cinfo)
587 {
588         WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
589         mutex_unlock(&cinfo->recv_mutex);
590         dlm_unlock_sync(cinfo->token_lockres);
591         clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
592         wake_up(&cinfo->wait);
593 }
594
595 /* __sendmsg()
596  * This function performs the actual sending of the message. This function is
597  * usually called after performing the encompassing operation
598  * The function:
599  * 1. Grabs the message lockresource in EX mode
600  * 2. Copies the message to the message LVB
601  * 3. Downconverts message lockresource to CW
602  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
603  *    and the other nodes read the message. The thread will wait here until all other
604  *    nodes have released ack lock resource.
605  * 5. Downconvert ack lockresource to CR
606  */
607 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
608 {
609         int error;
610         int slot = cinfo->slot_number - 1;
611
612         cmsg->slot = cpu_to_le32(slot);
613         /*get EX on Message*/
614         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
615         if (error) {
616                 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
617                 goto failed_message;
618         }
619
620         memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
621                         sizeof(struct cluster_msg));
622         /*down-convert EX to CW on Message*/
623         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
624         if (error) {
625                 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
626                                 error);
627                 goto failed_ack;
628         }
629
630         /*up-convert CR to EX on Ack*/
631         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
632         if (error) {
633                 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
634                                 error);
635                 goto failed_ack;
636         }
637
638         /*down-convert EX to CR on Ack*/
639         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
640         if (error) {
641                 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
642                                 error);
643                 goto failed_ack;
644         }
645
646 failed_ack:
647         error = dlm_unlock_sync(cinfo->message_lockres);
648         if (unlikely(error != 0)) {
649                 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
650                         error);
651                 /* in case the message can't be released due to some reason */
652                 goto failed_ack;
653         }
654 failed_message:
655         return error;
656 }
657
658 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
659 {
660         int ret;
661
662         lock_comm(cinfo);
663         ret = __sendmsg(cinfo, cmsg);
664         unlock_comm(cinfo);
665         return ret;
666 }
667
668 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
669 {
670         struct md_cluster_info *cinfo = mddev->cluster_info;
671         int i, ret = 0;
672         struct dlm_lock_resource *bm_lockres;
673         struct suspend_info *s;
674         char str[64];
675         sector_t lo, hi;
676
677
678         for (i = 0; i < total_slots; i++) {
679                 memset(str, '\0', 64);
680                 snprintf(str, 64, "bitmap%04d", i);
681                 bm_lockres = lockres_init(mddev, str, NULL, 1);
682                 if (!bm_lockres)
683                         return -ENOMEM;
684                 if (i == (cinfo->slot_number - 1))
685                         continue;
686
687                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
688                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
689                 if (ret == -EAGAIN) {
690                         memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
691                         s = read_resync_info(mddev, bm_lockres);
692                         if (s) {
693                                 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
694                                                 __func__, __LINE__,
695                                                 (unsigned long long) s->lo,
696                                                 (unsigned long long) s->hi, i);
697                                 spin_lock_irq(&cinfo->suspend_lock);
698                                 s->slot = i;
699                                 list_add(&s->list, &cinfo->suspend_list);
700                                 spin_unlock_irq(&cinfo->suspend_lock);
701                         }
702                         ret = 0;
703                         lockres_free(bm_lockres);
704                         continue;
705                 }
706                 if (ret) {
707                         lockres_free(bm_lockres);
708                         goto out;
709                 }
710
711                 /* Read the disk bitmap sb and check if it needs recovery */
712                 ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
713                 if (ret) {
714                         pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
715                         lockres_free(bm_lockres);
716                         continue;
717                 }
718                 if ((hi > 0) && (lo < mddev->recovery_cp)) {
719                         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
720                         mddev->recovery_cp = lo;
721                         md_check_recovery(mddev);
722                 }
723
724                 dlm_unlock_sync(bm_lockres);
725                 lockres_free(bm_lockres);
726         }
727 out:
728         return ret;
729 }
730
731 static int join(struct mddev *mddev, int nodes)
732 {
733         struct md_cluster_info *cinfo;
734         int ret, ops_rv;
735         char str[64];
736
737         cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
738         if (!cinfo)
739                 return -ENOMEM;
740
741         INIT_LIST_HEAD(&cinfo->suspend_list);
742         spin_lock_init(&cinfo->suspend_lock);
743         init_completion(&cinfo->completion);
744         set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
745         init_waitqueue_head(&cinfo->wait);
746         mutex_init(&cinfo->recv_mutex);
747
748         mddev->cluster_info = cinfo;
749
750         memset(str, 0, 64);
751         sprintf(str, "%pU", mddev->uuid);
752         ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
753                                 DLM_LSFL_FS, LVB_SIZE,
754                                 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
755         if (ret)
756                 goto err;
757         wait_for_completion(&cinfo->completion);
758         if (nodes < cinfo->slot_number) {
759                 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
760                         cinfo->slot_number, nodes);
761                 ret = -ERANGE;
762                 goto err;
763         }
764         /* Initiate the communication resources */
765         ret = -ENOMEM;
766         cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
767         if (!cinfo->recv_thread) {
768                 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
769                 goto err;
770         }
771         cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
772         if (!cinfo->message_lockres)
773                 goto err;
774         cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
775         if (!cinfo->token_lockres)
776                 goto err;
777         cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
778         if (!cinfo->ack_lockres)
779                 goto err;
780         cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
781         if (!cinfo->no_new_dev_lockres)
782                 goto err;
783
784         /* get sync CR lock on ACK. */
785         if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
786                 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
787                                 ret);
788         /* get sync CR lock on no-new-dev. */
789         if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
790                 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
791
792
793         pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
794         snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
795         cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
796         if (!cinfo->bitmap_lockres)
797                 goto err;
798         if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
799                 pr_err("Failed to get bitmap lock\n");
800                 ret = -EINVAL;
801                 goto err;
802         }
803
804         cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
805         if (!cinfo->resync_lockres)
806                 goto err;
807
808         ret = gather_all_resync_info(mddev, nodes);
809         if (ret)
810                 goto err;
811
812         return 0;
813 err:
814         lockres_free(cinfo->message_lockres);
815         lockres_free(cinfo->token_lockres);
816         lockres_free(cinfo->ack_lockres);
817         lockres_free(cinfo->no_new_dev_lockres);
818         lockres_free(cinfo->resync_lockres);
819         lockres_free(cinfo->bitmap_lockres);
820         if (cinfo->lockspace)
821                 dlm_release_lockspace(cinfo->lockspace, 2);
822         mddev->cluster_info = NULL;
823         kfree(cinfo);
824         return ret;
825 }
826
827 static void resync_bitmap(struct mddev *mddev)
828 {
829         struct md_cluster_info *cinfo = mddev->cluster_info;
830         struct cluster_msg cmsg = {0};
831         int err;
832
833         cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
834         err = sendmsg(cinfo, &cmsg);
835         if (err)
836                 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
837                         __func__, __LINE__, err);
838 }
839
840 static void unlock_all_bitmaps(struct mddev *mddev);
841 static int leave(struct mddev *mddev)
842 {
843         struct md_cluster_info *cinfo = mddev->cluster_info;
844
845         if (!cinfo)
846                 return 0;
847
848         /* BITMAP_NEEDS_SYNC message should be sent when node
849          * is leaving the cluster with dirty bitmap, also we
850          * can only deliver it when dlm connection is available */
851         if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
852                 resync_bitmap(mddev);
853
854         md_unregister_thread(&cinfo->recovery_thread);
855         md_unregister_thread(&cinfo->recv_thread);
856         lockres_free(cinfo->message_lockres);
857         lockres_free(cinfo->token_lockres);
858         lockres_free(cinfo->ack_lockres);
859         lockres_free(cinfo->no_new_dev_lockres);
860         lockres_free(cinfo->bitmap_lockres);
861         unlock_all_bitmaps(mddev);
862         dlm_release_lockspace(cinfo->lockspace, 2);
863         return 0;
864 }
865
866 /* slot_number(): Returns the MD slot number to use
867  * DLM starts the slot numbers from 1, wheras cluster-md
868  * wants the number to be from zero, so we deduct one
869  */
870 static int slot_number(struct mddev *mddev)
871 {
872         struct md_cluster_info *cinfo = mddev->cluster_info;
873
874         return cinfo->slot_number - 1;
875 }
876
877 /*
878  * Check if the communication is already locked, else lock the communication
879  * channel.
880  * If it is already locked, token is in EX mode, and hence lock_token()
881  * should not be called.
882  */
883 static int metadata_update_start(struct mddev *mddev)
884 {
885         struct md_cluster_info *cinfo = mddev->cluster_info;
886
887         wait_event(cinfo->wait,
888                    !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
889                    test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
890
891         /* If token is already locked, return 0 */
892         if (cinfo->token_lockres->mode == DLM_LOCK_EX)
893                 return 0;
894
895         return lock_token(cinfo);
896 }
897
898 static int metadata_update_finish(struct mddev *mddev)
899 {
900         struct md_cluster_info *cinfo = mddev->cluster_info;
901         struct cluster_msg cmsg;
902         struct md_rdev *rdev;
903         int ret = 0;
904         int raid_slot = -1;
905
906         memset(&cmsg, 0, sizeof(cmsg));
907         cmsg.type = cpu_to_le32(METADATA_UPDATED);
908         /* Pick up a good active device number to send.
909          */
910         rdev_for_each(rdev, mddev)
911                 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
912                         raid_slot = rdev->desc_nr;
913                         break;
914                 }
915         if (raid_slot >= 0) {
916                 cmsg.raid_slot = cpu_to_le32(raid_slot);
917                 ret = __sendmsg(cinfo, &cmsg);
918         } else
919                 pr_warn("md-cluster: No good device id found to send\n");
920         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
921         unlock_comm(cinfo);
922         return ret;
923 }
924
925 static void metadata_update_cancel(struct mddev *mddev)
926 {
927         struct md_cluster_info *cinfo = mddev->cluster_info;
928         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
929         unlock_comm(cinfo);
930 }
931
932 static int resync_start(struct mddev *mddev)
933 {
934         struct md_cluster_info *cinfo = mddev->cluster_info;
935         cinfo->resync_lockres->flags |= DLM_LKF_NOQUEUE;
936         return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX);
937 }
938
939 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
940 {
941         struct md_cluster_info *cinfo = mddev->cluster_info;
942         struct resync_info ri;
943         struct cluster_msg cmsg = {0};
944
945         /* do not send zero again, if we have sent before */
946         if (hi == 0) {
947                 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
948                 if (le64_to_cpu(ri.hi) == 0)
949                         return 0;
950         }
951
952         add_resync_info(cinfo->bitmap_lockres, lo, hi);
953         /* Re-acquire the lock to refresh LVB */
954         dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
955         cmsg.type = cpu_to_le32(RESYNCING);
956         cmsg.low = cpu_to_le64(lo);
957         cmsg.high = cpu_to_le64(hi);
958
959         return sendmsg(cinfo, &cmsg);
960 }
961
962 static int resync_finish(struct mddev *mddev)
963 {
964         struct md_cluster_info *cinfo = mddev->cluster_info;
965         cinfo->resync_lockres->flags &= ~DLM_LKF_NOQUEUE;
966         dlm_unlock_sync(cinfo->resync_lockres);
967         return resync_info_update(mddev, 0, 0);
968 }
969
970 static int area_resyncing(struct mddev *mddev, int direction,
971                 sector_t lo, sector_t hi)
972 {
973         struct md_cluster_info *cinfo = mddev->cluster_info;
974         int ret = 0;
975         struct suspend_info *s;
976
977         if ((direction == READ) &&
978                 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
979                 return 1;
980
981         spin_lock_irq(&cinfo->suspend_lock);
982         if (list_empty(&cinfo->suspend_list))
983                 goto out;
984         list_for_each_entry(s, &cinfo->suspend_list, list)
985                 if (hi > s->lo && lo < s->hi) {
986                         ret = 1;
987                         break;
988                 }
989 out:
990         spin_unlock_irq(&cinfo->suspend_lock);
991         return ret;
992 }
993
994 /* add_new_disk() - initiates a disk add
995  * However, if this fails before writing md_update_sb(),
996  * add_new_disk_cancel() must be called to release token lock
997  */
998 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
999 {
1000         struct md_cluster_info *cinfo = mddev->cluster_info;
1001         struct cluster_msg cmsg;
1002         int ret = 0;
1003         struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1004         char *uuid = sb->device_uuid;
1005
1006         memset(&cmsg, 0, sizeof(cmsg));
1007         cmsg.type = cpu_to_le32(NEWDISK);
1008         memcpy(cmsg.uuid, uuid, 16);
1009         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1010         lock_comm(cinfo);
1011         ret = __sendmsg(cinfo, &cmsg);
1012         if (ret)
1013                 return ret;
1014         cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1015         ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1016         cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1017         /* Some node does not "see" the device */
1018         if (ret == -EAGAIN)
1019                 ret = -ENOENT;
1020         if (ret)
1021                 unlock_comm(cinfo);
1022         else {
1023                 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1024                 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1025                 wake_up(&cinfo->wait);
1026         }
1027         return ret;
1028 }
1029
1030 static void add_new_disk_cancel(struct mddev *mddev)
1031 {
1032         struct md_cluster_info *cinfo = mddev->cluster_info;
1033         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1034         unlock_comm(cinfo);
1035 }
1036
1037 static int new_disk_ack(struct mddev *mddev, bool ack)
1038 {
1039         struct md_cluster_info *cinfo = mddev->cluster_info;
1040
1041         if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1042                 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1043                 return -EINVAL;
1044         }
1045
1046         if (ack)
1047                 dlm_unlock_sync(cinfo->no_new_dev_lockres);
1048         complete(&cinfo->newdisk_completion);
1049         return 0;
1050 }
1051
1052 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1053 {
1054         struct cluster_msg cmsg = {0};
1055         struct md_cluster_info *cinfo = mddev->cluster_info;
1056         cmsg.type = cpu_to_le32(REMOVE);
1057         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1058         return sendmsg(cinfo, &cmsg);
1059 }
1060
1061 static int lock_all_bitmaps(struct mddev *mddev)
1062 {
1063         int slot, my_slot, ret, held = 1, i = 0;
1064         char str[64];
1065         struct md_cluster_info *cinfo = mddev->cluster_info;
1066
1067         cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
1068                                              sizeof(struct dlm_lock_resource *),
1069                                              GFP_KERNEL);
1070         if (!cinfo->other_bitmap_lockres) {
1071                 pr_err("md: can't alloc mem for other bitmap locks\n");
1072                 return 0;
1073         }
1074
1075         my_slot = slot_number(mddev);
1076         for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1077                 if (slot == my_slot)
1078                         continue;
1079
1080                 memset(str, '\0', 64);
1081                 snprintf(str, 64, "bitmap%04d", slot);
1082                 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1083                 if (!cinfo->other_bitmap_lockres[i])
1084                         return -ENOMEM;
1085
1086                 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1087                 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1088                 if (ret)
1089                         held = -1;
1090                 i++;
1091         }
1092
1093         return held;
1094 }
1095
1096 static void unlock_all_bitmaps(struct mddev *mddev)
1097 {
1098         struct md_cluster_info *cinfo = mddev->cluster_info;
1099         int i;
1100
1101         /* release other node's bitmap lock if they are existed */
1102         if (cinfo->other_bitmap_lockres) {
1103                 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1104                         if (cinfo->other_bitmap_lockres[i]) {
1105                                 dlm_unlock_sync(cinfo->other_bitmap_lockres[i]);
1106                                 lockres_free(cinfo->other_bitmap_lockres[i]);
1107                         }
1108                 }
1109                 kfree(cinfo->other_bitmap_lockres);
1110         }
1111 }
1112
1113 static int gather_bitmaps(struct md_rdev *rdev)
1114 {
1115         int sn, err;
1116         sector_t lo, hi;
1117         struct cluster_msg cmsg = {0};
1118         struct mddev *mddev = rdev->mddev;
1119         struct md_cluster_info *cinfo = mddev->cluster_info;
1120
1121         cmsg.type = cpu_to_le32(RE_ADD);
1122         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1123         err = sendmsg(cinfo, &cmsg);
1124         if (err)
1125                 goto out;
1126
1127         for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1128                 if (sn == (cinfo->slot_number - 1))
1129                         continue;
1130                 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1131                 if (err) {
1132                         pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1133                         goto out;
1134                 }
1135                 if ((hi > 0) && (lo < mddev->recovery_cp))
1136                         mddev->recovery_cp = lo;
1137         }
1138 out:
1139         return err;
1140 }
1141
1142 static struct md_cluster_operations cluster_ops = {
1143         .join   = join,
1144         .leave  = leave,
1145         .slot_number = slot_number,
1146         .resync_start = resync_start,
1147         .resync_finish = resync_finish,
1148         .resync_info_update = resync_info_update,
1149         .metadata_update_start = metadata_update_start,
1150         .metadata_update_finish = metadata_update_finish,
1151         .metadata_update_cancel = metadata_update_cancel,
1152         .area_resyncing = area_resyncing,
1153         .add_new_disk = add_new_disk,
1154         .add_new_disk_cancel = add_new_disk_cancel,
1155         .new_disk_ack = new_disk_ack,
1156         .remove_disk = remove_disk,
1157         .gather_bitmaps = gather_bitmaps,
1158         .lock_all_bitmaps = lock_all_bitmaps,
1159         .unlock_all_bitmaps = unlock_all_bitmaps,
1160 };
1161
1162 static int __init cluster_init(void)
1163 {
1164         pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1165         pr_info("Registering Cluster MD functions\n");
1166         register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1167         return 0;
1168 }
1169
1170 static void cluster_exit(void)
1171 {
1172         unregister_md_cluster_operations();
1173 }
1174
1175 module_init(cluster_init);
1176 module_exit(cluster_exit);
1177 MODULE_AUTHOR("SUSE");
1178 MODULE_LICENSE("GPL");
1179 MODULE_DESCRIPTION("Clustering support for MD");