Skip to content

Commit

Permalink
Timestamp support (#565)
Browse files Browse the repository at this point in the history
* Add add support for Timestamp type
* Add timestamp tests
* Fix timestamp pack overflow
* Add timestamp value support
* Support timestamp in unpackValue
* Use AirSpec for integration with ScalaCheck
* Fix ValueFactoryTest
* Remove scalatest
* Remove xerial-core dependency
* Remove unnecessary dependencies
* Variable.writeTo roundtrip tests
* Use consistent code style

Co-authored-by: Sadayuki Furuhashi <[email protected]>
  • Loading branch information
xerial and frsyuki authored May 17, 2021
1 parent 2864da3 commit 1b2ec92
Show file tree
Hide file tree
Showing 31 changed files with 1,934 additions and 956 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Here is a list of sbt commands for daily development:
> ~test:compile # Compile both source and test codes
> ~test # Run tests upon source code change
> ~testOnly *MessagePackTest # Run tests in the specified class
> ~testOnly *MessagePackTest -- -n prim # Run the test tagged as "prim"
> ~testOnly *MessagePackTest -- (pattern) # Run tests matching the pattern
> project msgpack-core # Focus on a specific project
> package # Create a jar file in the target folder of each project
> findbugs # Produce findbugs report in target/findbugs
Expand Down
18 changes: 12 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Global / concurrentRestrictions := Seq(
Tags.limit(Tags.Test, 1)
)

val AIRFRAME_VERSION = "20.4.1"

// Use dynamic snapshot version strings for non tagged versions
ThisBuild / dynverSonatypeSnapshots := true
// Use coursier friendly version separator
Expand Down Expand Up @@ -71,15 +73,19 @@ lazy val msgpackCore = Project(id = "msgpack-core", base = file("msgpack-core"))
"org.msgpack.value",
"org.msgpack.value.impl"
),
testFrameworks += new TestFramework("wvlet.airspec.Framework"),
libraryDependencies ++= Seq(
// msgpack-core should have no external dependencies
junitInterface,
"org.scalatest" %% "scalatest" % "3.2.8" % "test",
"org.scalacheck" %% "scalacheck" % "1.15.4" % "test",
"org.xerial" %% "xerial-core" % "3.6.0" % "test",
"org.msgpack" % "msgpack" % "0.6.12" % "test",
"commons-codec" % "commons-codec" % "1.12" % "test",
"com.typesafe.akka" %% "akka-actor" % "2.5.23" % "test"
"org.wvlet.airframe" %% "airframe-json" % AIRFRAME_VERSION % "test",
"org.wvlet.airframe" %% "airspec" % AIRFRAME_VERSION % "test",
// Add property testing support with forAll methods
"org.scalacheck" %% "scalacheck" % "1.15.4" % "test",
// For performance comparison with msgpack v6
"org.msgpack" % "msgpack" % "0.6.12" % "test",
// For integration test with Akka
"com.typesafe.akka" %% "akka-actor" % "2.5.23" % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.4.3" % "test"
)
)

Expand Down
2 changes: 2 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/core/MessagePack.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public static final boolean isFixedRaw(byte b)
public static final byte MAP32 = (byte) 0xdf;

public static final byte NEGFIXINT_PREFIX = (byte) 0xe0;

public static final byte EXT_TIMESTAMP = (byte) -1;
}

private MessagePack()
Expand Down
107 changes: 107 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.time.Instant;

import static org.msgpack.core.MessagePack.Code.ARRAY16;
import static org.msgpack.core.MessagePack.Code.ARRAY32;
Expand All @@ -41,6 +42,7 @@
import static org.msgpack.core.MessagePack.Code.EXT16;
import static org.msgpack.core.MessagePack.Code.EXT32;
import static org.msgpack.core.MessagePack.Code.EXT8;
import static org.msgpack.core.MessagePack.Code.EXT_TIMESTAMP;
import static org.msgpack.core.MessagePack.Code.FALSE;
import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
import static org.msgpack.core.MessagePack.Code.FIXEXT1;
Expand Down Expand Up @@ -798,6 +800,111 @@ else if (s.length() < (1 << 16)) {
return this;
}

