ocfs2: o2hb: add some user/debug log
[linux-2.6-block.git] / fs / ocfs2 / cluster / heartbeat.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36 #include <linux/debugfs.h>
37 #include <linux/slab.h>
38 #include <linux/bitmap.h>
39 #include <linux/ktime.h>
40 #include "heartbeat.h"
41 #include "tcp.h"
42 #include "nodemanager.h"
43 #include "quorum.h"
44
45 #include "masklog.h"
46
47
48 /*
49  * The first heartbeat pass had one global thread that would serialize all hb
50  * callback calls.  This global serializing sem should only be removed once
51  * we've made sure that all callees can deal with being called concurrently
52  * from multiple hb region threads.
53  */
54 static DECLARE_RWSEM(o2hb_callback_sem);
55
56 /*
57  * multiple hb threads are watching multiple regions.  A node is live
58  * whenever any of the threads sees activity from the node in its region.
59  */
60 static DEFINE_SPINLOCK(o2hb_live_lock);
61 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
62 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
63 static LIST_HEAD(o2hb_node_events);
64 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
65
66 /*
67  * In global heartbeat, we maintain a series of region bitmaps.
68  *      - o2hb_region_bitmap allows us to limit the region number to max region.
69  *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
70  *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
71  *              heartbeat on it.
72  *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
73  */
74 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
75 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
76 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
77 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
78
79 #define O2HB_DB_TYPE_LIVENODES          0
80 #define O2HB_DB_TYPE_LIVEREGIONS        1
81 #define O2HB_DB_TYPE_QUORUMREGIONS      2
82 #define O2HB_DB_TYPE_FAILEDREGIONS      3
83 #define O2HB_DB_TYPE_REGION_LIVENODES   4
84 #define O2HB_DB_TYPE_REGION_NUMBER      5
85 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
86 #define O2HB_DB_TYPE_REGION_PINNED      7
87 struct o2hb_debug_buf {
88         int db_type;
89         int db_size;
90         int db_len;
91         void *db_data;
92 };
93
94 static struct o2hb_debug_buf *o2hb_db_livenodes;
95 static struct o2hb_debug_buf *o2hb_db_liveregions;
96 static struct o2hb_debug_buf *o2hb_db_quorumregions;
97 static struct o2hb_debug_buf *o2hb_db_failedregions;
98
99 #define O2HB_DEBUG_DIR                  "o2hb"
100 #define O2HB_DEBUG_LIVENODES            "livenodes"
101 #define O2HB_DEBUG_LIVEREGIONS          "live_regions"
102 #define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
103 #define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
104 #define O2HB_DEBUG_REGION_NUMBER        "num"
105 #define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
106 #define O2HB_DEBUG_REGION_PINNED        "pinned"
107
108 static struct dentry *o2hb_debug_dir;
109 static struct dentry *o2hb_debug_livenodes;
110 static struct dentry *o2hb_debug_liveregions;
111 static struct dentry *o2hb_debug_quorumregions;
112 static struct dentry *o2hb_debug_failedregions;
113
114 static LIST_HEAD(o2hb_all_regions);
115
116 static struct o2hb_callback {
117         struct list_head list;
118 } o2hb_callbacks[O2HB_NUM_CB];
119
120 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
121
122 #define O2HB_DEFAULT_BLOCK_BITS       9
123
124 enum o2hb_heartbeat_modes {
125         O2HB_HEARTBEAT_LOCAL            = 0,
126         O2HB_HEARTBEAT_GLOBAL,
127         O2HB_HEARTBEAT_NUM_MODES,
128 };
129
130 char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
131                 "local",        /* O2HB_HEARTBEAT_LOCAL */
132                 "global",       /* O2HB_HEARTBEAT_GLOBAL */
133 };
134
135 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
136 unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
137
138 /*
139  * o2hb_dependent_users tracks the number of registered callbacks that depend
140  * on heartbeat. o2net and o2dlm are two entities that register this callback.
141  * However only o2dlm depends on the heartbeat. It does not want the heartbeat
142  * to stop while a dlm domain is still active.
143  */
144 unsigned int o2hb_dependent_users;
145
146 /*
147  * In global heartbeat mode, all regions are pinned if there are one or more
148  * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
149  * regions are unpinned if the region count exceeds the cut off or the number
150  * of dependent users falls to zero.
151  */
152 #define O2HB_PIN_CUT_OFF                3
153
154 /*
155  * In local heartbeat mode, we assume the dlm domain name to be the same as
156  * region uuid. This is true for domains created for the file system but not
157  * necessarily true for userdlm domains. This is a known limitation.
158  *
159  * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
160  * works for both file system and userdlm domains.
161  */
162 static int o2hb_region_pin(const char *region_uuid);
163 static void o2hb_region_unpin(const char *region_uuid);
164
165 /* Only sets a new threshold if there are no active regions.
166  *
167  * No locking or otherwise interesting code is required for reading
168  * o2hb_dead_threshold as it can't change once regions are active and
169  * it's not interesting to anyone until then anyway. */
170 static void o2hb_dead_threshold_set(unsigned int threshold)
171 {
172         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
173                 spin_lock(&o2hb_live_lock);
174                 if (list_empty(&o2hb_all_regions))
175                         o2hb_dead_threshold = threshold;
176                 spin_unlock(&o2hb_live_lock);
177         }
178 }
179
180 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
181 {
182         int ret = -1;
183
184         if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
185                 spin_lock(&o2hb_live_lock);
186                 if (list_empty(&o2hb_all_regions)) {
187                         o2hb_heartbeat_mode = hb_mode;
188                         ret = 0;
189                 }
190                 spin_unlock(&o2hb_live_lock);
191         }
192
193         return ret;
194 }
195
196 struct o2hb_node_event {
197         struct list_head        hn_item;
198         enum o2hb_callback_type hn_event_type;
199         struct o2nm_node        *hn_node;
200         int                     hn_node_num;
201 };
202
203 struct o2hb_disk_slot {
204         struct o2hb_disk_heartbeat_block *ds_raw_block;
205         u8                      ds_node_num;
206         u64                     ds_last_time;
207         u64                     ds_last_generation;
208         u16                     ds_equal_samples;
209         u16                     ds_changed_samples;
210         struct list_head        ds_live_item;
211 };
212
213 /* each thread owns a region.. when we're asked to tear down the region
214  * we ask the thread to stop, who cleans up the region */
215 struct o2hb_region {
216         struct config_item      hr_item;
217
218         struct list_head        hr_all_item;
219         unsigned                hr_unclean_stop:1,
220                                 hr_aborted_start:1,
221                                 hr_item_pinned:1,
222                                 hr_item_dropped:1,
223                                 hr_node_deleted:1;
224
225         /* protected by the hr_callback_sem */
226         struct task_struct      *hr_task;
227
228         unsigned int            hr_blocks;
229         unsigned long long      hr_start_block;
230
231         unsigned int            hr_block_bits;
232         unsigned int            hr_block_bytes;
233
234         unsigned int            hr_slots_per_page;
235         unsigned int            hr_num_pages;
236
237         struct page             **hr_slot_data;
238         struct block_device     *hr_bdev;
239         struct o2hb_disk_slot   *hr_slots;
240
241         /* live node map of this region */
242         unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
243         unsigned int            hr_region_num;
244
245         struct dentry           *hr_debug_dir;
246         struct dentry           *hr_debug_livenodes;
247         struct dentry           *hr_debug_regnum;
248         struct dentry           *hr_debug_elapsed_time;
249         struct dentry           *hr_debug_pinned;
250         struct o2hb_debug_buf   *hr_db_livenodes;
251         struct o2hb_debug_buf   *hr_db_regnum;
252         struct o2hb_debug_buf   *hr_db_elapsed_time;
253         struct o2hb_debug_buf   *hr_db_pinned;
254
255         /* let the person setting up hb wait for it to return until it
256          * has reached a 'steady' state.  This will be fixed when we have
257          * a more complete api that doesn't lead to this sort of fragility. */
258         atomic_t                hr_steady_iterations;
259
260         /* terminate o2hb thread if it does not reach steady state
261          * (hr_steady_iterations == 0) within hr_unsteady_iterations */
262         atomic_t                hr_unsteady_iterations;
263
264         char                    hr_dev_name[BDEVNAME_SIZE];
265
266         unsigned int            hr_timeout_ms;
267
268         /* randomized as the region goes up and down so that a node
269          * recognizes a node going up and down in one iteration */
270         u64                     hr_generation;
271
272         struct delayed_work     hr_write_timeout_work;
273         unsigned long           hr_last_timeout_start;
274
275         /* negotiate timer, used to negotiate extending hb timeout. */
276         struct delayed_work     hr_nego_timeout_work;
277         unsigned long           hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
278
279         /* Used during o2hb_check_slot to hold a copy of the block
280          * being checked because we temporarily have to zero out the
281          * crc field. */
282         struct o2hb_disk_heartbeat_block *hr_tmp_block;
283
284         /* Message key for negotiate timeout message. */
285         unsigned int            hr_key;
286         struct list_head        hr_handler_list;
287 };
288
289 struct o2hb_bio_wait_ctxt {
290         atomic_t          wc_num_reqs;
291         struct completion wc_io_complete;
292         int               wc_error;
293 };
294
295 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
296
297 enum {
298         O2HB_NEGO_TIMEOUT_MSG = 1,
299         O2HB_NEGO_APPROVE_MSG = 2,
300 };
301
302 struct o2hb_nego_msg {
303         u8 node_num;
304 };
305
306 static void o2hb_write_timeout(struct work_struct *work)
307 {
308         int failed, quorum;
309         struct o2hb_region *reg =
310                 container_of(work, struct o2hb_region,
311                              hr_write_timeout_work.work);
312
313         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
314              "milliseconds\n", reg->hr_dev_name,
315              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
316
317         if (o2hb_global_heartbeat_active()) {
318                 spin_lock(&o2hb_live_lock);
319                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
320                         set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
321                 failed = bitmap_weight(o2hb_failed_region_bitmap,
322                                         O2NM_MAX_REGIONS);
323                 quorum = bitmap_weight(o2hb_quorum_region_bitmap,
324                                         O2NM_MAX_REGIONS);
325                 spin_unlock(&o2hb_live_lock);
326
327                 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
328                      quorum, failed);
329
330                 /*
331                  * Fence if the number of failed regions >= half the number
332                  * of  quorum regions
333                  */
334                 if ((failed << 1) < quorum)
335                         return;
336         }
337
338         o2quo_disk_timeout();
339 }
340
341 static void o2hb_arm_timeout(struct o2hb_region *reg)
342 {
343         /* Arm writeout only after thread reaches steady state */
344         if (atomic_read(&reg->hr_steady_iterations) != 0)
345                 return;
346
347         mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
348              O2HB_MAX_WRITE_TIMEOUT_MS);
349
350         if (o2hb_global_heartbeat_active()) {
351                 spin_lock(&o2hb_live_lock);
352                 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
353                 spin_unlock(&o2hb_live_lock);
354         }
355         cancel_delayed_work(&reg->hr_write_timeout_work);
356         reg->hr_last_timeout_start = jiffies;
357         schedule_delayed_work(&reg->hr_write_timeout_work,
358                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
359
360         cancel_delayed_work(&reg->hr_nego_timeout_work);
361         /* negotiate timeout must be less than write timeout. */
362         schedule_delayed_work(&reg->hr_nego_timeout_work,
363                               msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
364         memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
365 }
366
367 static void o2hb_disarm_timeout(struct o2hb_region *reg)
368 {
369         cancel_delayed_work_sync(&reg->hr_write_timeout_work);
370         cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
371 }
372
373 static int o2hb_send_nego_msg(int key, int type, u8 target)
374 {
375         struct o2hb_nego_msg msg;
376         int status, ret;
377
378         msg.node_num = o2nm_this_node();
379 again:
380         ret = o2net_send_message(type, key, &msg, sizeof(msg),
381                         target, &status);
382
383         if (ret == -EAGAIN || ret == -ENOMEM) {
384                 msleep(100);
385                 goto again;
386         }
387
388         return ret;
389 }
390
391 static void o2hb_nego_timeout(struct work_struct *work)
392 {
393         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
394         int master_node, i, ret;
395         struct o2hb_region *reg;
396
397         reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
398         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
399         /* lowest node as master node to make negotiate decision. */
400         master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
401
402         if (master_node == o2nm_this_node()) {
403                 if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
404                         printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
405                                 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
406                                 config_item_name(&reg->hr_item), reg->hr_dev_name);
407                         set_bit(master_node, reg->hr_nego_node_bitmap);
408                 }
409                 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
410                                 sizeof(reg->hr_nego_node_bitmap))) {
411                         /* check negotiate bitmap every second to do timeout
412                          * approve decision.
413                          */
414                         schedule_delayed_work(&reg->hr_nego_timeout_work,
415                                 msecs_to_jiffies(1000));
416
417                         return;
418                 }
419
420                 printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
421                         config_item_name(&reg->hr_item), reg->hr_dev_name);
422                 /* approve negotiate timeout request. */
423                 o2hb_arm_timeout(reg);
424
425                 i = -1;
426                 while ((i = find_next_bit(live_node_bitmap,
427                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
428                         if (i == master_node)
429                                 continue;
430
431                         mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
432                         ret = o2hb_send_nego_msg(reg->hr_key,
433                                         O2HB_NEGO_APPROVE_MSG, i);
434                         if (ret)
435                                 mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
436                                         i, ret);
437                 }
438         } else {
439                 /* negotiate timeout with master node. */
440                 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
441                         o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
442                         reg->hr_dev_name, master_node);
443                 ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
444                                 master_node);
445                 if (ret)
446                         mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
447                                 master_node, ret);
448         }
449 }
450
451 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
452                                 void **ret_data)
453 {
454         struct o2hb_region *reg = data;
455         struct o2hb_nego_msg *nego_msg;
456
457         nego_msg = (struct o2hb_nego_msg *)msg->buf;
458         printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
459                 nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
460         if (nego_msg->node_num < O2NM_MAX_NODES)
461                 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
462         else
463                 mlog(ML_ERROR, "got nego timeout message from bad node.\n");
464
465         return 0;
466 }
467
468 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
469                                 void **ret_data)
470 {
471         struct o2hb_region *reg = data;
472
473         printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
474                 config_item_name(&reg->hr_item), reg->hr_dev_name);
475         o2hb_arm_timeout(reg);
476         return 0;
477 }
478
479 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
480 {
481         atomic_set(&wc->wc_num_reqs, 1);
482         init_completion(&wc->wc_io_complete);
483         wc->wc_error = 0;
484 }
485
486 /* Used in error paths too */
487 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
488                                      unsigned int num)
489 {
490         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
491          * good news is that the fast path only completes one at a time */
492         while(num--) {
493                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
494                         BUG_ON(num > 0);
495                         complete(&wc->wc_io_complete);
496                 }
497         }
498 }
499
500 static void o2hb_wait_on_io(struct o2hb_region *reg,
501                             struct o2hb_bio_wait_ctxt *wc)
502 {
503         o2hb_bio_wait_dec(wc, 1);
504         wait_for_completion(&wc->wc_io_complete);
505 }
506
507 static void o2hb_bio_end_io(struct bio *bio)
508 {
509         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
510
511         if (bio->bi_error) {
512                 mlog(ML_ERROR, "IO Error %d\n", bio->bi_error);
513                 wc->wc_error = bio->bi_error;
514         }
515
516         o2hb_bio_wait_dec(wc, 1);
517         bio_put(bio);
518 }
519
520 /* Setup a Bio to cover I/O against num_slots slots starting at
521  * start_slot. */
522 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
523                                       struct o2hb_bio_wait_ctxt *wc,
524                                       unsigned int *current_slot,
525                                       unsigned int max_slots)
526 {
527         int len, current_page;
528         unsigned int vec_len, vec_start;
529         unsigned int bits = reg->hr_block_bits;
530         unsigned int spp = reg->hr_slots_per_page;
531         unsigned int cs = *current_slot;
532         struct bio *bio;
533         struct page *page;
534
535         /* Testing has shown this allocation to take long enough under
536          * GFP_KERNEL that the local node can get fenced. It would be
537          * nicest if we could pre-allocate these bios and avoid this
538          * all together. */
539         bio = bio_alloc(GFP_ATOMIC, 16);
540         if (!bio) {
541                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
542                 bio = ERR_PTR(-ENOMEM);
543                 goto bail;
544         }
545
546         /* Must put everything in 512 byte sectors for the bio... */
547         bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
548         bio->bi_bdev = reg->hr_bdev;
549         bio->bi_private = wc;
550         bio->bi_end_io = o2hb_bio_end_io;
551
552         vec_start = (cs << bits) % PAGE_SIZE;
553         while(cs < max_slots) {
554                 current_page = cs / spp;
555                 page = reg->hr_slot_data[current_page];
556
557                 vec_len = min(PAGE_SIZE - vec_start,
558                               (max_slots-cs) * (PAGE_SIZE/spp) );
559
560                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
561                      current_page, vec_len, vec_start);
562
563                 len = bio_add_page(bio, page, vec_len, vec_start);
564                 if (len != vec_len) break;
565
566                 cs += vec_len / (PAGE_SIZE/spp);
567                 vec_start = 0;
568         }
569
570 bail:
571         *current_slot = cs;
572         return bio;
573 }
574
575 static int o2hb_read_slots(struct o2hb_region *reg,
576                            unsigned int max_slots)
577 {
578         unsigned int current_slot=0;
579         int status;
580         struct o2hb_bio_wait_ctxt wc;
581         struct bio *bio;
582
583         o2hb_bio_wait_init(&wc);
584
585         while(current_slot < max_slots) {
586                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
587                 if (IS_ERR(bio)) {
588                         status = PTR_ERR(bio);
589                         mlog_errno(status);
590                         goto bail_and_wait;
591                 }
592
593                 atomic_inc(&wc.wc_num_reqs);
594                 submit_bio(READ, bio);
595         }
596
597         status = 0;
598
599 bail_and_wait:
600         o2hb_wait_on_io(reg, &wc);
601         if (wc.wc_error && !status)
602                 status = wc.wc_error;
603
604         return status;
605 }
606
607 static int o2hb_issue_node_write(struct o2hb_region *reg,
608                                  struct o2hb_bio_wait_ctxt *write_wc)
609 {
610         int status;
611         unsigned int slot;
612         struct bio *bio;
613
614         o2hb_bio_wait_init(write_wc);
615
616         slot = o2nm_this_node();
617
618         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
619         if (IS_ERR(bio)) {
620                 status = PTR_ERR(bio);
621                 mlog_errno(status);
622                 goto bail;
623         }
624
625         atomic_inc(&write_wc->wc_num_reqs);
626         submit_bio(WRITE_SYNC, bio);
627
628         status = 0;
629 bail:
630         return status;
631 }
632
633 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
634                                      struct o2hb_disk_heartbeat_block *hb_block)
635 {
636         __le32 old_cksum;
637         u32 ret;
638
639         /* We want to compute the block crc with a 0 value in the
640          * hb_cksum field. Save it off here and replace after the
641          * crc. */
642         old_cksum = hb_block->hb_cksum;
643         hb_block->hb_cksum = 0;
644
645         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
646
647         hb_block->hb_cksum = old_cksum;
648
649         return ret;
650 }
651
652 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
653 {
654         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
655              "cksum = 0x%x, generation 0x%llx\n",
656              (long long)le64_to_cpu(hb_block->hb_seq),
657              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
658              (long long)le64_to_cpu(hb_block->hb_generation));
659 }
660
661 static int o2hb_verify_crc(struct o2hb_region *reg,
662                            struct o2hb_disk_heartbeat_block *hb_block)
663 {
664         u32 read, computed;
665
666         read = le32_to_cpu(hb_block->hb_cksum);
667         computed = o2hb_compute_block_crc_le(reg, hb_block);
668
669         return read == computed;
670 }
671
672 /*
673  * Compare the slot data with what we wrote in the last iteration.
674  * If the match fails, print an appropriate error message. This is to
675  * detect errors like... another node hearting on the same slot,
676  * flaky device that is losing writes, etc.
677  * Returns 1 if check succeeds, 0 otherwise.
678  */
679 static int o2hb_check_own_slot(struct o2hb_region *reg)
680 {
681         struct o2hb_disk_slot *slot;
682         struct o2hb_disk_heartbeat_block *hb_block;
683         char *errstr;
684
685         slot = &reg->hr_slots[o2nm_this_node()];
686         /* Don't check on our 1st timestamp */
687         if (!slot->ds_last_time)
688                 return 0;
689
690         hb_block = slot->ds_raw_block;
691         if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
692             le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
693             hb_block->hb_node == slot->ds_node_num)
694                 return 1;
695
696 #define ERRSTR1         "Another node is heartbeating on device"
697 #define ERRSTR2         "Heartbeat generation mismatch on device"
698 #define ERRSTR3         "Heartbeat sequence mismatch on device"
699
700         if (hb_block->hb_node != slot->ds_node_num)
701                 errstr = ERRSTR1;
702         else if (le64_to_cpu(hb_block->hb_generation) !=
703                  slot->ds_last_generation)
704                 errstr = ERRSTR2;
705         else
706                 errstr = ERRSTR3;
707
708         mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
709              "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
710              slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
711              (unsigned long long)slot->ds_last_time, hb_block->hb_node,
712              (unsigned long long)le64_to_cpu(hb_block->hb_generation),
713              (unsigned long long)le64_to_cpu(hb_block->hb_seq));
714
715         return 0;
716 }
717
718 static inline void o2hb_prepare_block(struct o2hb_region *reg,
719                                       u64 generation)
720 {
721         int node_num;
722         u64 cputime;
723         struct o2hb_disk_slot *slot;
724         struct o2hb_disk_heartbeat_block *hb_block;
725
726         node_num = o2nm_this_node();
727         slot = &reg->hr_slots[node_num];
728
729         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
730         memset(hb_block, 0, reg->hr_block_bytes);
731         /* TODO: time stuff */
732         cputime = CURRENT_TIME.tv_sec;
733         if (!cputime)
734                 cputime = 1;
735
736         hb_block->hb_seq = cpu_to_le64(cputime);
737         hb_block->hb_node = node_num;
738         hb_block->hb_generation = cpu_to_le64(generation);
739         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
740
741         /* This step must always happen last! */
742         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
743                                                                    hb_block));
744
745         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
746              (long long)generation,
747              le32_to_cpu(hb_block->hb_cksum));
748 }
749
750 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
751                                 struct o2nm_node *node,
752                                 int idx)
753 {
754         struct o2hb_callback_func *f;
755
756         list_for_each_entry(f, &hbcall->list, hc_item) {
757                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
758                 (f->hc_func)(node, idx, f->hc_data);
759         }
760 }
761
762 /* Will run the list in order until we process the passed event */
763 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
764 {
765         struct o2hb_callback *hbcall;
766         struct o2hb_node_event *event;
767
768         /* Holding callback sem assures we don't alter the callback
769          * lists when doing this, and serializes ourselves with other
770          * processes wanting callbacks. */
771         down_write(&o2hb_callback_sem);
772
773         spin_lock(&o2hb_live_lock);
774         while (!list_empty(&o2hb_node_events)
775                && !list_empty(&queued_event->hn_item)) {
776                 event = list_entry(o2hb_node_events.next,
777                                    struct o2hb_node_event,
778                                    hn_item);
779                 list_del_init(&event->hn_item);
780                 spin_unlock(&o2hb_live_lock);
781
782                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
783                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
784                      event->hn_node_num);
785
786                 hbcall = hbcall_from_type(event->hn_event_type);
787
788                 /* We should *never* have gotten on to the list with a
789                  * bad type... This isn't something that we should try
790                  * to recover from. */
791                 BUG_ON(IS_ERR(hbcall));
792
793                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
794
795                 spin_lock(&o2hb_live_lock);
796         }
797         spin_unlock(&o2hb_live_lock);
798
799         up_write(&o2hb_callback_sem);
800 }
801
802 static void o2hb_queue_node_event(struct o2hb_node_event *event,
803                                   enum o2hb_callback_type type,
804                                   struct o2nm_node *node,
805                                   int node_num)
806 {
807         assert_spin_locked(&o2hb_live_lock);
808
809         BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
810
811         event->hn_event_type = type;
812         event->hn_node = node;
813         event->hn_node_num = node_num;
814
815         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
816              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
817
818         list_add_tail(&event->hn_item, &o2hb_node_events);
819 }
820
821 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
822 {
823         struct o2hb_node_event event =
824                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
825         struct o2nm_node *node;
826         int queued = 0;
827
828         node = o2nm_get_node_by_num(slot->ds_node_num);
829         if (!node)
830                 return;
831
832         spin_lock(&o2hb_live_lock);
833         if (!list_empty(&slot->ds_live_item)) {
834                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
835                      slot->ds_node_num);
836
837                 list_del_init(&slot->ds_live_item);
838
839                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
840                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
841
842                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
843                                               slot->ds_node_num);
844                         queued = 1;
845                 }
846         }
847         spin_unlock(&o2hb_live_lock);
848
849         if (queued)
850                 o2hb_run_event_list(&event);
851
852         o2nm_node_put(node);
853 }
854
855 static void o2hb_set_quorum_device(struct o2hb_region *reg)
856 {
857         if (!o2hb_global_heartbeat_active())
858                 return;
859
860         /* Prevent race with o2hb_heartbeat_group_drop_item() */
861         if (kthread_should_stop())
862                 return;
863
864         /* Tag region as quorum only after thread reaches steady state */
865         if (atomic_read(&reg->hr_steady_iterations) != 0)
866                 return;
867
868         spin_lock(&o2hb_live_lock);
869
870         if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
871                 goto unlock;
872
873         /*
874          * A region can be added to the quorum only when it sees all
875          * live nodes heartbeat on it. In other words, the region has been
876          * added to all nodes.
877          */
878         if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
879                    sizeof(o2hb_live_node_bitmap)))
880                 goto unlock;
881
882         printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
883                config_item_name(&reg->hr_item), reg->hr_dev_name);
884
885         set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
886
887         /*
888          * If global heartbeat active, unpin all regions if the
889          * region count > CUT_OFF
890          */
891         if (bitmap_weight(o2hb_quorum_region_bitmap,
892                            O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
893                 o2hb_region_unpin(NULL);
894 unlock:
895         spin_unlock(&o2hb_live_lock);
896 }
897
898 static int o2hb_check_slot(struct o2hb_region *reg,
899                            struct o2hb_disk_slot *slot)
900 {
901         int changed = 0, gen_changed = 0;
902         struct o2hb_node_event event =
903                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
904         struct o2nm_node *node;
905         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
906         u64 cputime;
907         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
908         unsigned int slot_dead_ms;
909         int tmp;
910         int queued = 0;
911
912         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
913
914         /*
915          * If a node is no longer configured but is still in the livemap, we
916          * may need to clear that bit from the livemap.
917          */
918         node = o2nm_get_node_by_num(slot->ds_node_num);
919         if (!node) {
920                 spin_lock(&o2hb_live_lock);
921                 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
922                 spin_unlock(&o2hb_live_lock);
923                 if (!tmp)
924                         return 0;
925         }
926
927         if (!o2hb_verify_crc(reg, hb_block)) {
928                 /* all paths from here will drop o2hb_live_lock for
929                  * us. */
930                 spin_lock(&o2hb_live_lock);
931
932                 /* Don't print an error on the console in this case -
933                  * a freshly formatted heartbeat area will not have a
934                  * crc set on it. */
935                 if (list_empty(&slot->ds_live_item))
936                         goto out;
937
938                 /* The node is live but pushed out a bad crc. We
939                  * consider it a transient miss but don't populate any
940                  * other values as they may be junk. */
941                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
942                      slot->ds_node_num, reg->hr_dev_name);
943                 o2hb_dump_slot(hb_block);
944
945                 slot->ds_equal_samples++;
946                 goto fire_callbacks;
947         }
948
949         /* we don't care if these wrap.. the state transitions below
950          * clear at the right places */
951         cputime = le64_to_cpu(hb_block->hb_seq);
952         if (slot->ds_last_time != cputime)
953                 slot->ds_changed_samples++;
954         else
955                 slot->ds_equal_samples++;
956         slot->ds_last_time = cputime;
957
958         /* The node changed heartbeat generations. We assume this to
959          * mean it dropped off but came back before we timed out. We
960          * want to consider it down for the time being but don't want
961          * to lose any changed_samples state we might build up to
962          * considering it live again. */
963         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
964                 gen_changed = 1;
965                 slot->ds_equal_samples = 0;
966                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
967                      "to 0x%llx)\n", slot->ds_node_num,
968                      (long long)slot->ds_last_generation,
969                      (long long)le64_to_cpu(hb_block->hb_generation));
970         }
971
972         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
973
974         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
975              "seq %llu last %llu changed %u equal %u\n",
976              slot->ds_node_num, (long long)slot->ds_last_generation,
977              le32_to_cpu(hb_block->hb_cksum),
978              (unsigned long long)le64_to_cpu(hb_block->hb_seq),
979              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
980              slot->ds_equal_samples);
981
982         spin_lock(&o2hb_live_lock);
983
984 fire_callbacks:
985         /* dead nodes only come to life after some number of
986          * changes at any time during their dead time */
987         if (list_empty(&slot->ds_live_item) &&
988             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
989                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
990                      slot->ds_node_num, (long long)slot->ds_last_generation);
991
992                 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
993
994                 /* first on the list generates a callback */
995                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
996                         mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
997                              "bitmap\n", slot->ds_node_num);
998                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
999
1000                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
1001                                               slot->ds_node_num);
1002
1003                         changed = 1;
1004                         queued = 1;
1005                 }
1006
1007                 list_add_tail(&slot->ds_live_item,
1008                               &o2hb_live_slots[slot->ds_node_num]);
1009
1010                 slot->ds_equal_samples = 0;
1011
1012                 /* We want to be sure that all nodes agree on the
1013                  * number of milliseconds before a node will be
1014                  * considered dead. The self-fencing timeout is
1015                  * computed from this value, and a discrepancy might
1016                  * result in heartbeat calling a node dead when it
1017                  * hasn't self-fenced yet. */
1018                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1019                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
1020                         /* TODO: Perhaps we can fail the region here. */
1021                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
1022                              "of %u ms, but our count is %u ms.\n"
1023                              "Please double check your configuration values "
1024                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1025                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
1026                              dead_ms);
1027                 }
1028                 goto out;
1029         }
1030
1031         /* if the list is dead, we're done.. */
1032         if (list_empty(&slot->ds_live_item))
1033                 goto out;
1034
1035         /* live nodes only go dead after enough consequtive missed
1036          * samples..  reset the missed counter whenever we see
1037          * activity */
1038         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1039                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
1040                      slot->ds_node_num);
1041
1042                 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1043
1044                 /* last off the live_slot generates a callback */
1045                 list_del_init(&slot->ds_live_item);
1046                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1047                         mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1048                              "nodes bitmap\n", slot->ds_node_num);
1049                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1050
1051                         /* node can be null */
1052                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1053                                               node, slot->ds_node_num);
1054
1055                         changed = 1;
1056                         queued = 1;
1057                 }
1058
1059                 /* We don't clear this because the node is still
1060                  * actually writing new blocks. */
1061                 if (!gen_changed)
1062                         slot->ds_changed_samples = 0;
1063                 goto out;
1064         }
1065         if (slot->ds_changed_samples) {
1066                 slot->ds_changed_samples = 0;
1067                 slot->ds_equal_samples = 0;
1068         }
1069 out:
1070         spin_unlock(&o2hb_live_lock);
1071
1072         if (queued)
1073                 o2hb_run_event_list(&event);
1074
1075         if (node)
1076                 o2nm_node_put(node);
1077         return changed;
1078 }
1079
1080 static int o2hb_highest_node(unsigned long *nodes, int numbits)
1081 {
1082         return find_last_bit(nodes, numbits);
1083 }
1084
1085 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1086 {
1087         int i, ret, highest_node;
1088         int membership_change = 0, own_slot_ok = 0;
1089         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1090         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1091         struct o2hb_bio_wait_ctxt write_wc;
1092
1093         ret = o2nm_configured_node_map(configured_nodes,
1094                                        sizeof(configured_nodes));
1095         if (ret) {
1096                 mlog_errno(ret);
1097                 goto bail;
1098         }
1099
1100         /*
1101          * If a node is not configured but is in the livemap, we still need
1102          * to read the slot so as to be able to remove it from the livemap.
1103          */
1104         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1105         i = -1;
1106         while ((i = find_next_bit(live_node_bitmap,
1107                                   O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1108                 set_bit(i, configured_nodes);
1109         }
1110
1111         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1112         if (highest_node >= O2NM_MAX_NODES) {
1113                 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1114                 ret = -EINVAL;
1115                 goto bail;
1116         }
1117
1118         /* No sense in reading the slots of nodes that don't exist
1119          * yet. Of course, if the node definitions have holes in them
1120          * then we're reading an empty slot anyway... Consider this
1121          * best-effort. */
1122         ret = o2hb_read_slots(reg, highest_node + 1);
1123         if (ret < 0) {
1124                 mlog_errno(ret);
1125                 goto bail;
1126         }
1127
1128         /* With an up to date view of the slots, we can check that no
1129          * other node has been improperly configured to heartbeat in
1130          * our slot. */
1131         own_slot_ok = o2hb_check_own_slot(reg);
1132
1133         /* fill in the proper info for our next heartbeat */
1134         o2hb_prepare_block(reg, reg->hr_generation);
1135
1136         ret = o2hb_issue_node_write(reg, &write_wc);
1137         if (ret < 0) {
1138                 mlog_errno(ret);
1139                 goto bail;
1140         }
1141
1142         i = -1;
1143         while((i = find_next_bit(configured_nodes,
1144                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1145                 membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1146         }
1147
1148         /*
1149          * We have to be sure we've advertised ourselves on disk
1150          * before we can go to steady state.  This ensures that
1151          * people we find in our steady state have seen us.
1152          */
1153         o2hb_wait_on_io(reg, &write_wc);
1154         if (write_wc.wc_error) {
1155                 /* Do not re-arm the write timeout on I/O error - we
1156                  * can't be sure that the new block ever made it to
1157                  * disk */
1158                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1159                      write_wc.wc_error, reg->hr_dev_name);
1160                 ret = write_wc.wc_error;
1161                 goto bail;
1162         }
1163
1164         /* Skip disarming the timeout if own slot has stale/bad data */
1165         if (own_slot_ok) {
1166                 o2hb_set_quorum_device(reg);
1167                 o2hb_arm_timeout(reg);
1168         }
1169
1170 bail:
1171         /* let the person who launched us know when things are steady */
1172         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1173                 if (!ret && own_slot_ok && !membership_change) {
1174                         if (atomic_dec_and_test(&reg->hr_steady_iterations))
1175                                 wake_up(&o2hb_steady_queue);
1176                 }
1177         }
1178
1179         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1180                 if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1181                         printk(KERN_NOTICE "o2hb: Unable to stabilize "
1182                                "heartbeart on region %s (%s)\n",
1183                                config_item_name(&reg->hr_item),
1184                                reg->hr_dev_name);
1185                         atomic_set(&reg->hr_steady_iterations, 0);
1186                         reg->hr_aborted_start = 1;
1187                         wake_up(&o2hb_steady_queue);
1188                         ret = -EIO;
1189                 }
1190         }
1191
1192         return ret;
1193 }
1194
1195 /*
1196  * we ride the region ref that the region dir holds.  before the region
1197  * dir is removed and drops it ref it will wait to tear down this
1198  * thread.
1199  */
1200 static int o2hb_thread(void *data)
1201 {
1202         int i, ret;
1203         struct o2hb_region *reg = data;
1204         struct o2hb_bio_wait_ctxt write_wc;
1205         ktime_t before_hb, after_hb;
1206         unsigned int elapsed_msec;
1207
1208         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1209
1210         set_user_nice(current, MIN_NICE);
1211
1212         /* Pin node */
1213         ret = o2nm_depend_this_node();
1214         if (ret) {
1215                 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1216                 reg->hr_node_deleted = 1;
1217                 wake_up(&o2hb_steady_queue);
1218                 return 0;
1219         }
1220
1221         while (!kthread_should_stop() &&
1222                !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1223                 /* We track the time spent inside
1224                  * o2hb_do_disk_heartbeat so that we avoid more than
1225                  * hr_timeout_ms between disk writes. On busy systems
1226                  * this should result in a heartbeat which is less
1227                  * likely to time itself out. */
1228                 before_hb = ktime_get_real();
1229
1230                 ret = o2hb_do_disk_heartbeat(reg);
1231
1232                 after_hb = ktime_get_real();
1233
1234                 elapsed_msec = (unsigned int)
1235                                 ktime_ms_delta(after_hb, before_hb);
1236
1237                 mlog(ML_HEARTBEAT,
1238                      "start = %lld, end = %lld, msec = %u, ret = %d\n",
1239                      before_hb.tv64, after_hb.tv64, elapsed_msec, ret);
1240
1241                 if (!kthread_should_stop() &&
1242                     elapsed_msec < reg->hr_timeout_ms) {
1243                         /* the kthread api has blocked signals for us so no
1244                          * need to record the return value. */
1245                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1246                 }
1247         }
1248
1249         o2hb_disarm_timeout(reg);
1250
1251         /* unclean stop is only used in very bad situation */
1252         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1253                 o2hb_shutdown_slot(&reg->hr_slots[i]);
1254
1255         /* Explicit down notification - avoid forcing the other nodes
1256          * to timeout on this region when we could just as easily
1257          * write a clear generation - thus indicating to them that
1258          * this node has left this region.
1259          */
1260         if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1261                 o2hb_prepare_block(reg, 0);
1262                 ret = o2hb_issue_node_write(reg, &write_wc);
1263                 if (ret == 0)
1264                         o2hb_wait_on_io(reg, &write_wc);
1265                 else
1266                         mlog_errno(ret);
1267         }
1268
1269         /* Unpin node */
1270         o2nm_undepend_this_node();
1271
1272         mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1273
1274         return 0;
1275 }
1276
1277 #ifdef CONFIG_DEBUG_FS
1278 static int o2hb_debug_open(struct inode *inode, struct file *file)
1279 {
1280         struct o2hb_debug_buf *db = inode->i_private;
1281         struct o2hb_region *reg;
1282         unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1283         unsigned long lts;
1284         char *buf = NULL;
1285         int i = -1;
1286         int out = 0;
1287
1288         /* max_nodes should be the largest bitmap we pass here */
1289         BUG_ON(sizeof(map) < db->db_size);
1290
1291         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1292         if (!buf)
1293                 goto bail;
1294
1295         switch (db->db_type) {
1296         case O2HB_DB_TYPE_LIVENODES:
1297         case O2HB_DB_TYPE_LIVEREGIONS:
1298         case O2HB_DB_TYPE_QUORUMREGIONS:
1299         case O2HB_DB_TYPE_FAILEDREGIONS:
1300                 spin_lock(&o2hb_live_lock);
1301                 memcpy(map, db->db_data, db->db_size);
1302                 spin_unlock(&o2hb_live_lock);
1303                 break;
1304
1305         case O2HB_DB_TYPE_REGION_LIVENODES:
1306                 spin_lock(&o2hb_live_lock);
1307                 reg = (struct o2hb_region *)db->db_data;
1308                 memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1309                 spin_unlock(&o2hb_live_lock);
1310                 break;
1311
1312         case O2HB_DB_TYPE_REGION_NUMBER:
1313                 reg = (struct o2hb_region *)db->db_data;
1314                 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1315                                 reg->hr_region_num);
1316                 goto done;
1317
1318         case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1319                 reg = (struct o2hb_region *)db->db_data;
1320                 lts = reg->hr_last_timeout_start;
1321                 /* If 0, it has never been set before */
1322                 if (lts)
1323                         lts = jiffies_to_msecs(jiffies - lts);
1324                 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1325                 goto done;
1326
1327         case O2HB_DB_TYPE_REGION_PINNED:
1328                 reg = (struct o2hb_region *)db->db_data;
1329                 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1330                                 !!reg->hr_item_pinned);
1331                 goto done;
1332
1333         default:
1334                 goto done;
1335         }
1336
1337         while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1338                 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1339         out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1340
1341 done:
1342         i_size_write(inode, out);
1343
1344         file->private_data = buf;
1345
1346         return 0;
1347 bail:
1348         return -ENOMEM;
1349 }
1350
1351 static int o2hb_debug_release(struct inode *inode, struct file *file)
1352 {
1353         kfree(file->private_data);
1354         return 0;
1355 }
1356
1357 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1358                                  size_t nbytes, loff_t *ppos)
1359 {
1360         return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1361                                        i_size_read(file->f_mapping->host));
1362 }
1363 #else
1364 static int o2hb_debug_open(struct inode *inode, struct file *file)
1365 {
1366         return 0;
1367 }
1368 static int o2hb_debug_release(struct inode *inode, struct file *file)
1369 {
1370         return 0;
1371 }
1372 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1373                                size_t nbytes, loff_t *ppos)
1374 {
1375         return 0;
1376 }
1377 #endif  /* CONFIG_DEBUG_FS */
1378
1379 static const struct file_operations o2hb_debug_fops = {
1380         .open =         o2hb_debug_open,
1381         .release =      o2hb_debug_release,
1382         .read =         o2hb_debug_read,
1383         .llseek =       generic_file_llseek,
1384 };
1385
1386 void o2hb_exit(void)
1387 {
1388         debugfs_remove(o2hb_debug_failedregions);
1389         debugfs_remove(o2hb_debug_quorumregions);
1390         debugfs_remove(o2hb_debug_liveregions);
1391         debugfs_remove(o2hb_debug_livenodes);
1392         debugfs_remove(o2hb_debug_dir);
1393         kfree(o2hb_db_livenodes);
1394         kfree(o2hb_db_liveregions);
1395         kfree(o2hb_db_quorumregions);
1396         kfree(o2hb_db_failedregions);
1397 }
1398
1399 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1400                                         struct o2hb_debug_buf **db, int db_len,
1401                                         int type, int size, int len, void *data)
1402 {
1403         *db = kmalloc(db_len, GFP_KERNEL);
1404         if (!*db)
1405                 return NULL;
1406
1407         (*db)->db_type = type;
1408         (*db)->db_size = size;
1409         (*db)->db_len = len;
1410         (*db)->db_data = data;
1411
1412         return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1413                                    &o2hb_debug_fops);
1414 }
1415
1416 static int o2hb_debug_init(void)
1417 {
1418         int ret = -ENOMEM;
1419
1420         o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1421         if (!o2hb_debug_dir) {
1422                 mlog_errno(ret);
1423                 goto bail;
1424         }
1425
1426         o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1427                                                  o2hb_debug_dir,
1428                                                  &o2hb_db_livenodes,
1429                                                  sizeof(*o2hb_db_livenodes),
1430                                                  O2HB_DB_TYPE_LIVENODES,
1431                                                  sizeof(o2hb_live_node_bitmap),
1432                                                  O2NM_MAX_NODES,
1433                                                  o2hb_live_node_bitmap);
1434         if (!o2hb_debug_livenodes) {
1435                 mlog_errno(ret);
1436                 goto bail;
1437         }
1438
1439         o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1440                                                    o2hb_debug_dir,
1441                                                    &o2hb_db_liveregions,
1442                                                    sizeof(*o2hb_db_liveregions),
1443                                                    O2HB_DB_TYPE_LIVEREGIONS,
1444                                                    sizeof(o2hb_live_region_bitmap),
1445                                                    O2NM_MAX_REGIONS,
1446                                                    o2hb_live_region_bitmap);
1447         if (!o2hb_debug_liveregions) {
1448                 mlog_errno(ret);
1449                 goto bail;
1450         }
1451
1452         o2hb_debug_quorumregions =
1453                         o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1454                                           o2hb_debug_dir,
1455                                           &o2hb_db_quorumregions,
1456                                           sizeof(*o2hb_db_quorumregions),
1457                                           O2HB_DB_TYPE_QUORUMREGIONS,
1458                                           sizeof(o2hb_quorum_region_bitmap),
1459                                           O2NM_MAX_REGIONS,
1460                                           o2hb_quorum_region_bitmap);
1461         if (!o2hb_debug_quorumregions) {
1462                 mlog_errno(ret);
1463                 goto bail;
1464         }
1465
1466         o2hb_debug_failedregions =
1467                         o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1468                                           o2hb_debug_dir,
1469                                           &o2hb_db_failedregions,
1470                                           sizeof(*o2hb_db_failedregions),
1471                                           O2HB_DB_TYPE_FAILEDREGIONS,
1472                                           sizeof(o2hb_failed_region_bitmap),
1473                                           O2NM_MAX_REGIONS,
1474                                           o2hb_failed_region_bitmap);
1475         if (!o2hb_debug_failedregions) {
1476                 mlog_errno(ret);
1477                 goto bail;
1478         }
1479
1480         ret = 0;
1481 bail:
1482         if (ret)
1483                 o2hb_exit();
1484
1485         return ret;
1486 }
1487
1488 int o2hb_init(void)
1489 {
1490         int i;
1491
1492         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1493                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1494
1495         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1496                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1497
1498         INIT_LIST_HEAD(&o2hb_node_events);
1499
1500         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1501         memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1502         memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1503         memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1504         memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1505
1506         o2hb_dependent_users = 0;
1507
1508         return o2hb_debug_init();
1509 }
1510
1511 /* if we're already in a callback then we're already serialized by the sem */
1512 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1513                                              unsigned bytes)
1514 {
1515         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1516
1517         memcpy(map, &o2hb_live_node_bitmap, bytes);
1518 }
1519
1520 /*
1521  * get a map of all nodes that are heartbeating in any regions
1522  */
1523 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1524 {
1525         /* callers want to serialize this map and callbacks so that they
1526          * can trust that they don't miss nodes coming to the party */
1527         down_read(&o2hb_callback_sem);
1528         spin_lock(&o2hb_live_lock);
1529         o2hb_fill_node_map_from_callback(map, bytes);
1530         spin_unlock(&o2hb_live_lock);
1531         up_read(&o2hb_callback_sem);
1532 }
1533 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1534
1535 /*
1536  * heartbeat configfs bits.  The heartbeat set is a default set under
1537  * the cluster set in nodemanager.c.
1538  */
1539
1540 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1541 {
1542         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1543 }
1544
1545 /* drop_item only drops its ref after killing the thread, nothing should
1546  * be using the region anymore.  this has to clean up any state that
1547  * attributes might have built up. */
1548 static void o2hb_region_release(struct config_item *item)
1549 {
1550         int i;
1551         struct page *page;
1552         struct o2hb_region *reg = to_o2hb_region(item);
1553
1554         mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1555
1556         kfree(reg->hr_tmp_block);
1557
1558         if (reg->hr_slot_data) {
1559                 for (i = 0; i < reg->hr_num_pages; i++) {
1560                         page = reg->hr_slot_data[i];
1561                         if (page)
1562                                 __free_page(page);
1563                 }
1564                 kfree(reg->hr_slot_data);
1565         }
1566
1567         if (reg->hr_bdev)
1568                 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1569
1570         kfree(reg->hr_slots);
1571
1572         debugfs_remove(reg->hr_debug_livenodes);
1573         debugfs_remove(reg->hr_debug_regnum);
1574         debugfs_remove(reg->hr_debug_elapsed_time);
1575         debugfs_remove(reg->hr_debug_pinned);
1576         debugfs_remove(reg->hr_debug_dir);
1577         kfree(reg->hr_db_livenodes);
1578         kfree(reg->hr_db_regnum);
1579         kfree(reg->hr_db_elapsed_time);
1580         kfree(reg->hr_db_pinned);
1581
1582         spin_lock(&o2hb_live_lock);
1583         list_del(&reg->hr_all_item);
1584         spin_unlock(&o2hb_live_lock);
1585
1586         o2net_unregister_handler_list(&reg->hr_handler_list);
1587         kfree(reg);
1588 }
1589
1590 static int o2hb_read_block_input(struct o2hb_region *reg,
1591                                  const char *page,
1592                                  unsigned long *ret_bytes,
1593                                  unsigned int *ret_bits)
1594 {
1595         unsigned long bytes;
1596         char *p = (char *)page;
1597
1598         bytes = simple_strtoul(p, &p, 0);
1599         if (!p || (*p && (*p != '\n')))
1600                 return -EINVAL;
1601
1602         /* Heartbeat and fs min / max block sizes are the same. */
1603         if (bytes > 4096 || bytes < 512)
1604                 return -ERANGE;
1605         if (hweight16(bytes) != 1)
1606                 return -EINVAL;
1607
1608         if (ret_bytes)
1609                 *ret_bytes = bytes;
1610         if (ret_bits)
1611                 *ret_bits = ffs(bytes) - 1;
1612
1613         return 0;
1614 }
1615
1616 static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1617                                             char *page)
1618 {
1619         return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1620 }
1621
1622 static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1623                                              const char *page,
1624                                              size_t count)
1625 {
1626         struct o2hb_region *reg = to_o2hb_region(item);
1627         int status;
1628         unsigned long block_bytes;
1629         unsigned int block_bits;
1630
1631         if (reg->hr_bdev)
1632                 return -EINVAL;
1633
1634         status = o2hb_read_block_input(reg, page, &block_bytes,
1635                                        &block_bits);
1636         if (status)
1637                 return status;
1638
1639         reg->hr_block_bytes = (unsigned int)block_bytes;
1640         reg->hr_block_bits = block_bits;
1641
1642         return count;
1643 }
1644
1645 static ssize_t o2hb_region_start_block_show(struct config_item *item,
1646                                             char *page)
1647 {
1648         return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1649 }
1650
1651 static ssize_t o2hb_region_start_block_store(struct config_item *item,
1652                                              const char *page,
1653                                              size_t count)
1654 {
1655         struct o2hb_region *reg = to_o2hb_region(item);
1656         unsigned long long tmp;
1657         char *p = (char *)page;
1658
1659         if (reg->hr_bdev)
1660                 return -EINVAL;
1661
1662         tmp = simple_strtoull(p, &p, 0);
1663         if (!p || (*p && (*p != '\n')))
1664                 return -EINVAL;
1665
1666         reg->hr_start_block = tmp;
1667
1668         return count;
1669 }
1670
1671 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1672 {
1673         return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1674 }
1675
1676 static ssize_t o2hb_region_blocks_store(struct config_item *item,
1677                                         const char *page,
1678                                         size_t count)
1679 {
1680         struct o2hb_region *reg = to_o2hb_region(item);
1681         unsigned long tmp;
1682         char *p = (char *)page;
1683
1684         if (reg->hr_bdev)
1685                 return -EINVAL;
1686
1687         tmp = simple_strtoul(p, &p, 0);
1688         if (!p || (*p && (*p != '\n')))
1689                 return -EINVAL;
1690
1691         if (tmp > O2NM_MAX_NODES || tmp == 0)
1692                 return -ERANGE;
1693
1694         reg->hr_blocks = (unsigned int)tmp;
1695
1696         return count;
1697 }
1698
1699 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1700 {
1701         unsigned int ret = 0;
1702
1703         if (to_o2hb_region(item)->hr_bdev)
1704                 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1705
1706         return ret;
1707 }
1708
1709 static void o2hb_init_region_params(struct o2hb_region *reg)
1710 {
1711         reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1712         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1713
1714         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1715              reg->hr_start_block, reg->hr_blocks);
1716         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1717              reg->hr_block_bytes, reg->hr_block_bits);
1718         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1719         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1720 }
1721
1722 static int o2hb_map_slot_data(struct o2hb_region *reg)
1723 {
1724         int i, j;
1725         unsigned int last_slot;
1726         unsigned int spp = reg->hr_slots_per_page;
1727         struct page *page;
1728         char *raw;
1729         struct o2hb_disk_slot *slot;
1730
1731         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1732         if (reg->hr_tmp_block == NULL)
1733                 return -ENOMEM;
1734
1735         reg->hr_slots = kcalloc(reg->hr_blocks,
1736                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1737         if (reg->hr_slots == NULL)
1738                 return -ENOMEM;
1739
1740         for(i = 0; i < reg->hr_blocks; i++) {
1741                 slot = &reg->hr_slots[i];
1742                 slot->ds_node_num = i;
1743                 INIT_LIST_HEAD(&slot->ds_live_item);
1744                 slot->ds_raw_block = NULL;
1745         }
1746
1747         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1748         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1749                            "at %u blocks per page\n",
1750              reg->hr_num_pages, reg->hr_blocks, spp);
1751
1752         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1753                                     GFP_KERNEL);
1754         if (!reg->hr_slot_data)
1755                 return -ENOMEM;
1756
1757         for(i = 0; i < reg->hr_num_pages; i++) {
1758                 page = alloc_page(GFP_KERNEL);
1759                 if (!page)
1760                         return -ENOMEM;
1761
1762                 reg->hr_slot_data[i] = page;
1763
1764                 last_slot = i * spp;
1765                 raw = page_address(page);
1766                 for (j = 0;
1767                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1768                      j++) {
1769                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1770
1771                         slot = &reg->hr_slots[j + last_slot];
1772                         slot->ds_raw_block =
1773                                 (struct o2hb_disk_heartbeat_block *) raw;
1774
1775                         raw += reg->hr_block_bytes;
1776                 }
1777         }
1778
1779         return 0;
1780 }
1781
1782 /* Read in all the slots available and populate the tracking
1783  * structures so that we can start with a baseline idea of what's
1784  * there. */
1785 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1786 {
1787         int ret, i;
1788         struct o2hb_disk_slot *slot;
1789         struct o2hb_disk_heartbeat_block *hb_block;
1790
1791         ret = o2hb_read_slots(reg, reg->hr_blocks);
1792         if (ret)
1793                 goto out;
1794
1795         /* We only want to get an idea of the values initially in each
1796          * slot, so we do no verification - o2hb_check_slot will
1797          * actually determine if each configured slot is valid and
1798          * whether any values have changed. */
1799         for(i = 0; i < reg->hr_blocks; i++) {
1800                 slot = &reg->hr_slots[i];
1801                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1802
1803                 /* Only fill the values that o2hb_check_slot uses to
1804                  * determine changing slots */
1805                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1806                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1807         }
1808
1809 out:
1810         return ret;
1811 }
1812
1813 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1814 static ssize_t o2hb_region_dev_store(struct config_item *item,
1815                                      const char *page,
1816                                      size_t count)
1817 {
1818         struct o2hb_region *reg = to_o2hb_region(item);
1819         struct task_struct *hb_task;
1820         long fd;
1821         int sectsize;
1822         char *p = (char *)page;
1823         struct fd f;
1824         struct inode *inode;
1825         ssize_t ret = -EINVAL;
1826         int live_threshold;
1827
1828         if (reg->hr_bdev)
1829                 goto out;
1830
1831         /* We can't heartbeat without having had our node number
1832          * configured yet. */
1833         if (o2nm_this_node() == O2NM_MAX_NODES)
1834                 goto out;
1835
1836         fd = simple_strtol(p, &p, 0);
1837         if (!p || (*p && (*p != '\n')))
1838                 goto out;
1839
1840         if (fd < 0 || fd >= INT_MAX)
1841                 goto out;
1842
1843         f = fdget(fd);
1844         if (f.file == NULL)
1845                 goto out;
1846
1847         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1848             reg->hr_block_bytes == 0)
1849                 goto out2;
1850
1851         inode = igrab(f.file->f_mapping->host);
1852         if (inode == NULL)
1853                 goto out2;
1854
1855         if (!S_ISBLK(inode->i_mode))
1856                 goto out3;
1857
1858         reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1859         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1860         if (ret) {
1861                 reg->hr_bdev = NULL;
1862                 goto out3;
1863         }
1864         inode = NULL;
1865
1866         bdevname(reg->hr_bdev, reg->hr_dev_name);
1867
1868         sectsize = bdev_logical_block_size(reg->hr_bdev);
1869         if (sectsize != reg->hr_block_bytes) {
1870                 mlog(ML_ERROR,
1871                      "blocksize %u incorrect for device, expected %d",
1872                      reg->hr_block_bytes, sectsize);
1873                 ret = -EINVAL;
1874                 goto out3;
1875         }
1876
1877         o2hb_init_region_params(reg);
1878
1879         /* Generation of zero is invalid */
1880         do {
1881                 get_random_bytes(&reg->hr_generation,
1882                                  sizeof(reg->hr_generation));
1883         } while (reg->hr_generation == 0);
1884
1885         ret = o2hb_map_slot_data(reg);
1886         if (ret) {
1887                 mlog_errno(ret);
1888                 goto out3;
1889         }
1890
1891         ret = o2hb_populate_slot_data(reg);
1892         if (ret) {
1893                 mlog_errno(ret);
1894                 goto out3;
1895         }
1896
1897         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1898         INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1899
1900         /*
1901          * A node is considered live after it has beat LIVE_THRESHOLD
1902          * times.  We're not steady until we've given them a chance
1903          * _after_ our first read.
1904          * The default threshold is bare minimum so as to limit the delay
1905          * during mounts. For global heartbeat, the threshold doubled for the
1906          * first region.
1907          */
1908         live_threshold = O2HB_LIVE_THRESHOLD;
1909         if (o2hb_global_heartbeat_active()) {
1910                 spin_lock(&o2hb_live_lock);
1911                 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1912                         live_threshold <<= 1;
1913                 spin_unlock(&o2hb_live_lock);
1914         }
1915         ++live_threshold;
1916         atomic_set(&reg->hr_steady_iterations, live_threshold);
1917         /* unsteady_iterations is triple the steady_iterations */
1918         atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1919
1920         hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1921                               reg->hr_item.ci_name);
1922         if (IS_ERR(hb_task)) {
1923                 ret = PTR_ERR(hb_task);
1924                 mlog_errno(ret);
1925                 goto out3;
1926         }
1927
1928         spin_lock(&o2hb_live_lock);
1929         reg->hr_task = hb_task;
1930         spin_unlock(&o2hb_live_lock);
1931
1932         ret = wait_event_interruptible(o2hb_steady_queue,
1933                                 atomic_read(&reg->hr_steady_iterations) == 0 ||
1934                                 reg->hr_node_deleted);
1935         if (ret) {
1936                 atomic_set(&reg->hr_steady_iterations, 0);
1937                 reg->hr_aborted_start = 1;
1938         }
1939
1940         if (reg->hr_aborted_start) {
1941                 ret = -EIO;
1942                 goto out3;
1943         }
1944
1945         if (reg->hr_node_deleted) {
1946                 ret = -EINVAL;
1947                 goto out3;
1948         }
1949
1950         /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1951         spin_lock(&o2hb_live_lock);
1952         hb_task = reg->hr_task;
1953         if (o2hb_global_heartbeat_active())
1954                 set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1955         spin_unlock(&o2hb_live_lock);
1956
1957         if (hb_task)
1958                 ret = count;
1959         else
1960                 ret = -EIO;
1961
1962         if (hb_task && o2hb_global_heartbeat_active())
1963                 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1964                        config_item_name(&reg->hr_item), reg->hr_dev_name);
1965
1966 out3:
1967         iput(inode);
1968 out2:
1969         fdput(f);
1970 out:
1971         if (ret < 0) {
1972                 if (reg->hr_bdev) {
1973                         blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1974                         reg->hr_bdev = NULL;
1975                 }
1976         }
1977         return ret;
1978 }
1979
1980 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1981 {
1982         struct o2hb_region *reg = to_o2hb_region(item);
1983         pid_t pid = 0;
1984
1985         spin_lock(&o2hb_live_lock);
1986         if (reg->hr_task)
1987                 pid = task_pid_nr(reg->hr_task);
1988         spin_unlock(&o2hb_live_lock);
1989
1990         if (!pid)
1991                 return 0;
1992
1993         return sprintf(page, "%u\n", pid);
1994 }
1995
1996 CONFIGFS_ATTR(o2hb_region_, block_bytes);
1997 CONFIGFS_ATTR(o2hb_region_, start_block);
1998 CONFIGFS_ATTR(o2hb_region_, blocks);
1999 CONFIGFS_ATTR(o2hb_region_, dev);
2000 CONFIGFS_ATTR_RO(o2hb_region_, pid);
2001
2002 static struct configfs_attribute *o2hb_region_attrs[] = {
2003         &o2hb_region_attr_block_bytes,
2004         &o2hb_region_attr_start_block,
2005         &o2hb_region_attr_blocks,
2006         &o2hb_region_attr_dev,
2007         &o2hb_region_attr_pid,
2008         NULL,
2009 };
2010
2011 static struct configfs_item_operations o2hb_region_item_ops = {
2012         .release                = o2hb_region_release,
2013 };
2014
2015 static struct config_item_type o2hb_region_type = {
2016         .ct_item_ops    = &o2hb_region_item_ops,
2017         .ct_attrs       = o2hb_region_attrs,
2018         .ct_owner       = THIS_MODULE,
2019 };
2020
2021 /* heartbeat set */
2022
2023 struct o2hb_heartbeat_group {
2024         struct config_group hs_group;
2025         /* some stuff? */
2026 };
2027
2028 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
2029 {
2030         return group ?
2031                 container_of(group, struct o2hb_heartbeat_group, hs_group)
2032                 : NULL;
2033 }
2034
2035 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
2036 {
2037         int ret = -ENOMEM;
2038
2039         reg->hr_debug_dir =
2040                 debugfs_create_dir(config_item_name(&reg->hr_item), dir);
2041         if (!reg->hr_debug_dir) {
2042                 mlog_errno(ret);
2043                 goto bail;
2044         }
2045
2046         reg->hr_debug_livenodes =
2047                         o2hb_debug_create(O2HB_DEBUG_LIVENODES,
2048                                           reg->hr_debug_dir,
2049                                           &(reg->hr_db_livenodes),
2050                                           sizeof(*(reg->hr_db_livenodes)),
2051                                           O2HB_DB_TYPE_REGION_LIVENODES,
2052                                           sizeof(reg->hr_live_node_bitmap),
2053                                           O2NM_MAX_NODES, reg);
2054         if (!reg->hr_debug_livenodes) {
2055                 mlog_errno(ret);
2056                 goto bail;
2057         }
2058
2059         reg->hr_debug_regnum =
2060                         o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2061                                           reg->hr_debug_dir,
2062                                           &(reg->hr_db_regnum),
2063                                           sizeof(*(reg->hr_db_regnum)),
2064                                           O2HB_DB_TYPE_REGION_NUMBER,
2065                                           0, O2NM_MAX_NODES, reg);
2066         if (!reg->hr_debug_regnum) {
2067                 mlog_errno(ret);
2068                 goto bail;
2069         }
2070
2071         reg->hr_debug_elapsed_time =
2072                         o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2073                                           reg->hr_debug_dir,
2074                                           &(reg->hr_db_elapsed_time),
2075                                           sizeof(*(reg->hr_db_elapsed_time)),
2076                                           O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2077                                           0, 0, reg);
2078         if (!reg->hr_debug_elapsed_time) {
2079                 mlog_errno(ret);
2080                 goto bail;
2081         }
2082
2083         reg->hr_debug_pinned =
2084                         o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2085                                           reg->hr_debug_dir,
2086                                           &(reg->hr_db_pinned),
2087                                           sizeof(*(reg->hr_db_pinned)),
2088                                           O2HB_DB_TYPE_REGION_PINNED,
2089                                           0, 0, reg);
2090         if (!reg->hr_debug_pinned) {
2091                 mlog_errno(ret);
2092                 goto bail;
2093         }
2094
2095         ret = 0;
2096 bail:
2097         return ret;
2098 }
2099
2100 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2101                                                           const char *name)
2102 {
2103         struct o2hb_region *reg = NULL;
2104         int ret;
2105
2106         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2107         if (reg == NULL)
2108                 return ERR_PTR(-ENOMEM);
2109
2110         if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2111                 ret = -ENAMETOOLONG;
2112                 goto free;
2113         }
2114
2115         spin_lock(&o2hb_live_lock);
2116         reg->hr_region_num = 0;
2117         if (o2hb_global_heartbeat_active()) {
2118                 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2119                                                          O2NM_MAX_REGIONS);
2120                 if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2121                         spin_unlock(&o2hb_live_lock);
2122                         ret = -EFBIG;
2123                         goto free;
2124                 }
2125                 set_bit(reg->hr_region_num, o2hb_region_bitmap);
2126         }
2127         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2128         spin_unlock(&o2hb_live_lock);
2129
2130         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2131
2132         /* this is the same way to generate msg key as dlm, for local heartbeat,
2133          * name is also the same, so make initial crc value different to avoid
2134          * message key conflict.
2135          */
2136         reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2137                 name, strlen(name));
2138         INIT_LIST_HEAD(&reg->hr_handler_list);
2139         ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2140                         sizeof(struct o2hb_nego_msg),
2141                         o2hb_nego_timeout_handler,
2142                         reg, NULL, &reg->hr_handler_list);
2143         if (ret)
2144                 goto free;
2145
2146         ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2147                         sizeof(struct o2hb_nego_msg),
2148                         o2hb_nego_approve_handler,
2149                         reg, NULL, &reg->hr_handler_list);
2150         if (ret)
2151                 goto unregister_handler;
2152
2153         ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2154         if (ret) {
2155                 config_item_put(&reg->hr_item);
2156                 goto unregister_handler;
2157         }
2158
2159         return &reg->hr_item;
2160
2161 unregister_handler:
2162         o2net_unregister_handler_list(&reg->hr_handler_list);
2163 free:
2164         kfree(reg);
2165         return ERR_PTR(ret);
2166 }
2167
2168 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2169                                            struct config_item *item)
2170 {
2171         struct task_struct *hb_task;
2172         struct o2hb_region *reg = to_o2hb_region(item);
2173         int quorum_region = 0;
2174
2175         /* stop the thread when the user removes the region dir */
2176         spin_lock(&o2hb_live_lock);
2177         hb_task = reg->hr_task;
2178         reg->hr_task = NULL;
2179         reg->hr_item_dropped = 1;
2180         spin_unlock(&o2hb_live_lock);
2181
2182         if (hb_task)
2183                 kthread_stop(hb_task);
2184
2185         if (o2hb_global_heartbeat_active()) {
2186                 spin_lock(&o2hb_live_lock);
2187                 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2188                 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2189                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2190                         quorum_region = 1;
2191                 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2192                 spin_unlock(&o2hb_live_lock);
2193                 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2194                        ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2195                         "stopped" : "start aborted"), config_item_name(item),
2196                        reg->hr_dev_name);
2197         }
2198
2199         /*
2200          * If we're racing a dev_write(), we need to wake them.  They will
2201          * check reg->hr_task
2202          */
2203         if (atomic_read(&reg->hr_steady_iterations) != 0) {
2204                 reg->hr_aborted_start = 1;
2205                 atomic_set(&reg->hr_steady_iterations, 0);
2206                 wake_up(&o2hb_steady_queue);
2207         }
2208
2209         config_item_put(item);
2210
2211         if (!o2hb_global_heartbeat_active() || !quorum_region)
2212                 return;
2213
2214         /*
2215          * If global heartbeat active and there are dependent users,
2216          * pin all regions if quorum region count <= CUT_OFF
2217          */
2218         spin_lock(&o2hb_live_lock);
2219
2220         if (!o2hb_dependent_users)
2221                 goto unlock;
2222
2223         if (bitmap_weight(o2hb_quorum_region_bitmap,
2224                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2225                 o2hb_region_pin(NULL);
2226
2227 unlock:
2228         spin_unlock(&o2hb_live_lock);
2229 }
2230
2231 static ssize_t o2hb_heartbeat_group_threshold_show(struct config_item *item,
2232                 char *page)
2233 {
2234         return sprintf(page, "%u\n", o2hb_dead_threshold);
2235 }
2236
2237 static ssize_t o2hb_heartbeat_group_threshold_store(struct config_item *item,
2238                 const char *page, size_t count)
2239 {
2240         unsigned long tmp;
2241         char *p = (char *)page;
2242
2243         tmp = simple_strtoul(p, &p, 10);
2244         if (!p || (*p && (*p != '\n')))
2245                 return -EINVAL;
2246
2247         /* this will validate ranges for us. */
2248         o2hb_dead_threshold_set((unsigned int) tmp);
2249
2250         return count;
2251 }
2252
2253 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2254                 char *page)
2255 {
2256         return sprintf(page, "%s\n",
2257                        o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2258 }
2259
2260 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2261                 const char *page, size_t count)
2262 {
2263         unsigned int i;
2264         int ret;
2265         size_t len;
2266
2267         len = (page[count - 1] == '\n') ? count - 1 : count;
2268         if (!len)
2269                 return -EINVAL;
2270
2271         for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2272                 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2273                         continue;
2274
2275                 ret = o2hb_global_heartbeat_mode_set(i);
2276                 if (!ret)
2277                         printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2278                                o2hb_heartbeat_mode_desc[i]);
2279                 return count;
2280         }
2281
2282         return -EINVAL;
2283
2284 }
2285
2286 CONFIGFS_ATTR(o2hb_heartbeat_group_, threshold);
2287 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2288
2289 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2290         &o2hb_heartbeat_group_attr_threshold,
2291         &o2hb_heartbeat_group_attr_mode,
2292         NULL,
2293 };
2294
2295 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2296         .make_item      = o2hb_heartbeat_group_make_item,
2297         .drop_item      = o2hb_heartbeat_group_drop_item,
2298 };
2299
2300 static struct config_item_type o2hb_heartbeat_group_type = {
2301         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2302         .ct_attrs       = o2hb_heartbeat_group_attrs,
2303         .ct_owner       = THIS_MODULE,
2304 };
2305
2306 /* this is just here to avoid touching group in heartbeat.h which the
2307  * entire damn world #includes */
2308 struct config_group *o2hb_alloc_hb_set(void)
2309 {
2310         struct o2hb_heartbeat_group *hs = NULL;
2311         struct config_group *ret = NULL;
2312
2313         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2314         if (hs == NULL)
2315                 goto out;
2316
2317         config_group_init_type_name(&hs->hs_group, "heartbeat",
2318                                     &o2hb_heartbeat_group_type);
2319
2320         ret = &hs->hs_group;
2321 out:
2322         if (ret == NULL)
2323                 kfree(hs);
2324         return ret;
2325 }
2326
2327 void o2hb_free_hb_set(struct config_group *group)
2328 {
2329         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2330         kfree(hs);
2331 }
2332
2333 /* hb callback registration and issuing */
2334
2335 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2336 {
2337         if (type == O2HB_NUM_CB)
2338                 return ERR_PTR(-EINVAL);
2339
2340         return &o2hb_callbacks[type];
2341 }
2342
2343 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2344                          enum o2hb_callback_type type,
2345                          o2hb_cb_func *func,
2346                          void *data,
2347                          int priority)
2348 {
2349         INIT_LIST_HEAD(&hc->hc_item);
2350         hc->hc_func = func;
2351         hc->hc_data = data;
2352         hc->hc_priority = priority;
2353         hc->hc_type = type;
2354         hc->hc_magic = O2HB_CB_MAGIC;
2355 }
2356 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2357
2358 /*
2359  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2360  * In global heartbeat mode, region_uuid passed is NULL.
2361  *
2362  * In local, we only pin the matching region. In global we pin all the active
2363  * regions.
2364  */
2365 static int o2hb_region_pin(const char *region_uuid)
2366 {
2367         int ret = 0, found = 0;
2368         struct o2hb_region *reg;
2369         char *uuid;
2370
2371         assert_spin_locked(&o2hb_live_lock);
2372
2373         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2374                 if (reg->hr_item_dropped)
2375                         continue;
2376
2377                 uuid = config_item_name(&reg->hr_item);
2378
2379                 /* local heartbeat */
2380                 if (region_uuid) {
2381                         if (strcmp(region_uuid, uuid))
2382                                 continue;
2383                         found = 1;
2384                 }
2385
2386                 if (reg->hr_item_pinned || reg->hr_item_dropped)
2387                         goto skip_pin;
2388
2389                 /* Ignore ENOENT only for local hb (userdlm domain) */
2390                 ret = o2nm_depend_item(&reg->hr_item);
2391                 if (!ret) {
2392                         mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2393                         reg->hr_item_pinned = 1;
2394                 } else {
2395                         if (ret == -ENOENT && found)
2396                                 ret = 0;
2397                         else {
2398                                 mlog(ML_ERROR, "Pin region %s fails with %d\n",
2399                                      uuid, ret);
2400                                 break;
2401                         }
2402                 }
2403 skip_pin:
2404                 if (found)
2405                         break;
2406         }
2407
2408         return ret;
2409 }
2410
2411 /*
2412  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2413  * In global heartbeat mode, region_uuid passed is NULL.
2414  *
2415  * In local, we only unpin the matching region. In global we unpin all the
2416  * active regions.
2417  */
2418 static void o2hb_region_unpin(const char *region_uuid)
2419 {
2420         struct o2hb_region *reg;
2421         char *uuid;
2422         int found = 0;
2423
2424         assert_spin_locked(&o2hb_live_lock);
2425
2426         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2427                 if (reg->hr_item_dropped)
2428                         continue;
2429
2430                 uuid = config_item_name(&reg->hr_item);
2431                 if (region_uuid) {
2432                         if (strcmp(region_uuid, uuid))
2433                                 continue;
2434                         found = 1;
2435                 }
2436
2437                 if (reg->hr_item_pinned) {
2438                         mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2439                         o2nm_undepend_item(&reg->hr_item);
2440                         reg->hr_item_pinned = 0;
2441                 }
2442                 if (found)
2443                         break;
2444         }
2445 }
2446
2447 static int o2hb_region_inc_user(const char *region_uuid)
2448 {
2449         int ret = 0;
2450
2451         spin_lock(&o2hb_live_lock);
2452
2453         /* local heartbeat */
2454         if (!o2hb_global_heartbeat_active()) {
2455             ret = o2hb_region_pin(region_uuid);
2456             goto unlock;
2457         }
2458
2459         /*
2460          * if global heartbeat active and this is the first dependent user,
2461          * pin all regions if quorum region count <= CUT_OFF
2462          */
2463         o2hb_dependent_users++;
2464         if (o2hb_dependent_users > 1)
2465                 goto unlock;
2466
2467         if (bitmap_weight(o2hb_quorum_region_bitmap,
2468                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2469                 ret = o2hb_region_pin(NULL);
2470
2471 unlock:
2472         spin_unlock(&o2hb_live_lock);
2473         return ret;
2474 }
2475
2476 void o2hb_region_dec_user(const char *region_uuid)
2477 {
2478         spin_lock(&o2hb_live_lock);
2479
2480         /* local heartbeat */
2481         if (!o2hb_global_heartbeat_active()) {
2482             o2hb_region_unpin(region_uuid);
2483             goto unlock;
2484         }
2485
2486         /*
2487          * if global heartbeat active and there are no dependent users,
2488          * unpin all quorum regions
2489          */
2490         o2hb_dependent_users--;
2491         if (!o2hb_dependent_users)
2492                 o2hb_region_unpin(NULL);
2493
2494 unlock:
2495         spin_unlock(&o2hb_live_lock);
2496 }
2497
2498 int o2hb_register_callback(const char *region_uuid,
2499                            struct o2hb_callback_func *hc)
2500 {
2501         struct o2hb_callback_func *f;
2502         struct o2hb_callback *hbcall;
2503         int ret;
2504
2505         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2506         BUG_ON(!list_empty(&hc->hc_item));
2507
2508         hbcall = hbcall_from_type(hc->hc_type);
2509         if (IS_ERR(hbcall)) {
2510                 ret = PTR_ERR(hbcall);
2511                 goto out;
2512         }
2513
2514         if (region_uuid) {
2515                 ret = o2hb_region_inc_user(region_uuid);
2516                 if (ret) {
2517                         mlog_errno(ret);
2518                         goto out;
2519                 }
2520         }
2521
2522         down_write(&o2hb_callback_sem);
2523
2524         list_for_each_entry(f, &hbcall->list, hc_item) {
2525                 if (hc->hc_priority < f->hc_priority) {
2526                         list_add_tail(&hc->hc_item, &f->hc_item);
2527                         break;
2528                 }
2529         }
2530         if (list_empty(&hc->hc_item))
2531                 list_add_tail(&hc->hc_item, &hbcall->list);
2532
2533         up_write(&o2hb_callback_sem);
2534         ret = 0;
2535 out:
2536         mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2537              ret, __builtin_return_address(0), hc);
2538         return ret;
2539 }
2540 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2541
2542 void o2hb_unregister_callback(const char *region_uuid,
2543                               struct o2hb_callback_func *hc)
2544 {
2545         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2546
2547         mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2548              __builtin_return_address(0), hc);
2549
2550         /* XXX Can this happen _with_ a region reference? */
2551         if (list_empty(&hc->hc_item))
2552                 return;
2553
2554         if (region_uuid)
2555                 o2hb_region_dec_user(region_uuid);
2556
2557         down_write(&o2hb_callback_sem);
2558
2559         list_del_init(&hc->hc_item);
2560
2561         up_write(&o2hb_callback_sem);
2562 }
2563 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2564
2565 int o2hb_check_node_heartbeating(u8 node_num)
2566 {
2567         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2568
2569         o2hb_fill_node_map(testing_map, sizeof(testing_map));
2570         if (!test_bit(node_num, testing_map)) {
2571                 mlog(ML_HEARTBEAT,
2572                      "node (%u) does not have heartbeating enabled.\n",
2573                      node_num);
2574                 return 0;
2575         }
2576
2577         return 1;
2578 }
2579 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
2580
2581 int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2582 {
2583         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2584
2585         spin_lock(&o2hb_live_lock);
2586         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2587         spin_unlock(&o2hb_live_lock);
2588         if (!test_bit(node_num, testing_map)) {
2589                 mlog(ML_HEARTBEAT,
2590                      "node (%u) does not have heartbeating enabled.\n",
2591                      node_num);
2592                 return 0;
2593         }
2594
2595         return 1;
2596 }
2597 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2598
2599 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2600 {
2601         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2602
2603         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2604         if (!test_bit(node_num, testing_map)) {
2605                 mlog(ML_HEARTBEAT,
2606                      "node (%u) does not have heartbeating enabled.\n",
2607                      node_num);
2608                 return 0;
2609         }
2610
2611         return 1;
2612 }
2613 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2614
2615 /* Makes sure our local node is configured with a node number, and is
2616  * heartbeating. */
2617 int o2hb_check_local_node_heartbeating(void)
2618 {
2619         u8 node_num;
2620
2621         /* if this node was set then we have networking */
2622         node_num = o2nm_this_node();
2623         if (node_num == O2NM_MAX_NODES) {
2624                 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
2625                 return 0;
2626         }
2627
2628         return o2hb_check_node_heartbeating(node_num);
2629 }
2630 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
2631
2632 /*
2633  * this is just a hack until we get the plumbing which flips file systems
2634  * read only and drops the hb ref instead of killing the node dead.
2635  */
2636 void o2hb_stop_all_regions(void)
2637 {
2638         struct o2hb_region *reg;
2639
2640         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2641
2642         spin_lock(&o2hb_live_lock);
2643
2644         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2645                 reg->hr_unclean_stop = 1;
2646
2647         spin_unlock(&o2hb_live_lock);
2648 }
2649 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2650
2651 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2652 {
2653         struct o2hb_region *reg;
2654         int numregs = 0;
2655         char *p;
2656
2657         spin_lock(&o2hb_live_lock);
2658
2659         p = region_uuids;
2660         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2661                 if (reg->hr_item_dropped)
2662                         continue;
2663
2664                 mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2665                 if (numregs < max_regions) {
2666                         memcpy(p, config_item_name(&reg->hr_item),
2667                                O2HB_MAX_REGION_NAME_LEN);
2668                         p += O2HB_MAX_REGION_NAME_LEN;
2669                 }
2670                 numregs++;
2671         }
2672
2673         spin_unlock(&o2hb_live_lock);
2674
2675         return numregs;
2676 }
2677 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2678
2679 int o2hb_global_heartbeat_active(void)
2680 {
2681         return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2682 }
2683 EXPORT_SYMBOL(o2hb_global_heartbeat_active);