Skip to content

Commit

Permalink
WiP: per-log locks, not workign yet
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Aug 31, 2023
1 parent 32aa239 commit 915d154
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 63 deletions.
89 changes: 46 additions & 43 deletions ocaml/database/redo_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ module type LockWitness = sig
*)

module MakeRecord (T : sig
type 'a t
type ('a, 'b) t
end) : sig
val assert_locked : locked -> (unit * locked) T.t -> (locked * locked) T.t
val assert_locked : locked -> (unit, locked) T.t -> (locked, locked) T.t
(** [assert_locked locked_witness record] checks that [record] holds the [locked_witness].
[locked_witness] can only be obtained while inside {!with_lock}.
*)
Expand All @@ -58,10 +58,10 @@ struct
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

module MakeRecord (T : sig
type _ t
type (_, _) t
end) =
struct
type _t = unit T.t
type _t = (unit, unit) T.t

let assert_locked () t = t
end
Expand Down Expand Up @@ -97,11 +97,11 @@ end
(* can be implemented more efficiently inside LockedRef, but it wouldn't ensure we hold the mutex *)
let update (type a b) (module LW : LockWitness with type locked = b) t f =
let module R = LW.MakeRecord (struct
type nonrec 'b t = (a, 'b) LockedRef.t
type nonrec ('b, 'c) t = (a, 'b * 'c) LockedRef.t
end) in
LW.with_lock LW.mutex @@ fun locked ->
let t = R.assert_locked locked t in
LockedRef.(t := f locked !t)
LockedRef.(t := f !t)

open LockedRef

Expand Down Expand Up @@ -144,6 +144,8 @@ let redo_log_sm_config = [("type", "raw")]
(* ---------------------------------------------------- *)
(* Encapsulate the state of a single redo_log instance. *)

type 'a lock_witness = (module LockWitness with type locked = 'a)

type 'a redo_log_conf = {
name: string
; marker: string
Expand All @@ -156,20 +158,18 @@ type 'a redo_log_conf = {
; sock: (Unix.file_descr option, 'a) LockedRef.t
; pid: ((Forkhelpers.pidty * string * string) option, 'a) LockedRef.t
; num_dying_processes: int Atomic.t
; mutex: 'locked lock_witness
}
constraint 'a = _ * 'locked

module CreationMutex = MakeMutex ()

type locked = CreationMutex.locked * CreationMutex.locked

type unlocked = unit * CreationMutex.locked

type ('a, 'b) redo_log = 'b redo_log_conf
type 'a redo_log = R : (unit * 'b) redo_log_conf -> 'a redo_log

module RedoLogSet = Set.Make (struct
type t = (unit, locked) redo_log
type t = unit redo_log

let compare log1 log2 = compare log1.marker log2.marker
let compare (R log1) (R log2) = compare log1.marker log2.marker
end)

let _redo_log_creation_mutex = CreationMutex.mutex
Expand Down Expand Up @@ -700,15 +700,19 @@ let healthy log =

exception TooManyProcesses

module RedologRecord = CreationMutex.MakeRecord (struct
type 'a t = 'a redo_log_conf
end)

let with_locked_log log f =
CreationMutex.with_lock CreationMutex.mutex @@ fun locked ->
let with_locked_log (type locked) log f =
let module LW = (val (log.mutex : locked lock_witness)) in
let module RedologRecord = LW.MakeRecord (struct
type ('a, 'b) t = ('a * 'b) redo_log_conf
end) in
LW.with_lock LW.mutex @@ fun locked ->
let log = RedologRecord.assert_locked locked log in
f log

let with_locked_log log f =
let R log = log in
with_locked_log log f

let startup' log =
if is_enabled log then (
try
Expand Down Expand Up @@ -862,29 +866,29 @@ let connect_and_perform_action f desc log =
(* Functions for handling creation and deletion of redo log instances. *)

let create ~name ~state_change_callback ~read_only =
let module M = MakeMutex () in
let mutex = (module M : LockWitness with type locked = M.locked) in
let instance =
{
name
; marker= Uuidx.to_string (Uuidx.make ())
; read_only
; device= Atomic.make None
; currently_accessible= ref (module CreationMutex) true
; state_change_callback
; time_of_last_failure= ref (module CreationMutex) 0.
; backoff_delay=
ref (module CreationMutex) Db_globs.redo_log_initial_backoff_delay
; sock= ref (module CreationMutex) None
; pid= ref (module CreationMutex) None
; num_dying_processes= Atomic.make 0
}
R
{
name
; marker= Uuidx.to_string (Uuidx.make ())
; read_only
; device= Atomic.make None
; currently_accessible= ref mutex true
; state_change_callback
; time_of_last_failure= ref mutex 0.
; backoff_delay= ref mutex Db_globs.redo_log_initial_backoff_delay
; sock= ref mutex None
; pid= ref mutex None
; num_dying_processes= Atomic.make 0
; mutex
}
in
update
(module CreationMutex)
all_redo_logs
(fun locked all_redo_logs ->
let instance = RedologRecord.assert_locked locked instance in
RedoLogSet.add instance all_redo_logs
) ;
(fun all_redo_logs -> RedoLogSet.add instance all_redo_logs) ;
instance

let create_rw = create ~read_only:false
Expand All @@ -894,10 +898,8 @@ let create_ro = create ~read_only:true
let shutdown log = with_locked_log log shutdown'

let delete log =
update (module CreationMutex) all_redo_logs @@ fun locked all_redo_logs ->
let log = RedologRecord.assert_locked locked log in
shutdown' log ;
disable log ;
let () = with_locked_log log @@ fun log -> shutdown' log ; disable log in
update (module CreationMutex) all_redo_logs @@ fun all_redo_logs ->
RedoLogSet.remove log all_redo_logs

(* -------------------------------------------------------- *)
Expand Down Expand Up @@ -979,10 +981,11 @@ let flush_db_to_redo_log' db log =
let write_db_to_fd out_fd = Db_xml.To.fd out_fd db in
write_db
(Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db))
write_db_to_fd log;
write_db_to_fd log ;
!(log.currently_accessible)

