Saturating a 10Gbps Link: My Experience Downloading 100TB of Code

Stargazer ZJ

Recently I was tasked with downloading massive code-related data. I found that my knowledge of the operating system is really beneficial to my work. This post documented my journey.

Infrastructure

We have a large CPU cluster managed by k8s. It featured a 10Gbps CN2 link to the internet, not only faster than most home connections, but also more stable when connecting to foreign server. The clusters have a shared GPFS storage, fast and capable. Despite the host servers having ample CPU and memory resources, for technical reasons we are only allowed to run our downloading programs in 4C8G interactive docker containers.

Because of the limited user-space CPU and memory resources, we later switched to a bare metal server featuring Intel(R) Xeon(R) Gold 6330 112 Core CPU, 512GB memory and an 1.84 TiB INTEL SSDPE2KX020T8 data drive. Downloaded data is uploaded to an internally hosted S3 server.

GH Archive & SWH Graph

I first downloaded GitHub Archive and Software Heritage Graph. GitHub Archive, despite the name, does not store actual code content on GitHub. It’s a third-party project aims to capture all the ‘events’ on GitHub, including project metadata and user interactions. Software Heritage do store code content, but it’s too large to download and analyze for now. Instead we chose to download Software Heritage Graph, a database containing all GitHub repositories’ file structure.

I used the aforementioned cluster to download both of them. It’s relatively easy to retrieve the urls to download. GH Archive links can be enumerated by date and hour, and SWH Graph is hosted on Amazon S3 whose direct download links were extracted by a few s3cmd commands.

The download method is also simple. I used Python multithreading to spawn at most 256 simultaneous aria2c processes, each is responsible for one link. Parameter -k 4 -x 4 -c was used for aria2c (further raising the connection number cap proved no marginal benefit). Error handling is minimal because faulty downloads can be retried by simply re-running the script, and aria2c will continue where it’s left off using the .aria2 progress file.

There are a few key points in writing the download CLI. First and foremost, it’s important to add begin and end parameters to download only a part of the files, for early trial runs and for splitting the workload to multiple servers. Secondly, displaying a tqdm progress bar is helpful to view the download progress in real time.

Asynchronous programming did not have an edge here because, at least in the case of Python asyncio, a thread must still be created for every subprocess spawn.

I discovered that a container with merely 4 cores is able to handle thousands of TCP connections and achieve about 5Gbps average download speed. Presumably, the main reason is that the network adapter driver and other networking heavy-lifting are run on the kernel side, and its CPU usage is not subject to the user-space limitations imposed by Docker. I couldn’t saturate the downstream link further because other people were downloading at the time.

The size of entire GitHub Archive up to June 2025 is about 3TB, and the SWH Graph May 2025 version accounts for roughly 20TB of space.

GitHub repositories

We moved on to cloning GitHub. We analyzed GH Archive and collected a list of all GitHub repositories with at least 5 stars, ranked by star count. By the time of writing I have cloned all repositories with at least 20 stars, totaling about 160,000 repositories and occupying about 60TB.

Git cloning is CPU intensive, so we switched to the bare metal server for downloading. Every repository is first cloned with git clone --bare, tarred, uploaded to an internal S3 server using bto3 and removed from local cache. The process is pipelined. Workers claimed jobs from a queue.Queue(), which is filled by workers of the previous pipeline stage.

A very important distinction in this download script was the use of a SQLite3 database for progress tracking. This made the script robust against interruptions and code modifications. In the database recorded the stage of the downloading process of each repository, and failure reason if exists. Any repositories not reaching the final stage were retried afresh on every run.

A dedicated ZFS pool is created and tuned on the data drive as local cache using the following configuration:

ZFS configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
sudo zpool create \
-o ashift=12 \
-o autotrim=on \
gittb \
/dev/disk/by-id/nvme-INTEL_SSDPE2KX020T8_PHLJ221000R42P0BGN
sudo zfs create -o mountpoint=/git-data \
-o recordsize=1M \
-o compression=off \
-o atime=off \
-o primarycache=all \
-o sync=disabled \
-o logbias=throughput \
gittb/repos

