Skip to content

Commit

Permalink
fixup! WiP: redo log lock state tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Aug 31, 2023
1 parent cb1e964 commit 32aa239
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 58 deletions.
2 changes: 1 addition & 1 deletion ocaml/database/db_connections.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ let dec_and_read_db_flush_thread_refcount () =

let pre_exit_hook () =
(* We're about to exit. Close the active redo logs. *)
Redo_log.with_active_redo_logs Redo_log.shutdown ;
Redo_log.with_active_redo_logs Redo_log.shutdown' ;
R.debug "Closed all active redo logs."

(* The connection flushing calls each lock the connection they're flushing to.
Expand Down
127 changes: 79 additions & 48 deletions ocaml/database/redo_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,93 @@ open Xapi_stdext_pervasives.Pervasiveext
open Xapi_stdext_std.Xstringext
open Xapi_stdext_unix

module type LockWitness = sig
type 'a t (** a unique mutex type with a type witness for its locking state *)
module type LockWitness = sig
(** a unique mutex type with a type witness for its locking state *)
type 'a t

(** type witness for the unlocked mutex {!t} *)
type unlocked

type unlocked (** type witness for the unlocked mutex {!t} *)
type locked (** type witness for the locked mutex {!t} *)
(** type witness for the locked mutex {!t} *)
type locked

val mutex : unlocked t
(** [mutex] is the unique mutex of type ['a t], initially in an unlocked state *)

val with_lock: unlocked t -> (locked -> unit) -> unit
val with_lock : unlocked t -> (locked -> 'a) -> 'a
(** [with_lock mutex f] calls [f locked_witness] with a type witness for its locked state.
The [locked_witness] can be used in {!MakeRecord.assert_locked} to "unlock" the fields of a record,
without having to modify the record itself (except its type parameter)
*)

module MakeRecord(T: sig type 'a t end): sig
val assert_locked: locked -> (unit * locked) T.t -> (locked * locked) T.t
module MakeRecord (T : sig
type 'a t
end) : sig
val assert_locked : locked -> (unit * locked) T.t -> (locked * locked) T.t
(** [assert_locked locked_witness record] checks that [record] holds the [locked_witness].
[locked_witness] can only be obtained while inside {!with_lock}.
*)
end
end

module MakeMutex( (** generative functor, ensuring a new, unique type is created on every application *) ) : LockWitness = struct
module MakeMutex () : LockWitness =
(* generative functor, ensuring a new, unique type is created on every application *)
struct
type _ t = Mutex.t

type unlocked = unit

type locked = unit

let mutex = Mutex.create ()

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

module MakeRecord(T: sig type _ t end) = struct
module MakeRecord (T : sig
type _ t
end) =
struct
type _t = unit T.t

let assert_locked () t = t
end
end

module LockedRef : sig
(** Opening this module will shadow the Stdlib [ref] operators, to ensure they are not used unsafely. *)

type ('a, 'mutex) t (** a ref containing values of type ['a] protected by ['mutex] *)

val ref: (module LockWitness with type locked = 'locked) -> 'a -> ('a, unit * 'locked) t
(** a ref containing values of type ['a] protected by ['mutex] *)
type ('a, 'mutex) t

val (!): ('a, 'locked * 'locked) t -> 'a
val ref :
(module LockWitness with type locked = 'locked)
-> 'a
-> ('a, unit * 'locked) t

val ( ! ) : ('a, 'locked * 'locked) t -> 'a
(** [!a] dereferences [a], requiring for its lock to be held.
Note that multiple values can share the same mutex. *)

val (:=) : ('a, 'locked * 'locked) t -> 'a -> unit
val ( := ) : ('a, 'locked * 'locked) t -> 'a -> unit
(** [r := value] assigns [value] to [r] while requiring its lock to be held. *)

end = struct
type ('a, _) t = 'a ref

let ref _ default = Stdlib.ref default
let (!) = Stdlib.(!)
let (:=) = Stdlib.(:=)

let ( ! ) = Stdlib.( ! )

let ( := ) = Stdlib.( := )
end

(* can be implemented more efficiently inside LockedRef, but it wouldn't ensure we hold the mutex *)
let update (type a b) (module LW:LockWitness with type locked = b) t f =
let module R = LW.MakeRecord(struct type nonrec 'b t = (a, 'b) LockedRef.t end) in
LW.with_lock LW.mutex @@ fun locked ->
let t = R.assert_locked locked t in
LockedRef.(t := f locked !t)

let update (type a b) (module LW : LockWitness with type locked = b) t f =
let module R = LW.MakeRecord (struct
type nonrec 'b t = (a, 'b) LockedRef.t
end) in
LW.with_lock LW.mutex @@ fun locked ->
let t = R.assert_locked locked t in
LockedRef.(t := f locked !t)

open LockedRef

Expand Down Expand Up @@ -140,9 +158,10 @@ type 'a redo_log_conf = {
; num_dying_processes: int Atomic.t
}

module CreationMutex = MakeMutex()
module CreationMutex = MakeMutex ()

type locked = CreationMutex.locked * CreationMutex.locked

type unlocked = unit * CreationMutex.locked

type ('a, 'b) redo_log = 'b redo_log_conf
Expand Down Expand Up @@ -538,7 +557,8 @@ let action_write_delta marker generation_count data flush_db_fn sock
R.debug "Performing writedelta (generation %Ld)" generation_count ;
(* Compute desired response time *)
let latest_response_time =
get_latest_response_time Stdlib.(!Db_globs.redo_log_max_block_time_writedelta)
get_latest_response_time
Stdlib.(!Db_globs.redo_log_max_block_time_writedelta)
in
(* Write *)
let str =
Expand Down Expand Up @@ -619,7 +639,7 @@ let maybe_retry f log =

(* Close any existing socket and kill the corresponding process. *)

let shutdown log =
let shutdown' log =
if is_enabled log then (
D.debug "Shutting down connection to I/O process for '%s'" log.name ;
try
Expand Down Expand Up @@ -671,7 +691,7 @@ let shutdown log =

let broken log =
set_time_of_last_failure log ;
shutdown log ;
shutdown' log ;
cannot_connect_fn log

let healthy log =
Expand All @@ -680,9 +700,9 @@ let healthy log =

exception TooManyProcesses

module RedologRecord = CreationMutex.MakeRecord(
struct type 'a t = 'a redo_log_conf end
)
module RedologRecord = CreationMutex.MakeRecord (struct
type 'a t = 'a redo_log_conf
end)

let with_locked_log log f =
CreationMutex.with_lock CreationMutex.mutex @@ fun locked ->
Expand Down Expand Up @@ -736,7 +756,8 @@ let startup' log =
() (* We're already connected *)
| None ->
let latest_connect_time =
get_latest_response_time Stdlib.(!Db_globs.redo_log_max_startup_time)
get_latest_response_time
Stdlib.(!Db_globs.redo_log_max_startup_time)
in
(* Now connect to the process via the socket *)
let s = connect ctrlsockpath latest_connect_time in
Expand Down Expand Up @@ -788,7 +809,7 @@ let startup' log =
let startup log = with_locked_log log startup'

let switch log vdi_reason =
with_locked_log log shutdown;
with_locked_log log shutdown' ;
Atomic.set log.device (get_static_device vdi_reason) ;
startup log

Expand Down Expand Up @@ -850,41 +871,50 @@ let create ~name ~state_change_callback ~read_only =
; currently_accessible= ref (module CreationMutex) true
; state_change_callback
; time_of_last_failure= ref (module CreationMutex) 0.
; backoff_delay= ref (module CreationMutex) Db_globs.redo_log_initial_backoff_delay
; backoff_delay=
ref (module CreationMutex) Db_globs.redo_log_initial_backoff_delay
; sock= ref (module CreationMutex) None
; pid= ref (module CreationMutex) None
; num_dying_processes= Atomic.make 0
}
in
update (module CreationMutex) all_redo_logs (fun locked all_redo_logs ->
update
(module CreationMutex)
all_redo_logs
(fun locked all_redo_logs ->
let instance = RedologRecord.assert_locked locked instance in
RedoLogSet.add instance all_redo_logs
) ;
) ;
instance

let create_rw = create ~read_only:false

let create_ro = create ~read_only:true

let shutdown log = with_locked_log log shutdown'

let delete log =
shutdown log ;
update (module CreationMutex) all_redo_logs @@ fun locked all_redo_logs ->
let log = RedologRecord.assert_locked locked log in
shutdown' log ;
disable log ;
update (module CreationMutex) all_redo_logs (fun _locked all_redo_logs ->
RedoLogSet.remove log all_redo_logs
)
RedoLogSet.remove log all_redo_logs

(* -------------------------------------------------------- *)
(* Helper functions for interacting with multiple redo_logs *)
let with_active_redo_logs f =
update (module CreationMutex) all_redo_logs (fun _locked all_redo_logs ->
update
(module CreationMutex)
all_redo_logs
(fun _locked all_redo_logs ->
let active_redo_logs =
RedoLogSet.filter
(fun log -> is_enabled log && not log.read_only)
all_redo_logs
in
RedoLogSet.iter f active_redo_logs;
RedoLogSet.iter f active_redo_logs ;
all_redo_logs
)
)

(* --------------------------------------------------------------- *)
(* Functions which interact with the redo log on the block device. *)
Expand Down Expand Up @@ -927,6 +957,7 @@ let write_delta generation_count t flush_db_fn log =

let apply fn_db fn_delta log =
if is_enabled log then (
with_locked_log log @@ fun log ->
(* Turn off writing to the database while we are applying deltas. *)
Atomic.set ready_to_write false ;
finally
Expand All @@ -943,14 +974,16 @@ let apply fn_db fn_delta log =
(** Functions which operate on all active redo_logs. *)

(* Flush the database to the given redo_log instance. *)
let flush_db_to_redo_log db log =
let flush_db_to_redo_log' db log =
D.info "Flushing database to redo_log [%s]" log.name ;
let write_db_to_fd out_fd = Db_xml.To.fd out_fd db in
write_db
(Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db))
write_db_to_fd log ;
write_db_to_fd log;
!(log.currently_accessible)

let flush_db_to_redo_log db log = with_locked_log log @@ fun log -> flush_db_to_redo_log' db log

let flush_db_exn db log =
assert (not log.read_only) ;
(* phantom type parameter ensures this only gets called with RW *)
Expand All @@ -959,18 +992,16 @@ let flush_db_exn db log =
raise (RedoLogFailure "Cannot connect to redo log")

let enable_and_flush db log reason =
with_locked_log log @@ fun log ->
enable_existing log reason ; flush_db_exn db log

let enable_block_and_flush db log path =
with_locked_log log @@ fun log ->
enable_block_existing log path ;
flush_db_exn db log

(* Write the given database to all active redo_logs *)
let flush_db_to_all_active_redo_logs db =
D.info "Flushing database to all active redo-logs" ;
with_active_redo_logs (fun log -> ignore (flush_db_to_redo_log db log))
with_active_redo_logs (fun log -> ignore (flush_db_to_redo_log' db log))

(* Write a delta to all active redo_logs *)
let database_callback event db =
Expand Down Expand Up @@ -1024,7 +1055,7 @@ let database_callback event db =
entry
(fun () ->
(* the function which will be invoked if a database write is required instead of a delta *)
ignore (flush_db_to_redo_log db log)
ignore (flush_db_to_redo_log' db log)
)
log
)
Expand Down
11 changes: 7 additions & 4 deletions ocaml/database/redo_log.mli
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ val redo_log_events : (string * bool) Event.channel
val startup : (_, unlocked) redo_log -> unit
(** Start the I/O process. Will do nothing if it's already started. *)

val shutdown : (_, locked) redo_log -> unit
val shutdown' : (_, locked) redo_log -> unit
(** Stop the I/O process. Will do nothing if it's not already started. *)

val shutdown : (_, unlocked) redo_log -> unit
(** Stop the I/O process. Will do nothing if it's not already started. *)

val switch : (_, unlocked) redo_log -> string -> unit
Expand All @@ -82,7 +85,7 @@ val create_rw :
name:string -> state_change_callback:(bool -> unit) option -> ([> `RW], unlocked) redo_log
(** Create a RW redo log instance and add it to the set. *)

val delete : (_, locked) redo_log -> unit
val delete : (_, unlocked) redo_log -> unit
(** Shutdown a redo_log instance and remove it from the set. *)

(** {Finding active redo_log instances} *)
Expand All @@ -107,7 +110,7 @@ type t =
val apply :
(Generation.t -> Unix.file_descr -> int -> float -> unit)
-> (Generation.t -> t -> unit)
-> ([< `RO | `RW], locked) redo_log
-> ([< `RO | `RW], unlocked) redo_log
-> unit
(** Read from the block device.
This function is best-effort only and does not raise any exceptions in the case of error.
Expand All @@ -116,7 +119,7 @@ val apply :
For each database, [db_fn] is invoked with the database's generation count, a file descriptor from which to read the database's contents, the length of the database in bytes and the latest response time. The [db_fn] function may raise {!Unixext.Timeout} if the transfer is not complete by the latest response time.
For each database delta, [delta_fn] is invoked with the delta's generation count and the value of the delta. *)

val flush_db_to_redo_log : Db_cache_types.Database.t -> ([< `RW], locked) redo_log -> bool
val flush_db_to_redo_log : Db_cache_types.Database.t -> ([< `RW], unlocked) redo_log -> bool
(** Immediately write the given database to the given redo_log instance *)

val flush_db_to_all_active_redo_logs : Db_cache_types.Database.t -> unit
Expand Down
4 changes: 2 additions & 2 deletions ocaml/xapi/redo_log_usage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*)

val read_from_redo_log :
[< `RO | `RW] Redo_log.redo_log -> string -> Db_ref.t -> unit
([< `RO | `RW], Redo_log.unlocked) Redo_log.redo_log -> string -> Db_ref.t -> unit
(** Connect to the block device and write the latest version of the database
* on it to a file with a given name. *)

val stop_using_redo_log : _ Redo_log.redo_log -> unit
val stop_using_redo_log : (_, Redo_log.unlocked) Redo_log.redo_log -> unit
(** Disconnect from the block device. May be safely called even when not currently connected. *)
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_ha.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2095,7 +2095,7 @@ let before_clean_shutdown_or_reboot ~__context ~host =
Note that Xha_metadata_vdi is a VDI with reason = ha_metadata_vdi_reason and type=`redo_log:
type=`metadata is for DR *)
debug "About to close active redo logs" ;
Redo_log.with_active_redo_logs Redo_log.shutdown ;
Redo_log.with_active_redo_logs Redo_log.shutdown' ;
(* We cannot call ha_release_resources because we want to keep HA armed on reboot *)
debug "About to detach static VDIs" ;
List.iter Static_vdis.detach_only (Static_vdis.list ()) ;
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_ha.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
(** Functions for implementing 'High Availability' (HA).
@group High Availability (HA) *)

val ha_redo_log : [`RW] Redo_log.redo_log
val ha_redo_log : ([`RW], Redo_log.unlocked) Redo_log.redo_log
(** The redo log instance used for HA *)

(******************************************************************************)
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_vdi_helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ let assert_managed ~__context ~vdi =
let redo_log_lifecycle_mutex = Mutex.create ()

let metadata_replication :
(API.ref_VDI, API.ref_VBD * [`RW] Redo_log.redo_log) Hashtbl.t =
(API.ref_VDI, API.ref_VBD * ([`RW], Redo_log.unlocked) Redo_log.redo_log) Hashtbl.t =
Hashtbl.create Xapi_globs.redo_log_max_instances

let get_master_dom0 ~__context =
Expand Down

0 comments on commit 32aa239

Please sign in to comment.