Course Evaluation (Final grade: A)

Another interesting course! I would recommend you take this course if you want to read related paper or do some small projects across different topics in cloud computing.

The most interesting project is training large model using spark. The average workload is not quite large, but you may need to spend many hours in one or two weeks through the semester based on load distribution.

Blog Chapters

  1. Chapter 1: Overview of Cloud Computing
  2. Chapter 2: Building a Cloud
  3. Chapter 3: Encapsulation
  4. Chapter 4: Programming Models and Frameworks
  5. Chapter 5: Cloud Storage
  6. Chapter 6: Scheduling Computation
  7. Chapter 7:

** Chapter 1: Overview of Cloud Computing **

Definitions

Properties:
  • Computing utility, always available, accessible through the networks
  • Simplified interface
  • Statistical multiplexing, sharing resources
  • Economies of scale from consolidation, costs lower
  • Capital costs converted to operating costs
  • Rapid and easy variation of usage
  • Appearance of infinite resources with small users
  • Pay only for what you use
  • Cost conservation: 1 unit for 1000 hours == 1000 units for 1 hour
Consolidation, sharing, elasticity
  • CLT theory
  • users with widely varying needs apply a considerably less variable load on a huge provider, allowing providers to do less overprovisioning.
    • Because of CLT, it is predictable for the overall load which causes less overprovisioning.
  • Users perceive exactly what they need all the time, if their needs are “small”(so the accessed resources are appearing as infinite)
SaaS, PaaS, IaaS
  • SaaS: service as application (Salesforce)
    • consumer does not manage or control the underlying cloud infrastructure including network, servers, operating systems, storage & minimum deployed applications configurations settings.
  • PaaS: high-level programming model for cloud computer, Turing complete but resource management hidden. (Google AppEngine)
    • Only App and data are controlled by user
    • consumer does not manage or control the underlying cloud infrastructure including network, servers, operating systems, storage but with control over the deployed applications and possibly configuration settings for the application-hosting environment.
  • IaaS: low-level computing model for cloud computer (AWS)
    • The consumer does not manage or control the underlying cloud infrastructure but has control over operating systems, storage, and deployed application
    • Can manage App, Data, Runtime, Middleware, OS, Cannot manage Virtualization, servers, storage, networking
XXX as a Service
  • Data as a Service, Network as a Service, Communication as a Service(No hardware private VOIP switching), IT as a Service(IT providing services)..
Deployment models
public cloud: provider sells computing to unrelated consumers
private cloud: largely unrelated components and divisions as consumers
community cloud: providers and consumers are different organizations with strong shared concerns
Hybrid cloud: private plus public resources combined by same consumer. Better availability, overflow from private to public, load balancing to increase elasticity
Larry Ellison’s objection
  • definition is including too much
Obstacles of cloud computing
  • Privacy & security
  • Privacy in the world tends to rely on regulation
  • Utility issues
  • Physical utilities tend to rely on regulation
  • High cost of networking combined with always remote
  • Performance unpredictability & in situ development/debugging
  • Software licensing – $/yr/CPU is not elastic and pay as you go
load balancing approach
1. DNS load balancing
  • DNS reorder the list for each client asking for translation of name
  • PRO: easy to scale, unrelated to actual TCP/HTTP requests
  • CON: new server may get less resources if scheduling more servers, dynamic changing is hard because it has to tell client for a binding(existing binding exists due to TTL, caching in network middleboxes).
2. Router distribute TCP connections
  • Router do the mapping: (client_ip+client_port) <-> (server_ip+server_port)
    • for SYN packets
    • not exposed to client about the ip address(like NAT)
  • PRO: router doesn’t have to think or remember too much, *it just selected the server with least connnection to schedule for the new connections.
  • CON: traffic all go through the router(more difficult to scale), and decision takes time cuz it’s for the entire session

3. Router distribute individual quests embedded in connections

  • PRO: most dynamic
  • CON: requires the most processing and state in the router, CPU load and memory goes up due to intelligent routing decisions.

    Elasticit: How? Elasticity controller

  • Elasticity controller to adjust load capability based on current load status
  • Monitoring: resource usage, request sequence(patterns)
  • Triggering: (simple conditions like thresholds), schedule, complex model based on monitored instances

Elasticity: Scale-out or Scale up

  • Horizontal scaling: adding more instances (Common approach)
  • Vertical scaling: Resizing the resources(bdw, cpu cores, memory) allocated to an existing instances, challenging(different OS.) More challenging.

Two-tier services

  • web-database
  • web server easy to be in cloud. At beginning, order-taking is not in cloud.
  • Elasticity in IaaS: database scaling is more difficult with state(consistency)
  • PassS, P=Web Service. Built-in elastic load balancing and scheduled actions for containers, persistent key-value store (datastore) & non-persistent memcache for simple database tier, Users can instantiate Backends, user code can request (actuate) horizontal scaling, running traditional database services, whose scaling is still hard.
Load-balancing method affect how much statistics we can get.
  • Router-based load balancing: firewall, intrusion detection, accelerator
  • scaling middleboxes: CPU intensive tasks. (OpenFlow, split flows)
  • bdw allocation by sw/rt
Service parallelization: Load Balancer
  • aws cloud watch
  • Load balancer is not necessarily elastic
Scalable relational database
  • Separate data at rest(distributed pay-for-use storage (HDFS)) from ongoing or recent access & mutation
  • Recent access & mutation servers are elastic (called Owning Transaction Managers)
  • Partitioned but all transactions restricted to one partition: transactions block on locks and bottleneck performance scaling
  • Fault-tolerance of Elastic controller. Controller itself, reliability provided by replication. Can re-assigns partitions while server is down/start up.
ElasTraS architecture scales OTM machines
  • Transactions are limited to interacting with data from only one partition to avoid the complexity of distributed transactions.
  • TODO

Paper reading notes

Armbrust2010
Referring to http://doi.acm.org/10.1145/1721654.1721672
Cloud computing: what brings it? large capital outlays, overprovisioning/underprovisioning...

Definition: Refers to both the applications as services over the internet and hardware and systems software in the data center that provide those services. The services themselves: SaaS. Services being sold is utility computing. Cloud computing = SaaS + utility computing. It has to be large enough to be called cloud. Hardware provisioning/pricing: 1. inifinite computing resources available on demand; 3. elimination of an up-front commitment by cloud users, add resources when needed; 4. pay for use of computing resources temporarily. *Construction and operaition of extremely large-scale, commodify-computer data centers at low-cost locations was the key necessary enabler of cloud computing. It could offer services below the costs of a medium-sized data center and make profits.

Utility Computing classes: EC2 with low level control but less automatic scalability and failover(application may need to control the replication...). Google AppEngine(domain specific platforms) on the other hand. Azure is in between. 

Economics: Favor cloud computing over conventional: 1. demand of services changes over time 2. demand is unknown
usage based pricing economically benefits the buyer. 
Elasticity helps reduce the costs. Underprovisioning has a cost that is difficult to measure: the users may never come back. Scale-up elasticity is an operational requirement, and scale-down elasticity allowed the steady state expenditure to more closely match the steady-state workload. 


Obstacles for Cloud computing:

1-3(adoption):
1. Business Continuity and Service Availability (hard to ensure availability, single failure still exists for a service provider)
2. Data Lock-In (public+private sharing by sharing API)
3. Data Confidentiality/Auditability: from other user/provider 

4-8(growth):
4. Data Transfer Bottlenecks: Applications continue to become more data-intensive
5. Performance Unpredictability: I/O interference between virtual machines, concerns scheduling of virtual machines for some classes of batch processing programs, specifically for highperformance computing
6.Scalable Storage
7. Bugs in Large Scale Distributed Systems: bugs cannot be reproduced in smaller configurations
8. Scaling Quickly

9-10(policy and business):
9. Reputation Fate Sharing, legal liability(customer responsible->unexpected down)
10. Software Licensing



Opportunities: 
- improve architectures and operating systems to efficiently virtualize interrupts and I/O channels
- flash memory will decrease I/O interference.
- offer something like “gang scheduling” for cloud computing
- create a storage system that would not only meet existing programmer expectations,but combine them with the cloud advantages of scaling arbitrarily up and down on demand. 
- reliance on virtual machines in cloud computing.(7)
- automatically scale quickly up and down in response to load in order to save money
- create reputation-guarding services similar to the “trusted email” services 
Referring to NISTdef2011
Characteristics:
- On-demand self-service
- Broad network access
- Rapid elasticity
- Resource pooling (multi-tenant)
Vaquero11
Dynamically Scaling Applications in the Cloud
vertical scaling is hard: rebooting..
load balancers need to support

Server scalability: a per-tier controller, a single controller for the whole application (for all tiers)
How and When to Scale: Feeding Controller with Rules and Policies
- adding load balancers
- LB scalability requires the total time taken to forward each request to the corresponding server to be negligible for small loads and should grow no faster than O(p) 
-  CPU-intensive web app: a LB to split computation among many instances.
-  network intensive: CPU-powerful standalone instance to maximize throughput
-  more network intensive applications, it may be necessary to use DNS load balancing

Scaling the network:
- Virtualization: VLAN(L2)
-  periodically measure actual network usage per application use other applications’ - increase the utilization of the network by virtually “slicing”: “network as a service”
- statistical multiplexing


Scaling the platform:
PaaS cloud platforms
- Container Scalability: execution environment for user applications.scalability of the container layer is crucial as it must efficiently manage and distribute resources to meet the demands of potentially numerous applications running concurrently.

Multitenant containers and Isolation: requires strong isolation to prevent security issues
Individual Containers per User: simplifies isolation but requires effective management of numerous containers.
Horizontal Scaling via Container Replication: achieved through automatic scaling in IaaS systems or by inherent capabilities of the platform itself
Components should ideally be stateless to cope with the dynamic nature of container instantiation and disposal, support for stateful components is also necessary, requiring sophisticated load balancing (LB) and container management to handle session data.(soft state replication, distributed caching systems)

- Database Scalability: for example, implementing horizontal scaling strategies, such as replicating the database across multiple nodes to handle increased load and ensure data availability. (demands on PaaS platforms can often surpass the capacity of any single machine)
NoSQL Databases: These databases provide high scalability and availability, fitting well in cloud environments with high demand. However, they offer eventual consistency rather than immediate consistency, leading to limitations in transaction support and SQL functionalities.
Replication Mechanisms: In-core Solutions, Middleware Solutions

Ideal Elastic Cloud: scale through VM or container replication, reconfiguration, and dynamic load balancing, possibly using DNS for the latter. allow dynamic allocation of network resources. 
Ideal PaaS Platform Features: capable of instantiating or releasing instances of user components based on demand changes and distributing the load transparently among them. Implementing session concepts and supporting transparent data replication are essential. Access to traditional relational databases with ACID transaction support is crucial. However, the system must address the increased latency due to consistency maintenance across replicas, especially under high demand.
  • Infrastructure-as-Code tools provide a high-level software interface (e.g., Python / Ruby or JSON / YAML) that allows developers to specify their infrastructure requirements, software dependencies, and the process for building the infrastructure and deploying it to the cloud

Back to Blog Chapters

** Chapter 2: Building a CMU Cloud **

Model

aim for less costs
Client ->
App
OS
Instance(s)
--------------------
Hypervisor (shared)
Building Blocks
Usage Monitor
Hardware (shared)
(shared Amazon EC2)

Problem: used mostly during ddl? 2 methods, renting..
Build a cloud
1. user would rent, so provisioner(resources,monitoring,... -> assignment of users to machines)
- Bin-packing, NP-hard. Can do with assumptions.. -> less complexity
- Migration, also costs
2. Scheduler: which user jobs/processes to run: Prioritization(pay more), Oversubscription, Workload constraints
3. Encapsulation: compute, storage, networking and data
4. Virtualization
5. Fault tolerance: 
    why scale of data center matters? Economy of scale, cost of operations
    state replication, logging, storage replic

Storage services - Scalable, fault tolerant
Tools - Programming models, frameworks
Automation - Reactive systems & elastic scaling

Elastic scaling - based on monitoring and diagnosis
traditional: provision for peaks

  • Cloud users & Services
    • App user: availability, performance, no interfaces exposed by cloud service provider (app provider did that)
    • Application Deployment User: Dashboard, management interfaces exposed
    • Admin: Management…
  • Orchestration: automatic deployment for user
OpenStack
  • independent parts,6 core services
  • communicate through public and well-defined APIs
  • Identity -> Dashboard -> Compute(?)/Network -> Image -> Object storage(get the image for VM) -> Block Storage(volume) -> initiate VM
  • Distributed storage to accelerate image initialization takes Azure many years to fix
Referring to sotomayor2009
Virtual Infrastructure Management in Private & Hybrid Clouds
Virtual Infrastructure (VI) management in the context of private/hybrid cloud environments: Uniform Resource View, Full Lifecycle Management of VMs, Configurable Resource Allocation Policies, Adaptability to Changing Resource Needs. provides primitives to schedule and manage VMs across multiple physical hosts


OpenNebula: This is a VI manager that allows organizations to deploy and manage VMs, either individually or in groups. It automates the VM setup process (including preparing disk images and setting up networking) and is compatible with various virtualization layers (Xen, KVM, VMware) and external clouds (EC2, ElasticHosts).

Haizea: This acts as a lease manager and can serve as a scheduling backend for OpenNebula. It introduces leasing capabilities not present in other cloud systems, such as advance reservations and resource preemption, which are particularly valuable for private cloud environments.



Traditional VI Management Tools: lack certain features necessary for building IaaS clouds, such as public cloud-like interfaces and the ability to deploy VMs on external clouds.
Cloud toolkits can help transform existing infrastructure into an IaaS cloud with cloudlike interfaces. VI management capabilities are not as robust.
    -: Gap: There's a noticeable gap between cloud management and VI management. Cloud toolkits attempt to cover both areas but often fall short in delivering comprehensive VI management functionalities. Integrating cloud management solutions with VI managers is complex due to the lack of open, standard interfaces and certain key features in existing VI managers.
    
    OpenNebula: overcome these challenges, Scaling to external clouds. A flexible and open architecture for easy extension and integration with other software. A variety of placement policies and support for scheduling, deploying, and configuring groups of VMs.
    
    Integration of Haizea with OpenNebula: work as a scheduler for OpenNebula, This integration allows OpenNebula to offer resource leases as a fundamental provisioning abstraction, and Haizea to operate with real hardware through OpenNebula. This combination provides advanced features like advance reservation of capacity, which is not offered by other VI managers.

    The integration is particularly beneficial for private clouds with limited resources, enabling sophisticated VM placement strategies that support queues, priorities, and advance reservations (ARs).