Notably, recordsize=1M is used because the git clone --bare mostly creates a few large .pack files, instead of many small files in normal git repositories.

A concurrency of 256 is enough to saturate the 10Gbps downstream link as well as host CPU usage.

I also implemented safeguards against disk filling up. When disk usage reached a predefined percentage (HWM), all cloning stopped automatically, and would only resume once the uploader reduced disk usage to a lower percentage (LWM). As it turned out, this safeguard is never triggered and the peak disk usage is about 600GB.

During initial testing, I found that uploading speed is slower than peak downloading speed of git cloning, despite the large number of workers. Since bto3 is based on Python urllib3, I suspected that the Python GIL was to blame. Switching to an external s3cmd or rclone for uploading was not optimal because each bto3 worker thread manages a persistent connection pool across uploads.

Thus I switched to python 3.13t, the new free-threaded version of Python. pandas which I used to load repository lists is not compatible with this new version so I simply offloaded its job to a dedicated python process.

The full cloning code is documented here:

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
#!/usr/bin/env python
# gh_bulk_clone.py
"""
Parallel bare-cloner + tar + uploader for GitHub
================================================

• Reads a parquet that contains (repo_id, repo_name, star_count, …)
• Filters and sorts by stars
• Clones with `git clone --bare` over SSH or HTTPS
• Handles 256 clone workers, 128 upload workers (configurable)
• zfs space management : pauses cloning when /git-data > high watermark
• Exponential back-off on transient network errors only
• Saves real-time state & errors to an SQLite data-base (resumable)
"""

from __future__ import annotations
import argparse, os, shutil, sqlite3, subprocess, sys, tarfile, time, queue, threading, logging, json
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Iterable, Union, Any

import boto3
import botocore.config
from boto3.s3.transfer import TransferConfig
from dotenv import load_dotenv
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from tqdm import tqdm
from rich.console import Console

# ------------------------------------------------------------------------------
# Configuration & CLI
# ------------------------------------------------------------------------------
DEF_CLONE_WORKERS = 256
DEF_UPLOAD_WORKERS = 256
DEF_STAR_THRESH = 64
DEF_HWM = 0.65
DEF_LWM = 0.25

console = Console()
log = logging.getLogger("gh_bulk")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)7s - %(message)s",
datefmt="%H:%M:%S")
SENTINEL = object()

@dataclass
class Repo:
repo_id: int
repo_name: str # "owner/name"
star_count: int
issue_count: int
pr_count: int
contributor_count: int

# Global counters for real-time tracking
clone_failed_count = 0
tar_failed_count = 0
upload_failed_count = 0
clone_failed_lock = threading.Lock()
tar_failed_lock = threading.Lock()
upload_failed_lock = threading.Lock()

# ------------------------------------------------------------------------------
# SQLite helper
# ------------------------------------------------------------------------------
DB_SCHEMA = """
CREATE TABLE IF NOT EXISTS repo_status(
repo_id INTEGER PRIMARY KEY,
repo_name TEXT,
cloned INTEGER DEFAULT 0,
tarred INTEGER DEFAULT 0,
uploaded INTEGER DEFAULT 0,
error_stage TEXT,
error_msg TEXT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

class DB:
def __init__(self, path: Path):
self.path = path
self._lock = threading.Lock()
self._conn = sqlite3.connect(path, check_same_thread=False, timeout=60)
with self._conn:
self._conn.executescript(DB_SCHEMA)

def init_repo(self, repo_id: int, repo_name: str):
"""Initialize a repo record with its name if it doesn't exist."""
with self._lock, self._conn:
self._conn.execute("""
INSERT INTO repo_status(repo_id, repo_name)
VALUES(?, ?)
ON CONFLICT(repo_id) DO UPDATE SET repo_name=?;
""", (repo_id, repo_name, repo_name))

