Presto 引擎的数据结构很复杂,因此需要正确计算 java 对象内存占用(这个代价并不总是轻微的, It is worth noting that calculating N is not always trivial as the engine has complex data structures and we need to properly account for the Java object overhead and we need to make sure that we don’t account for a piece of memory multiple times if there are multiple references to it ),对于这个问题, JOL (Java Object Layout) 库[3] 中提供了 API 来计算 Java 对象的保留大小。确保如果一个对象有多个引用的话,不会计算一段内存多次。只能通过仔细计算整个引擎的数据结构,避免重复计算
LongBigArray.java
1 2 3 4 5 6 7
privatestaticfinalint INSTANCE_SIZE = ClassLayout.parseClass(LongBigArray.class).instanceSize(); // jol 的 api
@Override publicsynchronized ListenableFuture<?> setBytes(long bytes) { checkState(!closed, "SimpleLocalMemoryContext is already closed"); checkArgument(bytes >= 0, "bytes cannot be negative");
if (bytes == usedBytes) { return NOT_BLOCKED; }
// update the parent first as it may throw a runtime exception (e.g., ExceededMemoryLimitException) ListenableFuture<?> future = parentMemoryContext.updateBytes(allocationTag, bytes - usedBytes, false); usedBytes = bytes; return future; }
@Override synchronized ListenableFuture<?> updateBytes(String allocationTag, long bytes, boolean enforceBroadcastMemoryLimit) { checkState(!isClosed(), "RootAggregatedMemoryContext is already closed"); ListenableFuture<?> future = reservationHandler.reserveMemory(allocationTag, bytes, enforceBroadcastMemoryLimit); addBytes(bytes); if (enforceBroadcastMemoryLimit) { addBroadcastBytes(bytes); } // make sure we never block queries below guaranteedMemory if (getBytes() < guaranteedMemory) { future = NOT_BLOCKED; } return future; }
/** * Reserves the given number of bytes. Caller should wait on the returned future, before allocating more memory. */ public ListenableFuture<?> reserve(QueryId queryId, String allocationTag, long bytes) { checkArgument(bytes >= 0, "bytes is negative");
ListenableFuture<?> result; synchronized (this) { if (bytes != 0) { queryMemoryReservations.merge(queryId, bytes, Long::sum); updateTaggedMemoryAllocations(queryId, allocationTag, bytes); } reservedBytes += bytes; if (getFreeBytes() <= 0) { if (future == null) { future = NonCancellableMemoryFuture.create(); } checkState(!future.isDone(), "future is already completed"); result = future; } else { result = NOT_BLOCKED; } }
config.getMemoryRevokingThreshold(): Revoke memory when memory pool is filled over threshold config.getMemoryRevokingTarget(): When revoking memory, try to revoke so much that pool is filled below target at the end config.getTaskSpillingStrategy(): PER_TASK_MEMORY_THRESHOLD 不支持 MemoryRevokingScheduler config.isQueryLimitSpillEnabled(): Spill whenever the total memory used by the query(including revocable and non-revocable memory) exceeds maxTotalMemoryPerNode, 当节点上的查询的用的内存到阈值就刷盘, 否则是看 memory pool 中的内存水位
成员变量memoryPoolListener: private final MemoryPoolListener memoryPoolListener = this::onMemoryReserved;,当memory reservation 完成的时候, MemoryPoolListener#onMemoryReserved 会被调用(queryMemoryReservation: the total amount of memory reserved by the query (revocable and regular))
/** * After calling this method operator should revoke all reserved revocable memory. * As soon as memory is revoked returned future should be marked as done. * Spawned threads can not modify OperatorContext because it's not thread safe. * For this purpose implement {@link #finishMemoryRevoke()} * <p> * Since memory revoking signal is delivered asynchronously to the Operator, implementation * must gracefully handle the case when there no longer is any revocable memory allocated. * After this method is called on Operator the Driver is disallowed to call any * processing methods on it (isBlocked/needsInput/addInput/getOutput) until * {@link #finishMemoryRevoke()} is called. */ default ListenableFuture<?> startMemoryRevoke() { return NOT_BLOCKED; }