OpenNebula: 
core:  image and storage technologies for preparing disk images for VMs, network fabric (such as Dynamic Host Configuration Protocol [DHCP] servers, firewalls, or switches) for providing VMs with a virtual network environment, hypervisors for creating and controlling VMs. Performs operations through pluggable drivers. 
A separate scheduler component makes VM placement decisions.
Management interfaces: libvirt API intergrate within other data center management tools, cloud interface exposed to external users.
Cloud drivers to interface with external clouds

The Haizea Lease Manager: Haizea is an open source resource lease manager and can act as a VM scheduler for OpenNebula.
Advance Reservation (AR) Leases: Resources are guaranteed to be available at a specific future time. This is beneficial for scenarios requiring resource certainty.
Best-Effort Leases: Resources are provisioned as soon as possible, with requests queued if immediate provisioning isn't possible.
Immediate Leases: Resources are provided immediately upon request or not at all, suitable for urgent needs without flexibility.
    
Haizea addresses the challenge of resource underutilization, often a downside of AR, by leveraging VMs. It allows for efficient support of ARs through resource preemption - suspending lower-priority VMs to free up resources for higher-priority needs and resuming them later.
It uses optimizations such as reusing disk images across leases to minimize the impact of preparation overhead and schedules runtime overheads (like suspending, resuming, and migrating VMs) efficiently.
Scheduling is based on a resource slot table, representing all physical nodes managed by Haizea over time.
Best-effort leases are managed using a first-come-first-serve queue with backfilling, optimizing queue-based systems.
AR leases utilize a greedy algorithm for selecting physical resources to minimize preemptions.

There are ongoing efforts to extend OpenNebula's capabilities, including the implementation of the libvirt interface, VM consolidation schedulers for minimizing energy consumption, and tools for service elasticity management, VM placement, public cloud interface support, and policy-driven dynamic placement optimization.

Haizea shows that VM-based approaches with suspend/resume capabilities can address utilization issues typically associated with advance reservation (AR) use.

Back to Blog Chapters

Xen and the Art of Virtualization
Xen, a high-performance, resource-managed VMM that effectively allows multiple commodity operating systems to share hardware resources safely and efficiently

Advantages: isolation of virtual machines to prevent adverse performance effects, accommodation of diverse operating systems, and minimization of performance overhead introduced by virtualization

- targeting existing application binary interfaces (ABIs) and supporting the virtualization of complex server configurations within a single guest OS instance. full multi-application operating systems, and the necessity of paravirtualization for high performance on complex architectures like x86.

 resource virtualization: ensures both correctness and performance by allowing guest operating systems visibility of real as well as virtual resources.  better support for time-sensitive tasks and optimal use of physical resources like superpages or page coloring. 

Memory Management:
Segmentation and Paging: Guest OSes are responsible for managing hardware page tables with minimal Xen involvement for safety. Xen resides in a 64MB section at the top of every address space to avoid TLB flush when switching to and from the hypervisor. Segmentation is virtualized by validating updates to hardware segment descriptor tables.
Guest OS Restrictions: Guest OSes cannot install fully-privileged segment descriptors and must not overlap with the top end of the linear address space. They have direct read access to hardware page tables but updates are batched and validated by Xen.


CPU:
Privilege Levels: Guest OSes run at a lower privilege level than Xen to protect the hypervisor from OS misbehavior and ensure domain isolation. x86's four privilege levels are leveraged, with guest OSes typically moved from ring 0 to ring 1.
Exceptions and System Calls: Guest OSes must register descriptor tables for exception handlers with Xen. They may install a ‘fast’ handler for system calls to avoid indirecting through Xen for every call. Page faults and system calls are the most performance-critical exceptions.

Device I/O:
Virtual Devices: Instead of emulating hardware devices, Xen introduces efficient and simple device abstractions. Data transfer is done using shared-memory, asynchronous buffer-descriptor rings, allowing Xen to perform validation checks.
Event System: Hardware interrupts are replaced with a lightweight event system. Xen supports a mechanism for sending asynchronous notifications to a domain, similar to hardware interrupts, but with more control and efficiency.

Porting Operating Systems to Xen: Code Modifications in architecture-independent sections, virtual network drivers, block-device drivers, and Xen-specific (non-driver) areas. Both operating systems required substantial alterations in their architecture-specific sections. 

Role of Domain0: A domain created at boot time, termed Domain0, is responsible for hosting application-level management software. It uses the control interface to manage the system, including creating and terminating other domains and controlling their resources.

Control Transfer: Hypercalls and Events:

Hypercalls: Synchronous calls from a domain to Xen, analogous to system calls in conventional OSes. Used for operations like requesting page-table updates.
Event Mechanism: Asynchronous notifications from Xen to domains, replacing traditional interrupt delivery mechanisms. Events are used for lightweight notifications like domain-termination requests or indicating that new data has been received over the network.
Data Transfer: I/O Rings:

Efficient data transfer mechanism that allows data to move vertically through the system with minimal overhead. The design focuses on resource management and event notification.
I/O rings are circular queues of descriptors allowing asynchronous, out-of-band data transfer between Xen and guest OSes. They support a variety of device paradigms and enable efficient zero-copy transfer.

Subsystem Virtualization:
CPU Scheduling: Xen employs the Borrowed Virtual Time (BVT) scheduling algorithm for domain scheduling, ensuring low-latency wake-up and fair resource sharing.
Time and Timers: Xen provides notions of real time, virtual time, and wall-clock time to guest OSes. Domains can program alarm timers for real and virtual time, with timeouts delivered using Xen's event mechanism.

Virtual Address Translation: Xen avoids the overhead of shadow page tables by allowing guest OSes to manage hardware page tables directly, with Xen involved only in updates validation. This approach minimizes the number of hypercalls required for page table management.

Physical Memory: Memory is statically partitioned between domains, providing strong isolation. Guest OSes can adjust their memory usage dynamically, with mechanisms like the balloon driver in XenoLinux facilitating this interaction.

Network: Xen introduces the concept of a virtual firewall-router (VFR), with each domain having virtual network interfaces (VIFs) attached to the VFR. Domains transmit packets via enqueueing buffer descriptors on I/O rings, and Xen handles packet reception efficiently by exchanging packets directly with page frames.
Disk: Only Domain0 has direct access to physical disks. Other domains access persistent storage through virtual block devices (VBDs). Disk requests are batched and serviced in a round-robin fashion by Xen, ensuring fair access and good throughput.

Advantages of Delegating Domain Construction to Domain0:
Reduced Hypervisor Complexity: Building a domain within Domain0 rather than entirely within Xen simplifies the hypervisor's design, focusing its functionality on core tasks and leaving the domain setup process to a more specialized and capable component.