def mark(self, repo_id: int, stage: str, ok: bool, errmsg: str|None = None):
with self._lock, self._conn:
if ok:
self._conn.execute(f"""
INSERT INTO repo_status(repo_id, repo_name, {stage})
VALUES(?, (SELECT repo_name FROM repo_status WHERE repo_id=?), 1)
ON CONFLICT(repo_id) DO UPDATE SET {stage}=1, error_stage=NULL, error_msg=NULL, ts=CURRENT_TIMESTAMP;
""", (repo_id, repo_id))
else:
self._conn.execute("""
INSERT INTO repo_status(repo_id, repo_name, error_stage, error_msg)
VALUES(?, (SELECT repo_name FROM repo_status WHERE repo_id=?), ?,?)
ON CONFLICT(repo_id) DO UPDATE SET error_stage=?, error_msg=?, ts=CURRENT_TIMESTAMP;
""", (repo_id, repo_id, stage, errmsg, stage, errmsg))

def already_done(self, repo_id: int) -> bool:
cur = self._conn.execute("SELECT uploaded FROM repo_status WHERE repo_id=?", (repo_id,))
row = cur.fetchone()
return row is not None and row[0] == 1

# ------------------------------------------------------------------------------
# Disk utilisation helpers
# ------------------------------------------------------------------------------
def zfs_usage(path="/git-data") -> tuple[int, int]:
"""Return (used_bytes, total_bytes) for the *pool* that contains path.
Fast: uses `statvfs`, you said querying the disk is OK.
"""
st = os.statvfs(path)
used = (st.f_blocks - st.f_bfree) * st.f_frsize
total = st.f_blocks * st.f_frsize
return used, total

# ------------------------------------------------------------------------------
# Git clone
# ------------------------------------------------------------------------------
def git_url(repo: Repo, use_ssh: bool) -> str:
if use_ssh:
return f"[email protected]:{repo.repo_name}.git"
return f"https://github.com/{repo.repo_name}.git"

class CloneError(Exception): pass

@retry(wait=wait_exponential(multiplier=1, min=2, max=64),
stop=stop_after_attempt(5),
retry=retry_if_exception_type(CloneError))
def clone_repo(repo: Repo, dest_dir: Path, use_ssh: bool):
if dest_dir.exists():
return # already cloned in this session
url = git_url(repo, use_ssh)
cmd = ["git", "clone", "--bare", url, str(dest_dir)]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
# network-ish?
if b"Could not resolve host" in e.stderr or b"Connection timed out" in e.stderr:
raise CloneError(e.stderr.decode())
else:
raise # non-retriable (repo deleted, private …)

# ------------------------------------------------------------------------------
# Tar + Upload
# ------------------------------------------------------------------------------
def tar_path(repo_id: int) -> Path:
return Path(f"/git-data/{repo_id}.tar")
# return Path(f"/dev/shm/{repo_id}.tar")

def make_tar(src_dir: Path, dst_tar: Path):
if dst_tar.exists():
return
with tarfile.open(dst_tar, "w") as tar:
tar.add(src_dir, arcname=src_dir.name)

# Thread-local storage for S3 clients
thread_local_data = threading.local()

def get_s3_client():
"""Get or create an S3 client for the current thread.

Uses thread-local storage to reuse S3 clients per thread, which is more
efficient than creating a new session and client for each task.
"""
if not hasattr(thread_local_data, 's3_client'):
config = botocore.config.Config(
max_pool_connections=1024,
retries={'max_attempts': 3}
)

sess = boto3.session.Session()
thread_local_data.s3_client = sess.client("s3",
endpoint_url=os.environ["S3_ENDPOINT"],
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
verify=False,
config=config)

return thread_local_data.s3_client

TRANSFER_CONFIG = TransferConfig(
multipart_threshold = 64*1024*1024, # 64 MB
multipart_chunksize = 64*1024*1024,
max_concurrency = 16,
)

def upload_tar(tar_file: Path, bucket: str, clnt):
clnt.upload_file(str(tar_file), bucket, tar_file.name, Config=TRANSFER_CONFIG)

# ------------------------------------------------------------------------------
# Worker thread functions
# ------------------------------------------------------------------------------
def clone_worker(repo: Repo, args, db: DB):
dest = Path(f"/git-data/{repo.repo_id}.git")
try:
clone_repo(repo, dest, args.ssh)
db.mark(repo.repo_id, "cloned", True)
except Exception as e:
# For subprocess errors, use stderr which contains more useful information
if isinstance(e, subprocess.CalledProcessError) and e.stderr:
try:
error_msg = e.stderr.decode() if isinstance(e.stderr, bytes) else str(e.stderr)
except Exception:
error_msg = str(e)
else:
error_msg = str(e)
db.mark(repo.repo_id, "cloned", False, error_msg)
raise