let flush_db_to_redo_log db log = with_locked_log log @@ fun log -> flush_db_to_redo_log' db log
let flush_db_to_redo_log db log =
with_locked_log log @@ fun log -> flush_db_to_redo_log' db log

let flush_db_exn db log =
assert (not log.read_only) ;
Expand Down
39 changes: 19 additions & 20 deletions ocaml/database/redo_log.mli
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,28 @@ val recommended_vdi_size : int64
val redo_log_sm_config : (string * string) list
(** SM config for redo log VDI *)

type unlocked (** type witness for unlocked mutex *)
type locked (** type witness for locked mutex *)

(** {redo_log data type} *)
type ('a, 'b) redo_log
type 'a redo_log

(** {2 Enabling and disabling writing} *)

val is_enabled : _ redo_log -> bool
(** Returns [true] iff writing deltas to the block device is enabled. *)

val enable_existing : ([< `RO | `RW], unlocked) redo_log -> string -> unit
val enable_existing : [< `RO | `RW] redo_log -> string -> unit
(** Enables writing deltas to the block device. Subsequent modifications to the database will be persisted to the block device. Takes a static-VDI reason as argument to select the device to use.
The redo log is expected to contain some data to be played back.
*)

val enable_and_flush :
Db_cache_types.Database.t -> ([< `RW], unlocked) redo_log -> string -> unit
Db_cache_types.Database.t -> [< `RW] redo_log -> string -> unit
(** Like {!enable_existing} but the redo log is freshly created and will trigger an immediate database flush *)

val enable_block_existing : ([< `RO], unlocked) redo_log -> string -> unit
val enable_block_existing : [< `RO] redo_log -> string -> unit
(** Enables writing deltas to the block device. Subsequent modifications to the database will be persisted to the block device. Takes a path as argument to select the device to use. *)

val enable_block_and_flush :
Db_cache_types.Database.t -> ([< `RW], unlocked) redo_log -> string -> unit
Db_cache_types.Database.t -> [< `RW] redo_log -> string -> unit
(** Like {!enable_block_existing} but the redo log is freshly created and will trigger an immediate database flush *)

val disable : _ redo_log -> unit
Expand All @@ -63,34 +60,35 @@ val redo_log_events : (string * bool) Event.channel

(** {2 Lifecycle of I/O process} *)

val startup : (_, unlocked) redo_log -> unit
val startup : _ redo_log -> unit
(** Start the I/O process. Will do nothing if it's already started. *)

val shutdown' : (_, locked) redo_log -> unit
(** Stop the I/O process. Will do nothing if it's not already started. *)

val shutdown : (_, unlocked) redo_log -> unit
val shutdown : _ redo_log -> unit
(** Stop the I/O process. Will do nothing if it's not already started. *)

val switch : (_, unlocked) redo_log -> string -> unit
val switch : _ redo_log -> string -> unit
(** Start using the VDI with the given reason as redo-log, discarding the current one. *)

(** {Keeping track of existing redo_log instances} *)

val create_ro :
name:string -> state_change_callback:(bool -> unit) option -> ([> `RO], unlocked) redo_log
name:string
-> state_change_callback:(bool -> unit) option
-> [> `RO] redo_log
(** Create a RO redo log instance and add it to the set. *)

val create_rw :
name:string -> state_change_callback:(bool -> unit) option -> ([> `RW], unlocked) redo_log
name:string
-> state_change_callback:(bool -> unit) option
-> [> `RW] redo_log
(** Create a RW redo log instance and add it to the set. *)

val delete : (_, unlocked) redo_log -> unit
val delete : _ redo_log -> unit
(** Shutdown a redo_log instance and remove it from the set. *)

(** {Finding active redo_log instances} *)

val with_active_redo_logs : ((_, locked) redo_log -> unit) -> unit
val with_active_redo_logs : (_ redo_log -> unit) -> unit
(* Apply the supplied function to all active redo_logs. *)

(** {2 Interacting with the block device} *)
Expand All @@ -110,7 +108,7 @@ type t =
val apply :
(Generation.t -> Unix.file_descr -> int -> float -> unit)
-> (Generation.t -> t -> unit)
-> ([< `RO | `RW], unlocked) redo_log
-> ([< `RO | `RW]) redo_log
-> unit
(** Read from the block device.
This function is best-effort only and does not raise any exceptions in the case of error.
Expand All @@ -119,7 +117,8 @@ val apply :
For each database, [db_fn] is invoked with the database's generation count, a file descriptor from which to read the database's contents, the length of the database in bytes and the latest response time. The [db_fn] function may raise {!Unixext.Timeout} if the transfer is not complete by the latest response time.
For each database delta, [delta_fn] is invoked with the delta's generation count and the value of the delta. *)

val flush_db_to_redo_log : Db_cache_types.Database.t -> ([< `RW], unlocked) redo_log -> bool
val flush_db_to_redo_log :
Db_cache_types.Database.t -> ([< `RW]) redo_log -> bool
(** Immediately write the given database to the given redo_log instance *)

val flush_db_to_all_active_redo_logs : Db_cache_types.Database.t -> unit
Expand Down

0 comments on commit 915d154

Please sign in to comment.