Improved Robustness: The process of setting up a new domain involves numerous delicate operations. Performing these operations within Domain0 allows for better error checking and handling.

Metrics:
Performance Comparison with Other Virtualization Technologies
Efficient Data Transfer and Subsystem Virtualization
Performance Isolation
Scalability
Network Performance
Concurrent Virtual Machines
Microbenchmarks 

Key points:
Efficient Paravirtualization
Resource Management and Performance Isolation
Scalability
Minimal Performance Overhead
Generality of the Interface
Facilitation of Network-Centric Services

** Chapter 3: Encapsulation **

Options available for encap
- Isolation: others can't read/write your data
- - can't impact your performance (DOS)
- Processes may need memory sharing, get resource alloc, has own address space
1. Bare metal
+: good isolation, performance, very good software freedom
-: granularity limitation
2. Process: just application
3. Containers: with applicaiton & library/fs/etc that looks like an OS
+: performance, decent software freedom
-: security issues
4. Virtual Machines: physical machine-like software container
HW <-> VMM <-> (OS, Library/fs/etc/, App)
+: isolation properties, good software freedom
-: performance overhead, imperfect performance isolation

Comparison
  • Process, Container, VM, BM
  • Lower management <-> Better isolation/fidelity
interface is the key
  • between APP/Library/OS/HW
Linux namespaces
  • pid, gid, …
    resouece alloc: linux groups
Limitation of containers
  • share the same host OS, security + flexibility(have to work with this OS)
Virtualization
  • VM: vm OS <–virtual machine interface–> VMM <–machine interface–> HW
  • The interfaces can be different
HW Virtualization Principles
  • Fidelity, software operation keeps identical
  • Isolation, guest cannot affect other guest/VMM
  • Performance, most operations execute natively

Ecap Principle

- Basic: Execute VM softare in de-privileged mode
- - prevent privileged instr from escaping containment
Privilege Level
App User/Ring 3
Lib User/Ring 3
OS  Supervisor/Ring-o
HW


Option 1
Privilege Level
App User/Ring 3
Lib User/Ring 3
OS  User/Ring-3
VMM Supervisor/Ring-0
HW
Overhead - Isolation, VMM has to deal with operations from OS.. not ideal

Option 2
Privilege Level
App User/Ring 3
Lib User/Ring 3
OS  Ring-1
VMM Supervisor/Ring-0
HW