def tar_worker(repo: Repo, db: DB):
try:
make_tar(Path(f"/git-data/{repo.repo_id}.git"), tar_path(repo.repo_id))
db.mark(repo.repo_id, "tarred", True)
except Exception as e:
db.mark(repo.repo_id, "tarred", False, str(e))
raise

def upload_worker(repo: Repo, bucket: str, db: DB):
try:
# Get the thread-local S3 client (reused per thread)
clnt = get_s3_client()
upload_tar(tar_path(repo.repo_id), bucket, clnt)
db.mark(repo.repo_id, "uploaded", True)
# free space
shutil.rmtree(f"/git-data/{repo.repo_id}.git", ignore_errors=True)
tar_path(repo.repo_id).unlink(missing_ok=True)
except Exception as e:
db.mark(repo.repo_id, "uploaded", False, str(e))
raise

# ------------------------------------------------------------------------------
# Main orchestrator
# ------------------------------------------------------------------------------
def main(argv: list[str] | None = None):

load_dotenv()

ap = argparse.ArgumentParser()
ap.add_argument("parquet", help="Input parquet with repo info")
ap.add_argument("--star-threshold", type=int, default=DEF_STAR_THRESH)
ap.add_argument("--from-index", type=int, default=0,
help="start (inclusive) slice index after filtering/sorting")
ap.add_argument("--to-index", type=int, default=None,
help="end (exclusive)")
ap.add_argument("-j", "--clone-workers", type=int, default=DEF_CLONE_WORKERS)
ap.add_argument("-u", "--upload-workers", type=int, default=DEF_UPLOAD_WORKERS)
ap.add_argument("--ssh", action="store_true", help="use SSH instead of HTTPS")
ap.add_argument("--db", default="progress.sqlite3")
ap.add_argument("--hwm", type=float, default=DEF_HWM)
ap.add_argument("--lwm", type=float, default=DEF_LWM)
args = ap.parse_args(argv)

if any(Path("/git-data").iterdir()):
console.print("[bold red]/git-data is not empty, aborting.[/]")
sys.exit(1)

db = DB(Path(args.db))
bucket = os.environ["S3_BUCKET"]

# Run preprocessing script to filter and sort repos
console.print("[bold]Running preprocessing...")
preprocess_cmd = [
sys.executable, "preprocess.py", args.parquet,
"--star-threshold", str(args.star_threshold),
"--from-index", str(args.from_index),
]
if args.to_index is not None:
preprocess_cmd.extend(["--to-index", str(args.to_index)])

try:
subprocess.run(preprocess_cmd, check=True)
except subprocess.CalledProcessError as e:
console.print(f"[red]Preprocessing failed: {e}[/]")
sys.exit(1)

# Load processed repos from JSON
console.print("[bold]Loading processed repos...")
repos_file = Path("/dev/shm/repos.json")
if not repos_file.exists():
console.print(f"[red]Preprocessed file not found: {repos_file}[/]")
sys.exit(1)

with open(repos_file, 'r') as f:
repos_data = json.load(f)

repos: list[Repo] = [Repo(**row) for row in repos_data]

# Clean up temporary file
repos_file.unlink(missing_ok=True)

# Remove already uploaded
repos = [r for r in repos if not db.already_done(r.repo_id)]
total = len(repos)
console.print(f"Total to process: {total:,}")

clone_q : queue.Queue[Repo] = queue.Queue()
tar_q : queue.Queue[Repo] = queue.Queue()
upload_q : queue.Queue[Repo] = queue.Queue()

for r in repos:
clone_q.put(r)

del repos

# ----------------------------------------------------------------------
# ThreadPool executors
# ----------------------------------------------------------------------
clone_pool = ThreadPoolExecutor(max_workers=args.clone_workers)
tar_pool = ThreadPoolExecutor(max_workers=args.clone_workers)
upload_pool = ThreadPoolExecutor(max_workers=args.upload_workers)