/**
* Writes a Timestamp value.
*
* <p>
* This method writes a timestamp value using timestamp format family.
*
* @param instant the timestamp to be written
* @return this packer
* @throws IOException when underlying output throws IOException
*/
public MessagePacker packTimestamp(Instant instant)
throws IOException
{
return packTimestamp(instant.getEpochSecond(), instant.getNano());
}

/**
* Writes a Timesamp value using a millisecond value (e.g., System.currentTimeMillis())
* @param millis the millisecond value
* @return this packer
* @throws IOException when underlying output throws IOException
*/
public MessagePacker packTimestamp(long millis)
throws IOException
{
return packTimestamp(Instant.ofEpochMilli(millis));
}

private static final long NANOS_PER_SECOND = 1000000000L;

/**
* Writes a Timestamp value.
*
* <p>
* This method writes a timestamp value using timestamp format family.
*
* @param epochSecond the number of seconds from 1970-01-01T00:00:00Z
* @param nanoAdjustment the nanosecond adjustment to the number of seconds, positive or negative
* @return this
* @throws IOException when underlying output throws IOException
* @throws ArithmeticException when epochSecond plus nanoAdjustment in seconds exceeds the range of long
*/
public MessagePacker packTimestamp(long epochSecond, int nanoAdjustment)
throws IOException, ArithmeticException
{
long sec = Math.addExact(epochSecond, Math.floorDiv(nanoAdjustment, NANOS_PER_SECOND));
long nsec = Math.floorMod((long) nanoAdjustment, NANOS_PER_SECOND);

if (sec >>> 34 == 0) {
// sec can be serialized in 34 bits.
long data64 = (nsec << 34) | sec;
if ((data64 & 0xffffffff00000000L) == 0L) {
// sec can be serialized in 32 bits and nsec is 0.
// use timestamp 32
writeTimestamp32((int) sec);
}
else {
// sec exceeded 32 bits or nsec is not 0.
// use timestamp 64
writeTimestamp64(data64);
}
}
else {
// use timestamp 96 format
writeTimestamp96(sec, (int) nsec);
}
return this;
}

private void writeTimestamp32(int sec)
throws IOException
{
// timestamp 32 in fixext 4
ensureCapacity(6);
buffer.putByte(position++, FIXEXT4);
buffer.putByte(position++, EXT_TIMESTAMP);
buffer.putInt(position, sec);
position += 4;
}

private void writeTimestamp64(long data64)
throws IOException
{
// timestamp 64 in fixext 8
ensureCapacity(10);
buffer.putByte(position++, FIXEXT8);
buffer.putByte(position++, EXT_TIMESTAMP);
buffer.putLong(position, data64);
position += 8;
}

private void writeTimestamp96(long sec, int nsec)
throws IOException
{
// timestamp 96 in ext 8
ensureCapacity(15);
buffer.putByte(position++, EXT8);
buffer.putByte(position++, (byte) 12); // length of nsec and sec
buffer.putByte(position++, EXT_TIMESTAMP);
buffer.putInt(position, nsec);
position += 4;
buffer.putLong(position, sec);
position += 8;
}

