diff --git a/ocaml/database/db_connections.ml b/ocaml/database/db_connections.ml index 9b390967fce..791386a8f92 100644 --- a/ocaml/database/db_connections.ml +++ b/ocaml/database/db_connections.ml @@ -81,7 +81,7 @@ let dec_and_read_db_flush_thread_refcount () = let pre_exit_hook () = (* We're about to exit. Close the active redo logs. *) - Redo_log.with_active_redo_logs Redo_log.shutdown ; + Redo_log.with_active_redo_logs Redo_log.shutdown' ; R.debug "Closed all active redo logs." (* The connection flushing calls each lock the connection they're flushing to. diff --git a/ocaml/database/redo_log.ml b/ocaml/database/redo_log.ml index ecb289c1b02..0cf3ecf5715 100644 --- a/ocaml/database/redo_log.ml +++ b/ocaml/database/redo_log.ml @@ -15,75 +15,93 @@ open Xapi_stdext_pervasives.Pervasiveext open Xapi_stdext_std.Xstringext open Xapi_stdext_unix -module type LockWitness = sig - type 'a t (** a unique mutex type with a type witness for its locking state *) +module type LockWitness = sig + (** a unique mutex type with a type witness for its locking state *) + type 'a t + (** type witness for the unlocked mutex {!t} *) + type unlocked - type unlocked (** type witness for the unlocked mutex {!t} *) - type locked (** type witness for the locked mutex {!t} *) + (** type witness for the locked mutex {!t} *) + type locked val mutex : unlocked t (** [mutex] is the unique mutex of type ['a t], initially in an unlocked state *) - val with_lock: unlocked t -> (locked -> unit) -> unit + val with_lock : unlocked t -> (locked -> 'a) -> 'a (** [with_lock mutex f] calls [f locked_witness] with a type witness for its locked state. The [locked_witness] can be used in {!MakeRecord.assert_locked} to "unlock" the fields of a record, without having to modify the record itself (except its type parameter) *) - module MakeRecord(T: sig type 'a t end): sig - val assert_locked: locked -> (unit * locked) T.t -> (locked * locked) T.t + module MakeRecord (T : sig + type 'a t + end) : sig + val assert_locked : locked -> (unit * locked) T.t -> (locked * locked) T.t (** [assert_locked locked_witness record] checks that [record] holds the [locked_witness]. [locked_witness] can only be obtained while inside {!with_lock}. *) end end -module MakeMutex( (** generative functor, ensuring a new, unique type is created on every application *) ) : LockWitness = struct +module MakeMutex () : LockWitness = +(* generative functor, ensuring a new, unique type is created on every application *) +struct type _ t = Mutex.t type unlocked = unit + type locked = unit let mutex = Mutex.create () let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute - module MakeRecord(T: sig type _ t end) = struct + module MakeRecord (T : sig + type _ t + end) = + struct type _t = unit T.t + let assert_locked () t = t end end module LockedRef : sig (** Opening this module will shadow the Stdlib [ref] operators, to ensure they are not used unsafely. *) - - type ('a, 'mutex) t (** a ref containing values of type ['a] protected by ['mutex] *) - val ref: (module LockWitness with type locked = 'locked) -> 'a -> ('a, unit * 'locked) t + (** a ref containing values of type ['a] protected by ['mutex] *) + type ('a, 'mutex) t - val (!): ('a, 'locked * 'locked) t -> 'a + val ref : + (module LockWitness with type locked = 'locked) + -> 'a + -> ('a, unit * 'locked) t + + val ( ! ) : ('a, 'locked * 'locked) t -> 'a (** [!a] dereferences [a], requiring for its lock to be held. Note that multiple values can share the same mutex. *) - val (:=) : ('a, 'locked * 'locked) t -> 'a -> unit + val ( := ) : ('a, 'locked * 'locked) t -> 'a -> unit (** [r := value] assigns [value] to [r] while requiring its lock to be held. *) - end = struct type ('a, _) t = 'a ref let ref _ default = Stdlib.ref default - let (!) = Stdlib.(!) - let (:=) = Stdlib.(:=) + + let ( ! ) = Stdlib.( ! ) + + let ( := ) = Stdlib.( := ) end (* can be implemented more efficiently inside LockedRef, but it wouldn't ensure we hold the mutex *) -let update (type a b) (module LW:LockWitness with type locked = b) t f = - let module R = LW.MakeRecord(struct type nonrec 'b t = (a, 'b) LockedRef.t end) in - LW.with_lock LW.mutex @@ fun locked -> - let t = R.assert_locked locked t in - LockedRef.(t := f locked !t) - +let update (type a b) (module LW : LockWitness with type locked = b) t f = + let module R = LW.MakeRecord (struct + type nonrec 'b t = (a, 'b) LockedRef.t + end) in + LW.with_lock LW.mutex @@ fun locked -> + let t = R.assert_locked locked t in + LockedRef.(t := f locked !t) open LockedRef @@ -140,9 +158,10 @@ type 'a redo_log_conf = { ; num_dying_processes: int Atomic.t } -module CreationMutex = MakeMutex() +module CreationMutex = MakeMutex () type locked = CreationMutex.locked * CreationMutex.locked + type unlocked = unit * CreationMutex.locked type ('a, 'b) redo_log = 'b redo_log_conf @@ -538,7 +557,8 @@ let action_write_delta marker generation_count data flush_db_fn sock R.debug "Performing writedelta (generation %Ld)" generation_count ; (* Compute desired response time *) let latest_response_time = - get_latest_response_time Stdlib.(!Db_globs.redo_log_max_block_time_writedelta) + get_latest_response_time + Stdlib.(!Db_globs.redo_log_max_block_time_writedelta) in (* Write *) let str = @@ -619,7 +639,7 @@ let maybe_retry f log = (* Close any existing socket and kill the corresponding process. *) -let shutdown log = +let shutdown' log = if is_enabled log then ( D.debug "Shutting down connection to I/O process for '%s'" log.name ; try @@ -671,7 +691,7 @@ let shutdown log = let broken log = set_time_of_last_failure log ; - shutdown log ; + shutdown' log ; cannot_connect_fn log let healthy log = @@ -680,9 +700,9 @@ let healthy log = exception TooManyProcesses -module RedologRecord = CreationMutex.MakeRecord( - struct type 'a t = 'a redo_log_conf end -) +module RedologRecord = CreationMutex.MakeRecord (struct + type 'a t = 'a redo_log_conf +end) let with_locked_log log f = CreationMutex.with_lock CreationMutex.mutex @@ fun locked -> @@ -736,7 +756,8 @@ let startup' log = () (* We're already connected *) | None -> let latest_connect_time = - get_latest_response_time Stdlib.(!Db_globs.redo_log_max_startup_time) + get_latest_response_time + Stdlib.(!Db_globs.redo_log_max_startup_time) in (* Now connect to the process via the socket *) let s = connect ctrlsockpath latest_connect_time in @@ -788,7 +809,7 @@ let startup' log = let startup log = with_locked_log log startup' let switch log vdi_reason = - with_locked_log log shutdown; + with_locked_log log shutdown' ; Atomic.set log.device (get_static_device vdi_reason) ; startup log @@ -850,41 +871,50 @@ let create ~name ~state_change_callback ~read_only = ; currently_accessible= ref (module CreationMutex) true ; state_change_callback ; time_of_last_failure= ref (module CreationMutex) 0. - ; backoff_delay= ref (module CreationMutex) Db_globs.redo_log_initial_backoff_delay + ; backoff_delay= + ref (module CreationMutex) Db_globs.redo_log_initial_backoff_delay ; sock= ref (module CreationMutex) None ; pid= ref (module CreationMutex) None ; num_dying_processes= Atomic.make 0 } in - update (module CreationMutex) all_redo_logs (fun locked all_redo_logs -> + update + (module CreationMutex) + all_redo_logs + (fun locked all_redo_logs -> let instance = RedologRecord.assert_locked locked instance in RedoLogSet.add instance all_redo_logs - ) ; + ) ; instance let create_rw = create ~read_only:false let create_ro = create ~read_only:true +let shutdown log = with_locked_log log shutdown' + let delete log = - shutdown log ; + update (module CreationMutex) all_redo_logs @@ fun locked all_redo_logs -> + let log = RedologRecord.assert_locked locked log in + shutdown' log ; disable log ; - update (module CreationMutex) all_redo_logs (fun _locked all_redo_logs -> - RedoLogSet.remove log all_redo_logs - ) + RedoLogSet.remove log all_redo_logs (* -------------------------------------------------------- *) (* Helper functions for interacting with multiple redo_logs *) let with_active_redo_logs f = - update (module CreationMutex) all_redo_logs (fun _locked all_redo_logs -> + update + (module CreationMutex) + all_redo_logs + (fun _locked all_redo_logs -> let active_redo_logs = RedoLogSet.filter (fun log -> is_enabled log && not log.read_only) all_redo_logs in - RedoLogSet.iter f active_redo_logs; + RedoLogSet.iter f active_redo_logs ; all_redo_logs - ) + ) (* --------------------------------------------------------------- *) (* Functions which interact with the redo log on the block device. *) @@ -927,6 +957,7 @@ let write_delta generation_count t flush_db_fn log = let apply fn_db fn_delta log = if is_enabled log then ( + with_locked_log log @@ fun log -> (* Turn off writing to the database while we are applying deltas. *) Atomic.set ready_to_write false ; finally @@ -943,14 +974,16 @@ let apply fn_db fn_delta log = (** Functions which operate on all active redo_logs. *) (* Flush the database to the given redo_log instance. *) -let flush_db_to_redo_log db log = +let flush_db_to_redo_log' db log = D.info "Flushing database to redo_log [%s]" log.name ; let write_db_to_fd out_fd = Db_xml.To.fd out_fd db in write_db (Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db)) - write_db_to_fd log ; + write_db_to_fd log; !(log.currently_accessible) +let flush_db_to_redo_log db log = with_locked_log log @@ fun log -> flush_db_to_redo_log' db log + let flush_db_exn db log = assert (not log.read_only) ; (* phantom type parameter ensures this only gets called with RW *) @@ -959,18 +992,16 @@ let flush_db_exn db log = raise (RedoLogFailure "Cannot connect to redo log") let enable_and_flush db log reason = - with_locked_log log @@ fun log -> enable_existing log reason ; flush_db_exn db log let enable_block_and_flush db log path = - with_locked_log log @@ fun log -> enable_block_existing log path ; flush_db_exn db log (* Write the given database to all active redo_logs *) let flush_db_to_all_active_redo_logs db = D.info "Flushing database to all active redo-logs" ; - with_active_redo_logs (fun log -> ignore (flush_db_to_redo_log db log)) + with_active_redo_logs (fun log -> ignore (flush_db_to_redo_log' db log)) (* Write a delta to all active redo_logs *) let database_callback event db = @@ -1024,7 +1055,7 @@ let database_callback event db = entry (fun () -> (* the function which will be invoked if a database write is required instead of a delta *) - ignore (flush_db_to_redo_log db log) + ignore (flush_db_to_redo_log' db log) ) log ) diff --git a/ocaml/database/redo_log.mli b/ocaml/database/redo_log.mli index 4d9f8b8682e..e94db17fb6a 100644 --- a/ocaml/database/redo_log.mli +++ b/ocaml/database/redo_log.mli @@ -66,7 +66,10 @@ val redo_log_events : (string * bool) Event.channel val startup : (_, unlocked) redo_log -> unit (** Start the I/O process. Will do nothing if it's already started. *) -val shutdown : (_, locked) redo_log -> unit +val shutdown' : (_, locked) redo_log -> unit +(** Stop the I/O process. Will do nothing if it's not already started. *) + +val shutdown : (_, unlocked) redo_log -> unit (** Stop the I/O process. Will do nothing if it's not already started. *) val switch : (_, unlocked) redo_log -> string -> unit @@ -82,7 +85,7 @@ val create_rw : name:string -> state_change_callback:(bool -> unit) option -> ([> `RW], unlocked) redo_log (** Create a RW redo log instance and add it to the set. *) -val delete : (_, locked) redo_log -> unit +val delete : (_, unlocked) redo_log -> unit (** Shutdown a redo_log instance and remove it from the set. *) (** {Finding active redo_log instances} *) @@ -107,7 +110,7 @@ type t = val apply : (Generation.t -> Unix.file_descr -> int -> float -> unit) -> (Generation.t -> t -> unit) - -> ([< `RO | `RW], locked) redo_log + -> ([< `RO | `RW], unlocked) redo_log -> unit (** Read from the block device. This function is best-effort only and does not raise any exceptions in the case of error. @@ -116,7 +119,7 @@ val apply : For each database, [db_fn] is invoked with the database's generation count, a file descriptor from which to read the database's contents, the length of the database in bytes and the latest response time. The [db_fn] function may raise {!Unixext.Timeout} if the transfer is not complete by the latest response time. For each database delta, [delta_fn] is invoked with the delta's generation count and the value of the delta. *) -val flush_db_to_redo_log : Db_cache_types.Database.t -> ([< `RW], locked) redo_log -> bool +val flush_db_to_redo_log : Db_cache_types.Database.t -> ([< `RW], unlocked) redo_log -> bool (** Immediately write the given database to the given redo_log instance *) val flush_db_to_all_active_redo_logs : Db_cache_types.Database.t -> unit diff --git a/ocaml/xapi/redo_log_usage.mli b/ocaml/xapi/redo_log_usage.mli index 1bbd9c8888b..eaa3ade5409 100644 --- a/ocaml/xapi/redo_log_usage.mli +++ b/ocaml/xapi/redo_log_usage.mli @@ -16,9 +16,9 @@ *) val read_from_redo_log : - [< `RO | `RW] Redo_log.redo_log -> string -> Db_ref.t -> unit + ([< `RO | `RW], Redo_log.unlocked) Redo_log.redo_log -> string -> Db_ref.t -> unit (** Connect to the block device and write the latest version of the database * on it to a file with a given name. *) -val stop_using_redo_log : _ Redo_log.redo_log -> unit +val stop_using_redo_log : (_, Redo_log.unlocked) Redo_log.redo_log -> unit (** Disconnect from the block device. May be safely called even when not currently connected. *) diff --git a/ocaml/xapi/xapi_ha.ml b/ocaml/xapi/xapi_ha.ml index d5840f2ddb9..a9441256def 100644 --- a/ocaml/xapi/xapi_ha.ml +++ b/ocaml/xapi/xapi_ha.ml @@ -2095,7 +2095,7 @@ let before_clean_shutdown_or_reboot ~__context ~host = Note that Xha_metadata_vdi is a VDI with reason = ha_metadata_vdi_reason and type=`redo_log: type=`metadata is for DR *) debug "About to close active redo logs" ; - Redo_log.with_active_redo_logs Redo_log.shutdown ; + Redo_log.with_active_redo_logs Redo_log.shutdown' ; (* We cannot call ha_release_resources because we want to keep HA armed on reboot *) debug "About to detach static VDIs" ; List.iter Static_vdis.detach_only (Static_vdis.list ()) ; diff --git a/ocaml/xapi/xapi_ha.mli b/ocaml/xapi/xapi_ha.mli index d6967db9807..0f527956019 100644 --- a/ocaml/xapi/xapi_ha.mli +++ b/ocaml/xapi/xapi_ha.mli @@ -15,7 +15,7 @@ (** Functions for implementing 'High Availability' (HA). @group High Availability (HA) *) -val ha_redo_log : [`RW] Redo_log.redo_log +val ha_redo_log : ([`RW], Redo_log.unlocked) Redo_log.redo_log (** The redo log instance used for HA *) (******************************************************************************) diff --git a/ocaml/xapi/xapi_vdi_helpers.ml b/ocaml/xapi/xapi_vdi_helpers.ml index 2e3355ef1f4..67ccb2e1e8a 100644 --- a/ocaml/xapi/xapi_vdi_helpers.ml +++ b/ocaml/xapi/xapi_vdi_helpers.ml @@ -57,7 +57,7 @@ let assert_managed ~__context ~vdi = let redo_log_lifecycle_mutex = Mutex.create () let metadata_replication : - (API.ref_VDI, API.ref_VBD * [`RW] Redo_log.redo_log) Hashtbl.t = + (API.ref_VDI, API.ref_VBD * ([`RW], Redo_log.unlocked) Redo_log.redo_log) Hashtbl.t = Hashtbl.create Xapi_globs.redo_log_max_instances let get_master_dom0 ~__context =