md-cluster/raid10: support add disk under grow mode
[linux-block.git] / drivers / md / md-cluster.c
CommitLineData
8e854e9c
GR
1/*
2 * Copyright (C) 2015, SUSE
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
7 * any later version.
8 *
9 */
10
11
12#include <linux/module.h>
7bcda714 13#include <linux/kthread.h>
47741b7c
GR
14#include <linux/dlm.h>
15#include <linux/sched.h>
1aee41f6 16#include <linux/raid/md_p.h>
47741b7c 17#include "md.h"
935fe098 18#include "md-bitmap.h"
edb39c9d 19#include "md-cluster.h"
47741b7c
GR
20
21#define LVB_SIZE 64
1aee41f6 22#define NEW_DEV_TIMEOUT 5000
47741b7c
GR
23
24struct dlm_lock_resource {
25 dlm_lockspace_t *ls;
26 struct dlm_lksb lksb;
27 char *name; /* lock name. */
28 uint32_t flags; /* flags to pass to dlm_lock() */
fccb60a4
GJ
29 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
30 bool sync_locking_done;
c4ce867f
GR
31 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
32 struct mddev *mddev; /* pointing back to mddev. */
dbb64f86 33 int mode;
c4ce867f
GR
34};
35
96ae923a
GR
36struct suspend_info {
37 int slot;
38 sector_t lo;
39 sector_t hi;
40 struct list_head list;
41};
42
43struct resync_info {
44 __le64 lo;
45 __le64 hi;
46};
47
fa8259da
GR
48/* md_cluster_info flags */
49#define MD_CLUSTER_WAITING_FOR_NEWDISK 1
90382ed9 50#define MD_CLUSTER_SUSPEND_READ_BALANCING 2
eece075c 51#define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
fa8259da 52
8b9277c8
GJ
53/* Lock the send communication. This is done through
54 * bit manipulation as opposed to a mutex in order to
55 * accomodate lock and hold. See next comment.
56 */
57#define MD_CLUSTER_SEND_LOCK 4
e19508fa
GJ
58/* If cluster operations (such as adding a disk) must lock the
59 * communication channel, so as to perform extra operations
60 * (update metadata) and no other operation is allowed on the
61 * MD. Token needs to be locked and held until the operation
62 * completes witha md_update_sb(), which would eventually release
63 * the lock.
8b9277c8
GJ
64 */
65#define MD_CLUSTER_SEND_LOCKED_ALREADY 5
51e453ae
GJ
66/* We should receive message after node joined cluster and
67 * set up all the related infos such as bitmap and personality */
68#define MD_CLUSTER_ALREADY_IN_CLUSTER 6
69#define MD_CLUSTER_PENDING_RECV_EVENT 7
0ba95977 70#define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8
fa8259da 71
c4ce867f 72struct md_cluster_info {
0ba95977 73 struct mddev *mddev; /* the md device which md_cluster_info belongs to */
c4ce867f
GR
74 /* dlm lock space and resources for clustered raid. */
75 dlm_lockspace_t *lockspace;
cf921cc1
GR
76 int slot_number;
77 struct completion completion;
8b9277c8 78 struct mutex recv_mutex;
54519c5f 79 struct dlm_lock_resource *bitmap_lockres;
f6a2dc64 80 struct dlm_lock_resource **other_bitmap_lockres;
c186b128 81 struct dlm_lock_resource *resync_lockres;
96ae923a
GR
82 struct list_head suspend_list;
83 spinlock_t suspend_lock;
e94987db
GR
84 struct md_thread *recovery_thread;
85 unsigned long recovery_map;
4664680c
GR
86 /* communication loc resources */
87 struct dlm_lock_resource *ack_lockres;
88 struct dlm_lock_resource *message_lockres;
89 struct dlm_lock_resource *token_lockres;
1aee41f6 90 struct dlm_lock_resource *no_new_dev_lockres;
4664680c 91 struct md_thread *recv_thread;
1aee41f6 92 struct completion newdisk_completion;
8b9277c8 93 wait_queue_head_t wait;
fa8259da 94 unsigned long state;
18c9ff7f
GJ
95 /* record the region in RESYNCING message */
96 sector_t sync_low;
97 sector_t sync_hi;
4664680c
GR
98};
99
100enum msg_type {
101 METADATA_UPDATED = 0,
102 RESYNCING,
1aee41f6 103 NEWDISK,
88bcfef7 104 REMOVE,
97f6cd39 105 RE_ADD,
dc737d7c 106 BITMAP_NEEDS_SYNC,
7da3d203 107 CHANGE_CAPACITY,
afd75628 108 BITMAP_RESIZE,
4664680c
GR
109};
110
111struct cluster_msg {
cf97a348
GJ
112 __le32 type;
113 __le32 slot;
1aee41f6 114 /* TODO: Unionize this for smaller footprint */
cf97a348
GJ
115 __le64 low;
116 __le64 high;
1aee41f6 117 char uuid[16];
cf97a348 118 __le32 raid_slot;
47741b7c
GR
119};
120
121static void sync_ast(void *arg)
122{
123 struct dlm_lock_resource *res;
124
2e2a7cd9 125 res = arg;
fccb60a4
GJ
126 res->sync_locking_done = true;
127 wake_up(&res->sync_locking);
47741b7c
GR
128}
129
130static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
131{
132 int ret = 0;
133
47741b7c
GR
134 ret = dlm_lock(res->ls, mode, &res->lksb,
135 res->flags, res->name, strlen(res->name),
136 0, sync_ast, res, res->bast);
137 if (ret)
138 return ret;
fccb60a4
GJ
139 wait_event(res->sync_locking, res->sync_locking_done);
140 res->sync_locking_done = false;
dbb64f86
GR
141 if (res->lksb.sb_status == 0)
142 res->mode = mode;
47741b7c
GR
143 return res->lksb.sb_status;
144}
145
146static int dlm_unlock_sync(struct dlm_lock_resource *res)
147{
148 return dlm_lock_sync(res, DLM_LOCK_NL);
149}
150
7bcda714
GJ
151/*
152 * An variation of dlm_lock_sync, which make lock request could
153 * be interrupted
154 */
155static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
156 struct mddev *mddev)
157{
158 int ret = 0;
159
160 ret = dlm_lock(res->ls, mode, &res->lksb,
161 res->flags, res->name, strlen(res->name),
162 0, sync_ast, res, res->bast);
163 if (ret)
164 return ret;
165
166 wait_event(res->sync_locking, res->sync_locking_done
d6385db9
GJ
167 || kthread_should_stop()
168 || test_bit(MD_CLOSING, &mddev->flags));
7bcda714
GJ
169 if (!res->sync_locking_done) {
170 /*
171 * the convert queue contains the lock request when request is
172 * interrupted, and sync_ast could still be run, so need to
173 * cancel the request and reset completion
174 */
175 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
176 &res->lksb, res);
177 res->sync_locking_done = false;
178 if (unlikely(ret != 0))
179 pr_info("failed to cancel previous lock request "
180 "%s return %d\n", res->name, ret);
181 return -EPERM;
182 } else
183 res->sync_locking_done = false;
184 if (res->lksb.sb_status == 0)
185 res->mode = mode;
186 return res->lksb.sb_status;
187}
188
c4ce867f 189static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
47741b7c
GR
190 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
191{
192 struct dlm_lock_resource *res = NULL;
193 int ret, namelen;
c4ce867f 194 struct md_cluster_info *cinfo = mddev->cluster_info;
47741b7c
GR
195
196 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
197 if (!res)
198 return NULL;
fccb60a4
GJ
199 init_waitqueue_head(&res->sync_locking);
200 res->sync_locking_done = false;
c4ce867f
GR
201 res->ls = cinfo->lockspace;
202 res->mddev = mddev;
dbb64f86 203 res->mode = DLM_LOCK_IV;
47741b7c
GR
204 namelen = strlen(name);
205 res->name = kzalloc(namelen + 1, GFP_KERNEL);
206 if (!res->name) {
207 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
208 goto out_err;
209 }
210 strlcpy(res->name, name, namelen + 1);
211 if (with_lvb) {
212 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
213 if (!res->lksb.sb_lvbptr) {
214 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
215 goto out_err;
216 }
217 res->flags = DLM_LKF_VALBLK;
218 }
219
220 if (bastfn)
221 res->bast = bastfn;
222
223 res->flags |= DLM_LKF_EXPEDITE;
224
225 ret = dlm_lock_sync(res, DLM_LOCK_NL);
226 if (ret) {
227 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
228 goto out_err;
229 }
230 res->flags &= ~DLM_LKF_EXPEDITE;
231 res->flags |= DLM_LKF_CONVERT;
232
233 return res;
234out_err:
235 kfree(res->lksb.sb_lvbptr);
236 kfree(res->name);
237 kfree(res);
238 return NULL;
239}
240
241static void lockres_free(struct dlm_lock_resource *res)
242{
400cb454 243 int ret = 0;
b5ef5678 244
47741b7c
GR
245 if (!res)
246 return;
247
400cb454
GJ
248 /*
249 * use FORCEUNLOCK flag, so we can unlock even the lock is on the
250 * waiting or convert queue
251 */
252 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
253 &res->lksb, res);
254 if (unlikely(ret != 0))
255 pr_err("failed to unlock %s return %d\n", res->name, ret);
256 else
fccb60a4 257 wait_event(res->sync_locking, res->sync_locking_done);
47741b7c
GR
258
259 kfree(res->name);
260 kfree(res->lksb.sb_lvbptr);
261 kfree(res);
262}
8e854e9c 263
30661b49
N
264static void add_resync_info(struct dlm_lock_resource *lockres,
265 sector_t lo, sector_t hi)
96ae923a
GR
266{
267 struct resync_info *ri;
268
269 ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
270 ri->lo = cpu_to_le64(lo);
271 ri->hi = cpu_to_le64(hi);
272}
273
274static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
275{
276 struct resync_info ri;
277 struct suspend_info *s = NULL;
278 sector_t hi = 0;
279
280 dlm_lock_sync(lockres, DLM_LOCK_CR);
281 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
282 hi = le64_to_cpu(ri.hi);
cf97a348 283 if (hi > 0) {
96ae923a
GR
284 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
285 if (!s)
286 goto out;
287 s->hi = hi;
288 s->lo = le64_to_cpu(ri.lo);
289 }
290 dlm_unlock_sync(lockres);
291out:
292 return s;
293}
294
6dc69c9c 295static void recover_bitmaps(struct md_thread *thread)
e94987db
GR
296{
297 struct mddev *mddev = thread->mddev;
298 struct md_cluster_info *cinfo = mddev->cluster_info;
299 struct dlm_lock_resource *bm_lockres;
300 char str[64];
301 int slot, ret;
302 struct suspend_info *s, *tmp;
303 sector_t lo, hi;
304
305 while (cinfo->recovery_map) {
306 slot = fls64((u64)cinfo->recovery_map) - 1;
307
e94987db
GR
308 snprintf(str, 64, "bitmap%04d", slot);
309 bm_lockres = lockres_init(mddev, str, NULL, 1);
310 if (!bm_lockres) {
311 pr_err("md-cluster: Cannot initialize bitmaps\n");
312 goto clear_bit;
313 }
314
7bcda714 315 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
e94987db
GR
316 if (ret) {
317 pr_err("md-cluster: Could not DLM lock %s: %d\n",
318 str, ret);
319 goto clear_bit;
320 }
e64e4018 321 ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
4b26a08a 322 if (ret) {
e94987db 323 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
e3f924d3 324 goto clear_bit;
4b26a08a 325 }
010228e4
GJ
326
327 /* Clear suspend_area associated with the bitmap */
328 spin_lock_irq(&cinfo->suspend_lock);
329 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
330 if (slot == s->slot) {
331 list_del(&s->list);
332 kfree(s);
333 }
334 spin_unlock_irq(&cinfo->suspend_lock);
335
4b26a08a 336 if (hi > 0) {
4b26a08a
GR
337 if (lo < mddev->recovery_cp)
338 mddev->recovery_cp = lo;
eb315cd0
GJ
339 /* wake up thread to continue resync in case resync
340 * is not finished */
341 if (mddev->recovery_cp != MaxSector) {
0357ba27
GJ
342 /*
343 * clear the REMOTE flag since we will launch
344 * resync thread in current node.
345 */
346 clear_bit(MD_RESYNCING_REMOTE,
347 &mddev->recovery);
348 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
349 md_wakeup_thread(mddev->thread);
eb315cd0 350 }
4b26a08a 351 }
e94987db 352clear_bit:
4ac7a65f 353 lockres_free(bm_lockres);
e94987db
GR
354 clear_bit(slot, &cinfo->recovery_map);
355 }
356}
357
cf921cc1
GR
358static void recover_prep(void *arg)
359{
90382ed9
GR
360 struct mddev *mddev = arg;
361 struct md_cluster_info *cinfo = mddev->cluster_info;
362 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
cf921cc1
GR
363}
364
05cd0e51 365static void __recover_slot(struct mddev *mddev, int slot)
cf921cc1 366{
cf921cc1
GR
367 struct md_cluster_info *cinfo = mddev->cluster_info;
368
05cd0e51 369 set_bit(slot, &cinfo->recovery_map);
e94987db
GR
370 if (!cinfo->recovery_thread) {
371 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
372 mddev, "recover");
373 if (!cinfo->recovery_thread) {
374 pr_warn("md-cluster: Could not create recovery thread\n");
375 return;
376 }
377 }
378 md_wakeup_thread(cinfo->recovery_thread);
cf921cc1
GR
379}
380
05cd0e51
GJ
381static void recover_slot(void *arg, struct dlm_slot *slot)
382{
383 struct mddev *mddev = arg;
384 struct md_cluster_info *cinfo = mddev->cluster_info;
385
386 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
387 mddev->bitmap_info.cluster_name,
388 slot->nodeid, slot->slot,
389 cinfo->slot_number);
390 /* deduct one since dlm slot starts from one while the num of
391 * cluster-md begins with 0 */
392 __recover_slot(mddev, slot->slot - 1);
393}
394
cf921cc1
GR
395static void recover_done(void *arg, struct dlm_slot *slots,
396 int num_slots, int our_slot,
397 uint32_t generation)
398{
399 struct mddev *mddev = arg;
400 struct md_cluster_info *cinfo = mddev->cluster_info;
401
402 cinfo->slot_number = our_slot;
eece075c
GJ
403 /* completion is only need to be complete when node join cluster,
404 * it doesn't need to run during another node's failure */
405 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
406 complete(&cinfo->completion);
407 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
408 }
90382ed9 409 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
cf921cc1
GR
410}
411
eece075c
GJ
412/* the ops is called when node join the cluster, and do lock recovery
413 * if node failure occurs */
cf921cc1
GR
414static const struct dlm_lockspace_ops md_ls_ops = {
415 .recover_prep = recover_prep,
416 .recover_slot = recover_slot,
417 .recover_done = recover_done,
418};
419
4664680c
GR
420/*
421 * The BAST function for the ack lock resource
422 * This function wakes up the receive thread in
423 * order to receive and process the message.
424 */
425static void ack_bast(void *arg, int mode)
426{
2e2a7cd9 427 struct dlm_lock_resource *res = arg;
4664680c
GR
428 struct md_cluster_info *cinfo = res->mddev->cluster_info;
429
51e453ae
GJ
430 if (mode == DLM_LOCK_EX) {
431 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
432 md_wakeup_thread(cinfo->recv_thread);
433 else
434 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
435 }
4664680c
GR
436}
437
e59721cc
GR
438static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
439{
440 struct suspend_info *s, *tmp;
441
442 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
443 if (slot == s->slot) {
e59721cc
GR
444 list_del(&s->list);
445 kfree(s);
446 break;
447 }
448}
449
b8ca846e 450static void remove_suspend_info(struct mddev *mddev, int slot)
e59721cc 451{
b8ca846e 452 struct md_cluster_info *cinfo = mddev->cluster_info;
b03e0ccb 453 mddev->pers->quiesce(mddev, 1);
e59721cc
GR
454 spin_lock_irq(&cinfo->suspend_lock);
455 __remove_suspend_info(cinfo, slot);
456 spin_unlock_irq(&cinfo->suspend_lock);
b03e0ccb 457 mddev->pers->quiesce(mddev, 0);
e59721cc
GR
458}
459
460
9ed38ff5 461static void process_suspend_info(struct mddev *mddev,
e59721cc
GR
462 int slot, sector_t lo, sector_t hi)
463{
9ed38ff5 464 struct md_cluster_info *cinfo = mddev->cluster_info;
e59721cc
GR
465 struct suspend_info *s;
466
467 if (!hi) {
0357ba27
GJ
468 /*
469 * clear the REMOTE flag since resync or recovery is finished
470 * in remote node.
471 */
472 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
b8ca846e 473 remove_suspend_info(mddev, slot);
c186b128
GR
474 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
475 md_wakeup_thread(mddev->thread);
e59721cc
GR
476 return;
477 }
18c9ff7f
GJ
478
479 /*
480 * The bitmaps are not same for different nodes
481 * if RESYNCING is happening in one node, then
482 * the node which received the RESYNCING message
483 * probably will perform resync with the region
484 * [lo, hi] again, so we could reduce resync time
485 * a lot if we can ensure that the bitmaps among
486 * different nodes are match up well.
487 *
488 * sync_low/hi is used to record the region which
489 * arrived in the previous RESYNCING message,
490 *
491 * Call bitmap_sync_with_cluster to clear
492 * NEEDED_MASK and set RESYNC_MASK since
493 * resync thread is running in another node,
494 * so we don't need to do the resync again
495 * with the same section */
e64e4018 496 md_bitmap_sync_with_cluster(mddev, cinfo->sync_low, cinfo->sync_hi, lo, hi);
18c9ff7f
GJ
497 cinfo->sync_low = lo;
498 cinfo->sync_hi = hi;
499
e59721cc
GR
500 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
501 if (!s)
502 return;
503 s->slot = slot;
504 s->lo = lo;
505 s->hi = hi;
9ed38ff5 506 mddev->pers->quiesce(mddev, 1);
e59721cc
GR
507 spin_lock_irq(&cinfo->suspend_lock);
508 /* Remove existing entry (if exists) before adding */
509 __remove_suspend_info(cinfo, slot);
510 list_add(&s->list, &cinfo->suspend_list);
511 spin_unlock_irq(&cinfo->suspend_lock);
b03e0ccb 512 mddev->pers->quiesce(mddev, 0);
e59721cc
GR
513}
514
1aee41f6
GR
515static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
516{
517 char disk_uuid[64];
518 struct md_cluster_info *cinfo = mddev->cluster_info;
519 char event_name[] = "EVENT=ADD_DEVICE";
520 char raid_slot[16];
521 char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
522 int len;
523
524 len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
b89f704a 525 sprintf(disk_uuid + len, "%pU", cmsg->uuid);
faeff83f 526 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
1aee41f6
GR
527 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
528 init_completion(&cinfo->newdisk_completion);
fa8259da 529 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
1aee41f6
GR
530 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
531 wait_for_completion_timeout(&cinfo->newdisk_completion,
532 NEW_DEV_TIMEOUT);
fa8259da 533 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
1aee41f6
GR
534}
535
536
537static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
538{
0ba95977 539 int got_lock = 0;
1aee41f6 540 struct md_cluster_info *cinfo = mddev->cluster_info;
15858fa5 541 mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
0ba95977 542
1aee41f6 543 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
0ba95977
GJ
544 wait_event(mddev->thread->wqueue,
545 (got_lock = mddev_trylock(mddev)) ||
546 test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
547 md_reload_sb(mddev, mddev->good_device_nr);
548 if (got_lock)
549 mddev_unlock(mddev);
1aee41f6
GR
550}
551
88bcfef7
GR
552static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
553{
5f0aa21d 554 struct md_rdev *rdev;
88bcfef7 555
5f0aa21d
GJ
556 rcu_read_lock();
557 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
659b254f
GJ
558 if (rdev) {
559 set_bit(ClusterRemove, &rdev->flags);
560 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
561 md_wakeup_thread(mddev->thread);
562 }
88bcfef7 563 else
faeff83f
GJ
564 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
565 __func__, __LINE__, le32_to_cpu(msg->raid_slot));
5f0aa21d 566 rcu_read_unlock();
88bcfef7
GR
567}
568
97f6cd39
GR
569static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
570{
5f0aa21d 571 struct md_rdev *rdev;
97f6cd39 572
5f0aa21d
GJ
573 rcu_read_lock();
574 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
97f6cd39
GR
575 if (rdev && test_bit(Faulty, &rdev->flags))
576 clear_bit(Faulty, &rdev->flags);
577 else
faeff83f
GJ
578 pr_warn("%s: %d Could not find disk(%d) which is faulty",
579 __func__, __LINE__, le32_to_cpu(msg->raid_slot));
5f0aa21d 580 rcu_read_unlock();
97f6cd39
GR
581}
582
1fa9a1ad 583static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
4664680c 584{
1fa9a1ad
GJ
585 int ret = 0;
586
256f5b24
GJ
587 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
588 "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
1fa9a1ad 589 return -1;
cf97a348 590 switch (le32_to_cpu(msg->type)) {
4664680c 591 case METADATA_UPDATED:
1aee41f6 592 process_metadata_update(mddev, msg);
4664680c 593 break;
7da3d203
GJ
594 case CHANGE_CAPACITY:
595 set_capacity(mddev->gendisk, mddev->array_sectors);
596 revalidate_disk(mddev->gendisk);
597 break;
4664680c 598 case RESYNCING:
0357ba27 599 set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
cf97a348
GJ
600 process_suspend_info(mddev, le32_to_cpu(msg->slot),
601 le64_to_cpu(msg->low),
602 le64_to_cpu(msg->high));
4664680c 603 break;
1aee41f6 604 case NEWDISK:
1aee41f6 605 process_add_new_disk(mddev, msg);
88bcfef7
GR
606 break;
607 case REMOVE:
88bcfef7
GR
608 process_remove_disk(mddev, msg);
609 break;
97f6cd39 610 case RE_ADD:
97f6cd39
GR
611 process_readd_disk(mddev, msg);
612 break;
dc737d7c 613 case BITMAP_NEEDS_SYNC:
cf97a348 614 __recover_slot(mddev, le32_to_cpu(msg->slot));
dc737d7c 615 break;
afd75628
GJ
616 case BITMAP_RESIZE:
617 if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
618 ret = md_bitmap_resize(mddev->bitmap,
619 le64_to_cpu(msg->high), 0, 0);
620 break;
88bcfef7 621 default:
1fa9a1ad 622 ret = -1;
88bcfef7
GR
623 pr_warn("%s:%d Received unknown message from %d\n",
624 __func__, __LINE__, msg->slot);
09dd1af2 625 }
1fa9a1ad 626 return ret;
4664680c
GR
627}
628
629/*
630 * thread for receiving message
631 */
632static void recv_daemon(struct md_thread *thread)
633{
634 struct md_cluster_info *cinfo = thread->mddev->cluster_info;
635 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
636 struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
637 struct cluster_msg msg;
b5ef5678 638 int ret;
4664680c 639
8b9277c8 640 mutex_lock(&cinfo->recv_mutex);
4664680c
GR
641 /*get CR on Message*/
642 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
643 pr_err("md/raid1:failed to get CR on MESSAGE\n");
8b9277c8 644 mutex_unlock(&cinfo->recv_mutex);
4664680c
GR
645 return;
646 }
647
648 /* read lvb and wake up thread to process this message_lockres */
649 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
1fa9a1ad
GJ
650 ret = process_recvd_msg(thread->mddev, &msg);
651 if (ret)
652 goto out;
4664680c
GR
653
654 /*release CR on ack_lockres*/
b5ef5678
GJ
655 ret = dlm_unlock_sync(ack_lockres);
656 if (unlikely(ret != 0))
657 pr_info("unlock ack failed return %d\n", ret);
66099bb0 658 /*up-convert to PR on message_lockres*/
b5ef5678
GJ
659 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
660 if (unlikely(ret != 0))
661 pr_info("lock PR on msg failed return %d\n", ret);
4664680c 662 /*get CR on ack_lockres again*/
b5ef5678
GJ
663 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
664 if (unlikely(ret != 0))
665 pr_info("lock CR on ack failed return %d\n", ret);
1fa9a1ad 666out:
4664680c 667 /*release CR on message_lockres*/
b5ef5678
GJ
668 ret = dlm_unlock_sync(message_lockres);
669 if (unlikely(ret != 0))
670 pr_info("unlock msg failed return %d\n", ret);
8b9277c8 671 mutex_unlock(&cinfo->recv_mutex);
4664680c
GR
672}
673
8b9277c8 674/* lock_token()
601b515c
GR
675 * Takes the lock on the TOKEN lock resource so no other
676 * node can communicate while the operation is underway.
677 */
0ba95977 678static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked)
601b515c 679{
0ba95977
GJ
680 int error, set_bit = 0;
681 struct mddev *mddev = cinfo->mddev;
601b515c 682
0ba95977
GJ
683 /*
684 * If resync thread run after raid1d thread, then process_metadata_update
685 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
686 * since another node already got EX on Token and waitting the EX of Ack),
687 * so let resync wake up thread in case flag is set.
688 */
689 if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
690 &cinfo->state)) {
691 error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
692 &cinfo->state);
693 WARN_ON_ONCE(error);
694 md_wakeup_thread(mddev->thread);
695 set_bit = 1;
696 }
601b515c 697 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
0ba95977
GJ
698 if (set_bit)
699 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
700
601b515c
GR
701 if (error)
702 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
703 __func__, __LINE__, error);
8b9277c8
GJ
704
705 /* Lock the receive sequence */
706 mutex_lock(&cinfo->recv_mutex);
601b515c
GR
707 return error;
708}
709
8b9277c8
GJ
710/* lock_comm()
711 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
712 */
0ba95977 713static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
8b9277c8
GJ
714{
715 wait_event(cinfo->wait,
716 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
717
0ba95977 718 return lock_token(cinfo, mddev_locked);
8b9277c8
GJ
719}
720
601b515c
GR
721static void unlock_comm(struct md_cluster_info *cinfo)
722{
dbb64f86 723 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
8b9277c8 724 mutex_unlock(&cinfo->recv_mutex);
601b515c 725 dlm_unlock_sync(cinfo->token_lockres);
8b9277c8
GJ
726 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
727 wake_up(&cinfo->wait);
601b515c
GR
728}
729
730/* __sendmsg()
731 * This function performs the actual sending of the message. This function is
732 * usually called after performing the encompassing operation
733 * The function:
734 * 1. Grabs the message lockresource in EX mode
735 * 2. Copies the message to the message LVB
66099bb0 736 * 3. Downconverts message lockresource to CW
601b515c
GR
737 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
738 * and the other nodes read the message. The thread will wait here until all other
739 * nodes have released ack lock resource.
740 * 5. Downconvert ack lockresource to CR
741 */
742static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
743{
744 int error;
745 int slot = cinfo->slot_number - 1;
746
747 cmsg->slot = cpu_to_le32(slot);
748 /*get EX on Message*/
749 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
750 if (error) {
751 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
752 goto failed_message;
753 }
754
755 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
756 sizeof(struct cluster_msg));
66099bb0
GJ
757 /*down-convert EX to CW on Message*/
758 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
601b515c 759 if (error) {
66099bb0 760 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
601b515c 761 error);
66099bb0 762 goto failed_ack;
601b515c
GR
763 }
764
765 /*up-convert CR to EX on Ack*/
766 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
767 if (error) {
768 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
769 error);
770 goto failed_ack;
771 }
772
773 /*down-convert EX to CR on Ack*/
774 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
775 if (error) {
776 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
777 error);
778 goto failed_ack;
779 }
780
781failed_ack:
b5ef5678
GJ
782 error = dlm_unlock_sync(cinfo->message_lockres);
783 if (unlikely(error != 0)) {
784 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
785 error);
786 /* in case the message can't be released due to some reason */
787 goto failed_ack;
788 }
601b515c
GR
789failed_message:
790 return error;
791}
792
0ba95977
GJ
793static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
794 bool mddev_locked)
601b515c
GR
795{
796 int ret;
797
0ba95977 798 lock_comm(cinfo, mddev_locked);
601b515c
GR
799 ret = __sendmsg(cinfo, cmsg);
800 unlock_comm(cinfo);
801 return ret;
802}
803
96ae923a
GR
804static int gather_all_resync_info(struct mddev *mddev, int total_slots)
805{
806 struct md_cluster_info *cinfo = mddev->cluster_info;
807 int i, ret = 0;
808 struct dlm_lock_resource *bm_lockres;
809 struct suspend_info *s;
810 char str[64];
abb9b22a 811 sector_t lo, hi;
96ae923a
GR
812
813
814 for (i = 0; i < total_slots; i++) {
815 memset(str, '\0', 64);
816 snprintf(str, 64, "bitmap%04d", i);
817 bm_lockres = lockres_init(mddev, str, NULL, 1);
818 if (!bm_lockres)
819 return -ENOMEM;
4ac7a65f
SL
820 if (i == (cinfo->slot_number - 1)) {
821 lockres_free(bm_lockres);
96ae923a 822 continue;
4ac7a65f 823 }
96ae923a
GR
824
825 bm_lockres->flags |= DLM_LKF_NOQUEUE;
826 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
827 if (ret == -EAGAIN) {
96ae923a
GR
828 s = read_resync_info(mddev, bm_lockres);
829 if (s) {
830 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
831 __func__, __LINE__,
832 (unsigned long long) s->lo,
833 (unsigned long long) s->hi, i);
834 spin_lock_irq(&cinfo->suspend_lock);
835 s->slot = i;
836 list_add(&s->list, &cinfo->suspend_list);
837 spin_unlock_irq(&cinfo->suspend_lock);
838 }
839 ret = 0;
840 lockres_free(bm_lockres);
841 continue;
842 }
6e6d9f2c
GJ
843 if (ret) {
844 lockres_free(bm_lockres);
96ae923a 845 goto out;
6e6d9f2c 846 }
abb9b22a
GJ
847
848 /* Read the disk bitmap sb and check if it needs recovery */
e64e4018 849 ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
abb9b22a
GJ
850 if (ret) {
851 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
852 lockres_free(bm_lockres);
853 continue;
854 }
855 if ((hi > 0) && (lo < mddev->recovery_cp)) {
856 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
857 mddev->recovery_cp = lo;
858 md_check_recovery(mddev);
859 }
860
96ae923a
GR
861 lockres_free(bm_lockres);
862 }
863out:
864 return ret;
865}
866
edb39c9d
GR
867static int join(struct mddev *mddev, int nodes)
868{
c4ce867f 869 struct md_cluster_info *cinfo;
cf921cc1 870 int ret, ops_rv;
c4ce867f
GR
871 char str[64];
872
c4ce867f
GR
873 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
874 if (!cinfo)
875 return -ENOMEM;
876
9e3072e3
GJ
877 INIT_LIST_HEAD(&cinfo->suspend_list);
878 spin_lock_init(&cinfo->suspend_lock);
cf921cc1 879 init_completion(&cinfo->completion);
eece075c 880 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
8b9277c8
GJ
881 init_waitqueue_head(&cinfo->wait);
882 mutex_init(&cinfo->recv_mutex);
cf921cc1 883
cf921cc1 884 mddev->cluster_info = cinfo;
0ba95977 885 cinfo->mddev = mddev;
cf921cc1 886
c4ce867f 887 memset(str, 0, 64);
b89f704a 888 sprintf(str, "%pU", mddev->uuid);
cf921cc1
GR
889 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
890 DLM_LSFL_FS, LVB_SIZE,
891 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
c4ce867f
GR
892 if (ret)
893 goto err;
cf921cc1 894 wait_for_completion(&cinfo->completion);
8c58f02e
GJ
895 if (nodes < cinfo->slot_number) {
896 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
897 cinfo->slot_number, nodes);
b97e9257
GR
898 ret = -ERANGE;
899 goto err;
900 }
4664680c
GR
901 /* Initiate the communication resources */
902 ret = -ENOMEM;
903 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
904 if (!cinfo->recv_thread) {
905 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
906 goto err;
907 }
908 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
909 if (!cinfo->message_lockres)
910 goto err;
911 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
912 if (!cinfo->token_lockres)
913 goto err;
1aee41f6
GR
914 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
915 if (!cinfo->no_new_dev_lockres)
916 goto err;
917
1535212c
GJ
918 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
919 if (ret) {
920 ret = -EAGAIN;
921 pr_err("md-cluster: can't join cluster to avoid lock issue\n");
922 goto err;
923 }
924 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
0f6187db
WY
925 if (!cinfo->ack_lockres) {
926 ret = -ENOMEM;
1535212c 927 goto err;
0f6187db 928 }
4664680c
GR
929 /* get sync CR lock on ACK. */
930 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
931 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
932 ret);
1535212c 933 dlm_unlock_sync(cinfo->token_lockres);
1aee41f6
GR
934 /* get sync CR lock on no-new-dev. */
935 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
936 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
937
54519c5f
GR
938
939 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
940 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
941 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
0f6187db
WY
942 if (!cinfo->bitmap_lockres) {
943 ret = -ENOMEM;
54519c5f 944 goto err;
0f6187db 945 }
54519c5f
GR
946 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
947 pr_err("Failed to get bitmap lock\n");
948 ret = -EINVAL;
949 goto err;
950 }
951
c186b128 952 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
0f6187db
WY
953 if (!cinfo->resync_lockres) {
954 ret = -ENOMEM;
c186b128 955 goto err;
0f6187db 956 }
c186b128 957
edb39c9d 958 return 0;
c4ce867f 959err:
0ba95977 960 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
5b0fb33e
GJ
961 md_unregister_thread(&cinfo->recovery_thread);
962 md_unregister_thread(&cinfo->recv_thread);
4664680c
GR
963 lockres_free(cinfo->message_lockres);
964 lockres_free(cinfo->token_lockres);
965 lockres_free(cinfo->ack_lockres);
1aee41f6 966 lockres_free(cinfo->no_new_dev_lockres);
c186b128 967 lockres_free(cinfo->resync_lockres);
96ae923a 968 lockres_free(cinfo->bitmap_lockres);
c4ce867f
GR
969 if (cinfo->lockspace)
970 dlm_release_lockspace(cinfo->lockspace, 2);
cf921cc1 971 mddev->cluster_info = NULL;
c4ce867f 972 kfree(cinfo);
c4ce867f 973 return ret;
edb39c9d
GR
974}
975
51e453ae
GJ
976static void load_bitmaps(struct mddev *mddev, int total_slots)
977{
978 struct md_cluster_info *cinfo = mddev->cluster_info;
979
980 /* load all the node's bitmap info for resync */
981 if (gather_all_resync_info(mddev, total_slots))
982 pr_err("md-cluster: failed to gather all resyn infos\n");
983 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
984 /* wake up recv thread in case something need to be handled */
985 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
986 md_wakeup_thread(cinfo->recv_thread);
987}
988
09995411
GJ
989static void resync_bitmap(struct mddev *mddev)
990{
991 struct md_cluster_info *cinfo = mddev->cluster_info;
992 struct cluster_msg cmsg = {0};
993 int err;
994
995 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
0ba95977 996 err = sendmsg(cinfo, &cmsg, 1);
09995411
GJ
997 if (err)
998 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
999 __func__, __LINE__, err);
1000}
1001
f6a2dc64 1002static void unlock_all_bitmaps(struct mddev *mddev);
edb39c9d
GR
1003static int leave(struct mddev *mddev)
1004{
c4ce867f
GR
1005 struct md_cluster_info *cinfo = mddev->cluster_info;
1006
1007 if (!cinfo)
1008 return 0;
09995411
GJ
1009
1010 /* BITMAP_NEEDS_SYNC message should be sent when node
1011 * is leaving the cluster with dirty bitmap, also we
1012 * can only deliver it when dlm connection is available */
1013 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
1014 resync_bitmap(mddev);
1015
0ba95977 1016 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
e94987db 1017 md_unregister_thread(&cinfo->recovery_thread);
4664680c
GR
1018 md_unregister_thread(&cinfo->recv_thread);
1019 lockres_free(cinfo->message_lockres);
1020 lockres_free(cinfo->token_lockres);
1021 lockres_free(cinfo->ack_lockres);
1aee41f6 1022 lockres_free(cinfo->no_new_dev_lockres);
4ac7a65f 1023 lockres_free(cinfo->resync_lockres);
54519c5f 1024 lockres_free(cinfo->bitmap_lockres);
f6a2dc64 1025 unlock_all_bitmaps(mddev);
c4ce867f 1026 dlm_release_lockspace(cinfo->lockspace, 2);
9c8043f3 1027 kfree(cinfo);
edb39c9d
GR
1028 return 0;
1029}
1030
cf921cc1
GR
1031/* slot_number(): Returns the MD slot number to use
1032 * DLM starts the slot numbers from 1, wheras cluster-md
1033 * wants the number to be from zero, so we deduct one
1034 */
1035static int slot_number(struct mddev *mddev)
1036{
1037 struct md_cluster_info *cinfo = mddev->cluster_info;
1038
1039 return cinfo->slot_number - 1;
1040}
1041
8b9277c8
GJ
1042/*
1043 * Check if the communication is already locked, else lock the communication
1044 * channel.
1045 * If it is already locked, token is in EX mode, and hence lock_token()
1046 * should not be called.
1047 */
293467aa
GR
1048static int metadata_update_start(struct mddev *mddev)
1049{
8b9277c8 1050 struct md_cluster_info *cinfo = mddev->cluster_info;
0ba95977
GJ
1051 int ret;
1052
1053 /*
1054 * metadata_update_start is always called with the protection of
1055 * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1056 */
1057 ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1058 &cinfo->state);
1059 WARN_ON_ONCE(ret);
1060 md_wakeup_thread(mddev->thread);
8b9277c8
GJ
1061
1062 wait_event(cinfo->wait,
1063 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1064 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1065
1066 /* If token is already locked, return 0 */
0ba95977
GJ
1067 if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1068 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
8b9277c8 1069 return 0;
0ba95977 1070 }
8b9277c8 1071
0ba95977
GJ
1072 ret = lock_token(cinfo, 1);
1073 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1074 return ret;
293467aa
GR
1075}
1076
1077static int metadata_update_finish(struct mddev *mddev)
1078{
1079 struct md_cluster_info *cinfo = mddev->cluster_info;
1080 struct cluster_msg cmsg;
70bcecdb
GR
1081 struct md_rdev *rdev;
1082 int ret = 0;
ba2746b0 1083 int raid_slot = -1;
293467aa
GR
1084
1085 memset(&cmsg, 0, sizeof(cmsg));
1086 cmsg.type = cpu_to_le32(METADATA_UPDATED);
70bcecdb
GR
1087 /* Pick up a good active device number to send.
1088 */
1089 rdev_for_each(rdev, mddev)
1090 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
ba2746b0 1091 raid_slot = rdev->desc_nr;
70bcecdb
GR
1092 break;
1093 }
ba2746b0
N
1094 if (raid_slot >= 0) {
1095 cmsg.raid_slot = cpu_to_le32(raid_slot);
70bcecdb 1096 ret = __sendmsg(cinfo, &cmsg);
ba2746b0 1097 } else
70bcecdb 1098 pr_warn("md-cluster: No good device id found to send\n");
8b9277c8 1099 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
293467aa
GR
1100 unlock_comm(cinfo);
1101 return ret;
1102}
1103
dbb64f86 1104static void metadata_update_cancel(struct mddev *mddev)
293467aa
GR
1105{
1106 struct md_cluster_info *cinfo = mddev->cluster_info;
8b9277c8 1107 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
dbb64f86 1108 unlock_comm(cinfo);
293467aa
GR
1109}
1110
afd75628
GJ
1111static int update_bitmap_size(struct mddev *mddev, sector_t size)
1112{
1113 struct md_cluster_info *cinfo = mddev->cluster_info;
1114 struct cluster_msg cmsg = {0};
1115 int ret;
1116
1117 cmsg.type = cpu_to_le32(BITMAP_RESIZE);
1118 cmsg.high = cpu_to_le64(size);
1119 ret = sendmsg(cinfo, &cmsg, 0);
1120 if (ret)
1121 pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
1122 __func__, __LINE__, ret);
1123 return ret;
1124}
1125
1126static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
1127{
1128 struct bitmap_counts *counts;
1129 char str[64];
1130 struct dlm_lock_resource *bm_lockres;
1131 struct bitmap *bitmap = mddev->bitmap;
1132 unsigned long my_pages = bitmap->counts.pages;
1133 int i, rv;
1134
1135 /*
1136 * We need to ensure all the nodes can grow to a larger
1137 * bitmap size before make the reshaping.
1138 */
1139 rv = update_bitmap_size(mddev, newsize);
1140 if (rv)
1141 return rv;
1142
1143 for (i = 0; i < mddev->bitmap_info.nodes; i++) {
1144 if (i == md_cluster_ops->slot_number(mddev))
1145 continue;
1146
1147 bitmap = get_bitmap_from_slot(mddev, i);
1148 if (IS_ERR(bitmap)) {
1149 pr_err("can't get bitmap from slot %d\n", i);
1150 goto out;
1151 }
1152 counts = &bitmap->counts;
1153
1154 /*
1155 * If we can hold the bitmap lock of one node then
1156 * the slot is not occupied, update the pages.
1157 */
1158 snprintf(str, 64, "bitmap%04d", i);
1159 bm_lockres = lockres_init(mddev, str, NULL, 1);
1160 if (!bm_lockres) {
1161 pr_err("Cannot initialize %s lock\n", str);
1162 goto out;
1163 }
1164 bm_lockres->flags |= DLM_LKF_NOQUEUE;
1165 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1166 if (!rv)
1167 counts->pages = my_pages;
1168 lockres_free(bm_lockres);
1169
1170 if (my_pages != counts->pages)
1171 /*
1172 * Let's revert the bitmap size if one node
1173 * can't resize bitmap
1174 */
1175 goto out;
1176 }
1177
1178 return 0;
1179out:
1180 md_bitmap_free(bitmap);
1181 update_bitmap_size(mddev, oldsize);
1182 return -1;
1183}
1184
b98938d1
GJ
1185/*
1186 * return 0 if all the bitmaps have the same sync_size
1187 */
7a57157a 1188static int cluster_check_sync_size(struct mddev *mddev)
b98938d1
GJ
1189{
1190 int i, rv;
1191 bitmap_super_t *sb;
1192 unsigned long my_sync_size, sync_size = 0;
1193 int node_num = mddev->bitmap_info.nodes;
1194 int current_slot = md_cluster_ops->slot_number(mddev);
1195 struct bitmap *bitmap = mddev->bitmap;
1196 char str[64];
1197 struct dlm_lock_resource *bm_lockres;
1198
1199 sb = kmap_atomic(bitmap->storage.sb_page);
1200 my_sync_size = sb->sync_size;
1201 kunmap_atomic(sb);
1202
1203 for (i = 0; i < node_num; i++) {
1204 if (i == current_slot)
1205 continue;
1206
1207 bitmap = get_bitmap_from_slot(mddev, i);
1208 if (IS_ERR(bitmap)) {
1209 pr_err("can't get bitmap from slot %d\n", i);
1210 return -1;
1211 }
1212
1213 /*
1214 * If we can hold the bitmap lock of one node then
1215 * the slot is not occupied, update the sb.
1216 */
1217 snprintf(str, 64, "bitmap%04d", i);
1218 bm_lockres = lockres_init(mddev, str, NULL, 1);
1219 if (!bm_lockres) {
1220 pr_err("md-cluster: Cannot initialize %s\n", str);
e64e4018 1221 md_bitmap_free(bitmap);
b98938d1
GJ
1222 return -1;
1223 }
1224 bm_lockres->flags |= DLM_LKF_NOQUEUE;
1225 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1226 if (!rv)
e64e4018 1227 md_bitmap_update_sb(bitmap);
b98938d1
GJ
1228 lockres_free(bm_lockres);
1229
1230 sb = kmap_atomic(bitmap->storage.sb_page);
1231 if (sync_size == 0)
1232 sync_size = sb->sync_size;
1233 else if (sync_size != sb->sync_size) {
1234 kunmap_atomic(sb);
e64e4018 1235 md_bitmap_free(bitmap);
b98938d1
GJ
1236 return -1;
1237 }
1238 kunmap_atomic(sb);
e64e4018 1239 md_bitmap_free(bitmap);
b98938d1
GJ
1240 }
1241
1242 return (my_sync_size == sync_size) ? 0 : -1;
1243}
1244
818da59f
GJ
1245/*
1246 * Update the size for cluster raid is a little more complex, we perform it
1247 * by the steps:
1248 * 1. hold token lock and update superblock in initiator node.
1249 * 2. send METADATA_UPDATED msg to other nodes.
1250 * 3. The initiator node continues to check each bitmap's sync_size, if all
1251 * bitmaps have the same value of sync_size, then we can set capacity and
1252 * let other nodes to perform it. If one node can't update sync_size
1253 * accordingly, we need to revert to previous value.
1254 */
1255static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1256{
1257 struct md_cluster_info *cinfo = mddev->cluster_info;
1258 struct cluster_msg cmsg;
1259 struct md_rdev *rdev;
1260 int ret = 0;
1261 int raid_slot = -1;
1262
1263 md_update_sb(mddev, 1);
1264 lock_comm(cinfo, 1);
1265
1266 memset(&cmsg, 0, sizeof(cmsg));
1267 cmsg.type = cpu_to_le32(METADATA_UPDATED);
1268 rdev_for_each(rdev, mddev)
1269 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1270 raid_slot = rdev->desc_nr;
1271 break;
1272 }
1273 if (raid_slot >= 0) {
1274 cmsg.raid_slot = cpu_to_le32(raid_slot);
1275 /*
1276 * We can only change capiticy after all the nodes can do it,
1277 * so need to wait after other nodes already received the msg
1278 * and handled the change
1279 */
1280 ret = __sendmsg(cinfo, &cmsg);
1281 if (ret) {
1282 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1283 __func__, __LINE__);
1284 unlock_comm(cinfo);
1285 return;
1286 }
1287 } else {
1288 pr_err("md-cluster: No good device id found to send\n");
1289 unlock_comm(cinfo);
1290 return;
1291 }
1292
1293 /*
1294 * check the sync_size from other node's bitmap, if sync_size
1295 * have already updated in other nodes as expected, send an
1296 * empty metadata msg to permit the change of capacity
1297 */
1298 if (cluster_check_sync_size(mddev) == 0) {
1299 memset(&cmsg, 0, sizeof(cmsg));
1300 cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1301 ret = __sendmsg(cinfo, &cmsg);
1302 if (ret)
1303 pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1304 __func__, __LINE__);
1305 set_capacity(mddev->gendisk, mddev->array_sectors);
1306 revalidate_disk(mddev->gendisk);
1307 } else {
1308 /* revert to previous sectors */
1309 ret = mddev->pers->resize(mddev, old_dev_sectors);
1310 if (!ret)
1311 revalidate_disk(mddev->gendisk);
1312 ret = __sendmsg(cinfo, &cmsg);
1313 if (ret)
1314 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1315 __func__, __LINE__);
1316 }
1317 unlock_comm(cinfo);
1318}
1319
c186b128
GR
1320static int resync_start(struct mddev *mddev)
1321{
1322 struct md_cluster_info *cinfo = mddev->cluster_info;
d6385db9 1323 return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
c186b128
GR
1324}
1325
c40f341f 1326static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
965400eb
GR
1327{
1328 struct md_cluster_info *cinfo = mddev->cluster_info;
ac277c6a 1329 struct resync_info ri;
aee177ac 1330 struct cluster_msg cmsg = {0};
965400eb 1331
ac277c6a
GR
1332 /* do not send zero again, if we have sent before */
1333 if (hi == 0) {
1334 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1335 if (le64_to_cpu(ri.hi) == 0)
1336 return 0;
1337 }
1338
30661b49 1339 add_resync_info(cinfo->bitmap_lockres, lo, hi);
c40f341f
GR
1340 /* Re-acquire the lock to refresh LVB */
1341 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
c40f341f 1342 cmsg.type = cpu_to_le32(RESYNCING);
965400eb
GR
1343 cmsg.low = cpu_to_le64(lo);
1344 cmsg.high = cpu_to_le64(hi);
c186b128 1345
0ba95977
GJ
1346 /*
1347 * mddev_lock is held if resync_info_update is called from
1348 * resync_finish (md_reap_sync_thread -> resync_finish)
1349 */
1350 if (lo == 0 && hi == 0)
1351 return sendmsg(cinfo, &cmsg, 1);
1352 else
1353 return sendmsg(cinfo, &cmsg, 0);
965400eb
GR
1354}
1355
c186b128
GR
1356static int resync_finish(struct mddev *mddev)
1357{
1358 struct md_cluster_info *cinfo = mddev->cluster_info;
41a95041 1359 int ret = 0;
0357ba27
GJ
1360
1361 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
df8c6764
GJ
1362
1363 /*
1364 * If resync thread is interrupted so we can't say resync is finished,
1365 * another node will launch resync thread to continue.
1366 */
41a95041
GJ
1367 if (!test_bit(MD_CLOSING, &mddev->flags))
1368 ret = resync_info_update(mddev, 0, 0);
1369 dlm_unlock_sync(cinfo->resync_lockres);
1370 return ret;
c186b128
GR
1371}
1372
90382ed9
GR
1373static int area_resyncing(struct mddev *mddev, int direction,
1374 sector_t lo, sector_t hi)
589a1c49
GR
1375{
1376 struct md_cluster_info *cinfo = mddev->cluster_info;
1377 int ret = 0;
1378 struct suspend_info *s;
1379
90382ed9
GR
1380 if ((direction == READ) &&
1381 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1382 return 1;
1383
589a1c49
GR
1384 spin_lock_irq(&cinfo->suspend_lock);
1385 if (list_empty(&cinfo->suspend_list))
1386 goto out;
1387 list_for_each_entry(s, &cinfo->suspend_list, list)
1388 if (hi > s->lo && lo < s->hi) {
1389 ret = 1;
1390 break;
1391 }
1392out:
1393 spin_unlock_irq(&cinfo->suspend_lock);
1394 return ret;
1395}
1396
dbb64f86
GR
1397/* add_new_disk() - initiates a disk add
1398 * However, if this fails before writing md_update_sb(),
1399 * add_new_disk_cancel() must be called to release token lock
1400 */
1401static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1aee41f6
GR
1402{
1403 struct md_cluster_info *cinfo = mddev->cluster_info;
1404 struct cluster_msg cmsg;
1405 int ret = 0;
1406 struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1407 char *uuid = sb->device_uuid;
1408
1409 memset(&cmsg, 0, sizeof(cmsg));
1410 cmsg.type = cpu_to_le32(NEWDISK);
1411 memcpy(cmsg.uuid, uuid, 16);
faeff83f 1412 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
0ba95977 1413 lock_comm(cinfo, 1);
1aee41f6 1414 ret = __sendmsg(cinfo, &cmsg);
2dffdc07
GJ
1415 if (ret) {
1416 unlock_comm(cinfo);
1aee41f6 1417 return ret;
2dffdc07 1418 }
1aee41f6
GR
1419 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1420 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1421 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1422 /* Some node does not "see" the device */
1423 if (ret == -EAGAIN)
1424 ret = -ENOENT;
dbb64f86
GR
1425 if (ret)
1426 unlock_comm(cinfo);
8b9277c8 1427 else {
1aee41f6 1428 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
e19508fa
GJ
1429 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1430 * will run soon after add_new_disk, the below path will be
1431 * invoked:
1432 * md_wakeup_thread(mddev->thread)
1433 * -> conf->thread (raid1d)
1434 * -> md_check_recovery -> md_update_sb
1435 * -> metadata_update_start/finish
1436 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1437 *
1438 * For other failure cases, metadata_update_cancel and
1439 * add_new_disk_cancel also clear below bit as well.
1440 * */
8b9277c8
GJ
1441 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1442 wake_up(&cinfo->wait);
1443 }
1aee41f6
GR
1444 return ret;
1445}
1446
dbb64f86 1447static void add_new_disk_cancel(struct mddev *mddev)
1aee41f6 1448{
dbb64f86 1449 struct md_cluster_info *cinfo = mddev->cluster_info;
8b9277c8 1450 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
dbb64f86 1451 unlock_comm(cinfo);
1aee41f6
GR
1452}
1453
fa8259da 1454static int new_disk_ack(struct mddev *mddev, bool ack)
1aee41f6
GR
1455{
1456 struct md_cluster_info *cinfo = mddev->cluster_info;
1457
fa8259da
GR
1458 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1459 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1460 return -EINVAL;
1461 }
1462
1aee41f6
GR
1463 if (ack)
1464 dlm_unlock_sync(cinfo->no_new_dev_lockres);
1465 complete(&cinfo->newdisk_completion);
fa8259da 1466 return 0;
1aee41f6
GR
1467}
1468
88bcfef7
GR
1469static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1470{
aee177ac 1471 struct cluster_msg cmsg = {0};
88bcfef7 1472 struct md_cluster_info *cinfo = mddev->cluster_info;
faeff83f
GJ
1473 cmsg.type = cpu_to_le32(REMOVE);
1474 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
0ba95977 1475 return sendmsg(cinfo, &cmsg, 1);
88bcfef7
GR
1476}
1477
f6a2dc64
GJ
1478static int lock_all_bitmaps(struct mddev *mddev)
1479{
1480 int slot, my_slot, ret, held = 1, i = 0;
1481 char str[64];
1482 struct md_cluster_info *cinfo = mddev->cluster_info;
1483
6396bb22
KC
1484 cinfo->other_bitmap_lockres =
1485 kcalloc(mddev->bitmap_info.nodes - 1,
1486 sizeof(struct dlm_lock_resource *), GFP_KERNEL);
f6a2dc64
GJ
1487 if (!cinfo->other_bitmap_lockres) {
1488 pr_err("md: can't alloc mem for other bitmap locks\n");
1489 return 0;
1490 }
1491
1492 my_slot = slot_number(mddev);
1493 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1494 if (slot == my_slot)
1495 continue;
1496
1497 memset(str, '\0', 64);
1498 snprintf(str, 64, "bitmap%04d", slot);
1499 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1500 if (!cinfo->other_bitmap_lockres[i])
1501 return -ENOMEM;
1502
1503 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1504 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1505 if (ret)
1506 held = -1;
1507 i++;
1508 }
1509
1510 return held;
1511}
1512
1513static void unlock_all_bitmaps(struct mddev *mddev)
1514{
1515 struct md_cluster_info *cinfo = mddev->cluster_info;
1516 int i;
1517
1518 /* release other node's bitmap lock if they are existed */
1519 if (cinfo->other_bitmap_lockres) {
1520 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1521 if (cinfo->other_bitmap_lockres[i]) {
f6a2dc64
GJ
1522 lockres_free(cinfo->other_bitmap_lockres[i]);
1523 }
1524 }
1525 kfree(cinfo->other_bitmap_lockres);
1526 }
1527}
1528
97f6cd39
GR
1529static int gather_bitmaps(struct md_rdev *rdev)
1530{
1531 int sn, err;
1532 sector_t lo, hi;
aee177ac 1533 struct cluster_msg cmsg = {0};
97f6cd39
GR
1534 struct mddev *mddev = rdev->mddev;
1535 struct md_cluster_info *cinfo = mddev->cluster_info;
1536
faeff83f
GJ
1537 cmsg.type = cpu_to_le32(RE_ADD);
1538 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
0ba95977 1539 err = sendmsg(cinfo, &cmsg, 1);
97f6cd39
GR
1540 if (err)
1541 goto out;
1542
1543 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1544 if (sn == (cinfo->slot_number - 1))
1545 continue;
e64e4018 1546 err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
97f6cd39
GR
1547 if (err) {
1548 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1549 goto out;
1550 }
1551 if ((hi > 0) && (lo < mddev->recovery_cp))
1552 mddev->recovery_cp = lo;
1553 }
1554out:
1555 return err;
1556}
1557
edb39c9d
GR
1558static struct md_cluster_operations cluster_ops = {
1559 .join = join,
1560 .leave = leave,
cf921cc1 1561 .slot_number = slot_number,
c186b128
GR
1562 .resync_start = resync_start,
1563 .resync_finish = resync_finish,
96ae923a 1564 .resync_info_update = resync_info_update,
293467aa
GR
1565 .metadata_update_start = metadata_update_start,
1566 .metadata_update_finish = metadata_update_finish,
1567 .metadata_update_cancel = metadata_update_cancel,
589a1c49 1568 .area_resyncing = area_resyncing,
dbb64f86
GR
1569 .add_new_disk = add_new_disk,
1570 .add_new_disk_cancel = add_new_disk_cancel,
1aee41f6 1571 .new_disk_ack = new_disk_ack,
88bcfef7 1572 .remove_disk = remove_disk,
51e453ae 1573 .load_bitmaps = load_bitmaps,
97f6cd39 1574 .gather_bitmaps = gather_bitmaps,
afd75628 1575 .resize_bitmaps = resize_bitmaps,
f6a2dc64
GJ
1576 .lock_all_bitmaps = lock_all_bitmaps,
1577 .unlock_all_bitmaps = unlock_all_bitmaps,
818da59f 1578 .update_size = update_size,
edb39c9d
GR
1579};
1580
8e854e9c
GR
1581static int __init cluster_init(void)
1582{
f0e230ad 1583 pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
8e854e9c 1584 pr_info("Registering Cluster MD functions\n");
edb39c9d 1585 register_md_cluster_operations(&cluster_ops, THIS_MODULE);
8e854e9c
GR
1586 return 0;
1587}
1588
1589static void cluster_exit(void)
1590{
edb39c9d 1591 unregister_md_cluster_operations();
8e854e9c
GR
1592}
1593
1594module_init(cluster_init);
1595module_exit(cluster_exit);
86b57277 1596MODULE_AUTHOR("SUSE");
8e854e9c
GR
1597MODULE_LICENSE("GPL");
1598MODULE_DESCRIPTION("Clustering support for MD");