/**
* Writes header of an Array value.
* <p>
Expand Down
62 changes: 60 additions & 2 deletions msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.time.Instant;

import static org.msgpack.core.MessagePack.Code.EXT_TIMESTAMP;
import static org.msgpack.core.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -595,6 +597,12 @@ private static MessagePackException unexpected(String expected, byte b)
}
}

private static MessagePackException unexpectedExtension(String expected, int expectedType, int actualType)
{
return new MessageTypeException(String.format("Expected extension type %s (%d), but got extension type %d",
expected, expectedType, actualType));
}

public ImmutableValue unpackValue()
throws IOException
{
Expand Down Expand Up @@ -643,7 +651,12 @@ public ImmutableValue unpackValue()
}
case EXTENSION: {
ExtensionTypeHeader extHeader = unpackExtensionTypeHeader();
return ValueFactory.newExtension(extHeader.getType(), readPayload(extHeader.getLength()));
switch (extHeader.getType()) {
case EXT_TIMESTAMP:
return ValueFactory.newTimestamp(unpackTimestamp(extHeader));
default:
return ValueFactory.newExtension(extHeader.getType(), readPayload(extHeader.getLength()));
}
}
default:
throw new MessageNeverUsedFormatException("Unknown value type");
Expand Down Expand Up @@ -707,7 +720,13 @@ public Variable unpackValue(Variable var)
}
case EXTENSION: {
ExtensionTypeHeader extHeader = unpackExtensionTypeHeader();
var.setExtensionValue(extHeader.getType(), readPayload(extHeader.getLength()));
switch (extHeader.getType()) {
case EXT_TIMESTAMP:
var.setTimestampValue(unpackTimestamp(extHeader));
break;
default:
var.setExtensionValue(extHeader.getType(), readPayload(extHeader.getLength()));
}
return var;
}
default:
Expand Down Expand Up @@ -1257,6 +1276,45 @@ private String decodeStringFastPath(int length)
}
}

public Instant unpackTimestamp()
throws IOException
{
ExtensionTypeHeader ext = unpackExtensionTypeHeader();
return unpackTimestamp(ext);
}

/**
* Internal method that can be used only when the extension type header is already read.
*/
private Instant unpackTimestamp(ExtensionTypeHeader ext) throws IOException
{
if (ext.getType() != EXT_TIMESTAMP) {
throw unexpectedExtension("Timestamp", EXT_TIMESTAMP, ext.getType());
}
switch (ext.getLength()) {
case 4: {
// Need to convert Java's int (int32) to uint32
long u32 = readInt() & 0xffffffffL;
return Instant.ofEpochSecond(u32);
}
case 8: {
long data64 = readLong();
int nsec = (int) (data64 >>> 34);
long sec = data64 & 0x00000003ffffffffL;
return Instant.ofEpochSecond(sec, nsec);
}
case 12: {
// Need to convert Java's int (int32) to uint32
long nsecU32 = readInt() & 0xffffffffL;
long sec = readLong();
return Instant.ofEpochSecond(sec, nsecU32);
}
default:
throw new MessageFormatException(String.format("Timestamp extension type (%d) expects 4, 8, or 12 bytes of payload but got %d bytes",
EXT_TIMESTAMP, ext.getLength()));
}
}

/**
* Reads header of an array.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.value;

/**
* Immutable representation of MessagePack's Timestamp type.
*
* @see org.msgpack.value.TimestampValue
*/
public interface ImmutableTimestampValue
extends TimestampValue, ImmutableValue
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ public interface ImmutableValue

@Override
public ImmutableStringValue asStringValue();

@Override
public ImmutableTimestampValue asTimestampValue();
}
33 changes: 33 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/value/TimestampValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.value;

import java.time.Instant;

/**
* Value representation of MessagePack's Timestamp type.
*/
public interface TimestampValue
extends ExtensionValue
{
long getEpochSecond();

int getNano();

long toEpochMillis();

Instant toInstant();
}
18 changes: 18 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/value/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.msgpack.value;

import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageTypeCastException;

import java.io.IOException;

Expand Down Expand Up @@ -180,6 +181,14 @@ public interface Value
*/
boolean isExtensionValue();

/**
* Returns true if the type of this value is Timestamp.
*
* If this method returns true, {@code asTimestamp} never throws exceptions.
* Note that you can't use <code>instanceof</code> or cast <code>((MapValue) thisValue)</code> to check type of a value because type of a mutable value is variable.
*/
boolean isTimestampValue();

/**
* Returns the value as {@code NilValue}. Otherwise throws {@code MessageTypeCastException}.
*
Expand Down Expand Up @@ -280,6 +289,15 @@ public interface Value
*/
ExtensionValue asExtensionValue();

/**
* Returns the value as {@code TimestampValue}. Otherwise throws {@code MessageTypeCastException}.
*
* Note that you can't use <code>instanceof</code> or cast <code>((TimestampValue) thisValue)</code> to check type of a value because type of a mutable value is variable.
*
* @throws MessageTypeCastException If type of this value is not Map.
*/
TimestampValue asTimestampValue();

/**
* Serializes the value using the specified {@code MessagePacker}
*
Expand Down
Loading

0 comments on commit 1b2ec92

Please sign in to comment.