.. _UC09: Use Case 09 - Replicate MN to MN -------------------------------- .. index:: Use Case 09, UC09, Replicate MN, replicate Revisions View document revision history_. Goal Replicate data from Member Node to Member Node. Summary Replication of content between Member Nodes (MN) is done to improve persistence of information (avoid data loss with loss of MN) and to improve accessibility (more choices for content retrieval can lower bandwidth requirements for any particular MN). The process of replication is controlled by a Coordinating Node (CN). A full copy of science data and metadata is made during the replication process, so the original science metadata and data is copied to the recipient MN. Data is copied across as an exact copy. Science metadata may be transformed into another format if the original can not be supported. It is important that the original metadata is preserved on the CNs, as it is always possible that the original MN where the content was published may go offline or be removed from the DataONE system. Actors Two Member Nodes, one or more Coordinating Nodes Preconditions - Content is present on a Member Node - The content has been registered with the DataONE system (i.e. Member Node Synchronization has occurred for the data and metadata) Triggers - A Coordinating Node detects that there are insufficient copies of the object(s) in question. - Information on a Member Node is altered - Capabilities of a Member Node changes (accepting more or less content) - Replication policy of DataONE or a Member Node changes - A Member Node goes offline Post Conditions - Content is present on the recipient Member Node - System metadata is updated to reflect the change - Watchers are notified of the change - Member Node and Coordinating Node logs are updated .. @startuml images/09_uc.png usecase "12. Authentication" as authen package "DataONE" actor "Coordinating Node" as CN actor "Member Node 1" as MN1 actor "Member Node 2" as MN2 usecase "13. Authorization" as author usecase "01. Get Object" as GET usecase "04. Create object" as CREATE usecase "06. Synchronize content" as SYNC usecase "16. Log event" as log usecase "21. Notify subscribers" as subscribe CN -- CREATE CN -- SYNC MN1 -- CREATE MN2 -- GET MN1 -- GET GET ..> author: <> GET ..> authen: <> GET ..> log: <> GET ..> subscribe: <> CREATE ..> author: <> CREATE ..> log: <> CREATE ..> subscribe: <> @enduml .. image:: images/09_uc.png *Figure 1.* Use case diagram indicating the actors involved in the process of Member Node replication. .. @startuml images/09_seq.png skinparam notebordercolor #AAAAAA skinparam notefontcolor #222222 title Replicate an object between two Member Nodes\n\n participant "mnA : MNode" as mnA <> participant "mnB : MNode" as mnB <> participant "cnXReplService : ReplicationService" as cnXrepl <> participant "cnZReplService : ReplicationService" as cnZ <> == Replication Event == [-> cnXrepl : hzSystemMetadata.put(pid, sysmeta) activate cnXrepl #D74F57 note right Synchronization services adds entry to SystemMetadata map managed by Hazelcast, EntryEvent is fired end note cnXrepl -> cnXrepl : entryAdded(EntryEvent) cnXrepl -> cnXrepl : queueEvent(pid) cnXrepl -> cnXrepl : itemAdded(pid) note right hzReplicationEvents.offer(pid) is called to keep track of frequent hzSystemMetadata change events. When popped off of the queue, identifiers are placed into the hzHandledReplicationEvents set until the evaluation to create a task or not is complete. This prevents multiple task creation across CNs for the same event end note cnXrepl -> cnXrepl : createAndQueueTasks(pid) loop for each ReplicationTask cnXrepl -> cnXrepl : taskid = idGenerator.newId() note right Hazelcast.getIdGenerator("task-ids") has been called in ReplicationService constructor end note cnXrepl -> cnXrepl : hzReplicationTaskQueue.put(taskid, task) note right Hazelcast distributes Replication Tasks to all CNs end note cnXrepl -> cnXrepl: setReplicationStatus(session, pid,\n nodeRef, ReplicationStatus.QUEUED) deactivate cnXrepl end loop == Process Replication Tasks == cnZ -> cnZ: itemAdded(task) activate cnZ #D74F57 cnZ -> cnZ: hzReplicationTaskQueue.poll() note left Each ReplicationService polls the replication task queue when events are fired. The first to get the lock handles the task. The others will also get the lock, but during evaluation, will not create a task because of the new state of the replica in the system metadata end note cnZ -> cnZ: ExecutorService.submit(task) activate cnZ #DarkSalmon cnZ -> cnZ: replicationTask.call(pid) cnZ -> mnB: replicate(cnZSession, mnASession, pid) activate mnB #D74F57 mnB --> cnZ: replicateResponse deactivate cnZ cnZ -> cnZ: setReplicationStatus(session, pid,\n nodeRef, ReplicationStatus.REQUESTED) note left Object's system metadata get's updated end note cnZ -> cnZ: updateSystemMetadata(pid) cnZ --> cnZ: statusResponse mnB -> mnA: getReplica(mnBSession, pid) deactivate mnB activate mnA #D74F57 mnA -> cnZ: isNodeAuthorized(mnASession, mnBSubject, pid) cnZ --> mnA: authorizationResponse mnA --> mnB: replicaBytes deactivate mnA activate mnB #D74F57 mnB -> cnZ: setReplicationStatus(session, pid,\n nodeRef, ReplicationStatus.COMPLETE) deactivate mnB cnZ -> mnA: mnA.getChecksum(pid) activate mnA #D74F57 mnA --> cnZ : checksum note right Object's system metadata get's updated end note deactivate mnA cnZ -> cnZ: updateSystemMetadata(pid) cnZ --> cnZ: statusResponse deactivate cnZ deactivate cnZ deactivate cnZ @enduml .. image:: images/09_seq.png .. @startuml images/09_seq_audit_1.png skinparam notebordercolor #AAAAAA skinparam notefontcolor #222222 title Audit replicas on Member Nodes.\n\n participant "ReplicaAuditQuartzJob" as cnQuartz <> participant "ReplicaAuditService" as cnAudit <> participant "ReplicaAuditDao" as auditDao <> participant "DistributedExecutorService" as cnZ <> == Replication Audit == note left of cnQuartz Quartz scheduled execution of replica audit processing. end note cnQuartz -> cnAudit : auditReplicas() activate cnAudit #D74F57 cnAudit -> cnAudit : aquireReplicaAuditingLock(); note over auditDao Query the Metacat replica system metadata replica status table for pids of replica records which have not been audited for the audit period length of time. May be a paged result. end note cnAudit -> auditDao : getReplicasByDate() cnAudit <-- auditDao : List pidsToAudit loop for each Identifier pid in pidsToAudit note over cnAudit Create ReplicaAuditTasks if not already being handled. Tasks can be configured to batch several Identifiers into tasks or to create a task for each Identifier. end note alt if pid NOT in hzProcessingAuditIdentifiers cnAudit -> cnAudit : replicaAuditTasks.add(new ReplicaAuditTask(pid)); end alt if replicaAuditTasks.size => replicaBatchSize note over cnAudit accumulatedTaskFuture List holds futures generated on this iteration. futuresToHandle holds futures generated on last iteration. Assumption is futures provided to distributed execution service on previous iteration should have executed asynch already. end note cnAudit -> cnAudit : futuresToHandle = accumulatedTaskFutures; // futures from last batch cnAudit -> cnAudit : accumulatedTaskFutures.clear(); loop for each ReplicaAuditTask auditTask in replicaAuditTasks note over cnZ Hazelcast distributed executor service used to distribute audit tasks among CN nodes. end note cnAudit -> cnZ : submit(auditTask); cnAudit -> cnAudit : hzProcessingAuditIdentfiers.removeAll(auditTask.pid); cnZ --> cnAudit : Future replicaTaskFuture; cnAudit -> cnAudit : accumulatedTaskFutures.add(replicaTaskFuture); end loop cnAudit -> cnAudit : replicaAuditTasks.clear(); // prepare for next batch tasks. loop for each Future replicaTaskFuture in futuresToHandle cnAudit -> cnAudit : handleResult(replicaTaskFuture.get()); end loop end end loop note over cnAudit Handle futures from last batch of audit tasks end note loop for each Future replicaTaskFuture in accumulatedTaskFutures cnAudit->cnAudit: handleResult(replicaTaskFuture); end loop deactivate cnAudit @enduml *Figure 2.* Interactions for use case 09. The diagram describes transfer of a single object from MN_A to MN_B as directed by a CN. It is assumed that the object does not exist on MN_A and the object has been identified as requiring replication by the CN checking its status in the system metadata. The end state of a replicate operation is that content is available on the MN, the MN has notified the CN of such, and the CN will schedule a synchronize operation that will verify the copy as legitimate. Implementation Details ---------------------- Replication of objects between Member Nodes (MN) within the DataONE system is managed by the Coordinating Nodes (CN). CNs are aware of the replication policies of each object (through system metadata) and the capabilities of each MN (through node capabilities), and populate a distributed queue of replication tasks to be processed by all of the CNs. Replication can be initiated in three ways: 1) CN synchronization: harvesting of system and science metadata 2) CN timed replication: periodic sweep of all system objects 3) MN event-based replication: MN sends replication request to a CN (not implemented) Replication Events ------------------ The CN's maintain a synchronized, distributed Hazelcast Map of system metadata (hzSystemMetadata). This map reflects the current state of the DataONE system's object store. This in-memory map is also bound to the backing Metacat object store via the Hazelcast MapStore and MapLoader interfaces. The hzSystemMetadata map serves as an object-level locking mechanism across CNs, and any service that will make changes to an object's system metadata will need to gain a lock on the given object identifier in the map. The hzSystemMetadata map is set to be persisted (backed-up) on 3 CNs. As the CN Synchronization Service becomes aware of create, update, and delete events for MN objects through harvesting, it updates the hzSystemMetadata map. The Replication service monitors this map for entry changes, and builds a list of ReplicationTask objects for each changed identifier in the map. This is done by calling ReplicationService.createReplicationTaskList(pid). The Replication Service evaluates the ReplicationPolicy of the given object's system metadata, evaluates the capabilities and availability of the potential target MNs, and creates a ReplicationTask for each MN replication target up to the numberOfReplicas in the object's ReplicationPolicy. Each ReplicationTask is listed based on priority. The Replication Service then iterates through the returned task list and populates the hzReplicationTasks queue with the ordered tasks. Each item offered to the queue consists of a task identifier and a ReplicationTask. .. Note:: TODO: Describe the CN time-based population of the replication task queue that periodically does a full sweep of the object store. .. Note:: TODO: Describe the MN-based replication via a CNReplication API request (not implemented) Processing Replication Tasks ---------------------------- As the hzReplicationTasks queue is populated, each CN's Replication Service receives entry added events and polls the queue with a short timeout to submit new tasks to process. A CN Replication Service instance's entryAdded() method is fired, and it in turn polls the task queue and submits the ReplicationTask to the cluster-wide Executor Service. One of the CN's will execute the task by calling the ReplicationTask.call() method. This call initiates MN replication. The CN calls replicate() on the target MN (mnB), passing in the cnZ token (cnZToken), the originating node reference (mnA), and the identifier of the object to be replicated (pid). This call triggers the MN (mnB) to call getReplica() on the originating MN (mnA), passing in mnB token (mnBToken) and the identifier of the object to be replicated (pid). In turn, the CN updates the system metadata for the object, setting the ReplicationStatus to REQUESTED after gaining the lock on the object. The lock is immediately released. Before responding to getReplica(), mnA checks for replication authorization by calling isNodeAuthorized() on the CN, passing in the mnA token (mnAToken), the Subject listed in the mnBToken (mnBSubject), the object identifier (pid), and the desired replication permission (replicationPermission). The Replication Service looks up Subject in the LDAP replication group, and returns the response. Upon successful authorization, mnA replicates the object (replicaBytes) to the target MN (mnB). mnB in turn sends a successful replication response to the CN (replicateResponse). The CN Replication Service once again updates the system metadata for the object after gaining a lock in the hzSystemMetadataMap. The lock is immediately released, and the statusResponse is sent to the CN. Note (2011.01.07 CWB): This simple authentication scheme will not work on member nodes that have their own access control rules. In this scheme, each member node will need to have knowledge of the administrative (or replication) credentials for each of the other member nodes. The CN needs to handle the login actions for both of the MNs involved and send an authenticated token from MN_A to MN_B so that it can use that credential to successfully get the document. This is only the case if the document on MN_A is read protected. If it is public, not token is needed. Note that the call setReplicationStatus with a value of *COMPLETE* is functionally equivalent to the *notify(objectCreated, identifier)* call indicated in use case 06. Replication Auditing --------------------- .. image:: images/09_seq_audit_1.png .. @startuml images/09_seq_audit_2.png skinparam notebordercolor #AAAAAA skinparam notefontcolor #222222 title Replica Audit Task Procesing.\n\n participant "ReplicaAuditService" as cnAudit <> participant "DistributedExecutorService" as cnZ <> participant "ReplicaAuditTask" as auditTask <> participant "d1Client.CNode : CNodeClient" as cNodeClient <> participant "d1Client.MNode : MNodeClient" as mnA <> participant "ReplicationManager" as repManager <> == Process replication audit. == cnAudit -> cnZ : submit(replicaAuditTask); cnZ -> cnZ: execute(replicaAuditTask); cnZ -> auditTask : call() activate auditTask #D74F57 loop for each Identifier pid in auditTask.pids auditTask -> cNodeClient : getSystemMetadata(pid); cNodeClient --> auditTask : sysMetadata; auditTask -> auditTask : verifiedReplicaCount = 0; loop for each Replica replica in sysMetadata.getReplicaList() auditTask -> auditTask : mnA = getMNodeClient(replica.getReplicaMemberNode()); auditTask -> mnA : replicaChecksum = getChecksum(pid, sysMeta.getChecksum()); alt if replicaChecksum == sysMetadata.getChecksum() auditTask -> auditTask : verfiedReplicaCount++; auditTask -> auditTask : replica.updateVerifiedDate(today); auditTask -> cNodeClient : updateReplicationMetadata(pid, replica); else else auditTask -> auditTask : replica.setReplicationStatus(INVALID); auditTask -> cNodeClient : updateReplicationMetadata(pid, replica); auditTask -> repManager : createAndQueueTasks(pid); end end loop alt if sysMetadata.getReplicationPolicy().getNumberReplicas !== verifiedReplicaCount auditTask -> repManager : createAndQueueTasks(pid); end end loop auditTask --> cnZ : String audit result deactivate auditTask cnZ --> cnAudit : Future - audit results including exceptions @enduml .. image:: images/09_seq_audit_2.png .. _history: https://redmine.dataone.org/projects/d1/repository/changes/documents/Projects/cicore/architecture/api-documentation/source/design/UseCases/09_uc.txt