----> later
Option 3
Privilege Level
App User/Ring 3*
Lib User/Ring 3*
OS  Ring-0* (don't have all power, cannot change fundamental page tables..)
VMM Supervisor/Ring-0
HW
Priv Ops
- Most executions run directly
- VNM needs to handle OS attemps that execute priv ops, Non-CPU devices..
- - popf, pushf...

https://www.vmware.com/content/dam/digitalmarketing/vmware/en/pdf/techpaper/VMware_paravirtualization.pdf

Handle: 
Trap & emulate: VMM handles
Static software re-writing/paravirtualization: rewrite guest OS to leverage VMM hypercalls (performance, sacrificing transparancy)
Dynamic software re-writing: VMM re-write guest's privileged code - coalescing traps, VMM complex, good performance


Referring to stackoverflow.com/questions/21462581
Paravirtualization is virtualization in which the guest operating system (the one being virtualized) is aware that it is a guest and accordingly has drivers that, instead of issuing hardware commands, simply issue commands directly to the host operating system. This also includes memory and thread management as well, which usually require unavailable privileged instructions in the processor.

Full Virtualization is virtualization in which the guest operating system is unaware that it is in a virtualized environment, and therefore hardware is virtualized by the host operating system so that the guest can issue commands to what it thinks is actual hardware, but really are just simulated hardware devices created by the host.

Hardware Assisted Virtualization is a type of Full Virtualization where the microprocessor architecture has special instructions to aid the virtualization of hardware. These instructions might allow a virtual context to be setup so that the guest can execute privileged instructions directly on the processor without affecting the host. Such a feature set is often called a Hypervisor. If said instructions do not exist, Full Virtualization is still possible, however it must be done via software techniques such as Dynamic Recompilation where the host recompiles on the fly privileged instructions in the guest to be able to run in a non-privileged way on the host.

There is also a combination of Para Virtualization and Full Virtualization called Hybrid Virtualization where parts of the guest operating system use paravirtualization for certain hardware drivers, and the host uses full virtualization for other features. This often produces superior performance on the guest without the need for the guest to be completely paravirtualized. An example of this: The guest uses full virtualization for privileged instructions in the kernel but paravirtualization for IO requests using a special driver in the guest. This way the guest operating system does not need to be fully paravirtualized, since this is sometimes not available, but can still enjoy some paravirtualized features by implementing special drivers for the guest.

Handling memory: process
- OS: page mapping from virtual page to physical page
- User process: typically continuous address space


Mutiple OS:
Guest OSes manage: Mapping guest virtual to guest physical
VMM manages guest physical to host physical

other split: cores, time sharing..

SW: Shadow page tables
HW: Extended page tables(EPT)
Virtualization Devices
- Could be all virtualized (trap accesses, and emulate), but worse performance

For performance
1. Mapping, control given to a host (hw support)
2. Partition, (disk..)
3. Guest enhancement (special VM -> VMM calls)
4. Virtualized-enhanced devices (NICs with VMDq)

Security issues of virtualization/containerization
  • meltdown
  • bitflip(row hammer)

Back to Blog Chapters

Web servers
  • Online retail stores
  • parallel programming: contention, failures, consistency(shared resources), load balancing,..
HPC
  • scaling for N processes (lower costs, faster computation)
  • strong/weak: same process finishes faster on N processors?
  • Bulk Synchronous Processing..
  • MPI, resource allocators, schedulers, ft
  • Very manual, deep learning curve, few commercial runaway successes
    Grid Computing
  • easier to use
  • emphasized geographical sharing
  • jobs selected from batch queue, take over cluster
  • workload diversity
    Cloud Computing
  • low initial cost
  • workload diversity +
  • may cost efficiency for easier programming
Motivation for CC programming framework
  • using many commodity* machines
  • tolerate failures
  • locality(reduce cost of communication), parallelism
Batch processing of large datasets on a cluster: framework offers..
  • Job orchestration
  • Data staging and movement
MapReduce
- read large data set
- ind process the input data in chunks
- shuffle and sort data (by key)
- reduce(merge)
- locality optimization for racks
- shuffle begins after first round of map (master responsible for scheduling this)

Split
RR(record reader) ->(K,V) Map -> (K', V') -> 
Partitioner: hash(key) mod R -> reducer
Sort (may need to wait for reduce finish)
Reuce

Map data size vs shuffle data size
  • ngram(small->large)
Spark
Less disk operations for iterative apps
RDD as memory storage(immutable), rerun instead of storing in persistent storage
transformations create new ones, can be lazy
action create value, recomputed when action comes

** Chapter 4: Programming Models and Frameworks **

Map Reduce
Core Concepts of MapReduce:

The MapReduce model involves two primary functions: the map function and the reduce function.
The map function takes key/value pairs as input and produces a set of intermediate key/value pairs.
The reduce function then merges all intermediate values associated with the same intermediate key.

Automatic Parallelization and Distribution:
MapReduce abstracts the complexity of parallelization, fault-tolerance, data distribution, and load balancing

Scalability and Ease of Use:
The MapReduce implementation can process vast amounts of data (many terabytes) across thousands of machines.
It has a user-friendly nature, evidenced by the extensive use within Google, with hundreds of MapReduce programs implemented and thousands of jobs executed daily.

Motivation and Design Philosophy:
The motivation behind MapReduce stemmed from the need to process large sets of raw data (like web documents, request logs) into computed forms (like inverted indices, summaries) efficiently.
MapReduce was designed to simplify the computations by abstracting the complex code required for parallelization, data distribution, and fault tolerance into a library.

Programming Model:
The computation in MapReduce takes input key/value pairs and produces output key/value pairs, expressed through Map and Reduce functions defined by the user.
It handles the grouping of intermediate values by keys and provides these to the Reduce function, which then merges the values per key.

Examples and Applications:
The abstract and subsequent sections provide practical examples, like counting the number of occurrences of each word in a document collection, showcasing the model's applicability to real-world tasks.
It also highlights the versatility of MapReduce in tasks such as distributed grep, counting URL access frequency, reversing web-link graphs, and computing term-vectors per host.
Inverted Index: A fundamental operation in document searching where the map function processes documents, emitting word and document ID pairs, and the reduce function groups these by word, creating a list of document IDs for each word.
Distributed Sort: A sorting operation where the map function extracts keys from records and the reduce function sorts these keys, relying on the system's partitioning and ordering capabilities.

Implementation and Execution Overview:
designed for large clusters of commodity PCs, considering the typical hardware specifications and the nature of the network and storage systems.
The execution process involves dividing the input data into splits, assigning map and reduce tasks to workers, processing the data through the user-defined map and reduce functions, handling intermediate data storage and retrieval, and finally producing the output after all tasks are completed.

Fault Tolerance:
Fault tolerance is a critical aspect, given the scale of operation and the likelihood of machine failures. The system handles worker failures by reassigning tasks and redoing work if necessary. The master node monitors worker status and orchestrates the reassignment of tasks as needed.
The system is designed to ensure that, despite failures, the output is consistent with what would be produced by a faultless, sequential execution, as long as the map and reduce functions are deterministic.

Master Data Structures:
The master node maintains data structures to keep track of the status of each task and the locations of intermediate data. This information is crucial for coordinating the work of map and reduce workers and for ensuring that data is correctly routed through the system.


Locality
Network Bandwidth Optimization:  importance of 
conserving network bandwidth, a relatively scarce resource in large computing environments.
Data Locality: The approach involves scheduling map tasks on machines that contain a replica of the input data, or at least are network-local to the data. This strategy significantly reduces network bandwidth usage as most data is read locally.

Granularity of Map and Reduce Phases: The map phase is subdivided into M pieces, and the reduce phase into R pieces. Ideally, M and R should be much larger than the number of worker machines.
Dynamic Load Balancing and Recovery: This setup improves dynamic load balancing and expedites recovery from worker failures, as the tasks a worker has completed can be redistributed across other workers.
Handling Stragglers: machines that take unusually long to complete tasks. Causes for stragglers include hardware issues, resource contention, or bugs.

Custom Partitioning: While a default partitioning function based on hashing is provided, users have the option to specify a custom partitioning function, allowing more control over how data is partitioned across reduce tasks.

Intermediate Key/Value Pair Ordering: The framework ensures that within a partition, intermediate key/value pairs are processed in increasing key order. This ordering guarantee is particularly beneficial when the output needs to be sorted or efficiently accessible by key.

Data Aggregation at Mapper Nodes: The Combiner function allows for partial merging of intermediate data on the mapper nodes before sending it over the network. This feature is especially useful when there's significant repetition in the intermediate keys, and the reduce function is commutative and associative.

Flexible Data Formats: The MapReduce library supports various formats for input data, providing flexibility to handle different types of data sources.
Spark
Spark, a new cluster computing framework designed to address the limitations of MapReduce and its variants in handling certain types of applications, particularly those involving iterative operations and interactive data analysis. 

- Limitations of MapReduce and Similar Systems:
they are primarily built around an acyclic data flow model. This model is not suitable for applications that need to reuse a working set of data across multiple parallel operations.

Iterative Jobs: Common in machine learning, these jobs require applying a function repeatedly to the same dataset. MapReduce's need to reload data from disk for each iteration causes significant performance penalties.

Interactive Analytics: Users often run ad-hoc queries on large datasets using SQL interfaces. With MapReduce, each query incurs high latency because it operates as a separate job and needs to read data from disk.

Spark aims to support applications with working sets while retaining the scalability and fault tolerance characteristics of MapReduce.
The key abstraction in Spark is the resilient distributed dataset (RDD), a read-only collection of objects partitioned across a set of machines. RDDs can be explicitly cached in memory and reused in multiple parallel operations, significantly improving performance for certain types of applications.


Fault Tolerance through Lineage:
RDDs are fault-tolerant, using a concept called lineage. If a partition of an RDD is lost, the system has sufficient information on how the RDD was derived from other RDDs to rebuild just the lost partition.

Implementation and Usability of Spark:
Spark is implemented in Scala, allowing for a high-level, statically typed, and functional programming interface. It can also be used interactively, which is a novel feature for a system of this kind, enabling users to process large datasets on a cluster interactively.

Spark can significantly outperform Hadoop in iterative machine learning workloads and provide interactive querying capabilities with sub-second latency for large datasets.



- Programming Model
Spark introduces two primary abstractions for parallel programming: resilient distributed datasets (RDDs) and parallel operations on these datasets, and shared variables.

RDDs are read-only collections of objects partitioned across machines, capable of being rebuilt if a partition is lost.
RDDs can be created directly from a file in a shared filesystem like HDFS, by parallelizing a collection in the driver program, or by transforming an existing RDD.
RDDs are lazy and ephemeral by default, meaning they are computed on demand and not stored persistently after use.
However, users can modify this behavior using two actions:
The cache action suggests keeping the dataset in memory after its initial computation for future reuse.
The save action evaluates and persists the dataset to a distributed filesystem, like HDFS.
This approach to persistence is a design choice in Spark to ensure continued operation under memory constraints or node failures, drawing a parallel to the concept of virtual memory.

Parallel Operations
Operations on RDDs:
Spark supports various parallel operations on RDDs, such as reduce (combining elements using a function), collect (sending all elements to the driver program), and foreach (applying a function to each element for its side effects).

Shared Variables
Handling Variables in Parallel Operations:
When parallel operations like map and filter are performed, the closures (functions) used can refer to variables in their creation scope. By default, these variables are copied to each worker node.

Types of Shared Variables:
Spark introduces two types of shared variables for common usage patterns:
Broadcast Variables: Used for distributing large, read-only pieces of data efficiently across workers.
Accumulators: These are "add-only" variables for workers and readable only by the driver, suitable for implementing counters and parallel sums.


-Text Search
The first example is a simple text search to count the number of lines containing "ERROR" in a large log file.
The process involves creating an RDD from the file, filtering lines containing "ERROR", mapping each line to 1, and then reducing by summing these ones.
This example illustrates Spark's lazy evaluation and in-memory data sharing capabilities, which allows for efficient data processing without materializing intermediate datasets.


- Logistic Regression
This program demonstrates an iterative machine learning algorithm, logistic regression, which benefits significantly from Spark's ability to cache data in memory across iterations.
The program reads points from a file, caches them, and then iteratively updates a vector w using a gradient computed in parallel across the points.
The use of accumulators for summing the gradient and the syntax of Spark make the code resemble an imperative serial program while being executed in parallel.

- Alternating Least Squares (ALS)

Implementation
The section details Spark's implementation, including its reliance on Mesos for cluster management, the structure of RDDs, task scheduling for parallel operations, handling of shared variables, and integration with the Scala interpreter.

Resilient Distributed Datasets (RDDs): RDDs are implemented as a chain of objects capturing their lineage, allowing efficient recomputation in case of node failures. Different types of RDDs (e.g., for files or transformed datasets) implement a standard interface for partitioning, iteration, and task scheduling.
Parallel Operations and Task Scheduling: Spark creates tasks for each RDD partition and tries to schedule them based on data locality. It uses a technique called delay scheduling for efficiency.
Handling of Shared Variables: Broadcast variables and accumulators are implemented with custom serialization formats to ensure efficient distribution and fault tolerance.
Interpreter Integration: Spark integrates with the Scala interpreter, allowing interactive processing of large datasets. Modifications were made to ensure that closures and state are correctly serialized and distributed to worker nodes.

5 Results
Logistic Regression Performance: Spark significantly outperforms Hadoop in iterative machine learning workloads, with up to 10x faster performance due to data caching.
Alternating Least Squares (ALS) Performance: The use of broadcast variables for distributing the ratings matrix results in substantial performance improvements in the ALS job.
Interactive Spark Usage: Spark enables interactive querying of a large dataset with sub-second response times after initial data loading, providing a much faster and more interactive experience than Hadoop.


MR ML model training
  • it may scale great, but overhead is high: ML training just not well suited for stateless, deterministic functions
    Spark better
Parameter Servers
  1. Shared Memory for Parameters
  2. Atomic Operations on Parameters
  3. Efficient Communication via RPCs
  4. Avoid Repartitioning Input Data
Sync?
  1. MR Approach: Requires strict synchronization (barrier synchronization) at the end of each stage, ensuring consistency but potentially leading to idle time as all nodes must wait for the slowest one.
  2. Parameter Server Approach: Offers flexibility. Synchronous mode ensures consistency but might slow down the training due to waiting times. Asynchronous mode can significantly speed up training as it allows nodes to compute and update independently, but it introduces some level of noise in the updates, which the training process must tolerate.
Parameter Servers

Sequential (BSP - Bulk Synchronous Parallel):

Traditional distributed computing, like in Spark, utilizes synchronous communication. Each iteration requires all tasks to be completed before moving to the next.
Advantages: Broad applicability and high convergence quality per iteration.
Disadvantages: Each iteration waits for the slowest task, leading to longer overall task computation times.
Bounded Delay (SSP - Staleness Synchronous Parallel):

Sets a maximum delay time, known as the staleness value, allowing a certain degree of inconsistency in task progress. When staleness = 0, it equates to the Sequential consistency model; when staleness = ∞, it becomes the Eventual consistency model.
Advantages: Reduces waiting time between tasks to a certain extent, offering faster computation speed and allowing developers to balance between algorithm convergence rate and system performance.
Disadvantages: The convergence quality per iteration may not be as high as BSP, potentially requiring more iterations to achieve similar convergence; not as broadly applicable as BSP, with some algorithms not suitable.
Eventual (ASP - Asynchronous Parallel):

Asynchronous communication, where tasks do not need to wait for each other. Tasks that complete first can proceed to the next iteration immediately.
Advantages: No waiting for other tasks, leading to fast computation speed.
Disadvantages: Poor applicability, potential decrease in convergence rate, or even non-convergence.
The synchronization restrictions of the three consistency models progressively relax to pursue faster computation speeds. In practice, the consistency model and related parameters should be adjusted based on the changes in metrics to balance convergence and computational speed.

User-defined Filters:

The system supports user-defined filters to filter out certain entries, thus reducing network bandwidth. Common filters include the significantly modified filter, which only pushes entries that have changed beyond a certain threshold, and the KKT filter, which uses conditions from optimization problems to filter out entries that have minimal impact on weights.

Vector Clock:
The parameter server uses a range vector clock to record the timestamp of each node's parameters. 
This helps in tracking the state of data and avoiding the resending of data. Since parameters are pushed and pulled in ranges, parameters within the same key range can share a timestamp. 
This approach compresses the traditional vector clock, reducing memory and network bandwidth overhead.

Messages:
Messages sent between nodes consist of a range vector clock and (key, value) pairs.
Message Compression:
Due to frequent updates of model parameters, the parameter server employs two methods to compress messages to reduce network bandwidth overhead:

Key Compression:
Since training data typically doesn't change during iterations, it's unnecessary for workers to send the same key lists each time. 
The server can cache the key lists upon the first reception. Subsequently, only the hash values of the key lists need to be sent for matching.

Value Compression:
Some parameter updates are not significant for final optimization, so users can define filter rules to discard unnecessary parameters. 
For instance, a large number of values being 0 or very small gradients can be inefficient in gradient descent and can be filtered out.

Consistency and Replication:
The parameter server uses consistent hashing to map keys and servers onto a ring according to a certain hash algorithm. 
Each server manages the key range from its insertion point counter-clockwise to another server. The server closest in the clockwise direction to a key on the ring is known as the primary server for that key range. 
Each server also backs up key ranges counter-clockwise, and these servers are known as backup servers for that key range. 
A physical server is often represented as multiple virtual servers to improve load balancing and fault tolerance.

Chain Replication vs. Replication after Aggregation:
Chain Replication:
As shown in the left diagram, worker 1 updates x, server 1 processes the data with a custom function f(x), and then backs up f(x) to server 2. 
The push is completed only after worker 1 receives an acknowledgment. This backup method can cause significant network bandwidth overhead for algorithms requiring frequent parameter updates.

Replication after Aggregation:
As shown in the right diagram, the server aggregates updates from all workers before backing up and then sends acknowledgments to workers. 
Waiting for aggregation can introduce latency in pulling updates, but this can be mitigated by relaxing the consistency model.

Server Management:
Adding a Server:
The server manager assigns a key range to the new server, and other servers adjust their key ranges accordingly.
The new server acquires the key range it will maintain as the primary server and the key ranges it will back up as a backup server.
The server manager broadcasts the changes to nodes.
Removing a Server:
When the server manager detects a server failure through heartbeat signals, it assigns the key range of the failed server to a new server and removes the failed server.
Worker Management:
Adding a Worker:
The task scheduler assigns data to the new worker.
The new worker loads training data and then fetches parameters from the server.
The task scheduler broadcasts node changes, which may cause other workers to release some training data.
Removing a Worker:
Losing a small portion of training data typically doesn't affect the training results. Moreover, restoring a worker requires more overhead than restoring a server. 
Therefore, removing a worker is usually done by simply disregarding the node. This can be used to terminate the slowest worker, mitigating the performance impact of stragglers. 
However, users can also choose to replace the lost worker with a new one.




Back to Blog Chapters

** Chapter 5: Cloud Storage **

Types of cs
---------------------------------
blob
arbitrary-sized “files” with simplified interface
limited interface and semantics
Example: AWS S3
Minimal promises re: sharing/concurrency, interrupted writes
- a big server
- a scalable service: aggregation of disk-equipped machines

But with additional opportunities for simplification
Object independence simplifies scaling (reduced coordination)
Disallowing (or discouraging) object updates simplifies code
paths, caching, use of space-saving encodings, etc.


Non-FS interface also separates from legacy/kernel code
• Standardized DFS implementations persist for a long time

---------------------------------
Block stores (virtual disks)
separate but attachable to any VM instance
Guest OS running in a VM has code for FSs on disks
So, give it a “disk” to use
Virtual disk looks to guest OS just like real disk
Most cloud infrastructures have this option
AWS Elastic Block Store (EBS), OpenStack Cinder

VDs often implemented as files
A file is a sequence of bytes
 can hold a sequence of fixed-sized blocks

Thin provisioning: Promise more, Allocate based on need
Performance interference
– Each VM may have a virtual disk

IOFlow: a Software-Defined Storage Architecture.
---------------------------------
“Local disk” as part of VM instance
exists for lifetime of instance
Key difference from #2-style block stores: visibility
– visible only to the instance it comes with
• makes sense, but can’t be attached to a different VM instance
– exists only as long as the VM instance
• can’t be attached to a different instance later
---------------------------------
“Traditional” distributed FS (DIY or aaS)

set up long-running instance(s) to be the DFS server(s)
with block stores (#2) or storage-enhanced instance(s) (#3)
running traditional DFS server software on the instance(s)
DFS services can be provided by CSP or third party
    charge for file service
CSP can implement a scalable file service and sell access

---------------------------------
Provide a “union” filesystem on each client

Make a single FS view from multiple FSs
Implemented by a layer atop the individual FSs
Each operation accesses “unioned” FSs as appropriate
    For read-only: Look in “first FS” first, then “second FS” if needed, …
    For creates/writes: put into “first” non-read-only FS
DFS in Cloud


Back to Blog Chapters

** Chapter 6: Scheduling Computation **

Cluster resource scheduler
  • schedule between tasks and machines
Start by assuming each machine is same, allocated as atomic units
Start by assuming each job require a single full machine, extensive user effort applied(user willing to help)
Simplest case: machine checkout
User looks at list of available machines and pick one, checkout 
* Centralized component is critical to success (avail list)
Scheduler comes into play
instead of user, scheduler picks machine
can run jobs on the machine: 
send to VMM or to agent that executes on the mahine

VMM or agent tells the scheduler to free resource
Still requires a central "list"
Resource requested comes in (RAM, MHz or cores of CPU)
Scheduler picks a machine with enough resource
- need to keep track 

Assume resource request is sufficient to the need
pick machine's VMM or agent ensures allocation fractions
intererence among jobs ignored
ignore unused fractions of machine

User’s resource request can be imperfect
Overcommit - moonitor and use
- assign more
- dealing with resources run out: kill, migrate VM, shrink allocation...
- Slack resources, time sharing
- Imagine that only 1/2 of the CPU has 


Users give more info about the resource request
- Reservation: min amount that must be provided
- Limit: upper bound amount can used
- Share: relative importance of different jobs


Machine differs
Scheduler still works in largely the same way
- special features require pruning set of options considered
- Exposing or hiding features? 
Heterogeneity in AWS
EC2 types...
Changing previous decisions
Scheduler has algorithm to change decisions of allocations

change is not free - comes with tradeoffs
Examples:
VM migration, shoot and restart, moving jobs between machines..
Non-resource constraints
e.g., some resources are closely allocated, near with another for performance

VMware constraint examples:
- Affinity: identiifes VMs that would beneefit from being on same machine for faster commu
- Anti-affinity: when crash, two VMs are not crashed together
Multi-machine tasks
Scheduler makes decision based on the whole requested resources
- some may give subset ASAP rather than waiting
- may also improve assignments after knowing the full set

-> To hoard or not to hoard
- hold back resources for large requests
- large requests may wait forever

Example: job and scheduling in MapReduce

Data Center: ToR - EoR - Core

MR assumes tree structure of DC, bdw higher on the same rack than than off-rack
JobTracker -> Master Node
TaskTrackers -> One or many slaves, coordinate with JobTracker
Send heartbeat to jobtracker every 5 seconds, liveness, availability, progress of current tasks...
Jobtracker combines updates to produce a global view
- JobTracker adds to queue
- schedular pick up and initialize


TaskTracker
- Fixed # slots for M & R tasks depending on resources
- When assigned a task, copy JAR from HDFS to local FS
- creates TaskRunner: launches child JVM, run taskm communicates progress to TaskTracker
- Then TaskTracker can tell JobTracker about what happens

Job Schedulr
- Fills Map slots before reduce slots


* Centralized job scheduler, default FIFO, Others are pluggable (Fair Scheduler, Capacity Scheduler)
* considers data-locality, failures, ...
* Reduce initiates the shuffle, Map will just store info at local disks


FIFO Default Job Scheduler:
- a job takes all cluster resources
- sch in order of submission
- only running job finishes, we go for new job
- staration with long-running jobs (small jobs are starved waiting for large...)
- No job preemption
- No evaluation of job size



Fair Job Scheduler (Facebook)
Each user is given a fair share of cluster capacity over time
Jobs in pools of equal size for each user
Supports preemption, kill tasks in pools running over capacity for pool not received enough

Resources are shared when a task from another user comes in
Depends on concurrent users

Capacity Job Scheduler (Yahoo!)
Defined for large clusters
- Multiple independent consumers

Job queues:
- each configured with number of slots(cap)
Designed preempt

Prioritization within Queue by default
Map Task Scheduling & Reduce Task Scheduling
Default per node 2 map slots, 2 reduce slots

Fault Tolerant
Task fails
- speculative execution
- - Drawbacks? heterogeneous environments, without checking TT speed/locality/load
- locate straggler? using hadoop monitorl, progress score


JVM fails
- why one task per JVM? isolation

Task Tracker fails: what does it mean? 
 
-> JT re-execute all mappers previously ran on that node
regenerate all intermediate data 
don't know what has been shuffles/not been shuffled
-> JT re-execute all reducers that were in progress on the failed TT

Node fails..

data failure -> HDFS deal with corrupted files, faulty nodes

Kubernetes
Microservices
- independent services, loosely coupled, well-defined API
Cloud Native apps, a collaction of small ms 
- We want Elasticity and ubiquity 
- Design principles: Auto-scaling, Design for Failure, Modularity, Stable Internal APIs, Automation


We use Kubernetes as Cloud-Native Orchestration Frameworks:
- LB
- self-healing
- Horizontal scaling
- Rolling updates & rollbacks
- Configurable & extensible(APIs)

Kubernetes Arch
Control Plane (Managed by Controller Plane Manager)
One controller controls a framework(like MR)
All requests from Node go to API server
-> write to etcd (in case failed, so preserved the requests)
-> new pod to scheduler
<- bind pod
-> write to etcd
-> bound pod to Kubelet
<- update status
-> write to etcd

State module: save state for failure 


Containers commu by IPC

Scheduler
Pod is a pending state

Filter() + Prioritize()
Filter: filter out all nodes which can't run the pod, configurable set of node predicates
- PodFitsResources
- PodMatchNodeSelector
- MatchInterPodAffinity (same/diff node as another pod)


Prioritize between remaining nodes


Multi-level scheduling
historically, just run seperate clusters, buying hardware for specific computations
Could just use a monolithic scheduler
  • can theoretically achieve optimal schedule
  • complexity, hard to scale and ensure resilience
  • hard to anticipate future framework’s requirements
  • need to refactor existing frameworks to yield control to central schedular
Two level scheduler
Global meta-scheduler (what framework and how many) determines when, which and how much =
 -> framework scheduler(what to do in each resource given)

+ simple, easoer to port
- distributed scheduling decision -> may be suboptimal, need to balance awareness with coordination overhead

Meta-scheduler may send resource offers to frameworks
and framework themselves select offfer and which task to run in which accepted resources
resource offer will be like: <#CPU, #GPU, ..>

challenges:
- allocation changes
- planning ahead, distributed hoarding
- limited visibility for frameworks into overall cluster state 



-> Remove a global schedular

Instead, using a shared state.. 
Exposed to all framework schedulers
each framework make and enact decisions independently
allow scheduling into future


Apache Yarn
any job
flexibility for user, Spark, Dryad, MR, ... 

meta-scheduler
framework-schedular: One master per job


YARN Architecture

Resource Manager: For the whole cluter
One per cluster
Request-based scheduler
tracks resource usage and node liveness (info from Node manager)
enforces allocation and arbitrates contention among competing jobs
- Fair, Capacity, Locality
Dynamically allocates lease to applications
Interacts with NM to assemble a global view
Can reclaim allocated resources by
- Collaborating with AMs
- Killing containers directly through the NM


Application Manager: Every job has a master
Managers running the AM
- One per job
Manages lifecycles of a job
plan & coordinate execution

Application Master:
each AM manages the job's individual tasks
each tasks runs within a container on each VM
has view of running status

Node Manager:
Node manager per node
Authenticates container leases
Monitors container executon
Report usage through heartbeat to RM
kills containers as directed by RM or AM

Containers is a bundle of resoruce

RM-NM protocol:
RM -> Start AM, a view of the cluster
<- AM: register itself
<- AM: regiser response
<- AM: allocate request
...
MR example
1. get job id
2. save job tasks files into HDFS
3. submit job to MN
4. start first container to Node manager
5. AM get launched, do the job....


Yarn schedulers..
Early bind: get a view of cluster from higher level, I want these nodes, then got em
Late bind: have a view, pick, resource manager has to check and make the actual decision
Failures
RM: SPOF, rec from Persistent storage
NM: from heartb, notify all AMs(tasks dead!), AMs request new resources
AM: RM restarts AM, sync with nodes and task, rerun the parts required
Task Failure: AM responsibility
Guest lecture: MACARON - Multi-cloud/region Aware Cache Auto-ReconfiguratiON
  • Autoscale intermediate “Caching”, using mini-scale sampling of miss vs cache size
  • Decaying factors to diminish the impact of statistics
  • Simulator: trace based, discrete event-driven to reduce cost of evaluating on a cloud
  • Belady alg to reduce miss ratios(compare values of cached objs)
  • Decide which item to cache in object storage cache based on cost of storing data and cost of re-transferring data

** Chapter 7: Diagnosis via monitoring **

Diagnosis via monitoring
  • why difficult? hard to find the sources of bottleneck
  • users
  • apps
  • VMs
Three primary data gathering methods
1. Gathering information from performance counters
Example: AWS CloudWatch, Ganglia
+ Lightweight, commonly available
- Black-box, aggregates, per node


2. Logging events of interest
detailed texts about system's behaviors
very available for most systems
+ White-box approach 
- High overhead, per-node


3. End-to-end activity tracing
Similar to logging, but workflow-based
Example: Dapper, Star-dust
+: white box, distinguishes workflows
-: requires software modifications

Gives more information with specific structure
A key cloud-specific issue
  • cloud providers and users do not want to share detailed information with each other
  • provider logs often not visible to users
  • users do not want to share algorithms to cloud providers
Example: Ganglia
Massie04

Designed for HPC env, bare-metal hardwares
Collects and aggregates coutners
Within cluster, counters visible everywhere
Counters form multiple cluster is aggregated


gmond per node in cluster:
periodically get statistics
periodically bradcast all stats to other nodes in the same cluster (XDR via UDP)

gmetad(can run anywhere):
periodically poll the statistics from one of gmonds (XML via TCP)
End-to-end tracing
Focus of many research efforts, currently used in Google, Bing, etcs
Traces show causality-related activity
- Trace: set of events from different threads/machines merged & sorted by causality
- Flow of indiv reqs


A DAG-based request flow
* time between nodes can be a bit off - clock drift
same process on same machine -> processing same requests
timestamp -> order
put the request ID to another machine, so we can know they are dealing with same requests

* Implementation

Tracing inf tracks trace points touched by individual requests
- Some start traces (generate unique trace ID)
- Others propagate ID: Transitivity of request ID
Traces obtained by stitching together trace points accessed by individual requests
(This can be done by background processes)
- Hard to account for async and batched work

If we do this for every request, the log can grow too huge..
If we use log entry for sampling decisions for storage, Cannot get the while picture of the request
If we decide based on request ID -> What most systems do

If we decide to sample interestng requests? How do we know? log after crash doesn't help...Dilemma 
Example: Dapper
Sigmelman10

Google's impl for e-t-e tracing, since 2008
Trace records gathered in external system
- median latency 15s 89% tile > hrs
End-to-end tracing analysis tools
Example: Spectroscope
Localizes perforamnce degradations
By ID'ing changed requests flows


Output: Groups of before/after request flows
Some changes automatically ID'd


Compare request flows between before and after, and find what changed
structure, time, ...
1. Structural change identification(same structure should have same trace points) via heuristics
2. Response-time only change identification (time between two points grows)
Then do the ranking

They tell where to look at the code and fix


Guest lecture Practical use of machine Learning in Amazon Redshift
Guest lecture Building a Cloud-Native Platform for the Future of AI
Cloud Co-location and Attacks on Public Cloud
- Sandbox: run untrusted code
Confinement
- Hardware isolation, complex with more users
- VM
- Process: System call interposition, Containers

Thread Model

AWS - Cloud Cartography
WHOIS-> IP to Instance mapping across accounts
How to prevent? 


Paper
zen hypervisor


Now - Side channels

- Securing the cloud with sandbox: gVisor

Privileged instrs: manage page tables, IO


Back to Blog Chapters