clone_pbar = tqdm(total=total, desc="Cloned", position=0)
tar_pbar = tqdm(total=total, desc="Tarred", position=1)
upl_pbar = tqdm(total=total, desc="Upload", position=2) # remove 'ed' to make the words of equal length

# Per-stage failure counters
clone_fail_pbar = tqdm(total=0, desc="Clone Failed", position=3, bar_format='{desc}: {n}')
tar_fail_pbar = tqdm(total=0, desc="Tar Failed", position=4, bar_format='{desc}: {n}')
upl_fail_pbar = tqdm(total=0, desc="Upload Failed", position=5, bar_format='{desc}: {n}')

# ----------------------------------------------------------------------
# Driver loops
# ----------------------------------------------------------------------
def launch_clone():
while True:
try:
repo = clone_q.get_nowait()
except queue.Empty:
break
clone_pool.submit(clone_task, repo)

def clone_task(repo):
global clone_failed_count

try:
db.init_repo(repo.repo_id, repo.repo_name)

# Disk space guard - check before actually cloning
while True:
# I expected all new cloning stop and start together, which is not what the code here doing. I didn't fix it because this part is never triggered anyway.
used, tot = zfs_usage("/git-data")
usage_ratio = used / tot
if usage_ratio < args.hwm:
break
# Disk is getting full (>= HWM), wait for uploads to free space until < LWM
while usage_ratio >= args.lwm:
time.sleep(5)
used, tot = zfs_usage("/git-data")
usage_ratio = used / tot

clone_worker(repo, args, db)
clone_pbar.update(1)
tar_q.put(repo)
except Exception:
with clone_failed_lock:
clone_failed_count += 1
clone_fail_pbar.n = clone_failed_count
clone_fail_pbar.refresh()

def tar_task(repo):
global tar_failed_count

try:
tar_worker(repo, db)
tar_pbar.update(1)
upload_q.put(repo)
except Exception:
with tar_failed_lock:
tar_failed_count += 1
tar_fail_pbar.n = tar_failed_count
tar_fail_pbar.refresh()

def upload_task(repo):
global upload_failed_count

try:
upload_worker(repo, bucket, db)
upl_pbar.update(1)
except Exception:
with upload_failed_lock:
upload_failed_count += 1
upl_fail_pbar.n = upload_failed_count
upl_fail_pbar.refresh()

# Start background tar/upload pump threads ----------------------------
def pump_tar():
while True:
repo = tar_q.get()
if repo is SENTINEL:
break
tar_pool.submit(tar_task, repo)
tar_q.task_done()

def pump_upload():
while True:
repo = upload_q.get()
if repo is SENTINEL:
break
upload_pool.submit(upload_task, repo)
upload_q.task_done()


tar_thread = threading.Thread(target=pump_tar, daemon=True)
upl_thread = threading.Thread(target=pump_upload, daemon=True)
tar_thread.start()
upl_thread.start()


# Main cloning loop ----------------------------------------------------
launch_clone()
clone_pool.shutdown(wait=True)

tar_q.put(SENTINEL)
tar_thread.join()
tar_pool.shutdown(wait=True)

upload_q.put(SENTINEL)
upl_thread.join()
upload_pool.shutdown(wait=True)

clone_pbar.close(); tar_pbar.close(); upl_pbar.close()
clone_fail_pbar.close(); tar_fail_pbar.close(); upl_fail_pbar.close()
console.print("[green]All done![/]")

# ------------------------------------------------------------------------------
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
console.print("[red]Interrupted by user[/]")
  • Title: Saturating a 10Gbps Link: My Experience Downloading 100TB of Code
  • Author: Stargazer ZJ
  • Created at : 2025-07-09 13:14:32
  • Updated at : 2025-07-09 15:40:31
  • Link: https://ji-z.net/2025/07/09/Saturating-a-10Gbps-Link-My-Experience-Downloading-100TB-of-Code/
  • License: This work is licensed under CC BY-NC-SA 4.0.
On this page
Saturating a 10Gbps Link: My Experience Downloading 100TB of Code